通过


NotebookUtils 笔记本运行和编排

使用笔记本实用工具运行笔记本、并行运行多个笔记本或退出具有值的笔记本。 运行以下命令以概要了解可用的方法:

notebookutils.notebook.help()

下表列出了可用的笔记本运行和业务流程方法:

方法 Signature 说明
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str 运行笔记本并返回其退出值。
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] 同时运行多个笔记本,同时支持依赖项关系。
validateDAG validateDAG(dag: Any): bool 验证 DAG 定义是否正确结构化。
exit exit(value: str): None 使用一个值退出并返回当前笔记本。

有关笔记本 CRUD 操作(创建、获取、更新、删除、列出),请参阅 “管理笔记本项目”。

注释

config参数runMultiple()仅在 Python 中可用。 Scala 和 R 不支持此参数。

注释

笔记本实用工具不适用于 Apache Spark 作业定义 (SJD)。

引用笔记本

该方法 run() 引用笔记本并返回其退出值。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。 被引用的笔记本将在调用此函数的笔记本的 Spark 池上运行。

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

例如:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

返回值

该方法 run() 返回在 notebookutils.notebook.exit(value) 子笔记本中传递的确切字符串。 如果在 exit() 子笔记本中未调用,则返回一个空字符串("")。

面料笔记本还支持通过指定 工作区 ID 来跨工作区引用笔记本。

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

打开单元格输出中的快照链接以查看参考运行情况。 快照捕获运行结果,并帮助调试引用的笔记本。

参考运行结果的屏幕截图。

快照示例的屏幕截图。

设置子笔记本以接收参数

创建通过 run()runMultiple()调用的子笔记本时,请设置参数单元格,以便笔记本可以从父级接收参数:

  1. 创建具有默认参数值的代码单元。
  2. 通过在笔记本 UI 中选择 “将单元格标记为参数 ”,将单元格标记为参数单元格。
  3. 在执行期间,参数单元格值将替换为从父级传递的参数。
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

小窍门

退出值始终是字符串。 如果需要父笔记本中的数值,在检索后转换结果(例如, int(result))。

注意事项

  • 跨工作区引用笔记本由运行时版本 1.2 及更高版本支持。
  • 如果使用笔记本资源下的文件,请在引用的笔记本中使用 notebookutils.nbResPath,以确保它指向与交互式运行相同的文件夹。
  • 引用运行仅当子笔记本使用与父级相同的 Lakehouse、继承父级的 Lakehouse 或两者都未定义时,才能运行。 如果子级指定的湖仓与父笔记本的不同,则会阻止执行。 若要绕过此检查,请在参数中设置 useRootDefaultLakehouse: True
  • 不要在try-catch块内调用notebookutils.notebook.exit(value)。 退出调用在异常处理中进行包装时不会生效。

并行运行多个笔记本

使用 notebookutils.notebook.runMultiple() 在并行或预定义的拓扑结构中运行多个 Jupyter 笔记本。 API 在 Spark 会话中使用多线程实现,这意味着引用的笔记本运行共享计算资源。

通过 notebookutils.notebook.runMultiple(),您可以:

  • 同时执行多个笔记本,而无需等待每个笔记本完成。

  • 使用简单的 JSON 格式为笔记本指定依赖项和执行顺序。

  • 优化 Spark 计算资源的使用,并降低 Fabric 项目的成本。

  • 在输出中查看每个笔记本运行记录的快照,并方便地调试/监视笔记本任务。

  • 获取每个执行活动的退出值,并在下游任务中使用它们。

运行 notebookutils.notebook.help("runMultiple") 以查看更多示例和使用情况详细信息。

运行简单的笔记本列表

以下示例并行运行笔记本列表:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

根笔记本中的执行结果如下所示:

参考笔记列表的屏幕截图。

返回值

该方法 runMultiple() 返回一个字典,其中每个键都是活动名称,每个值都是具有以下键的字典:

  • exitVal:子笔记本调用返回的 exit() 字符串;如果未 exit() 调用,则为空字符串。
  • exception:活动失败或 None 成功时出错对象。

使用 DAG 结构运行笔记本

下面的示例使用 notebookutils.notebook.runMultiple() 在 DAG 结构中运行笔记本。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

根笔记本中的执行结果如下所示:

显示参数笔记本列表的屏幕截图。

DAG 参数参考

下表描述了可在 DAG 定义中使用的每个字段:

领域 级别 必需 说明
activities 是的 定义要运行的笔记本的活动对象的列表。
timeoutInSeconds 整个 DAG 的最大超时时间。 默认值为 43200 (12 小时)。
concurrency 要并发运行的笔记本的最大数目。 默认值是可用 CPU 核心计数的 3 倍。 如果需要更严格的控制,请显式设置此值;或者使用 0 实现无限并发。
name 活动 是的 活动的唯一名称。 用于标识结果并定义依赖项。
path 活动 是的 要执行的笔记本项名称或路径。
timeoutPerCellInSeconds 活动 子笔记本中每个单元格的最大超时时间。 默认值为 90 秒。
args 活动 传递给子笔记本的参数配置字典。
workspace 活动 笔记本所在的工作区名称或 ID。 默认情况下,子笔记本在与调用方相同的工作区中运行。
retry 活动 如果活动失败,则重试尝试次数。 默认为 0。
retryIntervalInSeconds 活动 重试尝试之间的等待时间(以秒为单位)。 默认为 0。
dependencies 活动 必须在此活动开始之前完成的活动名称列表。

活动间出口值的参考

可以通过使用 @activity() 表达式来引用 args 字段中的依赖项活动的返回值。 此模式允许在 DAG 中的笔记本之间传递数据。

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

小窍门

使用字段中的@activity('activity_name').exitValue()args表达式将一个活动的结果传递到 DAG 中的另一个活动。

生成动态 DAG

可以以编程的方式为跨多个分区的扇出处理等场景生成 DAG 结构。

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

验证 DAG

用于 validateDAG() 在执行之前验证 DAG 结构是否有效。 它捕获重复活动名称、缺少依赖项和循环引用等问题。

notebookutils.notebook.validateDAG(DAG)

返回值

此方法 validateDAG() 返回 True DAG 结构是否有效,或者在验证失败时引发异常。

小窍门

始终先在生产工作流中调用validateDAG()runMultiple(),以尽早捕获结构错误。

处理“runMultiple”失败

该方法 runMultiple() 返回一个字典,其中每个键都是活动名称,每个值都包含一个 exitVal (字符串)和一个 exception (错误对象或 None)。 即使某些活动失败,也可以检查部分结果:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

注意事项

  • 多个笔记本运行的并行度受限于 Spark 会话的总可用计算资源。
  • 并发笔记本的默认数目是 可用 CPU 核心计数的 3 倍。 可以自定义此值,但由于计算资源使用率过高,过度并行可能会导致稳定性和性能问题。 如果出现相关问题,请考虑将笔记本拆分为多个 runMultiple 调用,或者通过调整 DAG 参数中的“并发”字段来减少并发。
  • 整个 DAG 的默认超时为 12 小时,子笔记本中每个单元格的默认超时时间为 90 秒。 可通过在 DAG 参数中设置“timeoutInSeconds”和“timeoutPerCellInSeconds”字段更改超时。
  • 配置 retryretryIntervalInSeconds 以应对因暂时性问题(例如网络超时或临时服务不可用)可能导致失败的活动。
  • 并行笔记本在单个 Spark 会话中共享计算资源。 监视资源利用率,以避免内存压力和 CPU 争用。

退出笔记本应用程序

exit()方法返回一个值,退出笔记本。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。

  • 以交互方式从笔记本调用 exit() 函数时,Fabric 笔记本会引发异常,跳过运行后续单元格,并使 Spark 会话保持活动状态。

  • 在调用 exit() 函数的管道中协调笔记本时,笔记本活动会返回退出值。 这会完成管道运行并停止 Spark 会话。

  • 在被引用的笔记本中调用函数 exit() 时,Fabric Spark 会停止对引用的笔记本的进一步执行,并继续在调用该 run() 函数的主笔记本中运行下一个单元格。 例如:Notebook1 有三个单元格,调用第二个单元格中的 exit() 函数。 Notebook2 有五个单元格,调用第三个单元格中的 run(notebook1) 函数。 运行 Notebook2 时,Notebook1 在调用 exit() 函数时会在第二个单元格处停止。 Notebook2 会继续运行其第四个和第五个代码单元格。

notebookutils.notebook.exit("value string")

返回行为

该方法 exit() 不返回值。 它会终止当前笔记本,并将提供的字符串传递给调用笔记本或管道。

注释

exit() 函数覆盖当前单元格输出。 为了避免丢失其他代码语句的输出,请调用单独的单元格中的 notebookutils.notebook.exit()

重要

不要在try-catch块内调用notebookutils.notebook.exit()。 退出在被异常处理包装时不会生效。 调用 exit() 必须位于代码的顶层才能正常工作。

例如:

Sample1 笔记本包含以下两个单元格:

  • 单元格 1 定义 input 参数,默认值设为 10。

  • 单元 2 以input作为退出值退出笔记本。

显示退出函数示例的笔记本屏幕截图。

可以在另一个具有默认值的笔记本中运行 Sample1

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

输出:

10

可以在另一个笔记本中运行 Sample1 ,并将 输入 值设置为 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

输出:

20