通过


NotebookUtils 在 Fabric 中的会话管理

在 Microsoft Fabric 中使用 notebookutils.session 管理笔记本会话的生命周期。 可以停止交互式会话或重启 Python 解释器以清理、资源管理和错误恢复。

下表列出了可用的会话管理方法:

方法 Signature 说明
stop stop(): void 异步停止当前交互式会话并释放资源。 在 PySpark、Scala 和 R 笔记本中,接受可选 detach 参数。
restartPython restartPython(): void 重启 Python 解释器,同时保持 Spark 上下文不变。 仅在 Python 和 PySpark 笔记本中可用。

注释

此方法 stop() 在 Python、PySpark、Scala 和 R 笔记本中可用。 在 PySpark、Scala 和 R 笔记本中,stop()接受可选detach参数: stop(detach=True) detach 如果True为 (默认值),会话会从高并发会话中分离,而不是完全停止会话。

注释

该方法 restartPython() 仅在 Python 和 PySpark 笔记本中可用。 它在 Scala 或 R 笔记本中不可用。

重要

在管道执行模式下,会话在代码完成后自动停止。 session.stop() API 主要用于在交互式会话中,以编程方式停止会话,而不是依靠手动点击停止按钮。

注释

该方法 stop() 在后台异步运行并释放 Spark 会话资源,以便它们可供同一池中的其他会话使用。

停止交互式会话

可以通过在代码中调用 API 来停止交互式会话,而不是手动选择停止按钮。

notebookutils.session.stop()

API 在 notebookutils.session.stop() 后台异步停止当前交互式会话。 它还会停止 Spark 会话并释放会话占用的资源,因此它们可用于同一池中的其他会话。

注释

session.stop() 之后的代码不执行。 会话停止后,所有内存中数据和变量都将丢失。 在调用 session.stop()之前保存重要数据。

返回行为

该方法 stop() 不返回值。 启动会话的异步关闭过程。

重启 Python 解释器

用于 notebookutils.session.restartPython() 重启 Python 解释器。

注释

在 PySpark(Spark)笔记本中, restartPython() 仅重启 Python 解释器,同时保持 Spark 上下文不变。 在没有 Spark 上下文的 Python 笔记本中, restartPython() 重启整个 Python 进程。

notebookutils.session.restartPython()

返回行为

该方法 restartPython() 不返回值。 重启完成后,代码执行将继续在下一个单元格中。

请记住以下注意事项:

  • 在笔记本引用运行案例中, restartPython() 仅重启正在引用的当前笔记本的 Python 解释器。 它不会影响父笔记本。
  • 在极少数情况下,由于 Spark 反射机制,命令可能会失败。 添加重试可以缓解问题。
  • 调用 restartPython()后,代码执行将继续在下一个单元格中。 在后续单元中导入新安装的包。

使用模式

在停止之前优雅地清理

使用try-finally 块来确保清理在会话停止之前运行。

try:
    print("Starting data processing...")
    # ... processing logic here ...

except Exception as e:
    print(f"Processing failed: {str(e)}")
    raise

finally:
    print("Performing cleanup...")
    try:
        notebookutils.fs.unmount("/mnt/data")
    except:
        pass

    notebookutils.session.stop()

安装包并重启解释器

使用 pip 安装新包后,请重启 Python 解释器,以便包可用:

import subprocess
import sys

packages = ["pandas==2.0.0", "numpy==1.24.0"]

print("Installing packages...")
for package in packages:
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

print("Restarting Python interpreter...")
notebookutils.session.restartPython()

注释

调用 restartPython()后,代码执行将继续在下一个单元格中。 在后续单元格中导入新安装的包。

使用重启解释器进行错误恢复

如果 Python 解释器达到损坏状态,可以通过重启它来尝试恢复:

def recover_from_error():
    """Attempt to recover from errors by restarting Python."""

    try:
        test_value = 1 + 1
    except Exception as e:
        print(f"Python interpreter error: {str(e)}")
        print("Restarting Python interpreter...")
        notebookutils.session.restartPython()
        return False

    return True

if not recover_from_error():
    print("Recovery attempted - check next cell")

停止前进行资源清理

在终止会话之前清理装载的路径、临时文件和缓存:

try:
    df = spark.range(0, 1000000)
    df.cache()
    result = df.count()
    print(f"Processing completed: {result}")

except Exception as e:
    print(f"Operation failed: {str(e)}")
    raise

finally:
    spark.catalog.clearCache()
    print("Stopping session to free resources...")
    notebookutils.session.stop()

仅限交互式模式的条件停止

在停止之前检查执行上下文,以避免在管道模式下进行不必要的调用:

context = notebookutils.runtime.context

if not context['isForPipeline']:
    print("Interactive mode: stopping session...")
    notebookutils.session.stop()
else:
    print("Pipeline mode: session stops automatically after execution")

小窍门

在调用session.stop()session.restartPython()之前,请始终保存重要的结果,例如将数据帧写入存储或记录输出日志。 这两个操作都会丢弃内存中的所有状态。