Condividi tramite


flussi di lavoro di Microsoft Agent Framework - Checkpoint

Questa pagina offre una panoramica di Checkpoints nel sistema del flusso di lavoro Microsoft Agent Framework.

Informazioni generali

I checkpoint consentono di salvare lo stato di un flusso di lavoro in punti specifici durante l'esecuzione e riprendere da tali punti in un secondo momento. Questa funzionalità è particolarmente utile per gli scenari seguenti:

  • Flussi di lavoro con esecuzione prolungata in cui si desidera evitare di perdere lo stato di avanzamento in caso di errori.
  • Flussi di lavoro a esecuzione prolungata in cui si vuole sospendere e riprendere l'esecuzione in un secondo momento.
  • Flussi di lavoro che richiedono il salvataggio periodico dello stato per scopi di controllo o conformità.
  • Flussi di lavoro di cui è necessario eseguire la migrazione in ambienti o istanze diversi.

Quando vengono creati i checkpoint?

Tenere presente che i flussi di lavoro vengono eseguiti in superstep, come documentato nei concetti di base. I checkpoint vengono creati alla fine di ogni passaggio superiore, dopo che tutti gli executor in tale passaggio hanno completato l'esecuzione. Un checkpoint acquisisce l'intero stato del flusso di lavoro, tra cui:

  • Stato corrente di tutti gli esecutori
  • Tutti i messaggi in sospeso nel flusso di lavoro per il passaggio successivo
  • Richieste e risposte in sospeso
  • Stati condivisi

Cattura dei checkpoint

Per abilitare il checkpointing, è necessario fornire un oggetto CheckpointManager durante l'esecuzione del flusso di lavoro. È quindi possibile accedere a un checkpoint tramite un SuperStepCompletedEventoggetto o tramite la Checkpoints proprietà nell'esecuzione.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Per abilitare il checkpoint, è necessario specificare un CheckpointStorage durante la creazione di un flusso di lavoro. È quindi possibile accedere a un checkpoint tramite l'archivio. Agent Framework offre tre implementazioni predefinite: selezionare quella che corrisponde alle esigenze di durabilità e distribuzione:

Provider Package Durability Ideale per
InMemoryCheckpointStorage agent-framework Solo in corso Test, demo, flussi di lavoro di breve durata
FileCheckpointStorage agent-framework Disco locale Flussi di lavoro con computer singolo, sviluppo locale
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Flussi di lavoro di produzione, distribuiti e tra processi

Tutti e tre implementano lo stesso CheckpointStorage protocollo, in modo da poter scambiare provider senza modificare il codice del flusso di lavoro o dell'executor.

InMemoryCheckpointStorage mantiene i checkpoint nella memoria del processo. Ideale per test, demo e flussi di lavoro di breve durata in cui non è necessaria la durabilità tra i riavvii.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Riprendere dai checkpoint

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa istanza del flusso di lavoro.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Riattivazione da checkpoint

In alternativa, è possibile riattivare un flusso di lavoro da un checkpoint in una nuova istanza di esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

In alternativa, è possibile ri-idratare una nuova istanza del flusso di lavoro da un checkpoint.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Salvare gli stati dell'esecutore

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve sovrascrivere il metodo OnCheckpointingAsync e salvare il suo stato nel contesto del flusso di lavoro.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e caricarne lo OnCheckpointRestoredAsync stato dal contesto del flusso di lavoro.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve eseguire l'override del metodo on_checkpoint_save e restituire il suo stato come un dizionario.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e ripristinarne lo on_checkpoint_restore stato dal dizionario di stato fornito.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Considerazioni sulla sicurezza

Importante

L'archiviazione del checkpoint è un limite di attendibilità. Indipendentemente dal fatto che si usino le implementazioni di archiviazione predefinite o una personalizzata, il back-end di archiviazione deve essere considerato attendibile e privato. Non caricare mai checkpoint da origini non attendibili o potenzialmente manomesse.

Assicurarsi che l'ubicazione di archiviazione utilizzata per i checkpoint sia protetta in modo appropriato. Solo i servizi autorizzati e gli utenti devono avere accesso in lettura o scrittura ai dati del checkpoint.

Serializzazione con pickle

Sia FileCheckpointStorage che CosmosCheckpointStorage usare il modulo pickle di Python per serializzare lo stato nativo non JSON, ad esempio dataclassi, datetime e oggetti personalizzati. Per attenuare i rischi di esecuzione arbitraria del codice durante la deserializzazione, entrambi i provider usano un restricted unpickler per impostazione predefinita. Durante la deserializzazione sono consentiti solo un set predefinito di tipi di Python sicuri (primitive, datetime, uuid, Decimal, raccolte comuni e così via) e tutti i tipi interni /agent_framework. Qualsiasi altro tipo rilevato in un checkpoint provoca l'esito negativo della deserializzazione con WorkflowCheckpointException.

Per consentire tipi specifici dell'applicazione aggiuntivi, passarli tramite il parametro usando allowed_checkpoint_types"module:qualname" formato:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage accetta lo stesso parametro:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Se il modello di minaccia non consente affatto la serializzazione basata su pickle, si prega di usare InMemoryCheckpointStorage o implementare una versione personalizzata di CheckpointStorage con una strategia di serializzazione alternativa.

Responsabilità della posizione di archiviazione

FileCheckpointStorage richiede un parametro esplicito storage_path : non esiste alcuna directory predefinita. Mentre il framework convalida contro gli attacchi di path traversal, la protezione della directory di archiviazione stessa (permessi sui file, crittografia dei dati inattivi, controlli di accesso) è responsabilità dello sviluppatore. Solo i processi autorizzati devono avere accesso in lettura o scrittura alla cartella del checkpoint.

CosmosCheckpointStorage si basa su Azure Cosmos DB per l'archiviazione. Utilizzare l'identità gestita/RBAC, ove possibile, delimitare il database e il contenitore per il servizio di flusso di lavoro e ruotare le chiavi dell'account se si usa l'autenticazione basata su chiave. Come per l'archiviazione di file, solo le entità autorizzate devono avere accesso in lettura o scrittura al contenitore di Cosmos DB che contiene documenti di checkpoint.

Passaggi successivi