Executores

Os executores são os blocos de construção fundamentais que processam mensagens em um fluxo de trabalho. Eles são unidades de processamento autônomas que recebem mensagens digitadas, executam operações e podem produzir mensagens de saída ou eventos.

Visão geral

Cada executor tem um identificador exclusivo e pode lidar com tipos de mensagens específicos. Os executores podem ser:

  • Componentes lógicos personalizados — dados de processo, APIs de chamadas ou mensagens de transformação
  • Agentes de IA — utilizam LLMs para gerar respostas (ver Agentes em Workflows)

Importante

A forma recomendada de definir manipuladores de mensagens executores em C# é usar o [MessageHandler] atributo em métodos dentro de uma partial classe que deriva de Executor. Isto utiliza geração de código-fonte em tempo de compilação para registo de handlers, proporcionando melhor desempenho, validação em tempo de compilação e compatibilidade nativa com AOT.

Estrutura Básica do Executor

Os executores derivam da Executor classe base e usam o [MessageHandler] atributo para declarar métodos handler. A classe deve ser marcada partial para permitir a geração de código-fonte.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        return ValueTask.FromResult(result); // Return value is automatically sent to connected executors
    }
}

Também pode enviar mensagens manualmente sem devolver um valor:

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        await context.SendMessageAsync(result); // Manually send messages to connected executors
    }
}

Sugestão

Os executores podem manter um estado mutável. Se um executor com estado for partilhado entre execuções de workflow, deve implementar IResettableExecutor para eliminar o estado obsoleto entre execuções. Consulte Resetable Executors para mais detalhes.

Múltiplos Tipos de Entrada

Lidar com múltiplos tipos de entrada definindo múltiplos [MessageHandler] métodos:

internal sealed partial class SampleExecutor() : Executor("SampleExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleStringAsync(string message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message.ToUpperInvariant());
    }

    [MessageHandler]
    private ValueTask<int> HandleIntAsync(int message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message * 2);
    }
}

Executores Baseados em Função

Crie um executor a partir de uma função usando o BindExecutor método de extensão:

Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

O Objeto IWorkflowContext

O IWorkflowContext fornece métodos para interagir com o fluxo de trabalho durante a execução.

  • SendMessageAsync — enviar mensagens para executores ligados
  • YieldOutputAsync — produzir saídas de fluxo de trabalho retornadas/transmitidas ao chamador
internal sealed partial class OutputExecutor() : Executor("OutputExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        await context.YieldOutputAsync("Hello, World!");
    }
}

Se um handler não enviar mensagens nem gerar saídas, pode simplesmente realizar efeitos secundários:

internal sealed partial class LogExecutor() : Executor("LogExecutor")
{
    [MessageHandler]
    private void Handle(string message, IWorkflowContext context)
    {
        Console.WriteLine("Doing some work...");
    }
}

Estrutura Básica do Executor

Os executores herdam da Executor classe base. Cada executor utiliza métodos decorados com o @handler decorador. Os manipuladores devem ter anotações de tipo adequadas para especificar os tipos de mensagens que processam.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        await ctx.send_message(text.upper())

Executores Baseados em Função

Crie um executor a partir de uma função usando o @executor decorador:

from agent_framework import (
    WorkflowContext,
    executor,
)

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Convert the input to uppercase and forward it to the next node."""
    await ctx.send_message(text.upper())

Múltiplos Tipos de Entrada

Lidar com múltiplos tipos de entrada definindo múltiplos handlers:

class SampleExecutor(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())

    @handler
    async def double_integer(self, number: int, ctx: WorkflowContext[int]) -> None:
        await ctx.send_message(number * 2)

Parâmetros de Tipo Explícitos

Como alternativa às anotações de tipos, pode especificar os tipos explicitamente através dos parâmetros do decorador:

Importante

Ao usar parâmetros de tipo explícitos, deve especificar todos os tipos através do decorador — não pode misturar parâmetros explícitos com anotações de tipos. O input parâmetro é obrigatório; output e workflow_output são opcionais.

class ExplicitTypesExecutor(Executor):

    @handler(input=str, output=str)
    async def to_upper_case(self, text, ctx) -> None:
        await ctx.send_message(text.upper())

    @handler(input=str | int, output=str)
    async def handle_mixed(self, message, ctx) -> None:
        await ctx.send_message(str(message).upper())

    @handler(input=str, output=int, workflow_output=bool)
    async def process_with_workflow_output(self, message, ctx) -> None:
        await ctx.send_message(len(message))
        await ctx.yield_output(True)

O Objeto WorkflowContext

O WorkflowContext fornece métodos para interagir com o fluxo de trabalho durante a execução.

  • send_message — enviar mensagens para executores ligados
  • yield_output — produzir saídas de fluxo de trabalho retornadas/transmitidas ao chamador
class OutputExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext[Never, str]) -> None:
        await ctx.yield_output("Hello, World!")

Se um handler não enviar mensagens nem fornecer saídas, não é necessário nenhum parâmetro de tipo:

class LogExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext) -> None:
        print("Doing some work...")

Próximos passos