Cortex Recipes

Short, runnable patterns for building real-time pipelines in .NET. Copy one, adapt it, ship it — every snippet uses the Cortex v3 fluent API.

Ingest

2 recipes

Consume JSON events from a Kafka topic

Attach a KafkaSourceOperator to a stream to consume and deserialize JSON messages from a Kafka topic. JSON (de)serialization is the default; pass a custom IDeserializer<T> for Avro/Protobuf/Base64.

using Cortex.Streams;
using Cortex.Streams.Kafka;

public record PageView(string UserId, string Url, DateTime At);

// KafkaSourceOperator: bootstrapServers + topic; JSON deserialization by default.
var source = new KafkaSourceOperator<PageView>(
    bootstrapServers: "localhost:9092",
    topic: "page-views");

var stream = StreamBuilder<PageView>
    .CreateNewStream("PageViewIngest")
    .Stream(source)                       // external source attached
    .Sink(pv => Console.WriteLine($"{pv.UserId} -> {pv.Url}"))
    .Build();

stream.Start();
// A stream with an attached source pulls on its own; do NOT call Emit() here.
Console.ReadLine();
stream.Stop();

In-app stream for tests and manual emit

Call .Stream() with no arguments to create an in-app source you feed yourself. Ideal for unit tests, demos, or pushing records from your own code with stream.Emit(x).

using Cortex.Streams;

// No external source: you push records in manually with Emit().
var stream = StreamBuilder<int>
    .CreateNewStream("Numbers")
    .Stream()                         // in-app source (no args)
    .Filter(x => x % 2 == 0)
    .Sink(Console.WriteLine)
    .Build();

stream.Start();

stream.Emit(1);   // dropped by filter
stream.Emit(2);   // -> 2
stream.Emit(3);
stream.Emit(4);   // -> 4

stream.Stop();

Transform

2 recipes

Map: reshape each record

Use .Map(x => ...) to transform every record from one shape to another - converting types, computing values, or projecting fields as data flows through the pipeline.

using Cortex.Streams;

var stream = StreamBuilder<int>
    .CreateNewStream("DoubleStream")
    .Stream()
    .Map(x => x * 2)                  // transform each item
    .Sink(Console.WriteLine)
    .Build();

stream.Start();
for (int i = 1; i <= 5; i++)
    stream.Emit(i);                  // -> 2, 4, 6, 8, 10
stream.Stop();

Filter then Map: clean and enrich in one pipeline

Chain .Filter(...) to drop unwanted records and .Map(...) to enrich what remains. Filtering early reduces work for downstream operators.

using Cortex.Streams;

public record Order(string Id, decimal Amount, string Status);
public record OrderEvent(string Id, decimal Amount, DateTime ProcessedAt);

var stream = StreamBuilder<Order>
    .CreateNewStream("OrderPipeline")
    .Stream()
    .Filter(o => o.Status == "Confirmed")          // keep only confirmed
    .Map(o => new OrderEvent(o.Id, o.Amount, DateTime.UtcNow)) // enrich
    .Sink(e => Console.WriteLine($"{e.Id} @ {e.ProcessedAt:O}"))
    .Build();

stream.Start();
stream.Emit(new Order("ORD-1", 120m, "Confirmed")); // passes through
stream.Emit(new Order("ORD-2", 50m, "Pending"));    // dropped
stream.Stop();

Aggregate & Window

2 recipes

Stateful click-count with GroupBySilently + AggregateSilently

Group events by key and maintain a running count in a named state store while data keeps flowing, then read the aggregated counts back via GetStateStoreByName.

using Cortex.Streams;
using Cortex.States;

public record Click(string UserId, string Button);

var stream = StreamBuilder<Click>
    .CreateNewStream("ClickCounter")
    .Stream()
    .GroupBySilently(c => c.UserId, stateStoreName: "group-store")
    .AggregateSilently<string, int>(
        c => c.UserId,
        (count, _) => count + 1,           // increment per click
        stateStoreName: "click-counts")
    .Sink(c => Console.WriteLine($"processed click for {c.UserId}"))
    .Build();

stream.Start();
stream.Emit(new Click("u1", "buy"));
stream.Emit(new Click("u1", "cart"));
stream.Emit(new Click("u2", "buy"));

// Read aggregated state back out
var store = stream.GetStateStoreByName<InMemoryStateStore<string, int>>("click-counts");
foreach (var kv in store.GetAll())
    Console.WriteLine($"{kv.Key} => {kv.Value} clicks");

stream.Stop();

Tumbling window: count events per fixed interval

Group events into fixed-size, non-overlapping time windows with .TumblingWindow<TKey>(...), then map each window's Items to a count and other summary metrics.

using System.Linq;
using Cortex.Streams;
using Cortex.Streams.Operators.Windows;

public record PageView(string PageUrl, string UserId, DateTime Timestamp);

var stream = StreamBuilder<PageView>
    .CreateNewStream("TrafficAnalytics")
    .Stream()
    .TumblingWindow<string>(
        keySelector: pv => pv.PageUrl,
        timestampSelector: pv => pv.Timestamp,
        windowSize: TimeSpan.FromMinutes(15))
    .Map(window => new
    {
        Page = window.Key,
        ViewCount = window.Items.Count,
        UniqueVisitors = window.Items.Select(pv => pv.UserId).Distinct().Count(),
        window.WindowStart,
        window.WindowEnd
    })
    .Sink(s => Console.WriteLine(
        $"{s.Page}: {s.ViewCount} views, {s.UniqueVisitors} uniques " +
        $"[{s.WindowStart:HH:mm}-{s.WindowEnd:HH:mm}]"))
    .Build();

stream.Start();

Join

2 recipes

Stream-table enrichment with a state-store lookup

Enrich streaming events by looking up reference data held in a state store. .Join is an inner join (drops unmatched); .LeftJoin always emits with a null right side for missing keys.

using Cortex.Streams;
using Cortex.States;

public record Order(string OrderId, int CustomerId, decimal Amount);
public record Customer(int Id, string Name, string Tier);
public record EnrichedOrder(string OrderId, decimal Amount, string CustomerName, string Tier);

// Reference table (a state store) - pre-populate or update while running
var customers = new InMemoryStateStore<int, Customer>("CustomerStore");
customers.Put(1001, new Customer(1001, "Alice", "Gold"));
customers.Put(1002, new Customer(1002, "Bob", "Silver"));

var stream = StreamBuilder<Order>
    .CreateNewStream("OrderEnrichment")
    .Stream()
    .Join(
        customers,
        order => order.CustomerId,                 // key selector
        (order, c) => new EnrichedOrder(order.OrderId, order.Amount, c.Name, c.Tier))
    .Sink(e => Console.WriteLine($"{e.OrderId}: {e.CustomerName} ({e.Tier})"))
    .Build();

stream.Start();
stream.Emit(new Order("ORD-1", 1001, 150m));  // emits - Alice exists
stream.Emit(new Order("ORD-2", 9999, 200m));  // dropped - no match (inner join)
stream.Stop();

Stream-stream windowed join: match orders with shipments

Correlate two unbounded streams on a shared key within a time window. The left stream drives the pipeline via Emit; feed the right stream through join.ProcessRight from another source.

using Cortex.Streams;
using Cortex.Streams.Operators;

public record Order(string OrderId, int CustomerId, DateTime Timestamp);
public record Shipment(string ShipmentId, string OrderId, string Carrier, DateTime ShippedAt);
public record OrderShipment(Order Order, Shipment? Shipment, bool IsShipped);

var join = new StreamStreamJoinOperator<Order, Shipment, string, OrderShipment>(
    order => order.OrderId,                 // left key
    shipment => shipment.OrderId,           // right key
    order => order.Timestamp,               // left timestamp
    shipment => shipment.ShippedAt,         // right timestamp
    (order, shipment) => new OrderShipment(order, shipment, shipment != null),
    StreamJoinConfiguration.InnerJoin(TimeSpan.FromHours(1)));

var stream = StreamBuilder<Order>
    .CreateNewStream("OrderShipmentJoin")
    .Stream()
    .JoinStream(join)
    .Sink(r => Console.WriteLine($"Order {r.Order.OrderId} via {r.Shipment?.Carrier}"))
    .Build();

stream.Start();
stream.Emit(new Order("ORD-1", 100, DateTime.UtcNow));   // feed LEFT (drives pipeline)
join.ProcessRight(new Shipment("SHP-1", "ORD-1", "FedEx", DateTime.UtcNow)); // feed RIGHT

// monitor: join.GetLeftBufferCount() / join.GetRightBufferCount();
stream.Stop();
join.Dispose();             // stops the cleanup timer

State

2 recipes

Persist aggregate state with RocksDB

Back a stateful aggregation with a durable RocksDbStateStore so counts survive restarts. Pass the store instance to AggregateSilently and dispose it on shutdown.

using Cortex.Streams;
using Cortex.States.RocksDb;

// Durable, on-disk key-value store: ctor is (name, dbPath)
var wordCounts = new RocksDbStateStore<string, int>("WordCountStore", "./data/rocksdb");

var stream = StreamBuilder<string>
    .CreateNewStream("WordCount")
    .Stream()
    .AggregateSilently<string, int>(
        word => word,                       // group by the word
        (count, _) => count + 1,            // increment
        stateStoreName: "WordCountStore",
        stateStore: wordCounts)             // persistent backend
    .Sink(word => Console.WriteLine($"counted: {word}"))
    .Build();

stream.Start();
foreach (var w in new[] { "apple", "banana", "apple" })
    stream.Emit(w);

Console.WriteLine($"apple => {wordCounts.Get("apple")}");  // 2

stream.Stop();
wordCounts.Dispose();        // flush + release resources

Analytical state with a DuckDB store

Use DuckDbKeyValueStateStore when you want columnar, analytics-friendly state with native Parquet/CSV export. Put/Get values directly and export the table for downstream analysis.

using Cortex.States.DuckDb;

public record OrderSummary(decimal Total, string Status);

// Persistent DuckDB-backed key-value store
var store = new DuckDbKeyValueStateStore<string, OrderSummary>(
    name: "OrderStore",
    databasePath: "./data/orders.duckdb",
    tableName: "Orders");

store.Put("ORD-001", new OrderSummary(99.99m, "Completed"));
store.Put("ORD-002", new OrderSummary(150.00m, "Pending"));

var one = store.Get("ORD-001");
Console.WriteLine($"ORD-001 total: {one.Total}, count: {store.Count()}");

// Native analytics export
store.ExportToParquet("./exports/orders.parquet");
store.Checkpoint();          // flush to disk for persistent DBs
store.Dispose();

Deliver

3 recipes

Publish results to a Kafka topic (sink)

Send pipeline output to Kafka with KafkaSinkOperator passed to .Sink(...). JSON serialization is the default; supply a custom ISerializer<T> for other formats.

using Cortex.Streams;
using Cortex.Streams.Kafka;

public record OrderEvent(string OrderId, decimal Amount);

var sink = new KafkaSinkOperator<OrderEvent>(
    bootstrapServers: "localhost:9092",
    topic: "order-events");

var stream = StreamBuilder<OrderEvent>
    .CreateNewStream("OrderPublisher")
    .Stream()                       // in-app source: we emit, it publishes
    .Sink(sink)
    .Build();

stream.Start();
stream.Emit(new OrderEvent("ORD-1", 42m));  // serialized + sent to Kafka
stream.Stop();

Index documents into Elasticsearch (sink)

Write records to an Elasticsearch index in bulk. The ElasticsearchSinkOperator takes a configured ElasticsearchClient and an index name; failed documents are buffered in a state store and retried automatically.

using Cortex.Streams;
using Cortex.Streams.Elasticsearch;
using Elastic.Clients.Elasticsearch;

public record LogEntry(string Service, string Level, string Message, DateTime At);

// Configure the Elasticsearch 8.x client
var client = new ElasticsearchClient(new Uri("http://localhost:9200"));

// ctor: (client, indexName, batchSize = 50, flushInterval = null, retryInterval = null, ...)
var sink = new ElasticsearchSinkOperator<LogEntry>(
    client: client,
    indexName: "app-logs",
    batchSize: 50,
    flushInterval: TimeSpan.FromSeconds(5));

var stream = StreamBuilder<LogEntry>
    .CreateNewStream("LogIndexer")
    .Stream()
    .Filter(l => l.Level != "Debug")
    .Sink(sink)
    .Build();

stream.Start();
stream.Emit(new LogEntry("api", "Error", "boom", DateTime.UtcNow));
stream.Stop();

FanOut: broadcast one record to multiple sinks

Send the same record to several destinations in one pass with .FanOut(...). Use named .To(...) sinks, add a predicate for filtered delivery, or .ToWithTransform(...) to reshape per sink.

using Cortex.Streams;

public record Order(string Id, decimal Amount, bool IsPriority);
public record OrderEvent(string Id, string State, DateTime At);

var stream = StreamBuilder<Order>
    .CreateNewStream("OrderFanOut")
    .Stream()
    .FanOut(fanOut => fanOut
        // all orders -> database
        .To("database", o => SaveToDatabase(o))
        // only high-value -> alerts (filtered sink)
        .To("alerts", o => o.Amount > 10_000, o => SendAlert(o))
        // transform per sink before publishing
        .ToWithTransform("kafka-events",
            o => new OrderEvent(o.Id, "Created", DateTime.UtcNow),
            evt => PublishEvent(evt)))
    .Build();

stream.Start();
stream.Emit(new Order("ORD-1", 12_000m, true));
stream.Stop();

void SaveToDatabase(Order o) { }
void SendAlert(Order o) { }
void PublishEvent(OrderEvent e) { }

Operate

2 recipes

Add OpenTelemetry metrics and traces

Wire telemetry into a pipeline with .WithTelemetry(provider) placed BEFORE .Stream(...). Cortex then records per-operator metrics and traces automatically.

using Cortex.Streams;
using Cortex.Telemetry.OpenTelemetry;

public record Order(string Id, decimal Amount);

// Telemetry must be configured before .Stream(...)
var telemetry = new OpenTelemetryProvider();

var stream = StreamBuilder<Order>
    .CreateNewStream("InstrumentedOrders")
    .WithTelemetry(telemetry)            // <-- before Stream()
    .Stream()
    .Filter(o => o.Amount > 0)
    .Map(o => o with { Amount = o.Amount * 1.2m })
    .Sink(o => Console.WriteLine($"{o.Id}: {o.Amount}"))
    .Build();

stream.Start();
stream.Emit(new Order("ORD-1", 100m));
stream.Stop();

Backpressure, buffering, and graceful shutdown

Enable buffered async processing with .WithPerformanceOptions(...) for non-blocking EmitAndForget and a configurable backpressure strategy, then drain the buffer cleanly with StopAsync.

using Cortex.Streams;
using Cortex.Streams.Performance;

var stream = StreamBuilder<int>
    .CreateNewStream("FastStream")
    .WithPerformanceOptions(new StreamPerformanceOptions
    {
        EnableBufferedProcessing = true,
        BufferCapacity = 10_000,
        BackpressureStrategy = BackpressureStrategy.Block,
        ConcurrencyLevel = 4,
        OnItemDropped = (item, reason) =>
            Console.WriteLine($"dropped {item}: {reason}")
    })
    .Stream()
    .Map(x => x * 2)
    .Sink(Console.WriteLine)
    .Build();

stream.Start();

stream.EmitAndForget(21);          // non-blocking (requires buffering)
await stream.EmitAsync(42);        // awaits buffer space, not processing

var stats = stream.GetBufferStatistics();
Console.WriteLine($"buffer {stats.UtilizationPercent:F1}%");

// Graceful shutdown: waits for the buffer to drain
await stream.StopAsync();
← Back to Cortex

Ready to build your next streaming application?

Browse the full documentation, or dive into the source on GitHub.