本页概述了 Microsoft Agent Framework 工作流系统中的 Checkpoints。
概述
检查点允许你在执行过程中在特定点保存工作流的状态,并在以后从这些点恢复。 此功能对于以下方案特别有用:
- 长时间运行的工作流,你希望在发生故障时避免丢失进度。
- 在长时间运行的工作流中,您希望可以在以后暂停和恢复执行。
- 出于审核或合规性目的,需要定期保存状态的工作流。
- 需要跨不同环境或实例迁移的工作流。
何时创建检查点?
请记住,工作流在 超级步骤中执行,如 核心概念中所述。 检查点在每个超级步骤结束时创建,之后该超级步骤中的所有执行程序都已完成执行。 检查点捕获工作流的整个状态,包括:
- 所有执行器的当前状态
- 下一个超级步骤工作流中的所有挂起消息
- 待处理的请求和响应
- 共享状态
捕获检查点
若要启用检查点,需要在运行工作流时提供 CheckpointManager。 可以通过 SuperStepCompletedEvent 或运行中的 Checkpoints 属性来访问检查点。
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
若要启用检查点,创建工作流时需要提供 CheckpointStorage。 然后,可以通过存储访问检查点。 代理框架提供三个内置实现 - 选择符合持久性和部署需求的实现:
| Provider | Package | Durability | 最适用于 |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
仅限于进程内 | 测试、演示、短期工作流 |
FileCheckpointStorage |
agent-framework |
本地磁盘 | 单机工作流,本地开发 |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | 生产、分布式、跨进程工作流 |
这三者都实现相同的 CheckpointStorage 协议,因此无需更改工作流或执行程序代码即可交换提供程序。
InMemoryCheckpointStorage 将检查点保留在进程内存中。 最适合用于测试、演示和无需在重启后保持持久性的短期工作流。
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
从检查点恢复
可以直接在同一次运行中从特定检查点恢复工作流。
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
可以直接在同一工作流实例上从特定检查点恢复工作流。
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
从检查点恢复
或者,可以将工作流从检查点解除冻结到新的运行实例。
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
或者,可以从检查点解除冻结新的工作流实例。
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
保存执行器状态
为了确保执行器的状态在检查点中被捕获,执行器必须重写OnCheckpointingAsync方法并将其状态保存到工作流上下文。
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
此外,为了确保从检查点恢复时正确还原状态,执行程序必须重写 OnCheckpointRestoredAsync 该方法并从工作流上下文加载其状态。
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
若要确保在检查点中捕获执行程序的状态,执行程序必须重写 on_checkpoint_save 该方法,并将其状态作为字典返回。
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
此外,为了确保从检查点恢复时能正确还原状态,执行程序必须重写该方法on_checkpoint_restore,并从提供的状态字典中恢复其状态。
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
安全注意事项
重要
检查点存储是信任边界。 无论是使用内置存储实现还是自定义存储实现,都必须将存储后端视为受信任的专用基础结构。 切勿从不受信任或可能被篡改的源加载检查点。
确保适当保护用于检查点的存储位置。 只有经过授权的服务和用户才能读取或写入检查点数据。
“Pickle” 序列化
FileCheckpointStorage 和 CosmosCheckpointStorage都使用 Python 的 pickle 模块序列化非 JSON 本机状态,例如数据类、日期时间和自定义对象。 为了缓解反序列化期间任意代码执行的风险,两个提供程序默认使用 受限的解压缩程序 。 反序列化期间,仅允许内置安全Python类型(基元、datetime、uuid、Decimal、通用集合等)和所有agent_framework内部类型。 检查点中遇到的任何其他类型都会导致反序列化失败,并导致出现WorkflowCheckpointException。
若要允许其他特定于应用程序的类型,请使用allowed_checkpoint_types格式通过"module:qualname"参数传递这些类型:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage 接受相同的参数:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
如果您的威胁模型完全不允许使用基于 pickle 的序列化,请使用 InMemoryCheckpointStorage 或实现具有替代序列化策略的自定义 CheckpointStorage 。
存储位置责任
FileCheckpointStorage 需要显式 storage_path 参数 - 没有默认目录。 虽然框架根据路径遍历攻击进行验证,但保护存储目录本身(文件权限、静态加密、访问控制)是开发人员的责任。 只有经过授权的进程才能对检查点目录具有读取或写入访问权限。
CosmosCheckpointStorage依赖 Azure Cosmos DB 作为存储。 尽可能使用托管标识/RBAC,将数据库和容器范围限定为工作流服务,并在使用基于密钥的身份验证时轮换帐户密钥。与文件存储一样,只有经过授权的主体才能对保存检查点文档的 Cosmos DB 容器拥有读取或写入访问权限。