通过


用于 Fabric 的 NotebookUtils 用户数据功能(UDF)实用工具

notebookutils.udf 模块提供用于将笔记本代码与用户数据函数(UDF)项集成的实用工具。 可以从同一工作区或不同工作区中的 UDF 项访问函数,然后根据需要调用这些函数。 UDF 项可提升代码可重用性、集中维护和团队协作。

使用 UDF 实用工具可以:

  • 函数检索 – 按名称从 UDF 项访问函数。
  • 跨工作区访问 - 使用来自其他工作区中的 UDF 项的函数。
  • 函数发现 – 检查可用的函数及其签名。
  • 灵活的调用 – 调用函数时, 使用符合语言的参数。

注释

需要对目标工作区中的 UDF 项进行读取访问权限才能检索其函数。 UDF 函数中的异常传播到调用笔记本。

下表列出了可用的 UDF 方法:

方法 Signature 说明
getFunctions getFunctions(udf: String, workspaceId: String = ""): UDF 按项目 ID 或名称从 UDF 项检索所有函数。 返回具有可调用函数属性的对象。

返回的对象公开以下属性:

财产 类型 说明
functionDetails 列表 函数元数据字典的列表。 每个字典包括: Name (函数名称)、 Description (函数说明)、 Parameters (参数定义列表)、 FunctionReturnType (返回类型)和 DataSourceConnections (使用的数据源连接)。
itemDetails 字典 UDF 项元数据的字典,其中包含键:Id (工件 ID)、Name (项名称)、WorkspaceId (工作区 ID)和 CapacityId (容量 ID)。
<functionName> Callable UDF 项中的每个函数都将成为返回的对象上的可调用方法。 用 myFunctions.functionName(...) 调用。

小窍门

检索一次 UDF 函数并缓存包装对象。 避免在循环中重复调用 getFunctions() - 缓存结果以最大程度地减少开销。

从 UDF 检索函数

使用 notebookutils.udf.getFunctions() 从 UDF 项目中获取所有函数。 可以为跨工作区访问选择性地指定工作区 ID。

# Get functions from a UDF item in the current workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName')

# Get functions from a UDF item in another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')

调用函数

从 UDF 项检索函数后,按名称调用它们。 Python 支持位置参数和命名参数。 Scala 和 R 示例使用位置参数。

# Positional parameters
myFunctions.functionName('value1', 'value2')

# Named parameters (recommended for clarity)
myFunctions.functionName(parameter1='value1', parameter2='value2')

显示详细信息

可以编程方式检查 UDF 项元数据和函数签名。

显示 UDF 项目详细信息

display(myFunctions.itemDetails)

显示函数详细信息

display(myFunctions.functionDetails)

小窍门

使用新的 UDF 项时,请始终检查 functionDetails 。 这有助于在调用之前验证可用函数及其预期参数类型。

错误处理

将 UDF 调用包装在相应语言的异常处理中,以优雅地管理缺少函数或意外的参数类型。 在调用该函数之前,请始终验证 UDF 项中是否存在函数。

import json

try:
    validators = notebookutils.udf.getFunctions('DataValidators')

    # Check if function exists before calling
    functions_info = json.loads(validators.functionDetails)
    function_names = [f['Name'] for f in functions_info]

    if 'validateSchema' in function_names:
        is_valid = validators.validateSchema(
            schema='sales_schema',
            data_path='Files/data/sales.csv'
        )
        print(f"Schema validation: {'passed' if is_valid else 'failed'}")
    else:
        print("validateSchema function not available in this UDF item")
        print(f"Available functions: {', '.join(function_names)}")

except AttributeError as e:
    print(f"Function not found: {e}")
except TypeError as e:
    print(f"Parameter type mismatch: {e}")
except Exception as e:
    print(f"Error invoking UDF: {e}")

在数据管道中使用 UDF 函数

可以编写 UDF 函数以生成可重用的 ETL 步骤:

etl_functions = notebookutils.udf.getFunctions('ETLUtilities')

df = spark.read.csv('Files/raw/sales.csv', header=True)
cleaned_df = etl_functions.removeOutliers(df, columns=['amount'])
enriched_df = etl_functions.addCalculatedColumns(cleaned_df)
validated_df = etl_functions.validateAndFilter(enriched_df)

validated_df.write.mode('overwrite').parquet('Files/processed/sales.parquet')
print("ETL pipeline completed using UDF functions")

重要

UDF 调用具有开销。 如果重复使用相同的参数调用同一函数,请考虑缓存结果。 尽可能避免在紧密循环中调用 UDF 函数。