Buffer Sources and Sinks

This section explains the BufferSource and BufferSink concepts for zero-copy I/O where the callee owns the buffers.

Prerequisites

Callee-Owns-Buffers Pattern

With streams and sources/sinks, the caller provides buffers:

// Caller owns the buffer
char my_buffer[1024];
co_await stream.read_some(make_buffer(my_buffer));

Data flows: source → caller’s buffer → processing

With buffer sources/sinks, the callee provides buffers:

// Callee owns the buffers
const_buffer arr[8];
auto [ec, bufs] = co_await source.pull(arr);
// bufs is a span pointing into source's internal storage

Data flows: source’s internal buffer → processing (no copy!)

BufferSource

A BufferSource provides read-only buffers from its internal storage:

template<typename T>
concept BufferSource =
    requires(T& src, std::span<const_buffer> dest, std::size_t n) {
        { src.pull(dest) } -> IoAwaitable;
        // pull await-returns (error_code, std::span<const_buffer>)
        src.consume(n);
    };

pull Semantics

IoAwaitable auto pull(std::span<const_buffer> dest);
void consume(std::size_t n);

pull await-returns (error_code, std::span<const_buffer>). The source fills dest with descriptors pointing into its internal storage and returns a span over the filled prefix:

  • On success: !ec, the returned span describes the available data

  • On exhaustion: ec == cond::eof with an empty span

  • On error: ec set to the error condition

The returned buffers point into the source’s internal storage and remain valid until you call consume. Calling pull again without an intervening consume returns the same unconsumed data. Call consume(n) to mark n bytes as consumed and advance the source.

Example

template<BufferSource Source>
task<> process_source(Source& source)
{
    const_buffer arr[8];

    for (;;)
    {
        auto [ec, bufs] = co_await source.pull(arr);

        if (ec == cond::eof)
            break;  // Source exhausted

        if (ec)
            throw std::system_error(ec);

        // Process buffers (zero-copy!)
        std::size_t total = 0;
        for (auto const& b : bufs)
        {
            process_data(b.data(), b.size());
            total += b.size();
        }

        source.consume(total);
    }
}

BufferSink

A BufferSink provides writable buffers for direct write access:

template<typename T>
concept BufferSink =
    requires(T& sink, std::span<mutable_buffer> dest, std::size_t n) {
        { sink.prepare(dest) } -> std::same_as<std::span<mutable_buffer>>;
        { sink.commit(n) } -> IoAwaitable;       // await-returns (error_code)
        { sink.commit_eof(n) } -> IoAwaitable;   // await-returns (error_code)
    };

prepare Semantics

std::span<mutable_buffer> prepare(std::span<mutable_buffer> dest);

Synchronous operation. The sink fills dest with writable buffer descriptors from its internal storage and returns a span over the filled prefix (which may be shorter than dest).

commit Semantics

IoAwaitable auto commit(std::size_t n);
IoAwaitable auto commit_eof(std::size_t n);

commit(n) finalizes n bytes of prepared data and await-returns (error_code). commit_eof(n) finalizes n final bytes and signals end-of-stream; pass 0 to signal end-of-stream with no further data.

Example

template<BufferSink Sink>
task<> write_to_sink(Sink& sink, std::span<char const> data)
{
    std::size_t written = 0;

    while (written < data.size())
    {
        mutable_buffer arr[8];
        auto bufs = sink.prepare(arr);

        if (bufs.empty())
            throw std::runtime_error("sink full");

        // Copy into sink's buffers
        std::size_t copied = 0;
        for (auto& b : bufs)
        {
            if (written >= data.size())
                break;
            std::size_t chunk = (std::min)(
                b.size(),
                data.size() - written);
            std::memcpy(b.data(), data.data() + written, chunk);
            written += chunk;
            copied += chunk;
        }

        if (written == data.size())
            co_await sink.commit_eof(copied);
        else
            co_await sink.commit(copied);
    }
}

Zero-Copy Benefits

Buffer sources/sinks enable true zero-copy I/O:

Memory-Mapped Files

A BufferSource is a concept, not a base class, so a type models it simply by providing the required members:

class mmap_source  // models BufferSource
{
    void* mapped_region_;
    std::size_t size_;
    std::size_t offset_ = 0;

public:
    io_task<std::span<const_buffer>> pull(std::span<const_buffer> dest)
    {
        if (offset_ >= size_)
            co_return {error::eof, {}};  // Exhausted

        // Return pointer into mapped memory: no copy!
        dest[0] = const_buffer(
            static_cast<char*>(mapped_region_) + offset_,
            size_ - offset_);

        co_return {{}, dest.subspan(0, 1)};
    }

    void consume(std::size_t n)
    {
        offset_ += n;
    }
};

Hardware Buffers

DMA buffers, GPU memory, network card ring buffers—all can be exposed through BufferSource/BufferSink without intermediate copying.

Type-Erasing Wrappers

any_buffer_source

#include <boost/capy/io/any_buffer_source.hpp>

// Owning: takes ownership of the source
template<BufferSource S>
any_buffer_source(S source);

// Reference: wraps without ownership, source must outlive the wrapper
template<BufferSource S>
any_buffer_source(S* source);

any_buffer_sink

#include <boost/capy/io/any_buffer_sink.hpp>

// Owning: takes ownership of the sink
template<BufferSink S>
any_buffer_sink(S sink);

// Reference: wraps without ownership, sink must outlive the wrapper
template<BufferSink S>
any_buffer_sink(S* sink);

Example: Compression Pipeline

// Compressor provides compressed data via BufferSource
// Decompressor consumes compressed data via BufferSink

task<> decompress_stream(any_buffer_source& compressed, any_write_sink& output)
{
    const_buffer arr[8];

    for (;;)
    {
        auto [ec, bufs] = co_await compressed.pull(arr);
        if (ec == cond::eof)
            break;
        if (ec)
            throw std::system_error(ec);

        std::size_t total = 0;
        for (auto const& b : bufs)
        {
            auto decompressed = decompress_block(b);
            co_await output.write(make_buffer(decompressed));
            total += b.size();
        }

        compressed.consume(total);
    }

    co_await output.write_eof();
}

Reference

Header Description

<boost/capy/concept/buffer_source.hpp>

BufferSource concept definition

<boost/capy/concept/buffer_sink.hpp>

BufferSink concept definition

<boost/capy/io/any_buffer_source.hpp>

Type-erased buffer source wrapper

<boost/capy/io/any_buffer_sink.hpp>

Type-erased buffer sink wrapper

You have now learned about buffer sources and sinks for zero-copy I/O. Continue to Transfer Algorithms to learn about composed read/write operations.