Backpressure and Resilience: Cortex Pipelines That Don't Fall Over

How to keep Cortex.Streams pipelines alive under load: bounded buffers, backpressure strategies, concurrency and batching trade-offs, observability, graceful shutdown, and poison-message handling.

By Buildersoft Team

The failure mode nobody plans for

Most streaming bugs in production are not logic bugs. They are physics bugs.

You wire up a fast source — a Kafka topic, a CDC feed, an HTTP ingest endpoint — into a pipeline that ends in a slow sink: a database write, an external API call, a disk flush. Under light load everything is fine, because the sink keeps up with the source. Then traffic spikes. The source keeps producing at full speed; the sink keeps draining at its own fixed rate. Items pile up somewhere in between.

If that "somewhere" is unbounded, you have a slow-motion crash. Memory climbs, the GC thrashes, latency balloons, and eventually the process is killed by the OOM reaper — usually at the worst possible time, under peak load, taking in-flight data with it.

Backpressure is the mechanism that prevents this. It puts a hard bound on how much work can be in flight, and then forces a decision when that bound is hit: slow the producer down, drop some data, or fail loudly. There is no fourth option that also keeps your memory flat. The whole point of this post is to make that decision explicit and deliberate instead of accidental.

Cortex.Streams (framework v3.1.2, targeting .NET 6.0 through .NET 10) ships an opt-in buffered processing model that makes all of this configurable. Everything below is verified against Cortex.Streams.Performance.StreamPerformanceOptions and the canonical builder API.

The default: synchronous and unbuffered

Without any performance configuration, a Cortex stream processes items synchronously. The producer thread runs the entire pipeline and only returns when the sink is done.

var stream = StreamBuilder<int>.CreateNewStream("BasicStream")
    .Stream()
    .Map(x => x * 2)
    .Sink(Console.WriteLine)
    .Build();

stream.Start();
stream.Emit(42);            // Blocks until the Sink completes
await stream.EmitAsync(42); // Runs on the thread pool, still awaits completion
stream.Stop();

This is perfectly fine for in-process work where the producer naturally throttles itself: if Emit blocks until the sink is done, the producer cannot outrun the sink. The "backpressure" here is implicit — the caller's own thread is the bound.

The trouble starts when you want producers to be non-blocking, or when the producer is an external broker that does not care how fast your sink is. That is when you opt into buffered processing.

Opting into buffered processing

You enable buffering by passing StreamPerformanceOptions via .WithPerformanceOptions(...), which goes before .Stream(...) on the builder.

using Cortex.Streams;
using Cortex.Streams.Performance;

var stream = StreamBuilder<int>.CreateNewStream("FastStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block
    })
    .Stream()
    .Map(x => x * 2)
    .Sink(ProcessItem)
    .Build();

stream.Start();

// Non-blocking: returns immediately once the item is buffered
stream.EmitAndForget(42);

// Async: awaits buffer space, returns once buffered (not once processed)
await stream.EmitAsync(42);

// Graceful shutdown: waits for the buffer to drain
await stream.StopAsync();

Three things change once EnableBufferedProcessing = true:

  • A bounded internal buffer of BufferCapacity items sits between emission and the pipeline.
  • One or more async consumer tasks drain that buffer and run the pipeline.
  • Emission decouples from processing. EmitAndForget(x) returns as soon as the item is buffered (or dropped); await EmitAsync(x) awaits buffer space, not pipeline completion.

The full set of options, verified from StreamPerformanceOptions:

PropertyTypeDefaultDescription
EnableBufferedProcessingboolfalseEnable the internal buffer and async consumers
BufferCapacityint10,000Maximum items held in the buffer
BackpressureStrategyBackpressureStrategyBlockWhat to do when the buffer is full
BatchSizeint1Items processed per batch (1 = immediate)
BatchTimeoutTimeSpan100msMax wait for a batch to fill
ConcurrencyLevelint1Number of parallel consumer tasks
BlockingTimeoutTimeSpan30sTimeout for blocking emits
OnItemDroppedAction<object, DropReason>nullCallback when an item is dropped

The BufferCapacity is the single most important number in this whole model. It is the hard ceiling on in-flight work. A good starting heuristic: size the buffer to hold 2–5 seconds of peak throughput. At 10,000 items/sec and a 3-second cushion, that is a 30,000-item buffer — enough to absorb a transient spike without forcing a decision, small enough that you notice when the sink is genuinely falling behind.

What happens when the buffer fills

When the buffer hits BufferCapacity, the BackpressureStrategy decides what happens next. There are exactly four choices, and each one trades a different thing away.

Rendering diagram…

The feedback arrow from consumer back to buffer is the essence of backpressure: the consumer's drain rate, combined with the bound, is what pushes back on the producer.

  • Block (the default). EmitAsync blocks asynchronously and EmitAndForget blocks synchronously until space frees up. If nothing frees up within BlockingTimeout, it throws OperationCanceledException. This is the only zero-loss strategy: the producer is forced to slow to the sink's pace.
  • DropOldest. New items are always accepted; the oldest buffered items are evicted to make room. The evicted item is handed to OnItemDropped. Use this when the freshest data is the only data that matters.
  • DropNewest. The buffer is left intact and incoming items are silently dropped while it is full. Writes always "succeed" (return true); use OnItemDropped to count the casualties.
  • ThrowException. Throws BufferFullException on the emitting call. The caller owns the recovery decision.
try
{
    stream.EmitAndForget(item);
}
catch (BufferFullException ex)
{
    Console.WriteLine($"Buffer full. Capacity: {ex.BufferCapacity}");
    // Apply your own retry / shed / alert logic here
}

Choosing a strategy

StrategyWhen to use itWhat you trade away
BlockFinancial transactions, orders, anything that must not be lost; log aggregation with a generously sized bufferProducer latency — fast producers get throttled to sink speed
DropOldest"Latest wins" telemetry: stock tickers, sensor readings, live dashboardsOlder data points
DropNewestLive video frames, sampled metrics where catching up beats completenessNewest data while saturated
ThrowExceptionCritical alerts and fail-fast designs where silent loss is unacceptable and you want explicit controlSimplicity — every caller must handle the exception

Cortex ships three presets so you do not have to assemble these by hand for common cases:

// Maximum throughput: large buffer, batching, parallel consumers, Block
var ht = StreamPerformanceOptions.HighThroughput(
    bufferCapacity: 100_000,
    concurrencyLevel: 8);

// Minimal latency: immediate processing, single ordered consumer, Block
var ll = StreamPerformanceOptions.LowLatency(
    bufferCapacity: 10_000);

// Latest-wins: DropOldest with a drop callback wired in
var dropOldest = StreamPerformanceOptions.DropOldest(
    bufferCapacity: 10_000,
    onItemDropped: (item, reason) => metrics.Track("dropped", item));

HighThroughput() is Block with batching and Environment.ProcessorCount consumers; LowLatency() is Block with BatchSize = 1 and a single consumer for ordering; DropOldest() is exactly what it says.

Concurrency and batching: throughput versus ordering and latency

Two more knobs decide how the consumers drain the buffer, and both involve a real trade-off.

ConcurrencyLevel sets how many consumer tasks pull from the buffer in parallel. Raising it increases throughput for I/O-bound sinks — but parallel consumers mean order is no longer guaranteed, and your operators must be thread-safe. With ConcurrencyLevel = 1 you keep strict ordering at the cost of throughput.

// Not thread-safe under ConcurrencyLevel > 1
var list = new List<int>();
.Sink(x => list.Add(x))

// Safe under parallel consumers
var bag = new System.Collections.Concurrent.ConcurrentBag<int>();
.Sink(x => bag.Add(x))

// Also safe: atomic operations
var counter = 0;
.Sink(x => System.Threading.Interlocked.Increment(ref counter))

BatchSize and BatchTimeout group items before handing them to the consumer. For I/O-bound sinks — a database insert, an HTTP POST — batching is the single biggest throughput win, because you amortize per-call overhead across many items. The cost is latency: an item may wait up to BatchTimeout for its batch to fill before it is processed.

var options = new StreamPerformanceOptions
{
    EnableBufferedProcessing = true,
    BufferCapacity = 50_000,
    BatchSize = 100,                                  // amortize I/O over 100 items
    BatchTimeout = TimeSpan.FromMilliseconds(100),    // but never wait more than 100ms
    ConcurrencyLevel = Environment.ProcessorCount,
    BackpressureStrategy = BackpressureStrategy.Block
};

Read those two settings as a contract: "process up to BatchSize items together, but never make any single item wait longer than BatchTimeout." If you need per-item latency, set BatchSize = 1 (or just use LowLatency()).

Observability: you cannot tune what you cannot see

A bounded buffer turns an invisible failure (memory creeping up until OOM) into a visible, measurable one. GetBufferStatistics() returns a live snapshot — but only when buffered processing is enabled; otherwise it returns null.

BufferStatistics stats = stream.GetBufferStatistics();
if (stats != null)
{
    Console.WriteLine($"In buffer:   {stats.CurrentCount}/{stats.Capacity}");
    Console.WriteLine($"Utilization: {stats.UtilizationPercent:F1}%");
    Console.WriteLine($"Enqueued:    {stats.TotalEnqueued}");
    Console.WriteLine($"Processed:   {stats.TotalProcessed}");
    Console.WriteLine($"Dropped:     {stats.TotalDropped}");
}

The two numbers to watch in production:

  • UtilizationPercent climbing toward 100% means the sink is losing the race. Sustained high utilization is your early warning to scale consumers or shed load — well before anything crashes.
  • TotalDropped increasing at all (under any Drop strategy) is direct evidence of data loss. If it must always be zero, you are on the wrong strategy.

A simple watchdog turns that into an alert:

var timer = new System.Threading.Timer(_ =>
{
    var s = stream.GetBufferStatistics();
    if (s != null && s.UtilizationPercent > 80)
        logger.Warn($"Buffer utilization high: {s.UtilizationPercent:F1}%");
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));

And OnItemDropped gives you a per-drop hook for metrics, so dropped data never goes unnoticed:

OnItemDropped = (item, reason) =>  // reason is a DropReason
    metrics.IncrementDropped(reason.ToString())

Graceful shutdown: drain, don't slam the door

A buffer that absorbs spikes also holds data when you shut down. Calling the synchronous Stop() stops immediately and can leave buffered items unprocessed. For a clean drain, use StopAsync(), which waits for the buffer to empty before returning.

// Risky: may discard whatever is still buffered
stream.Stop();

// Correct: waits for the buffer to drain
await stream.StopAsync();

// Bounded drain: drain, but don't hang forever
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await stream.StopAsync(cts.Token);

Wire StopAsync() into your host's shutdown path (for example, IHostApplicationLifetime.ApplicationStopping) so a rolling deploy or scale-in does not silently lose in-flight items.

Resilience against poison messages

Backpressure keeps you alive under volume. The other thing that takes pipelines down is a single bad item — a malformed record, a transient downstream timeout — that throws inside an operator. Without a policy, one poison message can kill the consumer loop.

.WithErrorHandling(...) attaches that policy. Like the performance options, it goes before .Stream(...), and it takes a StreamExecutionOptions instance. You set a default ErrorHandlingStrategy and, optionally, an OnError callback for per-error decisions. The callback receives a StreamErrorContext — which carries the exception, the operator name, the offending input, and the retry attempt — and returns an ErrorHandlingDecision.

using Cortex.Streams;
using Cortex.Streams.ErrorHandling;

var executionOptions = new StreamExecutionOptions
{
    ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, // default when OnError is null
    MaxRetries = 3,
    RetryDelay = TimeSpan.FromSeconds(1),
    OnError = ctx =>
    {
        _logger.LogError(ctx.Exception,
            "Error in {Operator} processing {Item} (attempt {Attempt})",
            ctx.OperatorName, ctx.Input, ctx.Attempt);
        return ErrorHandlingDecision.Retry; // or Skip, Stop, Rethrow
    }
};

var stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
    .WithErrorHandling(executionOptions)
    .Stream()
    .FanOut(fanOut => fanOut
        .To("database", order => SaveOrder(order))
        .To("kafka", order => PublishEvent(order)))
    .Build();

The ErrorHandlingDecision you return decides the fate of the pipeline:

ErrorHandlingDecisionEffect
RethrowPropagate the exception — the failure surfaces to the caller (this is the None default behaviour)
SkipDiscard the offending item and keep processing — best for tolerable, isolated failures
RetryRe-attempt up to MaxRetries, waiting RetryDelay between attempts — ideal for transient downstream errors
StopHalt the pipeline gracefully — for unrecoverable, must-not-proceed conditions

If you do not supply an OnError callback, the global ErrorHandlingStrategy (None, Skip, Retry, or Stop) applies to every error. Retry with a small MaxRetries and a short RetryDelay handles the common case of a flaky network call or a momentarily-locked row. For genuinely poisoned data that will never succeed, return Skip (after logging it, or routing it to a dead-letter destination from inside the handler) so one bad record cannot stall the stream.

Putting it together

Here is a realistic order processor that combines all of it: a bounded buffer with Block so no order is ever lost, four parallel consumers for throughput, retry-with-backoff for transient errors, drop monitoring, and a graceful drain on shutdown.

using Cortex.Streams;
using Cortex.Streams.ErrorHandling;
using Cortex.Streams.Performance;

public class OrderProcessor
{
    private readonly IStream<Order> _stream;

    public OrderProcessor()
    {
        _stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
            .WithPerformanceOptions(new StreamPerformanceOptions
            {
                EnableBufferedProcessing = true,
                BufferCapacity = 50_000,
                BackpressureStrategy = BackpressureStrategy.Block,
                ConcurrencyLevel = 4,
                OnItemDropped = (item, reason) =>
                    Logger.Warn($"Order dropped ({reason}): {((Order)item).Id}")
            })
            .WithErrorHandling(new StreamExecutionOptions
            {
                ErrorHandlingStrategy = ErrorHandlingStrategy.Retry,
                MaxRetries = 3,
                RetryDelay = TimeSpan.FromSeconds(1)
            })
            .Stream()
            .Filter(order => order.IsValid)
            .Map(order => EnrichOrder(order))
            .Map(order => ProcessOrder(order))
            .Sink(order => SaveToDatabase(order))
            .Build();
    }

    public void Start() => _stream.Start();

    public async Task StopAsync() => await _stream.StopAsync();

    public async Task SubmitAsync(Order order) =>
        await _stream.EmitAsync(order);

    public void LogStats()
    {
        var s = _stream.GetBufferStatistics();
        if (s != null)
            Logger.Info($"Buffer {s.CurrentCount}/{s.Capacity} " +
                        $"({s.UtilizationPercent:F1}%), processed {s.TotalProcessed}, " +
                        $"dropped {s.TotalDropped}");
    }
}

None of this is exotic. It is a bounded buffer, a backpressure policy, a couple of dials, a metrics hook, and an error policy — each one a deliberate answer to a question that production will ask you eventually. Answer them on purpose, while you are writing the pipeline, rather than at 3 a.m. while it falls over.

Where to go next

  • Documentation — start at the Cortex docs home, and read the Stream Performance & Async Processing guide for the full options reference.
  • Source — browse or contribute on GitHub.
  • Recipes — copy-paste patterns on our recipes page.
  • Community — bring questions and war stories to the Buildersoft Discord.

Install with dotnet add package Cortex.Streams (add Cortex.States and any connector or CDC packages as needed).

Ready to Transform Your Business?

Let's discuss how we can help you build innovative solutions that drive real results.