Condividi tramite


Orchestrazioni dei flussi di lavoro di Microsoft Agent Framework - Sequenziale

Nell'orchestrazione sequenziale gli agenti sono organizzati in una pipeline. Ogni agente elabora l'attività a sua volta, passando l'output all'agente successivo nella sequenza. Questo è ideale per i flussi di lavoro in cui ogni passaggio si basa su quello precedente, ad esempio la revisione dei documenti, le pipeline di elaborazione dei dati o il ragionamento a più fasi.

Orchestrazione sequenziale

Importante

La cronologia completa della conversazione degli agenti precedenti viene passata all'agente successivo nella sequenza. Ogni agente può visualizzare tutti i messaggi precedenti, consentendo l'elaborazione compatibile con il contesto.

Cosa Imparerai

  • Come creare una pipeline sequenziale di agenti
  • Come concatenare gli agenti in cui ognuno si basa sull'output precedente
  • Come aggiungere l'approvazione umana per le chiamate a strumenti sensibili
  • Come combinare agenti con executor personalizzati per attività specializzate
  • Come tenere traccia del flusso della conversazione attraverso la pipeline

Definire gli agenti

Nell'orchestrazione sequenziale, gli agenti sono organizzati in una pipeline in cui ogni agente elabora l'attività a sua volta, passando l'output all'agente successivo nella sequenza.

Configurare il client OpenAI di Azure

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Avviso

DefaultAzureCredential è utile per lo sviluppo, ma richiede un'attenta considerazione nell'ambiente di produzione. Nell'ambiente di produzione prendere in considerazione l'uso di credenziali specifiche ,ad esempio ManagedIdentityCredential, per evitare problemi di latenza, probe di credenziali indesiderate e potenziali rischi per la sicurezza dai meccanismi di fallback.

Creare agenti specializzati che funzioneranno in sequenza:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurare l'orchestrazione sequenziale

Compilare il flusso di lavoro usando AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Eseguire il flusso di lavoro sequenziale

Eseguire il flusso di lavoro ed elaborare gli eventi:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Output di esempio

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Orchestrazione sequenziale con human-in-the-loop

Le orchestrazioni sequenziali supportano le interazioni umane nel ciclo tramite l'approvazione degli strumenti. Quando gli agenti usano strumenti avvolti con ApprovalRequiredAIFunction, il flusso di lavoro si sospende e genera un RequestInfoEvent contenente ToolApprovalRequestContent. I sistemi esterni (ad esempio un operatore umano) possono esaminare la chiamata dello strumento, approvarla o rifiutarla e il flusso di lavoro riprende di conseguenza.

Orchestrazione sequenziale con intervento umano

Suggerimento

Per altri dettagli sul modello di richiesta e risposta, vedere Human-in-the-Loop.

Definire gli agenti con strumenti che richiedono approvazione

Creare agenti in cui gli strumenti sensibili vengono incapsulati con ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Compilare ed eseguire con la gestione delle approvazioni

Creare il flusso di lavoro sequenziale come di consueto. Il flusso di approvazione viene gestito tramite il flusso di eventi:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Annotazioni

AgentWorkflowBuilder.BuildSequential() supporta l'approvazione degli strumenti di default — non è necessaria alcuna configurazione aggiuntiva. Quando un agente chiama uno strumento wrappato con ApprovalRequiredAIFunction, il flusso di lavoro si sospende automaticamente e genera un RequestInfoEvent.

Suggerimento

Per un esempio completo eseguibile di questo flusso di approvazione, vedere l'esempioGroupChatToolApproval. Lo stesso RequestInfoEvent modello di gestione si applica ad altre orchestrazioni.

Concetti chiave

  • Elaborazione sequenziale: ogni agente elabora l'output dell'agente precedente nell'ordine
  • AgentWorkflowBuilder.BuildSequential(): crea un flusso di lavoro della pipeline da una raccolta di agenti
  • ChatClientAgent: rappresenta un agente supportato da un client di chat con istruzioni specifiche
  • InProcessExecution.RunStreamingAsync(): esegue il flusso di lavoro e restituisce un StreamingRun per lo streaming di eventi in tempo reale
  • Gestione eventi: monitorare l'avanzamento attraverso AgentResponseUpdateEvent e il completamento attraverso WorkflowOutputEvent
  • Approvazione degli strumenti: Avvolgere gli strumenti sensibili con ApprovalRequiredAIFunction per richiedere l'approvazione umana prima dell'esecuzione
  • RequestInfoEvent: generato quando uno strumento richiede l'approvazione; contiene ToolApprovalRequestContent con i dettagli della chiamata dello strumento

Nell'orchestrazione sequenziale, ogni agente elabora l'attività a sua volta, con il flusso dell'output che passa da uno a quello successivo. Per iniziare, definire gli agenti per un processo in due fasi:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Configurare l'orchestrazione sequenziale

La SequentialBuilder classe crea una pipeline in cui gli agenti elaborano le attività in ordine. Ogni agente vede la cronologia completa della conversazione e aggiunge la risposta:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Eseguire il flusso di lavoro sequenziale

Eseguire il flusso di lavoro e raccogliere la conversazione finale che mostra il contributo di ogni agente:

from typing import Any, cast
from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output":
        outputs.append(cast(list[Message], event.data))

if outputs:
    print("===== Final Conversation =====")
    messages: list[Message] = outputs[-1]
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Output di esempio

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Avanzato: integrazione di agenti con executor personalizzati

L'orchestrazione sequenziale supporta la combinazione di agenti con executor personalizzati per l'elaborazione specializzata. Ciò è utile quando è necessaria una logica personalizzata che non richiede un LLM:

Definire un executor personalizzato

Annotazioni

Quando un executor personalizzato segue un agente nella sequenza, il relativo gestore riceve un oggetto AgentExecutorResponse (perché gli agenti sono internamente avvolti da AgentExecutor). Usare agent_response.full_conversation per accedere alla cronologia completa della conversazione.

from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.send_message(list(agent_response.full_conversation) + [summary])

Creare un flusso di lavoro sequenziale misto

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Output di esempio con executor personalizzato

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Orchestrazione sequenziale con human-in-the-loop

Le orchestrazioni sequenziali supportano le interazioni umane nel ciclo in due modi: l'approvazione dello strumento per controllare le chiamate agli strumenti sensibili e richiedere informazioni per la sospensione dopo ogni risposta dell'agente per raccogliere feedback.

Orchestrazione sequenziale con intervento umano

Suggerimento

Per altri dettagli sul modello di richiesta e risposta, vedere Human-in-the-Loop.

Approvazione degli strumenti nei flussi di lavoro sequenziali

Usare @tool(approval_mode="always_require") per contrassegnare gli strumenti che richiedono l'approvazione umana prima dell'esecuzione. Il flusso di lavoro sospende e genera un request_info evento quando l'agente tenta di chiamare lo strumento.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Elaborare il flusso di eventi e gestire le richieste di approvazione:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Suggerimento

Per un esempio eseguibile completo, vedere sequential_builder_tool_approval.py. L'approvazione degli strumenti funziona con SequentialBuilder senza alcuna configurazione aggiuntiva del builder.

Richiedi informazioni per commenti e suggerimenti dell'agente

Usare .with_request_info() per sospendere dopo la risposta di agenti specifici, consentendo l'input esterno (ad esempio la revisione umana) prima che inizi l'agente successivo:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Concetti chiave

  • Contesto condiviso: ogni partecipante riceve la cronologia completa della conversazione, inclusi tutti i messaggi precedenti
  • L'ordine è importante: gli agenti vengono eseguiti rigorosamente nell'ordine specificato nell'elenco participants
  • Partecipanti flessibili: è possibile combinare agenti ed esecutori personalizzati in qualsiasi ordine
  • Flusso di conversazione: ciascun agente/esecutore aggiunge alla conversazione, creando un dialogo completo.
  • Approvazione degli strumenti: usare @tool(approval_mode="always_require") per operazioni sensibili che richiedono una revisione umana
  • Richiedi informazioni: usare .with_request_info(agents=[...]) per sospendere dopo agenti specifici per commenti e suggerimenti esterni

Passaggi successivi