该 notebookutils.udf 模块提供用于将笔记本代码与用户数据函数(UDF)项集成的实用工具。 可以从同一工作区或不同工作区中的 UDF 项访问函数,然后根据需要调用这些函数。 UDF 项可提升代码可重用性、集中维护和团队协作。
使用 UDF 实用工具可以:
- 函数检索 – 按名称从 UDF 项访问函数。
- 跨工作区访问 - 使用来自其他工作区中的 UDF 项的函数。
- 函数发现 – 检查可用的函数及其签名。
- 灵活的调用 – 调用函数时, 使用符合语言的参数。
注释
需要对目标工作区中的 UDF 项进行读取访问权限才能检索其函数。 UDF 函数中的异常传播到调用笔记本。
下表列出了可用的 UDF 方法:
| 方法 | Signature | 说明 |
|---|---|---|
getFunctions |
getFunctions(udf: String, workspaceId: String = ""): UDF |
按项目 ID 或名称从 UDF 项检索所有函数。 返回具有可调用函数属性的对象。 |
返回的对象公开以下属性:
| 财产 | 类型 | 说明 |
|---|---|---|
functionDetails |
列表 | 函数元数据字典的列表。 每个字典包括: Name (函数名称)、 Description (函数说明)、 Parameters (参数定义列表)、 FunctionReturnType (返回类型)和 DataSourceConnections (使用的数据源连接)。 |
itemDetails |
字典 | UDF 项元数据的字典,其中包含键:Id (工件 ID)、Name (项名称)、WorkspaceId (工作区 ID)和 CapacityId (容量 ID)。 |
<functionName> |
Callable | UDF 项中的每个函数都将成为返回的对象上的可调用方法。 用 myFunctions.functionName(...) 调用。 |
小窍门
检索一次 UDF 函数并缓存包装对象。 避免在循环中重复调用 getFunctions() - 缓存结果以最大程度地减少开销。
从 UDF 检索函数
使用 notebookutils.udf.getFunctions() 从 UDF 项目中获取所有函数。 可以为跨工作区访问选择性地指定工作区 ID。
# Get functions from a UDF item in the current workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
# Get functions from a UDF item in another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
调用函数
从 UDF 项检索函数后,按名称调用它们。 Python 支持位置参数和命名参数。 Scala 和 R 示例使用位置参数。
# Positional parameters
myFunctions.functionName('value1', 'value2')
# Named parameters (recommended for clarity)
myFunctions.functionName(parameter1='value1', parameter2='value2')
显示详细信息
可以编程方式检查 UDF 项元数据和函数签名。
显示 UDF 项目详细信息
显示函数详细信息
小窍门
使用新的 UDF 项时,请始终检查 functionDetails 。 这有助于在调用之前验证可用函数及其预期参数类型。
错误处理
将 UDF 调用包装在相应语言的异常处理中,以优雅地管理缺少函数或意外的参数类型。 在调用该函数之前,请始终验证 UDF 项中是否存在函数。
import json
try:
validators = notebookutils.udf.getFunctions('DataValidators')
# Check if function exists before calling
functions_info = json.loads(validators.functionDetails)
function_names = [f['Name'] for f in functions_info]
if 'validateSchema' in function_names:
is_valid = validators.validateSchema(
schema='sales_schema',
data_path='Files/data/sales.csv'
)
print(f"Schema validation: {'passed' if is_valid else 'failed'}")
else:
print("validateSchema function not available in this UDF item")
print(f"Available functions: {', '.join(function_names)}")
except AttributeError as e:
print(f"Function not found: {e}")
except TypeError as e:
print(f"Parameter type mismatch: {e}")
except Exception as e:
print(f"Error invoking UDF: {e}")
在数据管道中使用 UDF 函数
可以编写 UDF 函数以生成可重用的 ETL 步骤:
etl_functions = notebookutils.udf.getFunctions('ETLUtilities')
df = spark.read.csv('Files/raw/sales.csv', header=True)
cleaned_df = etl_functions.removeOutliers(df, columns=['amount'])
enriched_df = etl_functions.addCalculatedColumns(cleaned_df)
validated_df = etl_functions.validateAndFilter(enriched_df)
validated_df.write.mode('overwrite').parquet('Files/processed/sales.parquet')
print("ETL pipeline completed using UDF functions")
重要
UDF 调用具有开销。 如果重复使用相同的参数调用同一函数,请考虑缓存结果。 尽可能避免在紧密循环中调用 UDF 函数。