通过


你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

智能体应用模式

使用 AI 生成代理应用程序有两种常规方法:

  • 确定性工作流 - 代码定义控制流。 使用标准编程构造编写步骤、分支、并行度和错误处理序列。 LLM 在每个步骤中执行工作,但不控制整体流。
  • 代理定向工作流(代理循环) - LLM 驱动控制流。 代理决定按什么顺序以及任务完成时间调用哪些工具。 提供工具和说明,但代理确定运行时的执行路径。

这两种方法都受益于 持久执行 ,并且可以使用 Durable Task 编程模型实现。 本文介绍如何使用代码示例生成每个模式。

小窍门

这些模式与Anthropic的构建有效代理中所述的代理工作流设计保持一致。 Durable Task 编程模型与这些模式自然映射:编排定义工作流控制流并自动进行检查点保存,而活动封装非确定性操作,例如 LLM 调用、工具调用和 API 请求。

选择方法

下表可帮助你确定何时使用每个方法。

使用确定性工作流... 使用代理循环...
提前知道步骤序列。 任务是开放式的,无法预测步骤。
需要对代理行为制定显式的约束。 你希望 LLM 决定使用哪些工具以及何时使用。
合规性或可审核性需要可审核的控制流。 代理需要根据中间结果调整其方法。
你想要在单个工作流中合并多个 AI 框架。 你正在生成具有工具调用功能的聊天代理。

两种方法都提供自动检查点、重试策略、分布式扩展以及通过持久执行实现的人机协同支持。

确定性工作流模式

在确定性工作流中,代码控制执行路径。 LLM 在工作流中被调用作为一个步骤,但是不决定接下来会发生什么。 Durable Task 编程模型自然映射到此方法:

  • 编排 定义了工作流控制流(顺序、分支、并行、错误处理)并自动创建检查点。
  • 活动 包装非确定性操作,例如 LLM 调用、工具调用和 API 请求。 活动可以在任何可用的计算实例上运行。

以下示例使用 Durable Functions,它在具有无服务器托管的 Azure Functions 上运行。

以下示例使用可移植可移植任务 SDK,这些 SDK 在任何主机计算(包括 Azure Container Apps、Kubernetes、虚拟机或本地)上运行。

提示链

提示链是最简单的智能体模式。 你将复杂任务拆分为一系列顺序 LLM 交互,每一步的输出作为下一步的输入。 由于每次活动调用都会自动地设置检查点,因此如果在管道中途发生崩溃,不必从头开始并重新消耗昂贵的 LLM 令牌;执行将从上一次完成的步骤继续。

还可以在步骤之间插入编程验证入口。 例如,生成大纲后,可以在将大纲传递给起草步骤之前验证它是否满足长度或主题约束。

此模式直接映射到 Durable Task 编程模型中的 函数链接 模式。

何时使用: 内容生成管道、多步骤文档处理、顺序数据扩充、需要中间验证入口的工作流。

[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var topic = context.GetInput<string>();

    // Step 1: Generate research outline
    string outline = await context.CallActivityAsync<string>(
        nameof(GenerateOutlineAgent), topic);

    // Step 2: Write first draft from outline
    string draft = await context.CallActivityAsync<string>(
        nameof(WriteDraftAgent), outline);

    // Step 3: Refine and polish the draft
    string finalContent = await context.CallActivityAsync<string>(
        nameof(RefineDraftAgent), draft);

    return finalContent;
}

注释

编排的状态会在每个 await 语句处自动保存检查点。 如果宿主进程崩溃或虚拟机回收,编排将从最后完成的步骤自动恢复,而不是重新开始。

[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, string topic)
    {
        // Step 1: Generate research outline
        string outline = await context.CallActivityAsync<string>(
            nameof(GenerateOutlineAgent), topic);

        // Step 2: Write first draft from outline
        string draft = await context.CallActivityAsync<string>(
            nameof(WriteDraftAgent), outline);

        // Step 3: Refine and polish the draft
        string finalContent = await context.CallActivityAsync<string>(
            nameof(RefineDraftAgent), draft);

        return finalContent;
    }
}

注释

编排的状态会在每个 await 语句处自动保存检查点。 如果宿主进程崩溃或虚拟机回收,编排将从最后完成的步骤自动恢复,而不是重新开始。

路线规划

路由使用分类步骤来确定哪个下游代理或模型应处理请求。 编排首先调用分类器活动,然后根据结果分支到相应处理器。 使用此方法可以独立定制每个处理程序的提示、模型和工具集,例如,将计费问题定向到具有付款 API 访问权限的专用代理,同时将常规问题发送到较轻量级模型。

何时使用: 客户支持会审、针对专用代理的意向分类、基于任务复杂性的动态模型选择。

[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<SupportRequest>();

    // Classify the request type
    string category = await context.CallActivityAsync<string>(
        nameof(ClassifyRequestAgent), request.Message);

    // Route to the appropriate specialized agent
    return category switch
    {
        "billing" => await context.CallActivityAsync<string>(
            nameof(BillingAgent), request),
        "technical" => await context.CallActivityAsync<string>(
            nameof(TechnicalSupportAgent), request),
        "general" => await context.CallActivityAsync<string>(
            nameof(GeneralInquiryAgent), request),
        _ => await context.CallActivityAsync<string>(
            nameof(GeneralInquiryAgent), request),
    };
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, SupportRequest request)
    {
        // Classify the request type
        string category = await context.CallActivityAsync<string>(
            nameof(ClassifyRequestAgent), request.Message);

        // Route to the appropriate specialized agent
        return category switch
        {
            "billing" => await context.CallActivityAsync<string>(
                nameof(BillingAgent), request),
            "technical" => await context.CallActivityAsync<string>(
                nameof(TechnicalSupportAgent), request),
            _ => await context.CallActivityAsync<string>(
                nameof(GeneralInquiryAgent), request),
        };
    }
}

并行化

当存在多个相互独立的子任务时,可以并行派发活动调用,并在所有结果返回后再继续。 持久任务计划程序会自动将这些活动分发到所有可用计算实例,这意味着增加工作节点可以直接缩短整体耗时。

常见的变体是多模型投票:将相同的提示发送到多个模型(或具有不同温度的相同模型),然后从响应中聚合或选择。 由于每个并行分支都是独立检查点的,因此一个分支中的暂时性故障不会影响其他分支。

此模式直接映射到 Durable Task 中的 扇出/扇入 模式。

何时使用: 对文档进行批量分析、并行工具调用、多模型评估、内容审查和多个审阅者。

[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ResearchRequest>();

    // Fan-out: research multiple subtopics in parallel
    var researchTasks = request.Subtopics
        .Select(subtopic => context.CallActivityAsync<string>(
            nameof(ResearchSubtopicAgent), subtopic))
        .ToList();
    string[] researchResults = await Task.WhenAll(researchTasks);

    // Aggregate: synthesize all research into a single summary
    string summary = await context.CallActivityAsync<string>(
        nameof(SynthesizeAgent),
        new { request.Topic, Research = researchResults });

    return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ResearchRequest request)
    {
        // Fan-out: research multiple subtopics in parallel
        var researchTasks = request.Subtopics
            .Select(subtopic => context.CallActivityAsync<string>(
                nameof(ResearchSubtopicAgent), subtopic))
            .ToList();
        string[] researchResults = await Task.WhenAll(researchTasks);

        // Aggregate: synthesize all research into a single summary
        string summary = await context.CallActivityAsync<string>(
            nameof(SynthesizeAgent),
            new { request.Topic, Research = researchResults });

        return summary;
    }
}

编排器-工作器

在此模式中,中心编排器首先通过活动调用 LLM 进行任务规划。 根据 LLM 的输出,业务流程协调程序随后确定需要哪些子任务。 编排器随后将这些子任务分发给专用的工作编排。 与并行化的主要区别在于,设计时未固定子任务集;业务流程协调程序在运行时动态确定它们。

此模式使用子编排,它们是独立检查点的子工作流。 每个工作编排本身也可以包含多个步骤、重试以及嵌套并行。

何时使用: 深入研究管道、编码修改多个文件、多代理协作的代理工作流,其中每个代理具有不同的角色。

[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ResearchRequest>();

    // Central orchestrator: determine what research is needed
    string[] subtasks = await context.CallActivityAsync<string[]>(
        nameof(PlanResearchAgent), request.Topic);

    // Delegate to worker orchestrations in parallel
    var workerTasks = subtasks
        .Select(subtask => context.CallSubOrchestratorAsync<string>(
            nameof(ResearchWorkerOrchestration), subtask))
        .ToList();
    string[] results = await Task.WhenAll(workerTasks);

    // Synthesize results
    string finalReport = await context.CallActivityAsync<string>(
        nameof(SynthesizeAgent),
        new { request.Topic, Research = results });

    return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ResearchRequest request)
    {
        // Central orchestrator: determine what research is needed
        string[] subtasks = await context.CallActivityAsync<string[]>(
            nameof(PlanResearchAgent), request.Topic);

        // Delegate to worker orchestrations in parallel
        var workerTasks = subtasks
            .Select(subtask => context.CallSubOrchestratorAsync<string>(
                nameof(ResearchWorkerOrchestration), subtask))
            .ToList();
        string[] results = await Task.WhenAll(workerTasks);

        // Synthesize results
        string finalReport = await context.CallActivityAsync<string>(
            nameof(SynthesizeAgent),
            new { request.Topic, Research = results });

        return finalReport;
    }
}

评估优化器

计算器优化器模式将生成器代理与精简循环中的计算器代理配对。 生成器生成输出,评估程序根据质量条件对其进行评分并提供反馈,循环将重复,直到输出通过或达到最大迭代计数。 由于每个循环迭代都经过检查点,因此在三轮优化成功后发生崩溃不会丢失该进度。

当质量可以以编程方式进行测量时,此模式特别有用,例如验证生成的代码能够编译,或者翻译能够保留命名实体。

何时使用: 使用自动审阅、文学翻译、迭代内容优化、需要多轮分析的复杂搜索任务生成代码。

[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ContentRequest>();
    int maxIterations = 5;
    string content = "";
    string feedback = "";

    for (int i = 0; i < maxIterations; i++)
    {
        // Generate or refine content
        content = await context.CallActivityAsync<string>(
            nameof(GenerateContentAgent),
            new { request.Prompt, PreviousContent = content, Feedback = feedback });

        // Evaluate quality
        var evaluation = await context.CallActivityAsync<EvaluationResult>(
            nameof(EvaluateContentAgent), content);

        if (evaluation.MeetsQualityBar)
            return content;

        feedback = evaluation.Feedback;
    }

    return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ContentRequest request)
    {
        int maxIterations = 5;
        string content = "";
        string feedback = "";

        for (int i = 0; i < maxIterations; i++)
        {
            // Generate or refine content
            content = await context.CallActivityAsync<string>(
                nameof(GenerateContentAgent),
                new { request.Prompt, PreviousContent = content, Feedback = feedback });

            // Evaluate quality
            var evaluation = await context.CallActivityAsync<EvaluationResult>(
                nameof(EvaluateContentAgent), content);

            if (evaluation.MeetsQualityBar)
                return content;

            feedback = evaluation.Feedback;
        }

        return content; // Return best effort after max iterations
    }
}

代理循环

在典型的 AI 代理实现中,LLM 在一个循环中调用工具并做出决策,直到任务完成或满足停止条件。 与确定性工作流不同,不会预定义执行路径。 代理根据前面步骤的结果确定在每个步骤中要执行的操作。

代理循环非常适合无法预测步骤的数量或顺序的任务。 常见示例包括开放式编码代理、自主研究和具有工具调用功能的聊天机器人。

有两种使用 Durable Task 编程模型实现代理循环的建议方法:

方法 说明 何时使用
基于编排 将智能体循环写为持久化编排。 工具调用通过活动来实现,人工输入使用外部事件。 编排控制循环结构,而 LLM 控制其中的决策。 你需要对循环进行精细控制,包括每个工具的重试策略、工具调用的分布式负载均衡,或者能够在 IDE 中使用断点来调试循环。
基于实体 每个代理实例都是一个持久实体。 代理框架在内部管理循环,实体提供持久化状态和会话持久化功能。 你使用的是已实现代理循环的代理框架(如 Microsoft Agent Framework),并且希望通过最少的代码更改添加持久性。

基于编排的代理循环

基于编排的智能体循环结合了多项持久任务能力:使用“永续编排(continue-as-new)”限制内存增长,使用扇出/扇入并行执行工具,以及通过外部事件实现人机协同交互。 循环的每个迭代:

  1. 通过活动或有状态实体将当前对话上下文发送给 LLM。
  2. 接收 LLM 的响应,其中包括工具调用。
  3. 以活动的形式执行任何工具调用(分布在可用计算中)。
  4. (可选)使用外部事件等待人工输入。
  5. 使用更新后的状态继续循环,或在代理发出完成信号时完成。
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    // Get state from input (supports continue-as-new)
    var state = context.GetInput<AgentState>() ?? new AgentState();

    int maxIterations = 100;
    while (state.Iteration < maxIterations)
    {
        // Send conversation history to the LLM
        var llmResponse = await context.CallActivityAsync<LlmResponse>(
            nameof(CallLlmAgent), state.Messages);

        state.Messages.Add(llmResponse.Message);

        // If the LLM returned tool calls, execute them in parallel
        if (llmResponse.ToolCalls is { Count: > 0 })
        {
            var toolTasks = llmResponse.ToolCalls
                .Select(tc => context.CallActivityAsync<ToolResult>(
                    nameof(ExecuteTool), tc))
                .ToList();
            ToolResult[] toolResults = await Task.WhenAll(toolTasks);

            foreach (var result in toolResults)
                state.Messages.Add(result.ToMessage());
        }
        // If the LLM needs human input, wait for it
        else if (llmResponse.NeedsHumanInput)
        {
            string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
            state.Messages.Add(new Message("user", humanInput));
        }
        // LLM is done
        else
        {
            return llmResponse.FinalAnswer;
        }

        state.Iteration++;

        // Periodically continue-as-new to keep the history bounded
        if (state.Iteration % 10 == 0)
        {
            context.ContinueAsNew(state);
            return null!; // Orchestration will restart with updated state
        }
    }

    return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, AgentState? state)
    {
        state ??= new AgentState();

        int maxIterations = 100;
        while (state.Iteration < maxIterations)
        {
            // Send conversation history to the LLM
            var llmResponse = await context.CallActivityAsync<LlmResponse>(
                nameof(CallLlmAgent), state.Messages);

            state.Messages.Add(llmResponse.Message);

            // If the LLM returned tool calls, execute them
            if (llmResponse.ToolCalls is { Count: > 0 })
            {
                var toolTasks = llmResponse.ToolCalls
                    .Select(tc => context.CallActivityAsync<ToolResult>(
                        nameof(ExecuteTool), tc))
                    .ToList();
                ToolResult[] toolResults = await Task.WhenAll(toolTasks);

                foreach (var result in toolResults)
                    state.Messages.Add(result.ToMessage());
            }
            // If the LLM needs human input, wait for it
            else if (llmResponse.NeedsHumanInput)
            {
                string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
                state.Messages.Add(new Message("user", humanInput));
            }
            // LLM is done
            else
            {
                return llmResponse.FinalAnswer;
            }

            state.Iteration++;

            // Periodically continue-as-new to keep the history bounded
            if (state.Iteration % 10 == 0)
            {
                context.ContinueAsNew(state);
                return null!;
            }
        }

        return "Max iterations reached.";
    }
}

基于实体的代理循环

如果使用已实现其自己的代理循环的代理框架,则可以将其包装在 持久实体 中,以添加持久性,而无需重写循环逻辑。 每个实体实例表示单个代理会话。 实体接收消息后,内部将其委托给代理框架,并在交互过程中持续保留会话状态。

此方法的主要优点是简单性:使用首选框架编写代理,并将持久性添加为托管问题,而不是重新设计代理的控制流。 实体充当持久包装器,自动处理会话持久性和恢复。

以下示例演示如何将现有代理 SDK 包装为持久实体。 实体公开了一个message操作,客户端可以调用该操作以发送用户输入。 在内部,实体委托给代理框架,该框架管理自己的工具调用循环。

// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
    private readonly IChatClient _chatClient;

    public ChatAgentEntity(IChatClient chatClient)
    {
        _chatClient = chatClient;
    }

    // Called by clients to send a message to the agent
    public async Task<string> Message(string userMessage)
    {
        // Add the user message to the conversation history
        State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));

        // Delegate to the agent SDK for the LLM call (with tool loop)
        ChatResponse response = await _chatClient.GetResponseAsync(
            State.Messages, State.Options);

        // Persist the response in the entity state
        State.Messages.AddRange(response.Messages);

        return response.Text;
    }

    // Azure Functions entry point for the entity
    [Function(nameof(ChatAgentEntity))]
    public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
    {
        return dispatcher.DispatchAsync<ChatAgentEntity>();
    }
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
    private readonly IChatClient _chatClient;

    public ChatAgentEntity(IChatClient chatClient)
    {
        _chatClient = chatClient;
    }

    // Called by clients to send a message to the agent
    public async Task<string> Message(string userMessage)
    {
        // Add the user message to the conversation history
        State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));

        // Delegate to the agent SDK for the LLM call (with tool loop)
        ChatResponse response = await _chatClient.GetResponseAsync(
            State.Messages, State.Options);

        // Persist the response in the entity state
        State.Messages.AddRange(response.Messages);

        return response.Text;
    }
}

Microsoft Agent Framework 的 Durable 任务扩展使用此方法。 它将 Microsoft Agent Framework 代理包装为持久实体,提供持久会话、自动检查点和具有单行配置的内置 API 终结点。

后续步骤