Introducing Cortex 3: Build a Real-Time Pipeline in .NET in 15 Minutes

Real-time stream processing in .NET no longer means reaching for the JVM. Cortex 3 is an open-source, modular .NET SDK with a fluent pipeline API, pluggable state, telemetry, and broker connectors. This hands-on quickstart walks from your first in-app stream to a stateful aggregation backed by Kafka.

By Buildersoft Team

Real-time in .NET has been the awkward one

If you have ever needed to do serious stream processing on the .NET platform, you know the routine. You start sketching a pipeline, you reach for stateful aggregations or windowed joins, and within an hour you are looking at the JVM. Kafka Streams, Flink, Spark Structured Streaming — all excellent, all built somewhere else. The alternative was to hand-roll a consumer loop, bolt on your own state handling, and slowly reinvent half of a stream processor while telling yourself it was "just a background service."

Neither path is great. Bringing in the JVM means a second runtime, a second deployment story, and a context switch every time you cross the boundary. Hand-rolling means you own all the hard parts — backpressure, state, exactly the things those frameworks exist to solve.

Cortex is the third option: a real stream-processing toolkit that is idiomatic C#, runs in your existing .NET process, and deploys like any other .NET app.

What Cortex is

Cortex Data Framework is an open-source, modular SDK for building real-time data pipelines in .NET. It is not a server you stand up — it is a set of NuGet packages you compose into your application. The pieces:

  • Cortex.Streams — a fluent pipeline API for sources, operators, and sinks.
  • Cortex.States — pluggable state stores, from in-memory to RocksDB to a full set of database backends.
  • Cortex.Mediator — a CQRS / Mediator implementation for clean, modular application architecture.
  • Cortex.Telemetry — OpenTelemetry-based metrics and tracing for your pipelines.
  • Cortex.Types — complex types like OneOf, AllOf, and AnyOf.
  • Connectors — Kafka, Pulsar, RabbitMQ, SQS, Azure Service Bus, Elasticsearch, S3, Azure Blob, files, HTTP, and CDC adapters.

Everything is plain C# with the conventions you already use. The current release is v3.1.2, targeting .NET 6.0 and up (multi-targeted through .NET 10).

This post is a quickstart. We will build an in-app pipeline, add a stateful aggregation, read the results back, and then swap the in-app source for a live Kafka topic — without touching the rest of the pipeline.

Install

Two packages cover the quickstart:

# Core streaming
dotnet add package Cortex.Streams

# State stores (used for aggregation later)
dotnet add package Cortex.States

Connectors and state backends are separate packages you add only when you need them. We will add the Kafka connector near the end.

Your first pipeline

A Cortex pipeline is a chain: a source feeds events in, operators transform and filter them, and a sink consumes the result. You start from StreamBuilder<T>, where T is the type of element entering the pipeline. The shape never changes — a source on the left, operators in the middle, a sink on the right:

Rendering diagram…
using Cortex.Streams;

var stream = StreamBuilder<int>.CreateNewStream("ExampleStream")
    .Stream()
    .Map(x => x * 2)
    .Filter(x => x > 10)
    .Sink(Console.WriteLine)
    .Build();

stream.Start();

// Push data in by hand
stream.Emit(2);   // 4  -> filtered out
stream.Emit(8);   // 16 -> printed

A few things to note, because they are the load-bearing parts of the whole API:

  • StreamBuilder<T> is single-generic. Older snippets floating around show StreamBuilder<int, int> with two type parameters — that is pre-v3. In Cortex 3 it is one type parameter: the input element type.
  • .Stream() decides where data comes from. Called with no arguments, you get an in-app stream: nothing feeds it automatically, and you push events yourself with stream.Emit(...). Called with a source operator — .Stream(sourceOperator) — the pipeline is source-attached and pulls from that source on its own.
  • There is no .Source(...), .From(...), or .To(...). The method is always .Stream(). (You may see .Source(...) in a couple of doc pages; that is a documentation slip — the real method is .Stream().)
  • .Sink(...) is the terminal step. Pass an Action<T> for a simple side effect like Console.WriteLine, or pass a sink operator to write to an external system.
  • .Build() finalizes the pipeline. Then stream.Start() brings it online, and stream.Stop() (or await stream.StopAsync()) shuts it down.

The in-app form is ideal for tests, for feeding events from existing application code, and for getting comfortable with the operators before you wire in a broker.

Add state and aggregation

Console output is fine for a demo, but the reason to use a stream processor is stateful computation — counts, sums, rolling aggregates that survive across events. Let's build the canonical example: tracking clicks per page in real time.

Start with the event:

public class ClickEvent
{
    public string PageUrl { get; set; }
    public DateTime Timestamp { get; set; }
}

Now the pipeline. We filter out empty URLs, group by page, and keep a running count per page in a state store:

using Cortex.States;
using Cortex.Streams;

var stream = StreamBuilder<ClickEvent>.CreateNewStream("ClickStream")
    .Stream()
    .Filter(e => !string.IsNullOrEmpty(e.PageUrl))
    .GroupBySilently(
        e => e.PageUrl,                          // key selector: group by PageUrl
        stateStoreName: "ClickGroupStore")
    .AggregateSilently<string, int>(
        e => e.PageUrl,                          // key selector for aggregation
        (count, e) => count + 1,                 // aggregation function: increment
        stateStoreName: "ClickAggregateStore")
    .Sink(e => Console.WriteLine($"Page: {e.PageUrl}"))
    .Build();

stream.Start();

The key idea here is the "silently" suffix. Cortex offers grouping and aggregation in two modes:

ModeMethodBehavior
EmittingGroupBy / AggregateEmits KeyValuePair<TKey, Aggregate> downstream whenever new data arrives.
SilentGroupBySilently / AggregateSilentlyUpdates the state store behind the scenes; the original event keeps flowing down the pipeline unchanged.

The silent variants are what you want when you need to maintain state and keep the original events moving — the aggregate is tracked in the background while the ClickEvent continues to the sink. The AggregateSilently<TKey, TAgg> call is generic over the key type and the aggregate type; here we key on string (the page URL) and accumulate an int count.

Feed it some traffic:

var random = new Random();
var pages = new[] { "/home", "/about", "/contact", "/products" };

while (true)
{
    var click = new ClickEvent
    {
        PageUrl = pages[random.Next(pages.Length)],
        Timestamp = DateTime.UtcNow
    };

    stream.Emit(click);
    Thread.Sleep(100); // simulate a click rate
}

Reading the state back

The counts live in the named state store. Pull it off the stream and read it like a dictionary:

var aggregateStore = stream
    .GetStateStoreByName<InMemoryStateStore<string, int>>("ClickAggregateStore");

if (aggregateStore != null)
{
    Console.WriteLine("\nAggregated Click Counts:");
    foreach (var kvp in aggregateStore.GetAll())
    {
        Console.WriteLine($"Page: {kvp.Key}, Clicks: {kvp.Value}");
    }
}

GetStateStoreByName<T>(name) returns the typed store, and from there GetAll() gives you every key/value pair while Get(key) fetches a single one. By default the store is an InMemoryStateStore<TKey, TVal>, but the store type is pluggable — more on that below.

Plug in a real broker

Here is the payoff for keeping the source separate from the pipeline. So far we have been pushing events with stream.Emit(...). To feed the exact same pipeline from Kafka, you only change one thing: pass a Kafka source operator to .Stream(...).

Add the connector package:

dotnet add package Cortex.Streams.Kafka

Then attach the source:

using Cortex.Streams;
using Cortex.Streams.Kafka;

var source = new KafkaSourceOperator<ClickEvent>(
    bootstrapServers: "localhost:9092",
    topic: "clicks");

var stream = StreamBuilder<ClickEvent>.CreateNewStream("ClickStream")
    .Stream(source)                              // <- the only change
    .Filter(e => !string.IsNullOrEmpty(e.PageUrl))
    .GroupBySilently(
        e => e.PageUrl,
        stateStoreName: "ClickGroupStore")
    .AggregateSilently<string, int>(
        e => e.PageUrl,
        (count, e) => count + 1,
        stateStoreName: "ClickAggregateStore")
    .Sink(e => Console.WriteLine($"Page: {e.PageUrl}"))
    .Build();

stream.Start();
// No more Emit() calls — the Kafka source drives the pipeline.

Everything downstream — the filter, the grouping, the aggregation, the state store — is byte-for-byte identical. The only difference is that .Stream(source) makes the pipeline source-attached, so it consumes from the clicks topic on its own instead of waiting for Emit.

The Kafka source uses JSON deserialization by default, so a ClickEvent POCO maps straight onto a JSON payload with no extra setup. To produce back to Kafka, the mirror image is new KafkaSinkOperator<ClickEvent>(bootstrapServers: "localhost:9092", topic: "clicks") passed to .Sink(...). For Avro, Protobuf, or other binary formats, supply a custom IDeserializer<T> / ISerializer<T> to the operator's constructor.

This is the whole point of the design: develop and test in-app with Emit, then attach a broker for production without rewriting your processing logic.

The ecosystem

The quickstart touches two packages, but Cortex is deliberately modular — you pull in only what you use.

  • Cortex.Streams — the fluent pipeline API: Map, Filter, Sink, grouping, aggregation, and JoinStream for windowed stream-stream joins.
  • Cortex.States — state management with nine state stores: an in-memory store plus eight persistent backends — RocksDB, SQL Server, PostgreSQL, SQLite, ClickHouse, MongoDB, Cassandra, and DuckDB. Swap the store type without changing pipeline code.
  • Cortex.Mediator — a Mediator/CQRS implementation for Vertical Slice Architecture, with optional FluentValidation and transactional behaviors.
  • Cortex.Telemetry — OpenTelemetry integration. Add .WithTelemetry(new OpenTelemetryProvider()) before .Stream(...) and your pipeline emits metrics and traces.
  • Cortex.TypesOneOf, AllOf, AnyOf, and related complex types.
  • Connectors — Kafka, Pulsar, RabbitMQ, AWS SQS, Azure Service Bus, Elasticsearch (sink), S3 (sink), Azure Blob (sink), files, and HTTP — each a separate NuGet package following the same .Stream(...) / .Sink(...) shape.
  • CDC adapters — change data capture for SQL Server, PostgreSQL, and MongoDB, so you can turn database changes into a stream.

Because state is pluggable, the click-tracking example can be made durable without touching the processing logic — you construct a persistent store and hand it to the aggregation operator. The store types share a small key/value API; here is RocksDB on its own:

using Cortex.States.RocksDb;

var store = new RocksDbStateStore<string, int>("ExampleStateStore", "./data");
store.Put("k", 1);
Console.WriteLine(store.Get("k"));  // 1

To use that store in the pipeline instead of the default in-memory one, pass it to the aggregation operator via its stateStore: parameter alongside stateStoreName:. The operators do not care which backend they write to — that decision lives entirely in the store you configure.

Where to go next

You now have the full arc: an in-app stream, a stateful aggregation read back from a state store, and the same pipeline fed live from Kafka. From here:

Cortex is open source under the MIT license. If it saves you a trip to the JVM, give the repo a star and tell us what you build.

Ready to Transform Your Business?

Let's discuss how we can help you build innovative solutions that drive real results.