Edit

Share via


System.IO.Pipelines

System.IO.Pipelines is a library designed to make high-performance I/O in .NET easier. The package targets .NET Standard for broad compatibility, .NET Framework, and modern .NET. In modern .NET versions, System.IO.Pipelines is included in the shared framework and doesn't require a separate NuGet package.

The library is also available as the System.IO.Pipelines NuGet package.

What problem does System.IO.Pipelines solve?

Apps that parse streaming data are composed of boilerplate code having many specialized and unusual code flows. The boilerplate and special case code is complex and difficult to maintain.

System.IO.Pipelines was architected to:

  • Have high performance parsing streaming data.
  • Reduce code complexity.

This code is typical for a TCP server that receives line-delimited messages (delimited by '\n') from a client:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

The preceding code has several problems:

  • The entire message (end of line) might not be received in a single call to ReadAsync.
  • It's ignoring the result of stream.ReadAsync. stream.ReadAsync returns how much data was read.
  • It doesn't handle the case where multiple lines are read in a single ReadAsync call.
  • It allocates a byte array with each read.

To fix the preceding problems, make these changes:

  • Buffer the incoming data until a new line is found.

  • Parse all the lines returned in the buffer.

  • The line might be bigger than 1 KB (1024 bytes). The code needs to resize the input buffer until the delimiter is found to fit the complete line inside the buffer.

    • If the buffer is resized, more buffer copies are made as longer lines appear in the input.
    • To reduce wasted space, compact the buffer used for reading lines.
  • Consider using buffer pooling to avoid allocating memory repeatedly.

  • This code addresses some of these problems:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

The previous code is complex and doesn't address all the problems identified. High-performance networking usually means writing complex code to maximize performance. System.IO.Pipelines was designed to make writing this type of code easier.

Pipe

Use the Pipe class to create a PipeWriter/PipeReader pair. All data written into the PipeWriter is available in the PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Pipe basic usage

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Two loops handle the reading and writing:

  • FillPipeAsync reads from the Socket and writes to the PipeWriter.
  • ReadPipeAsync reads from the PipeReader and parses incoming lines.

No explicit buffers are allocated. All buffer management is delegated to the PipeReader and PipeWriter implementations. Delegating buffer management makes it easier for consuming code to focus solely on the business logic.

In the first loop:

In the second loop, the PipeReader consumes the buffers written by PipeWriter. The buffers come from the socket. The call to PipeReader.ReadAsync:

  • Returns a ReadResult that contains two important pieces of information:

    • The data that was read in the form of ReadOnlySequence<T>.
    • A boolean IsCompleted that indicates if the end of data (EOF) has been reached.

After finding the end of line (EOL) delimiter and parsing the line:

  • The logic processes the buffer to skip what's already processed.
  • PipeReader.AdvanceTo is called to tell the PipeReader how much data has been consumed and examined.

The reader and writer loops end by calling PipeReader.Complete and PipeWriter.Complete. Calling Complete releases the memory the underlying Pipe allocated.

Backpressure and flow control

Ideally, reading and parsing work together:

  • The reading thread consumes data from the network and puts it in buffers.
  • The parsing thread is responsible for constructing the appropriate data structures.

Typically, parsing takes more time than just copying blocks of data from the network:

  • The reading thread gets ahead of the parsing thread.
  • The reading thread has to either slow down or allocate more memory to store the data for the parsing thread.

For optimal performance, there's a balance between frequent pauses and allocating more memory.

To solve the preceding problem, the Pipe has two settings to control the flow of data:

Diagram with ResumeWriterThreshold and PauseWriterThreshold

PipeWriter.FlushAsync:

  • Returns an incomplete ValueTask<FlushResult> when the amount of data in the Pipe crosses PauseWriterThreshold.
  • Completes ValueTask<FlushResult> when it becomes lower than ResumeWriterThreshold.

Two values are used to prevent rapid cycling, which can occur if one value is used.

Examples

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Typically when using async and await, asynchronous code resumes on either a TaskScheduler or the current SynchronizationContext.

When doing I/O, it's important to have fine-grained control over where the I/O is performed. This control allows taking advantage of CPU caches effectively. Efficient caching is critical for high-performance apps like web servers. PipeScheduler provides control over where asynchronous callbacks run. By default:

  • The current SynchronizationContext is used.
  • If there's no SynchronizationContext, it uses the thread pool to run callbacks.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool is the PipeScheduler implementation that queues callbacks to the thread pool. PipeScheduler.ThreadPool is the default and generally the best choice. PipeScheduler.Inline can cause unintended consequences such as deadlocks.

Pipe reset

Reusing the Pipe object is often efficient. To reset the pipe, call PipeReader Reset when both the PipeReader and PipeWriter are complete.

PipeReader

PipeReader manages memory on the caller's behalf. Always call PipeReader.AdvanceTo after calling PipeReader.ReadAsync. This lets the PipeReader know when the caller is done with the memory so that it can be tracked. The ReadOnlySequence<T> returned from PipeReader.ReadAsync is only valid until the call to PipeReader.AdvanceTo. It's illegal to use ReadOnlySequence<T> after calling PipeReader.AdvanceTo.

PipeReader.AdvanceTo takes two SequencePosition arguments:

  • The first argument determines how much memory was consumed.
  • The second argument determines how much of the buffer was observed.

Marking data as consumed means that the pipe can return the memory to the underlying buffer pool. Marking data as observed controls what the next call to PipeReader.ReadAsync does. Marking everything as observed means that the next call to PipeReader.ReadAsync won't return until there's more data written to the pipe. Any other value makes the next call to PipeReader.ReadAsync return immediately with the observed and unobserved data, but not data that has already been consumed.

Read streaming data scenarios

A couple of typical patterns emerge when reading streaming data:

  • Given a stream of data, parse a single message.
  • Given a stream of data, parse all available messages.

These examples use the TryParseLines method for parsing messages from a ReadOnlySequence<T>. TryParseLines parses a single message and updates the input buffer to trim the parsed message from the buffer. TryParseLines isn't part of .NET; it's a user-written method used in the following sections.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Read a single message

This code reads a single message from a PipeReader and returns it to the caller.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

The preceding code:

  • Parses a single message.
  • Updates the consumed SequencePosition and examined SequencePosition to point to the start of the trimmed input buffer.

The two SequencePosition arguments are updated because TryParseLines removes the parsed message from the input buffer. Generally, when parsing a single message from the buffer, the examined position should be one of the following:

  • The end of the message.
  • The end of the received buffer if no message was found.

The single message case has the most potential for errors. Passing the wrong values to examined might result in an out of memory exception or an infinite loop. For more information, see the PipeReader common problems section in this article.

Important

ReadSingleMessageAsync doesn't call PipeReader.CompleteAsync. The caller is responsible for completing the PipeReader. Calling PipeReader.CompleteAsync inside ReadSingleMessageAsync signals that no more data can be read, which prevents reading subsequent messages.

Read multiple messages

This code reads all messages from a PipeReader and calls ProcessMessageAsync on each.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Because ProcessMessagesAsync owns the complete message-reading loop, it calls PipeReader.CompleteAsync when it's done. Unlike the single-message case, the caller doesn't need to complete the reader. ProcessMessagesAsync takes full ownership of the PipeReader lifetime.

Cancellation

PipeReader.ReadAsync:

  • Supports passing a CancellationToken.
  • Throws an OperationCanceledException if the CancellationToken is canceled while there's a read pending.
  • Supports a way to cancel the current read operation via PipeReader.CancelPendingRead, which avoids raising an exception. Calling PipeReader.CancelPendingRead causes the current or next call to PipeReader.ReadAsync to return a ReadResult with IsCanceled set to true. This is useful for halting the existing read loop in a non-destructive and non-exceptional way.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

PipeReader common problems

  • Passing the wrong values to consumed or examined might result in reading already read data.

  • Passing buffer.End as examined might result in:

    • Stalled data
    • An eventual out-of-memory (OOM) exception if data isn't consumed. For example, PipeReader.AdvanceTo(position, buffer.End) when processing a single message at a time from the buffer.
  • Passing the wrong values to consumed or examined might result in an infinite loop. For example, PipeReader.AdvanceTo(buffer.Start) if buffer.Start hasn't changed causes the next call to PipeReader.ReadAsync to return immediately before new data arrives.

  • Passing the wrong values to consumed or examined might result in infinite buffering (eventual OOM).

  • Using ReadOnlySequence<T> after calling PipeReader.AdvanceTo might result in memory corruption (use after free).

  • Failing to call Complete/CompleteAsync might result in a memory leak.

  • Checking ReadResult.IsCompleted and exiting the reading logic before processing the buffer results in data loss. The loop exit condition should be based on ReadResult.Buffer.IsEmpty and ReadResult.IsCompleted. Doing this incorrectly could result in an infinite loop.

Problematic code

Data loss

The ReadResult can return the final segment of data when IsCompleted is set to true. Not reading that data before exiting the read loop results in data loss.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

Infinite loop

The following logic might result in an infinite loop if the Result.IsCompleted is true but there's never a complete message in the buffer.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

Here's another piece of code with the same problem. It's checking for a non-empty buffer before checking ReadResult.IsCompleted. Because it's in an else if, it loops forever if there's never a complete message in the buffer.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

Unresponsive application

Unconditionally calling PipeReader.AdvanceTo with buffer.End in the examined position might result in the application becoming unresponsive when parsing a single message. The next call to PipeReader.AdvanceTo won't return until:

  • There's more data written to the pipe.
  • And the new data wasn't previously examined.

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

Out of Memory (OOM)

With the following conditions, this code keeps buffering until an OutOfMemoryException occurs:

  • There's no maximum message size.
  • The data returned from the PipeReader doesn't make a complete message. For example, the other side is writing a large message (for example, a 4-GB message).

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

Memory Corruption

When writing helpers that read the buffer, copy any returned payload before calling Advance. The following example returns memory that the Pipe has discarded and might reuse it for the next operation (read/write).

Warning

Do NOT use the following code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The following sample is provided to explain PipeReader common problems.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Warning

Do NOT use the preceding code. Using this sample will result in data loss, hangs, security issues and should NOT be copied. The preceding sample is provided to explain PipeReader common problems.

PipeWriter

The PipeWriter manages buffers for writing on the caller's behalf. PipeWriter implements IBufferWriter<byte>. IBufferWriter<byte> provides access to buffers to perform writes without extra buffer copies.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

The previous code:

  • Requests a buffer of at least 5 bytes from the PipeWriter using GetMemory.
  • Writes bytes for the ASCII string "Hello" to the returned Memory<byte>.
  • Calls Advance to indicate how many bytes were written to the buffer.
  • Flushes the PipeWriter, which sends the bytes to the underlying device.

The previous method of writing uses the buffers provided by the PipeWriter. It also could use PipeWriter.WriteAsync, which:

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Cancellation

FlushAsync supports passing a CancellationToken. Passing a CancellationToken results in an OperationCanceledException if the token is canceled while there's a flush pending. PipeWriter.FlushAsync supports a way to cancel the current flush operation via PipeWriter.CancelPendingFlush without raising an exception. Calling PipeWriter.CancelPendingFlush causes the current or next call to PipeWriter.FlushAsync or PipeWriter.WriteAsync to return a FlushResult with IsCanceled set to true. This is useful for halting the yielding flush in a non-destructive and non-exceptional way.

PipeWriter common problems

  • GetSpan and GetMemory return a buffer with at least the requested amount of memory. Don't assume exact buffer sizes.
  • Successive calls aren't guaranteed to return the same buffer or the same-sized buffer.
  • A new buffer must be requested after calling Advance to continue writing more data. The previously acquired buffer can't be written to.
  • Calling GetMemory or GetSpan while there's an incomplete call to FlushAsync isn't safe.
  • Calling Complete or CompleteAsync while there's unflushed data might result in memory corruption.

Tips for PipeReader and PipeWriter

Use these tips to successfully use the System.IO.Pipelines classes:

IDuplexPipe

IDuplexPipe is a contract for types that support both reading and writing. For example, a network connection is represented by an IDuplexPipe.

Unlike Pipe, which contains a PipeReader and a PipeWriter, IDuplexPipe represents a single side of a full duplex connection. What you write to the PipeWriter won't be read from the PipeReader.

Streams

When reading or writing stream data, you typically read data using a de-serializer and write data using a serializer. Most of these read and write stream APIs have a Stream parameter. To make it easier to integrate with these existing APIs, PipeReader and PipeWriter expose an AsStream method. AsStream returns a Stream implementation around the PipeReader or PipeWriter.

Stream example

Create PipeReader and PipeWriter instances using the static Create methods given a Stream object and optional corresponding creation options.

The StreamPipeReaderOptions allow for control over the creation of the PipeReader instance with the following parameters:

The StreamPipeWriterOptions allow for control over the creation of the PipeWriter instance with the following parameters:

Important

When creating PipeReader and PipeWriter instances using the Create methods, consider the Stream object lifetime. If you need access to the stream after the reader or writer is done with it, set the LeaveOpen flag to true on the creation options. Otherwise, the stream is closed.

This code demonstrates creating PipeReader and PipeWriter instances using the Create methods from a stream.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

The application uses a StreamReader to read the lorem-ipsum.txt file as a stream, and it must end with a blank line. The FileStream is passed to PipeReader.Create, which instantiates a PipeReader object. The console application then passes its standard output stream to PipeWriter.Create using Console.OpenStandardOutput(). The example supports cancellation.