通过


你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

异步请求-答复模式

当后端处理需要异步运行时,从前端主机分离后端处理,但前端需要明确的响应。

上下文和问题

在现代应用程序开发中,客户端应用程序通常依赖于远程 API 来提供业务逻辑和撰写功能。 许多应用程序在 Web 浏览器中运行代码,其他环境也托管客户端代码。 这些 API 可能与应用程序直接相关,或者作为外部服务的共享服务运行。 大多数 API 调用使用 HTTP 或 HTTPS,并遵循 REST 语义。

在大多数情况下,客户端应用程序的 API 以大约 100 毫秒(ms)或更少的方式响应。 许多因素可能会影响响应延迟:

  • 应用程序的托管堆栈
  • 安全组件
  • 调用方和后端的相对地理位置
  • 网络基础结构
  • 当前负载
  • 请求有效负载的大小
  • 处理队列长度
  • 后端处理请求的时间

这些因素可能会增加响应的延迟。 可以通过横向扩展后端来缓解某些因素。 其他因素(如网络基础结构)不在应用程序开发人员的控制之外。 大多数 API 的响应速度足够快,以便响应通过同一连接返回。 应用程序代码可以通过非阻止方式进行同步 API 调用,从而提供异步处理的外观。 我们建议针对输入和输出(I/O)绑定的操作使用此方法。

在某些情况下,后端会执行长时间运行且需要几秒钟的工作。 在其他情况下,后端会执行需耗时几分钟或更长时间的后台工作。 在这些情况下,在发送响应之前,无法等待工作完成。 这种情况可能会为同步请求-回复模式创建问题。 有关设计后端处理的指南,请参阅 后台作业

一些体系结构使用消息代理来分离请求和响应阶段,从而解决此问题。 许多系统通过 Queue-Based 负载调配模式实现这种分离。 这种分离使客户端进程和后端 API 可以独立扩展。 当客户端需要成功通知时,它还引入了额外的复杂性,因为该步骤也必须成为异步步骤。

适用于客户端应用程序的许多相同注意事项也适用于分布式系统中的服务器到服务器 REST API 调用,例如在微服务体系结构中。

解决方案

此问题的一种解决方案是使用 HTTP 轮询。 当回调终结点不可用或长期运行的连接增加过多复杂性时,轮询机制适用于客户端的代码。 即使可以进行回调,它们需要的额外库和服务也会增加复杂性。

以下步骤描述了解决方案:

  • 客户端应用程序对 API 进行同步调用,以在后端触发一个耗时的操作。

  • API 会尽快地进行同步响应。 它返回 HTTP 202(已接受)状态代码,确认它收到了处理请求。

    注释

    API 应在启动长时间运行的进程之前验证请求和要执行的操作。 如果请求无效,请使用 HTTP 400(错误请求)等错误代码立即回复。

  • 响应包括一个位置引用,该引用指向一个客户端可以轮询的终结点,以检查长时间运行操作的结果。

  • API 将处理卸载到另一个组件,例如消息队列。

  • 对于每次成功调用状态终结点,终结点将返回 HTTP 200(确定)。 当工作正在进行时,状态终结点返回一个指示该状态的资源。 状态响应正文应包含足够的信息,以便客户端了解操作的当前状态。

    当工作完成时,状态终结点返回一个资源,该资源指示完成或重定向到另一个资源 URL。 例如,如果异步操作执行创建新资源,那么状态终结点会重定向到该资源的 URL。

下图显示了典型的流。

显示异步 HTTP 请求的请求和响应流的关系图。

  1. 客户端发送请求并接收 HTTP 202(已接受)响应。

  2. 客户端向状态终结点发送 HTTP GET 请求。 由于工作仍处于待处理状态,因此该调用返回 HTTP 200。

  3. 在某些时候,工作完成,状态终结点返回 HTTP 303(请参阅其他)以重定向到资源。

  4. 客户端从指定的 URL 提取资源。

问题和注意事项

在决定如何实现此模式时,请考虑以下几点:

  • 有多种方法可以通过 HTTP 实现此模式,上游服务并不总是使用相同的语义。 例如,某些实现不使用单独的状态终结点。 相反,客户端会直接轮询目标资源 URL,并在创建资源之前接收 HTTP 404(找不到)。 此响应有意义,因为资源尚不存在。 但是,如果为无效的请求 ID 返回了 404,则此方法可能不明确。 用于返回包含状态正文的 HTTP 200 的专用状态端点,如此模式中所述,可避免这种歧义。

  • HTTP 202 响应指示客户端轮询的位置以及频率。 它应包含以下标头。

    Header 说明 备注
    Location 用于客户端轮询响应状态的 URL 此 URL 可以是共享访问签名令牌。 当此位置需要访问控制时, 辅助密钥模式 非常有效。 当需要将响应轮询切换到另一个后端时,该模式也适用。
    Retry-After 估计处理何时完成 此标头旨在防止轮询客户端向后端发送过多请求。

    设计此响应时,请考虑预期的客户端行为。 你控制的客户端可以完全遵循这些响应值。 其他人创作的客户端(包括使用无代码或低代码工具生成的客户端,如 Azure Logic Apps)可以应用自己的 HTTP 202 处理。

  • 请考虑在状态终结点响应中包含以下字段。

    领域 说明 备注
    status 操作的当前状态,例如 挂起正在运行成功失败已取消 使用一致的、记录的终端和非终端值集。
    createdAt 操作被接受的时间。 帮助客户端检测过时或已放弃的操作。
    lastUpdatedAt 上次更新状态的时间。 允许客户端将停止的操作与正在积极进行的操作区分开来。
    percentComplete 可选进度指示器。 当后端可以有意义地估计进度时非常有用。
    error 状态为 “失败”时的结构化错误对象。 请考虑使用 RFC 9457 格式保持一致性。
  • 可能需要使用处理代理来根据所使用的基础服务来调整响应标头或有效负载。

  • 如果状态终结点在完成后重定向,请使用 HTTP 303(请参阅其他)。 303 指示客户端向重定向 URL 发出 GET 请求,而不考虑原始请求方法。 此行为是此模式的正确语义,因为客户端正在检索不同的结果资源,而不是重新提交原始操作。 HTTP 302(已找到) 不能保证方法更改;某些客户端在重定向时重播原始方法,这可能会导致意外的副作用,例如重复的 POST 请求。

  • 服务器成功处理请求后,标头指定的资源 Location 将返回 HTTP 状态代码(如 200、201(已创建)或 204(无内容)。

  • 如果在处理过程中发生错误,请在标头指定的资源 URL Location 中保留错误,并从该资源返回与失败匹配的 4xx 状态代码。 使用结构化错误格式(例如 RFC 9457(HTTP API 的问题详细信息), 以便客户端可以编程方式分析和处理故障。

  • 状态资源和任何存储的结果使用存储和计算。 定义一个留存策略,以在合理的时间段后清理数据,并考虑通过 Expires 状态响应的头部信息将留存时间段传达给客户端。

  • 解决方案并不是都以相同的方式实现此模式,某些服务包括额外的或备用标头。 例如,Azure Resource Manager 使用一种此模式的修改变体。 有关详细信息,请参阅 Resource Manager 异步操作

  • 旧版客户端可能不支持此模式。 在这种情况下,可能需要在异步 API 上放置外观,以对原始客户端隐藏异步处理。 例如,逻辑应用原生支持此模式,并且可以将其用作异步 API 与进行同步调用的客户端之间的集成层。 有关详细信息,请参阅 Azure 逻辑应用中的异步请求响应行为

  • 在某些情况下,可能希望为客户端提供一种方法来取消长时间运行的请求。 在这种情况下,对状态终结点资源公开 DELETE 操作。 此请求应将取消指令转发到后端处理组件。 后端处理取消后,应更新状态资源以反映已取消状态。 此过程有助于防止不完整的工作无限期地消耗资源。 请考虑该操作是支持部分回滚,还是应该视为补偿性交易。

  • 考虑在提交初始请求时要求客户端提供幂等密钥(例如,在 Idempotency-Key 请求标头中)。 如果后端收到重复密钥,它应返回现有状态资源,而不是排队第二个工作项。 此方法可防止网络故障,导致客户端重试服务器已接受的 POST。 此模式尤其重要,因为客户端无法区分丢失的响应与从未收到的请求。

注释

此模式描述 HTTP 轮询,客户端会定期发出新请求来检查状态。 长轮询是一种相关但独特的技术:客户端发送请求,服务器将连接保持打开状态,直到有新数据可用或发生超时。 与定期轮询相比,长轮询减少了响应延迟,但对连接管理和超时带来了复杂性。

何时使用此模式

在以下情况下使用此模式:

  • 在处理客户端代码(例如浏览器应用程序)时,由于这些约束,可能会导致回调终端点难以提供,或者长时间运行的连接会增加过多的复杂性。

  • 调用仅使用 HTTP 协议的服务,并且返回服务由于客户端上的防火墙限制而无法发送回调。

  • 与不支持现代回调机制(如 WebSocket 或 Webhook)的应用程序集成。

在以下情况下,此模式可能不适用:

  • 可以改用为异步通知生成的服务,例如Azure Event Grid。

  • 响应必须实时流式传输到客户端。 请考虑服务器发送事件(SSE),这种技术提供了一种从服务器到客户端的轻量级、原生于HTTP的单向推送通道,无需客户端进行轮询。

  • 客户端需要收集许多结果,并且这些结果的延迟非常重要。 考虑使用消息中间件。

  • 可以使用 WebSocket 或 SignalR 等服务器端持久性网络连接。 可以使用这些连接通知调用方结果。

  • 网络设计支持开放端口以接收异步回调或网络钩子。

工作负载设计

架构师应评估如何在工作负荷设计中使用异步请求-应答模式,以解决Azure Well-Architected Framework 的原则支柱中涵盖的目标和原则。

支柱 此模式如何支持支柱目标
通过缩放、数据和代码的优化,性能效率可帮助工作负荷高效地满足需求 通过分离不需要即时响应的进程的请求和回复阶段,可以提高响应能力和可伸缩性。 异步方法会增加并发能力,使服务器能够在容量可用时进行工作计划。

- PE:05 缩放和分区
- PE:07 代码和基础结构

与任何设计决策一样,请考虑权衡此模式可能引入的其他支柱的目标。

示例

以下代码演示了使用Azure Functions实现此模式的应用程序的摘录。 此解决方案具有三个功能:

  • 异步 API 端点
  • 状态终结点
  • 一个后端函数,它获取排队的工作项并运行它们

函数中异步请求答复模式的结构示意图。

GitHub logo. 此示例在 GitHub 上提供。

实现使用托管标识通过 Azure 服务总线和 Azure Blob 存储进行身份验证,从而避免存储连接字符串或帐户密钥。 依赖项在Program.cs中使用DefaultAzureCredential进行注册,并通过主构造函数注入。

AsyncProcessingWorkAcceptor 函数

AsyncProcessingWorkAcceptor函数实现一个终结点,接受来自客户端应用程序的工作,并将其排队进行处理。

  • 该函数生成请求 ID 并将其作为元数据添加到队列消息中。

  • HTTP 响应包括指向 Location 状态终结点的标头,以及 Retry-After 建议轮询间隔的标头。 请求 ID 显示在 URL 路径中。

public class AsyncProcessingWorkAcceptor(ServiceBusClient _serviceBusClient)
{
    [Function("AsyncProcessingWorkAcceptor")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req,
        [FromBody] CustomerPOCO customer)
    {
        if (string.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
        {
            return new BadRequestResult();
        }

        string requestId = Guid.NewGuid().ToString();

        string statusUrl = $"https://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{requestId}";

        var messagePayload = JsonConvert.SerializeObject(customer);
        var message = new ServiceBusMessage(messagePayload);
        message.ApplicationProperties.Add("RequestGUID", requestId);
        message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.UtcNow);
        message.ApplicationProperties.Add("RequestStatusURL", statusUrl);
        var sender = _serviceBusClient.CreateSender("outqueue");

        await sender.SendMessageAsync(message);

        req.HttpContext.Response.Headers["Retry-After"] = "5";

        return new AcceptedResult(statusUrl, null);
    }
}

AsyncProcessingBackgroundWorker 函数

AsyncProcessingBackgroundWorker 函数从队列中读取操作,根据消息有效负载处理操作,并将结果写入存储帐户。

public class AsyncProcessingBackgroundWorker(BlobContainerClient _blobContainerClient)
{
    [Function("AsyncProcessingBackgroundWorker")]
    public async Task Run(
        [ServiceBusTrigger("outqueue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
    {
        // Perform an actual action against the blob data source for the async readers to be able to check against.
        // This is where your actual service worker processing will be performed

        var requestGuid = message.ApplicationProperties["RequestGUID"].ToString();
        string blobName = $"{requestGuid}.blobdata";

        var blobClient = _blobContainerClient.GetBlobClient(blobName);
        using (MemoryStream memoryStream = new MemoryStream())
        using (StreamWriter writer = new StreamWriter(memoryStream))
        {
            writer.Write(message.Body.ToString());
            writer.Flush();
            memoryStream.Position = 0;

            await blobClient.UploadAsync(memoryStream, overwrite: true);
        }
    }
}

AsyncOperationStatusChecker 函数

AsyncOperationStatusChecker 函数实现状态终结点。 此函数检查请求的状态:

public class AsyncOperationStatusChecker(ILogger<AsyncOperationStatusChecker> _logger)
{
    [Function("AsyncOperationStatusChecker")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{requestId}")] HttpRequest req,
        [BlobInput("data/{requestId}.blobdata", Connection = "DataStorage")] BlockBlobClient inputBlob, string requestId)
    {
        OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
        OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");

        _logger.LogInformation("Received status request for {RequestId} - OnComplete {OnComplete} - OnPending {OnPending}",
            requestId, OnComplete, OnPending);

        // Check whether the blob exists.
        if (await inputBlob.ExistsAsync())
        {
            // If the blob exists, the function uses the OnComplete parameter to determine the next action.
            return await OnCompleted(OnComplete, inputBlob, requestId, req);
        }
        else
        {
            // If the blob doesn't exist, the function uses the OnPending parameter to determine the next action.
            switch (OnPending)
            {
                case OnPendingEnum.OK:
                    {
                        // Return an HTTP 200 status code.
                        return new OkObjectResult(new { status = "In progress", Location = rqs });
                    }

                case OnPendingEnum.Synchronous:
                    {
                        // Long polling example: hold the connection open and check for completion
                        // using exponential backoff. Time out after approximately one minute.
                        int backoff = 250;

                        while (!await inputBlob.ExistsAsync() && backoff < 64000)
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - retrying in {Backoff} ms", requestId, backoff);
                            backoff = backoff * 2;
                            await Task.Delay(backoff);
                        }

                        if (await inputBlob.ExistsAsync())
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - completed after {Backoff} ms", requestId, backoff);
                            return await OnCompleted(OnComplete, inputBlob, requestId, req);
                        }
                        else
                        {
                            _logger.LogInformation("Synchronous mode {RequestId} - NOT FOUND after timeout {Backoff} ms", requestId, backoff);
                            return new NotFoundResult();
                        }
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnPending}");
                    }
            }
        }
    }

    private async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string requestId, HttpRequest req)
    {
        switch (OnComplete)
        {
            case OnCompleteEnum.Redirect:
                {
                    // Generate a user delegation SAS URI using managed identity credentials.
                    BlobServiceClient blobServiceClient = inputBlob.GetParentBlobContainerClient().GetParentBlobServiceClient();
                    var userDelegationKey = await blobServiceClient.GetUserDelegationKeyAsync(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddDays(7));

                    // Return 303 See Other to redirect the client to the result resource.
                    // GenerateUserDelegationSasUri is a custom helper; see the full implementation on GitHub.
                    req.HttpContext.Response.Headers.Location = GenerateUserDelegationSasUri(inputBlob, userDelegationKey);;
                    return new StatusCodeResult(StatusCodes.Status303SeeOther);
                }

            case OnCompleteEnum.Stream:
                {
                    // Download the file and return it directly to the caller.
                    // For larger files, use a stream to minimize RAM usage.
                    return new OkObjectResult(await inputBlob.DownloadContentAsync());
                }

            default:
                {
                    throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                }
        }
    }
}

public enum OnCompleteEnum
{
    Redirect,
    Stream
}

public enum OnPendingEnum
{
    OK,
    Synchronous
}

后续步骤