The broker is an implementation detail. Most pipelines don't treat it that way.
Open almost any production .NET streaming service and you will find the broker's client SDK woven through the business logic. The Confluent consumer loop sits next to the deserialization code, which sits next to the aggregation, which sits next to the producer that writes results back out. It works — until the day someone decides to move from Kafka to Pulsar, or to ingest from RabbitMQ while a managed Kafka cluster handles the output, or simply to write a unit test that doesn't require a running broker.
At that point the cost of coupling shows up all at once. The consumer loop, the offset handling, the topic names, and the serializer are all tangled into the same methods that do the actual work. Switching brokers is not a config change; it's a rewrite. And testing the interesting part — the Map/Filter/Aggregate logic — means standing up infrastructure you don't actually want in your test suite.
Cortex draws the line in a different place. Where your data comes from and where it goes are operators attached to the edges of a pipeline. The transformation logic in the middle never names a broker.
The abstraction: ISourceOperator and sink operators
A Cortex stream has exactly one entry point and one or more exits. Both are pluggable.
A source operator implements ISourceOperator. The interface is deliberately small — two methods:
Start(Action<object> emit)— the framework hands your source anemitcallback. Whenever a record arrives (from a Kafka topic, an SQS poll loop, a timer, anything), you callemit(record)and it flows downstream.Startmust not block; it spawns background work and returns.Stop()— cease emitting and release resources.
Internally, Cortex wraps your source in a SourceOperatorAdapter that bridges it to the rest of the pipeline and hooks telemetry. You attach a source with .Stream(sourceOperator). You attach a sink with .Sink(action) for an inline lambda, or .Sink(sinkOperator) for a reusable terminal operator that writes to an external system.
Everything between the source and the sink — .Map(...), .Filter(...), .GroupBySilently(...), .AggregateSilently(...) — operates on your domain types and has no idea what's on either end.
A note on the docs: some integration pages currently show
.Source(...)and.Source(). The method on the v3 builder is.Stream(...)(with a source operator) or.Stream()(no args, in-app). The snippets below use the correct form.
Step 1: Build the pipeline against nothing
Start with an in-app stream. Call .Stream() with no arguments and Cortex gives you a pipeline you feed by hand with stream.Emit(...). No broker, no SDK, no network.
using Cortex.Streams;
using Cortex.States; // InMemoryStateStore lives here
public record Order(string CustomerId, decimal Amount, DateTime PlacedAt);
// The business logic, defined once, fed by nothing in particular.
var stream = StreamBuilder<Order>
.CreateNewStream("order-totals")
.Stream() // in-app source
.Filter(o => o.Amount > 0)
.Map(o => o with { Amount = o.Amount * 1.0825m }) // add tax
.GroupBySilently(o => o.CustomerId, stateStoreName: "by-customer")
.AggregateSilently<string, decimal>(
o => o.CustomerId,
(total, o) => total + o.Amount,
stateStoreName: "customer-totals")
.Sink(o => Console.WriteLine($"{o.CustomerId}: {o.Amount:C}"))
.Build();
stream.Start();
stream.Emit(new Order("c-1", 50m, DateTime.UtcNow));
stream.Emit(new Order("c-1", 30m, DateTime.UtcNow));
stream.Emit(new Order("c-2", 99m, DateTime.UtcNow));
// Read the aggregated state directly — no broker involved.
var totals = stream.GetStateStoreByName<InMemoryStateStore<string, decimal>>("customer-totals");
foreach (var kv in totals.GetAll())
Console.WriteLine($"running total {kv.Key} = {kv.Value:C}");
stream.Stop();
This is a complete, runnable unit test. You assert on the state store after emitting a known set of records. The filter, the tax calculation, and the per-customer aggregation are exercised with zero infrastructure. That is the whole point: the logic you care about is testable in isolation because the broker was never part of it.
Step 2: Same pipeline, attach Kafka
Now take that identical downstream chain and put Kafka in front of it. The only change is the first line after .CreateNewStream(...): instead of .Stream(), pass a KafkaSourceOperator<T>.
using Cortex.Streams;
using Cortex.Streams.Kafka;
var source = new KafkaSourceOperator<Order>(
bootstrapServers: "localhost:9092",
topic: "orders");
var stream = StreamBuilder<Order>
.CreateNewStream("order-totals")
.Stream(source) // the ONLY difference
.Filter(o => o.Amount > 0)
.Map(o => o with { Amount = o.Amount * 1.0825m })
.GroupBySilently(o => o.CustomerId, stateStoreName: "by-customer")
.AggregateSilently<string, decimal>(
o => o.CustomerId,
(total, o) => total + o.Amount,
stateStoreName: "customer-totals")
.Sink(o => Console.WriteLine($"{o.CustomerId}: {o.Amount:C}"))
.Build();
stream.Start(); // Cortex calls source.Start(emit) for you
The KafkaSourceOperator<T> wraps the Confluent.Kafka client. The constructor takes bootstrapServers and topic; it generates a random group id and uses JSON deserialization by default. For Avro, Protobuf, or opaque binary payloads, pass a custom IDeserializer<T> (or a ConsumerConfig to control GroupId / AutoOffsetReset). The seven downstream operators did not change a single character.
One rule worth internalizing: a stream has exactly one entry point. If you attach a source with .Stream(source), you may not call stream.Emit(...) — that throws. Emit is only for in-app streams created with .Stream(). This is enforced on purpose so a pipeline never has two competing data sources.
Step 3: Swap the source — Pulsar, RabbitMQ, SQS, Service Bus
Every connector ships as its own NuGet package and exposes a source operator with the same role. The pipeline body is unchanged in every case; only the constructor differs. The constructors below are taken directly from each connector's documentation.
Apache Pulsar (Cortex.Streams.Pulsar, built on DotPulsar). JSON by default, auto-generated subscription, earliest position, automatic acknowledgement:
using Cortex.Streams.Pulsar;
var source = new PulsarSourceOperator<Order>(
serviceUrl: "pulsar://localhost:6650",
topic: "persistent://public/default/orders");
// .Stream(source) → same downstream chain
RabbitMQ (Cortex.Streams.RabbitMQ, wraps RabbitMQ.Client). The queue is declared durable; messages are acknowledged only after your pipeline processes them, giving at-least-once semantics:
using Cortex.Streams.RabbitMQ;
var source = new RabbitMQSourceOperator<Order>(
hostname: "localhost",
queueName: "orders",
username: "guest",
password: "guest");
// .Stream(source) → same downstream chain
Amazon SQS (Cortex.Streams.AWSSQS). Long-polls the queue (up to 10 messages, 20-second wait) and deletes each message after successful processing; defaults to RegionEndpoint.USEast1:
using Cortex.Streams.AWSSQS;
using Amazon;
var source = new SQSSourceOperator<Order>(
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
region: RegionEndpoint.USEast1);
// .Stream(source) → same downstream chain
Azure Service Bus (Cortex.Streams.AzureServiceBus, on Azure.Messaging.ServiceBus). PeekLock mode, single concurrency by default; messages are completed after processing and abandoned on error:
using Cortex.Streams.AzureServiceBus;
var source = new AzureServiceBusSourceOperator<Order>(
connectionString: Environment.GetEnvironmentVariable("SERVICEBUS_CONNSTR"),
queueOrTopicName: "orders");
// .Stream(source) → same downstream chain
In every one of these, you take the seven-line transformation chain from Step 1, drop the source operator into .Stream(...), and you're done. The package and the constructor are the only broker-specific code in your service.
Step 4: Sinks are interchangeable too
The exit is just as pluggable. A console lambda is fine for development; for production you swap in a sink operator and write the same results to a broker, a search index, or object storage.
Kafka sink (Cortex.Streams.Kafka) — fan results back out to a topic:
using Cortex.Streams.Kafka;
var sink = new KafkaSinkOperator<Order>(
bootstrapServers: "localhost:9092",
topic: "orders-processed");
// ... .Map(...) ...
// .Sink(sink)
// .Build();
Elasticsearch sink (Cortex.Streams.Elasticsearch) — buffered bulk writes with a flush interval and automatic retry of failed documents via a state store. You supply a configured ElasticsearchClient:
using Cortex.Streams.Elasticsearch;
using Elastic.Clients.Elasticsearch;
var client = new ElasticsearchClient(
new ElasticsearchClientSettings(new Uri("http://localhost:9200")));
var sink = new ElasticsearchSinkOperator<Order>(
client,
indexName: "orders",
batchSize: 100,
flushInterval: TimeSpan.FromSeconds(10),
retryInterval: TimeSpan.FromMinutes(1));
// ... .Sink(sink) ...
The Elasticsearch doc page currently shows an older
StreamBuilder.NewStream<T>().Pipeline(...)shape and refers to aBuildersoft.Cortex.Streams.Elasticsearchpackage. On v3 the package id isCortex.Streams.Elasticsearchand you attach this operator the same way as any other sink:.Sink(sink). TheElasticsearchSinkOperator<T>constructor above is the verified one.
Amazon S3 sink (Cortex.Streams.S3). Two flavors: S3SinkOperator<T> writes one JSON file per record; S3SinkBulkOperator<T> batches into .jsonl files (default batch size 100, flush every 10 seconds):
using Cortex.Streams.S3;
using Amazon.S3;
var s3 = new AmazonS3Client();
var sink = new S3SinkBulkOperator<Order>(
bucketName: "my-bucket",
folderPath: "orders-processed",
s3Client: s3,
batchSize: 50,
flushInterval: TimeSpan.FromSeconds(15));
// ... .Sink(sink) ...
For a relational target, the same approach applies: terminate the pipeline with .Sink(record => ...) and write through your DbContext or Dapper inside the lambda, or use one of the state-store backends (SQL Server, PostgreSQL, SQLite, ClickHouse, MongoDB, and others) to persist aggregated state.
What this buys you
Testing without a broker. The Step 1 in-app version is your test harness. Emit records, read the state store, assert. Your CI doesn't need Docker Compose with Kafka and Zookeeper.
Migration without touching business logic. Moving from RabbitMQ to Kafka is a one-line swap from RabbitMQSourceOperator<T> to KafkaSourceOperator<T>. The filter, the tax math, the aggregation, and the sink are untouched and stay tested.
Multi-broker topologies. Because the source and sink are independent operators, ingesting from one system and emitting to another is the default, not a special case. Read orders from RabbitMQ, fan the processed results to a Kafka topic:
var stream = StreamBuilder<Order>
.CreateNewStream("rabbit-to-kafka")
.Stream(new RabbitMQSourceOperator<Order>("localhost", "orders", "guest", "guest"))
.Filter(o => o.Amount > 0)
.Map(o => o with { Amount = o.Amount * 1.0825m })
.Sink(new KafkaSinkOperator<Order>("localhost:9092", "orders-processed"))
.Build();
stream.Start();
Ingest from a legacy queue, publish to your modern event backbone, run both during a migration window — without forking the logic in the middle.
Honest caveats: the API is unified, the semantics are not
Cortex unifies how you wire brokers into a pipeline. It does not erase the operational differences between them, and pretending otherwise would set you up to fail.
| Concern | What differs across brokers |
|---|---|
| Delivery semantics | Kafka commits offsets automatically; RabbitMQ and Service Bus ack after your processing completes (at-least-once); SQS deletes after success. The guarantees you get depend on the broker, not on Cortex. |
| Ordering | Kafka orders within a partition; Pulsar within a partition/subscription; classic queues and SQS standard queues offer weaker ordering. The abstraction won't impose ordering a broker doesn't provide. |
| Offset / ack model | Offset commit (Kafka), explicit ack/nack (RabbitMQ), peek-lock complete/abandon (Service Bus), receive-then-delete (SQS) — each has different failure and redelivery behavior. |
| Partitioning & scaling | Consumer groups, shared subscriptions, competing consumers, and queue-per-consumer scale differently. Throughput and parallelism characteristics do not transfer. |
| Back-pressure | Cortex does not currently provide built-in back-pressure for sources. If your downstream can't keep up, throttle or batch at the source. |
Treat the abstraction as a portability layer for your code, not a guarantee that a pipeline behaves identically on every broker. When you migrate, re-validate delivery and ordering against the target broker's model. The business logic carries over cleanly; the operational contract is the broker's, and you still own understanding it.
That trade is the right one. You keep the parts that are genuinely yours — the transformations, the aggregations, the tests — decoupled and portable, while staying explicit about the parts that are inherently broker-specific.
Where to go next
- Source and sink operators — the interfaces and lifecycle in depth: Cortex docs
- Source on GitHub — connectors, operators, and examples: github.com/buildersoftio/cortex
- Recipes — end-to-end pipelines you can adapt: /cortex/recipes
- Community — questions, patterns, and help on Discord
Install the core and the connectors you need:
dotnet add package Cortex.Streams
dotnet add package Cortex.States
dotnet add package Cortex.Streams.Kafka # or .Pulsar / .RabbitMQ / .AWSSQS / .AzureServiceBus
Build the pipeline against .Stream(), test it with no broker, then attach whichever source and sink production actually uses.