通过


System.Threading.Channels 库

System.Threading.Channels 命名空间提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。 该库面向 .NET、.NET Standard 和 .NET Framework,适用于所有 .NET 实现。

此库在 System.Threading.Channels NuGet 包中📦可用。 但是,如果使用 .NET Core 3.0 或更高版本,包将作为共享框架的一部分包含在内。

生成者/使用者概念编程模型

通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型通过先进先出(“FIFO”)队列将数据从一方传递到另一方。 将通道视为任何其他通用泛型集合类型,例如 List<T>。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如:

  • 允许存储的元素数以及达到该限制时会发生什么情况。
  • 通道是否被多个生产者或多个消费者同时访问。

基本用法

以下示例演示了信道的基本用法,其中生产者写入条目,消费者读取这些条目。

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

边界限制策略

根据 Channel<T> 的创建方式,其读取器和编写器的行为会有所不同。

若要创建指定最大容量的通道,请调用 Channel.CreateBounded。 若要创建由任意数量的读取器和编写器并发使用的通道,请调用 Channel.CreateUnbounded。 每个边界策略都会公开各种由创建者定义的选项,分别是 BoundedChannelOptionsUnboundedChannelOptions

注意

无论边界策略如何,当通道在关闭后被使用时,它总是会抛出ChannelClosedException异常。

无界通道

若要创建无界通道,请调用 Channel.CreateUnbounded 重载之一:

var channel = Channel.CreateUnbounded<T>();

默认情况下,创建无界通道时,该通道可以同时供任意数量的读取器和编写器使用。 或者,在创建无界通道时,可通过提供 UnboundedChannelOptions 实例来指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行。 如需更多示例,请参阅无界创建模式

有界通道

若要创建有界通道,请调用 Channel.CreateBounded 重载之一:

var channel = Channel.CreateBounded<T>(7);

前面的代码创建了一个最大容量为 7 个项的通道。 创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间变得可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道。 有关其他示例,请参阅有界创建模式

全模式行为

使用有界通道时,可指定在达到配置的边界时通道会遵循的行为。 下表列出了每个 BoundedChannelFullMode 值的完整模式行为:

行为
BoundedChannelFullMode.Wait 这是默认值。 对 WriteAsync 的调用要等到空间可用才能完成写入操作。 调用 TryWrite 会立即返回 false
BoundedChannelFullMode.DropNewest 删除并忽略通道中的最新项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropOldest 删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。

重要

每当 Channel<TWrite,TRead>.Writer 的生成速度快于 Channel<TWrite,TRead>.Reader 的使用速度时,通道的编写器都会遇到反压力。

生成者 API

生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:

API 预期行为
ChannelWriter<T>.Complete 将通道标记为已完成,这意味着不会再向该通道写入项。
ChannelWriter<T>.TryComplete 尝试将通道标记为“已完成”,这表示不再往该通道写入数据。
ChannelWriter<T>.TryWrite 尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 trueChannelWriter<T>.Complete 发出完成信号,否则这将始终返回 ChannelWriter<T>.TryComplete
ChannelWriter<T>.WaitToWriteAsync 返回在有可用空间来写入项时完成的 ValueTask<TResult>
ChannelWriter<T>.WriteAsync 以异步方式将元素写入到通道。

使用者 API

使用者功能在 Channel<TWrite,TRead>.Reader 上公开。 下表详细介绍了使用者 API 和预期行为:

API 预期行为
ChannelReader<T>.ReadAllAsync 创建允许从通道中读取所有数据的 IAsyncEnumerable<T>
ChannelReader<T>.ReadAsync 以异步方式从通道中读取项。
ChannelReader<T>.TryPeek 尝试从通道中查看项。
ChannelReader<T>.TryRead 尝试从通道中读取项。
ChannelReader<T>.WaitToReadAsync 返回在有数据可读取时完成的 ValueTask<TResult>

常见使用模式

频道有多种使用模式:

API 设计为简单、一致且尽可能灵活。 所有异步方法都返回一个表示轻量级异步操作的 ValueTask(或 ValueTask<bool>),如果操作同步完成,甚至可能异步完成,则可以避免分配。 此外,API 被设计为可组合,因为通道的创建者承诺了其预期用途。 当使用某些参数创建通道时,内部实现可以在知道这些承诺的情况下更高效地运行。

创建模式

假设你正在为全球定位系统 (GPS) 创建生成者/使用者解决方案。 你想要跟踪设备随时间推移的坐标。 示例坐标对象可能如下所示:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

无界创建模式

一种常见的使用模式是创建默认 的未绑定 通道:

var channel = Channel.CreateUnbounded<Coordinates>();

设想你想要创建一个无界通道,并配有多个生产者和消费者。 设置 SingleWriter = falseSingleReader = false 在频道选项中:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

在这种情况下,所有写入都是同步的,即使是 WriteAsync。 发生此行为是因为未绑定的通道始终具有可立即写入的空间。 通过将AllowSynchronousContinuations设置为true,写入操作可能最终执行与读取器相关的工作,这是因为它们执行了延续操作。 此设置不会影响操作的同步性。

有界创建模式

使用 边界 通道时,使用者应知道通道的配置性,以帮助确保适当的使用。 也就是说,使用者应了解在达到配置的边界时,通道会表现出什么行为。 以下示例演示了一些常见的有限创建模式。

创建绑定通道的最简单方法是指定容量。 以下代码创建最大容量为 1 的有限通道。

var channel = Channel.CreateBounded<Coordinates>(1);

其他选项可用。 某些选项与未绑定通道相同,而其他选项则特定于绑定通道。 在以下代码中,通道被创建为一个限制为 1,000 个项目的有界通道,其中有单个写入者,但有许多读取者。 其完整模式行为被定义为 DropWrite,这意味着如果通道已满,它会删除正在写入的项。

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

使用有界通道时,若要观察被丢弃的项目,请注册itemDropped 回调:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

每当通道已满且添加了新项时,都会调用 itemDropped 回调。 在此示例中,提供的回调将项写入控制台,但你可以自由执行所需的任何其他操作。

生产者模式

假设此场景中的生产者正在向通道写入新坐标。 生成者可以通过调用 TryWrite 实现这一点。

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

前面的生成者代码:

  • 接受 Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) 作为参数,以及初始 Coordinates
  • 定义一个条件 while 循环,该循环尝试使用 TryWrite 移动坐标。

替代生成者可能使用 WriteAsync 方法:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

同样,Channel<Coordinates>.Writer 被用在 while 循环中。 但这次调用了 WriteAsync 方法。 该方法仅在写入坐标后继续。 当 while 循环退出时,会调用 Complete,这表示不再向通道写入数据。

另一种生成者模式是使用 WaitToWriteAsync 方法,请考虑以下代码:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

作为条件 while 的一部分,WaitToWriteAsync 调用的结果用于确定是否继续循环。

使用者模式

有几种常见的通道使用者模式。 当通道永不结束,意味着它将无限地生成数据时,使用者可使用 while (true) 循环,并在数据可用时读取数据:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

注意

如果通道关闭,此代码将引发异常。

替代使用者可以通过使用嵌套的 while 循环来避免这种问题,如以下代码所示:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

在前面的代码中,使用者等待读取数据。 数据可用后,使用者便会尝试读取数据。 这些循环会继续评估,直到通道的生成者表示它不再具有要读取的数据。 也就是说,当已知生成者具有生成的有限数量的项,并且当生成者发出完成信号时,使用者可使用 await foreach 语义循环访问这些项:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

前面的代码使用 ReadAllAsync 方法从通道读取所有坐标。

多个生产者和消费者

通道支持多个同时并发的生成者和消费者。 若要启用此功能,请在通道选项中创建包含 SingleWriter = falseSingleReader = false 的通道。 然后在多个生成者任务中进行扇出写操作,并在多个使用者任务中进行扇入读操作。

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

前面的代码:

  • 创建显式支持多个并发编写器和读取器的未绑定通道。
  • 启动三个并发的生产者任务,每个任务都编写一系列具有唯一设备标识符的坐标。
  • 启动两个并发使用者任务,每个任务都使用 ReadAllAsync同一通道读取。
  • 等待所有生成者完成,然后调用 Complete 以指示不再向通道写入任何数据。
  • 等待所有使用者完成从通道中清空剩余数据。

小窍门

使用多个生成者时,仅在channel.Writer.Complete()生成者完成写入后调用。 这表示不再写入任何数据,允许 ReadAllAsync() 在使用所有剩余项目后完成。

另请参阅