The failure mode nobody plans for
Most streaming bugs in production are not logic bugs. They are physics bugs.
You wire up a fast source — a Kafka topic, a CDC feed, an HTTP ingest endpoint — into a pipeline that ends in a slow sink: a database write, an external API call, a disk flush. Under light load everything is fine, because the sink keeps up with the source. Then traffic spikes. The source keeps producing at full speed; the sink keeps draining at its own fixed rate. Items pile up somewhere in between.
If that "somewhere" is unbounded, you have a slow-motion crash. Memory climbs, the GC thrashes, latency balloons, and eventually the process is killed by the OOM reaper — usually at the worst possible time, under peak load, taking in-flight data with it.
Backpressure is the mechanism that prevents this. It puts a hard bound on how much work can be in flight, and then forces a decision when that bound is hit: slow the producer down, drop some data, or fail loudly. There is no fourth option that also keeps your memory flat. The whole point of this post is to make that decision explicit and deliberate instead of accidental.
Cortex.Streams (framework v3.1.2, targeting .NET 6.0 through .NET 10) ships an opt-in buffered processing model that makes all of this configurable. Everything below is verified against Cortex.Streams.Performance.StreamPerformanceOptions and the canonical builder API.
The default: synchronous and unbuffered
Without any performance configuration, a Cortex stream processes items synchronously. The producer thread runs the entire pipeline and only returns when the sink is done.
var stream = StreamBuilder<int>.CreateNewStream("BasicStream")
.Stream()
.Map(x => x * 2)
.Sink(Console.WriteLine)
.Build();
stream.Start();
stream.Emit(42); // Blocks until the Sink completes
await stream.EmitAsync(42); // Runs on the thread pool, still awaits completion
stream.Stop();
This is perfectly fine for in-process work where the producer naturally throttles itself: if Emit blocks until the sink is done, the producer cannot outrun the sink. The "backpressure" here is implicit — the caller's own thread is the bound.
The trouble starts when you want producers to be non-blocking, or when the producer is an external broker that does not care how fast your sink is. That is when you opt into buffered processing.
Opting into buffered processing
You enable buffering by passing StreamPerformanceOptions via .WithPerformanceOptions(...), which goes before .Stream(...) on the builder.
using Cortex.Streams;
using Cortex.Streams.Performance;
var stream = StreamBuilder<int>.CreateNewStream("FastStream")
.WithPerformanceOptions(new StreamPerformanceOptions
{
EnableBufferedProcessing = true,
BufferCapacity = 10_000,
BackpressureStrategy = BackpressureStrategy.Block
})
.Stream()
.Map(x => x * 2)
.Sink(ProcessItem)
.Build();
stream.Start();
// Non-blocking: returns immediately once the item is buffered
stream.EmitAndForget(42);
// Async: awaits buffer space, returns once buffered (not once processed)
await stream.EmitAsync(42);
// Graceful shutdown: waits for the buffer to drain
await stream.StopAsync();
Three things change once EnableBufferedProcessing = true:
- A bounded internal buffer of
BufferCapacityitems sits between emission and the pipeline. - One or more async consumer tasks drain that buffer and run the pipeline.
- Emission decouples from processing.
EmitAndForget(x)returns as soon as the item is buffered (or dropped);await EmitAsync(x)awaits buffer space, not pipeline completion.
The full set of options, verified from StreamPerformanceOptions:
| Property | Type | Default | Description |
|---|---|---|---|
EnableBufferedProcessing | bool | false | Enable the internal buffer and async consumers |
BufferCapacity | int | 10,000 | Maximum items held in the buffer |
BackpressureStrategy | BackpressureStrategy | Block | What to do when the buffer is full |
BatchSize | int | 1 | Items processed per batch (1 = immediate) |
BatchTimeout | TimeSpan | 100ms | Max wait for a batch to fill |
ConcurrencyLevel | int | 1 | Number of parallel consumer tasks |
BlockingTimeout | TimeSpan | 30s | Timeout for blocking emits |
OnItemDropped | Action<object, DropReason> | null | Callback when an item is dropped |
The BufferCapacity is the single most important number in this whole model. It is the hard ceiling on in-flight work. A good starting heuristic: size the buffer to hold 2–5 seconds of peak throughput. At 10,000 items/sec and a 3-second cushion, that is a 30,000-item buffer — enough to absorb a transient spike without forcing a decision, small enough that you notice when the sink is genuinely falling behind.
What happens when the buffer fills
When the buffer hits BufferCapacity, the BackpressureStrategy decides what happens next. There are exactly four choices, and each one trades a different thing away.
The feedback arrow from consumer back to buffer is the essence of backpressure: the consumer's drain rate, combined with the bound, is what pushes back on the producer.
- Block (the default).
EmitAsyncblocks asynchronously andEmitAndForgetblocks synchronously until space frees up. If nothing frees up withinBlockingTimeout, it throwsOperationCanceledException. This is the only zero-loss strategy: the producer is forced to slow to the sink's pace. - DropOldest. New items are always accepted; the oldest buffered items are evicted to make room. The evicted item is handed to
OnItemDropped. Use this when the freshest data is the only data that matters. - DropNewest. The buffer is left intact and incoming items are silently dropped while it is full. Writes always "succeed" (return
true); useOnItemDroppedto count the casualties. - ThrowException. Throws
BufferFullExceptionon the emitting call. The caller owns the recovery decision.
try
{
stream.EmitAndForget(item);
}
catch (BufferFullException ex)
{
Console.WriteLine($"Buffer full. Capacity: {ex.BufferCapacity}");
// Apply your own retry / shed / alert logic here
}
Choosing a strategy
| Strategy | When to use it | What you trade away |
|---|---|---|
| Block | Financial transactions, orders, anything that must not be lost; log aggregation with a generously sized buffer | Producer latency — fast producers get throttled to sink speed |
| DropOldest | "Latest wins" telemetry: stock tickers, sensor readings, live dashboards | Older data points |
| DropNewest | Live video frames, sampled metrics where catching up beats completeness | Newest data while saturated |
| ThrowException | Critical alerts and fail-fast designs where silent loss is unacceptable and you want explicit control | Simplicity — every caller must handle the exception |
Cortex ships three presets so you do not have to assemble these by hand for common cases:
// Maximum throughput: large buffer, batching, parallel consumers, Block
var ht = StreamPerformanceOptions.HighThroughput(
bufferCapacity: 100_000,
concurrencyLevel: 8);
// Minimal latency: immediate processing, single ordered consumer, Block
var ll = StreamPerformanceOptions.LowLatency(
bufferCapacity: 10_000);
// Latest-wins: DropOldest with a drop callback wired in
var dropOldest = StreamPerformanceOptions.DropOldest(
bufferCapacity: 10_000,
onItemDropped: (item, reason) => metrics.Track("dropped", item));
HighThroughput() is Block with batching and Environment.ProcessorCount consumers; LowLatency() is Block with BatchSize = 1 and a single consumer for ordering; DropOldest() is exactly what it says.
Concurrency and batching: throughput versus ordering and latency
Two more knobs decide how the consumers drain the buffer, and both involve a real trade-off.
ConcurrencyLevel sets how many consumer tasks pull from the buffer in parallel. Raising it increases throughput for I/O-bound sinks — but parallel consumers mean order is no longer guaranteed, and your operators must be thread-safe. With ConcurrencyLevel = 1 you keep strict ordering at the cost of throughput.
// Not thread-safe under ConcurrencyLevel > 1
var list = new List<int>();
.Sink(x => list.Add(x))
// Safe under parallel consumers
var bag = new System.Collections.Concurrent.ConcurrentBag<int>();
.Sink(x => bag.Add(x))
// Also safe: atomic operations
var counter = 0;
.Sink(x => System.Threading.Interlocked.Increment(ref counter))
BatchSize and BatchTimeout group items before handing them to the consumer. For I/O-bound sinks — a database insert, an HTTP POST — batching is the single biggest throughput win, because you amortize per-call overhead across many items. The cost is latency: an item may wait up to BatchTimeout for its batch to fill before it is processed.
var options = new StreamPerformanceOptions
{
EnableBufferedProcessing = true,
BufferCapacity = 50_000,
BatchSize = 100, // amortize I/O over 100 items
BatchTimeout = TimeSpan.FromMilliseconds(100), // but never wait more than 100ms
ConcurrencyLevel = Environment.ProcessorCount,
BackpressureStrategy = BackpressureStrategy.Block
};
Read those two settings as a contract: "process up to BatchSize items together, but never make any single item wait longer than BatchTimeout." If you need per-item latency, set BatchSize = 1 (or just use LowLatency()).
Observability: you cannot tune what you cannot see
A bounded buffer turns an invisible failure (memory creeping up until OOM) into a visible, measurable one. GetBufferStatistics() returns a live snapshot — but only when buffered processing is enabled; otherwise it returns null.
BufferStatistics stats = stream.GetBufferStatistics();
if (stats != null)
{
Console.WriteLine($"In buffer: {stats.CurrentCount}/{stats.Capacity}");
Console.WriteLine($"Utilization: {stats.UtilizationPercent:F1}%");
Console.WriteLine($"Enqueued: {stats.TotalEnqueued}");
Console.WriteLine($"Processed: {stats.TotalProcessed}");
Console.WriteLine($"Dropped: {stats.TotalDropped}");
}
The two numbers to watch in production:
UtilizationPercentclimbing toward 100% means the sink is losing the race. Sustained high utilization is your early warning to scale consumers or shed load — well before anything crashes.TotalDroppedincreasing at all (under any Drop strategy) is direct evidence of data loss. If it must always be zero, you are on the wrong strategy.
A simple watchdog turns that into an alert:
var timer = new System.Threading.Timer(_ =>
{
var s = stream.GetBufferStatistics();
if (s != null && s.UtilizationPercent > 80)
logger.Warn($"Buffer utilization high: {s.UtilizationPercent:F1}%");
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
And OnItemDropped gives you a per-drop hook for metrics, so dropped data never goes unnoticed:
OnItemDropped = (item, reason) => // reason is a DropReason
metrics.IncrementDropped(reason.ToString())
Graceful shutdown: drain, don't slam the door
A buffer that absorbs spikes also holds data when you shut down. Calling the synchronous Stop() stops immediately and can leave buffered items unprocessed. For a clean drain, use StopAsync(), which waits for the buffer to empty before returning.
// Risky: may discard whatever is still buffered
stream.Stop();
// Correct: waits for the buffer to drain
await stream.StopAsync();
// Bounded drain: drain, but don't hang forever
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await stream.StopAsync(cts.Token);
Wire StopAsync() into your host's shutdown path (for example, IHostApplicationLifetime.ApplicationStopping) so a rolling deploy or scale-in does not silently lose in-flight items.
Resilience against poison messages
Backpressure keeps you alive under volume. The other thing that takes pipelines down is a single bad item — a malformed record, a transient downstream timeout — that throws inside an operator. Without a policy, one poison message can kill the consumer loop.
.WithErrorHandling(...) attaches that policy. Like the performance options, it goes before .Stream(...), and it takes a StreamExecutionOptions instance. You set a default ErrorHandlingStrategy and, optionally, an OnError callback for per-error decisions. The callback receives a StreamErrorContext — which carries the exception, the operator name, the offending input, and the retry attempt — and returns an ErrorHandlingDecision.
using Cortex.Streams;
using Cortex.Streams.ErrorHandling;
var executionOptions = new StreamExecutionOptions
{
ErrorHandlingStrategy = ErrorHandlingStrategy.Retry, // default when OnError is null
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(1),
OnError = ctx =>
{
_logger.LogError(ctx.Exception,
"Error in {Operator} processing {Item} (attempt {Attempt})",
ctx.OperatorName, ctx.Input, ctx.Attempt);
return ErrorHandlingDecision.Retry; // or Skip, Stop, Rethrow
}
};
var stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
.WithErrorHandling(executionOptions)
.Stream()
.FanOut(fanOut => fanOut
.To("database", order => SaveOrder(order))
.To("kafka", order => PublishEvent(order)))
.Build();
The ErrorHandlingDecision you return decides the fate of the pipeline:
ErrorHandlingDecision | Effect |
|---|---|
Rethrow | Propagate the exception — the failure surfaces to the caller (this is the None default behaviour) |
Skip | Discard the offending item and keep processing — best for tolerable, isolated failures |
Retry | Re-attempt up to MaxRetries, waiting RetryDelay between attempts — ideal for transient downstream errors |
Stop | Halt the pipeline gracefully — for unrecoverable, must-not-proceed conditions |
If you do not supply an OnError callback, the global ErrorHandlingStrategy (None, Skip, Retry, or Stop) applies to every error. Retry with a small MaxRetries and a short RetryDelay handles the common case of a flaky network call or a momentarily-locked row. For genuinely poisoned data that will never succeed, return Skip (after logging it, or routing it to a dead-letter destination from inside the handler) so one bad record cannot stall the stream.
Putting it together
Here is a realistic order processor that combines all of it: a bounded buffer with Block so no order is ever lost, four parallel consumers for throughput, retry-with-backoff for transient errors, drop monitoring, and a graceful drain on shutdown.
using Cortex.Streams;
using Cortex.Streams.ErrorHandling;
using Cortex.Streams.Performance;
public class OrderProcessor
{
private readonly IStream<Order> _stream;
public OrderProcessor()
{
_stream = StreamBuilder<Order>.CreateNewStream("OrderProcessor")
.WithPerformanceOptions(new StreamPerformanceOptions
{
EnableBufferedProcessing = true,
BufferCapacity = 50_000,
BackpressureStrategy = BackpressureStrategy.Block,
ConcurrencyLevel = 4,
OnItemDropped = (item, reason) =>
Logger.Warn($"Order dropped ({reason}): {((Order)item).Id}")
})
.WithErrorHandling(new StreamExecutionOptions
{
ErrorHandlingStrategy = ErrorHandlingStrategy.Retry,
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(1)
})
.Stream()
.Filter(order => order.IsValid)
.Map(order => EnrichOrder(order))
.Map(order => ProcessOrder(order))
.Sink(order => SaveToDatabase(order))
.Build();
}
public void Start() => _stream.Start();
public async Task StopAsync() => await _stream.StopAsync();
public async Task SubmitAsync(Order order) =>
await _stream.EmitAsync(order);
public void LogStats()
{
var s = _stream.GetBufferStatistics();
if (s != null)
Logger.Info($"Buffer {s.CurrentCount}/{s.Capacity} " +
$"({s.UtilizationPercent:F1}%), processed {s.TotalProcessed}, " +
$"dropped {s.TotalDropped}");
}
}
None of this is exotic. It is a bounded buffer, a backpressure policy, a couple of dials, a metrics hook, and an error policy — each one a deliberate answer to a question that production will ask you eventually. Answer them on purpose, while you are writing the pipeline, rather than at 3 a.m. while it falls over.
Where to go next
- Documentation — start at the Cortex docs home, and read the Stream Performance & Async Processing guide for the full options reference.
- Source — browse or contribute on GitHub.
- Recipes — copy-paste patterns on our recipes page.
- Community — bring questions and war stories to the Buildersoft Discord.
Install with dotnet add package Cortex.Streams (add Cortex.States and any connector or CDC packages as needed).