Two systems that never talk to each other still produce events that belong together. Your order service publishes an Order the moment a customer checks out. Your fulfillment system publishes a Shipment minutes or hours later, from a completely different process, often a completely different team. Nobody emits an "order-and-shipment" event, because nobody owns both.
Correlating those two streams is the job of a stream-stream join, and it is genuinely awkward to do by hand. This post walks through doing it properly with the Cortex Data Framework and its StreamStreamJoinOperator.
Why this is hard without the right tool
The naive options all fall down:
- A database join assumes both sides are sitting still in tables. They aren't. They are unbounded streams arriving at different times. By the time you could query, the data may not be there yet.
- A simple in-memory buffer ("hold orders, wait for shipments") works until you ask the hard questions: How long do you wait? What happens to the order that never ships? When do you free the memory? How do you handle a shipment that arrives a few seconds after your window closed?
A stream-stream join answers all of those. It buffers events from both sides, matches them on a shared key within a time window, emits the result, and cleans up expired state on a timer so memory does not grow forever. You configure the window and the semantics; the operator handles the bookkeeping.
The classic shape of the problem: match each order with its shipment, if it shipped within an hour.
The model
Three records: the left input, the right input, and the joined output. These are the exact shapes from the Cortex docs.
public record Order(string OrderId, int CustomerId, decimal Amount, DateTime Timestamp);
public record Shipment(string ShipmentId, string OrderId, string Carrier, DateTime ShippedAt);
public record OrderShipment(Order Order, Shipment? Shipment, bool IsShipped);
Two things to notice. Both inputs carry the shared key (OrderId) and a timestamp (Order.Timestamp, Shipment.ShippedAt). The join uses the key to find candidate matches and the timestamps to decide whether two events fall inside the same window. On the output, Shipment? is nullable on purpose: depending on the join type, an order may be emitted before, or without ever seeing, a shipment.
The join operator
StreamStreamJoinOperator<TLeft, TRight, TKey, TOut> takes six arguments. Here it is fully wired:
var joinOperator = new StreamStreamJoinOperator<Order, Shipment, string, OrderShipment>(
// 1 & 2 - key selectors: how to extract the join key from each side
order => order.OrderId,
shipment => shipment.OrderId,
// 3 & 4 - timestamp selectors: which field places each event on the timeline
order => order.Timestamp,
shipment => shipment.ShippedAt,
// 5 - join function: build the output; right may be null
(order, shipment) => new OrderShipment(order, shipment, shipment != null),
// 6 - configuration: window size + semantics
StreamJoinConfiguration.InnerJoin(TimeSpan.FromHours(1)));
The four generic parameters are the left type (Order), the right type (Shipment), the key type (string), and the output type (OrderShipment). The six constructor arguments, in order:
- Left key selector
order => order.OrderId— pulls the join key from a left event. - Right key selector
shipment => shipment.OrderId— pulls the matching key from a right event. Both must produce the sameTKey. - Left timestamp selector
order => order.Timestamp— where the order sits on the timeline. - Right timestamp selector
shipment => shipment.ShippedAt— where the shipment sits. The operator compares these to decide if two events are within the window. - Join function
(order, shipment) => ...— producesTOut. The right argument can be null (for unmatched left events in a left/outer join), so guard accordingly. - Configuration — the window and the join semantics. Here, a one-hour inner join via the
StreamJoinConfiguration.InnerJoin(...)factory.
Note the argument order: both key selectors come first, then both timestamp selectors, then the join function, then the configuration.
Wiring it: the left stream drives the pipeline
This is the part that trips people up the first time. The join has two inputs but the Cortex pipeline has one entry point. The left side is wired into the stream and drives the pipeline; the right side is fed directly into the operator with ProcessRight.
var orderStream = StreamBuilder<Order>.CreateNewStream("OrderShipmentJoin")
.Stream() // in-app source; we push with Emit(...)
.JoinStream(joinOperator) // the join sits in the pipeline
.Sink(result =>
{
Console.WriteLine($"Order {result.Order.OrderId} shipped via {result.Shipment?.Carrier}!");
NotifyCustomer(result.Order.CustomerId, result.Shipment);
})
.Build();
orderStream.Start();
// LEFT side: orders flow through the stream
orderStream.Emit(new Order("ORD-001", 100, 150.00m, DateTime.UtcNow));
orderStream.Emit(new Order("ORD-002", 101, 75.50m, DateTime.UtcNow));
// RIGHT side: shipments are pushed straight into the operator
joinOperator.ProcessRight(new Shipment("SHP-001", "ORD-001", "FedEx", DateTime.UtcNow)); // matches ORD-001
joinOperator.ProcessRight(new Shipment("SHP-002", "ORD-002", "UPS", DateTime.UtcNow)); // matches ORD-002
Output:
Order ORD-001 shipped via FedEx!
Order ORD-002 shipped via UPS!
StreamBuilder<Order> is single-generic. .Stream() with no arguments creates an in-app source you feed with Emit(...). .JoinStream(joinOperator) inserts the join. The .Sink(...) receives each OrderShipment the join produces. And critically, joinOperator.ProcessRight(shipment) is the only way the right side gets data — it never goes through the builder.
The dual-source pattern
In production the two sides usually come from two different places. A common setup is Kafka on the left and a separate consumer pushing the right side. The left stream takes a source operator instead of a bare .Stream():
var joinOp = new StreamStreamJoinOperator<Order, Shipment, string, OrderShipment>(
o => o.OrderId, s => s.OrderId,
o => o.Timestamp, s => s.ShippedAt,
(o, s) => new OrderShipment(o, s, s != null),
StreamJoinConfiguration.InnerJoin(TimeSpan.FromHours(1)));
// LEFT side from Kafka - the source operator drives the pipeline, no manual Emit
var orderStream = StreamBuilder<Order>.CreateNewStream("OrderShipmentJoin")
.Stream(new KafkaSourceOperator<Order>(
bootstrapServers: "localhost:9092",
topic: "orders"))
.JoinStream(joinOp)
.Sink(result => NotifyCustomer(result.Order.CustomerId, result.Shipment))
.Build();
orderStream.Start();
The right side never goes through a builder — it just needs something that calls joinOp.ProcessRight(...) for each message. Since KafkaSourceOperator is a source (it can only drive a stream), the right-side consumer is a plain loop over the underlying Confluent.Kafka client that Cortex.Streams.Kafka wraps:
// RIGHT side from a separate consumer - each message calls ProcessRight
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "shipment-join-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var shipmentConsumer = new ConsumerBuilder<Ignore, Shipment>(consumerConfig)
.SetValueDeserializer(new DefaultJsonDeserializer<Shipment>())
.Build();
shipmentConsumer.Subscribe("shipments");
var cts = new CancellationTokenSource();
_ = Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
var result = shipmentConsumer.Consume(cts.Token);
joinOp.ProcessRight(result.Message.Value); // feed the right side
}
}, cts.Token);
The right side does not have to be Kafka. ProcessRight is a plain method call, so an HTTP webhook works just as well:
// ASP.NET Core endpoint feeding the right side from a carrier webhook
app.MapPost("/webhooks/shipments", (Shipment shipment) =>
{
joinOp.ProcessRight(shipment);
return Results.Ok();
});
KafkaSourceOperator<T> ships in the Cortex.Streams.Kafka package and defaults to JSON (de)serialization; pass a custom IDeserializer<T> (the package includes DefaultProtobufDeserializer<T> and DefaultBase64StringDeserializer<T>) if you are on Protobuf or an opaque binary format. Any right-side source works: a message queue, a polling loop, a webhook — anything that can call ProcessRight.
Join semantics
The fifth question from earlier — "what about the order that never ships?" — is answered by the join type. It controls what happens to unmatched events on each side.
| Join type | Left unmatched | Right unmatched | Use case |
|---|---|---|---|
| Inner | Dropped | Dropped | Only care about matched pairs |
| Left | Emitted (with null right) | Dropped | Must process every left event |
| Right | Dropped | Emitted (with null left) | Must process every right event |
| Outer | Emitted | Emitted | Need full visibility of both streams |
You set the type either through a factory method or directly on StreamJoinConfiguration:
// Factory methods - window only, sensible defaults for the rest
var inner = StreamJoinConfiguration.InnerJoin(TimeSpan.FromMinutes(10));
var left = StreamJoinConfiguration.LeftJoin(TimeSpan.FromMinutes(10));
var outer = StreamJoinConfiguration.OuterJoin(TimeSpan.FromMinutes(10));
// Full control over every knob
var config = new StreamJoinConfiguration
{
WindowSize = TimeSpan.FromMinutes(10), // how long to buffer for a match
JoinType = StreamJoinType.Left, // Inner | Left | Right | Outer
GracePeriod = TimeSpan.FromMinutes(2), // extra wait for late events
CleanupInterval = TimeSpan.FromSeconds(30), // how often expired events are purged
MaxBufferSizePerKey = 1000 // cap per key to bound memory
};
The configuration knobs:
WindowSize— the maximum time gap between a left and a right event for them to count as a match. A shipment that arrives 90 minutes after a one-hour-window order will not match.JoinType— the semantics from the table above (StreamJoinType.Inner,Left,Right, orOuter).GracePeriod— extra time the operator waits past the window before treating an event as permanently unmatched. This absorbs late arrivals.CleanupInterval— how often the internal timer sweeps out expired events. Shorter intervals mean lower peak memory at the cost of more frequent housekeeping.MaxBufferSizePerKey— a hard cap on buffered events per key, your safety valve against a single hot key consuming all available memory. When the cap is hit, the oldest buffered element for that key is dropped to make room.
Unmatched and late events
For a left or outer join, an order that hits the end of its window without a shipment is still emitted — with a null right side. That is why the join function and the output record must tolerate null:
var orderShipmentJoin = new StreamStreamJoinOperator<Order, Shipment, string, OrderShipment>(
o => o.OrderId, s => s.OrderId,
o => o.Timestamp, s => s.ShippedAt,
(order, shipment) => new OrderShipment(order, shipment, shipment != null),
new StreamJoinConfiguration
{
WindowSize = TimeSpan.FromHours(1),
JoinType = StreamJoinType.Left, // emit unshipped orders too
GracePeriod = TimeSpan.FromMinutes(5) // tolerate slightly late shipments
});
// ... in the sink:
.Sink(result =>
{
if (result.IsShipped)
Console.WriteLine($"Order {result.Order.OrderId} shipped via {result.Shipment!.Carrier}");
else
EscalateUnshippedOrder(result.Order); // window expired, no shipment ever arrived
})
The lifecycle of a single order under a left join looks like this:
- The order arrives and is buffered.
- If a shipment with the same
OrderIdarrives within the window, the pair is emitted withIsShipped = true. - If no shipment arrives by the window's end, the operator waits out the grace period in case one is merely late.
- When window plus grace elapse with no match, the order is emitted with a
nullshipment andIsShipped = false, then purged on the next cleanup pass.
For an outer join, the same applies in reverse: a shipment whose order was never seen (perhaps the order event was lost) is emitted with a null left side. Always handle that: (left, right) => left?.OrderId ?? right!.OrderId is the typical defensive pattern when either side can be absent.
Production notes
A stream-stream join is stateful by definition, and that state lives in memory. Buffered state grows roughly with window size x event rate x key cardinality x event size. A 24-hour window over a high-volume stream with millions of distinct keys will hurt. The good news is every factor in that product has a lever:
| Factor | Effect | Lever |
|---|---|---|
| Window size | Bigger window, more buffered events | Use the smallest window that still catches real matches |
| Event rate | Higher rate, more events in flight | Pre-aggregate or sample upstream |
| Key cardinality | More keys, more buffers | Set MaxBufferSizePerKey |
| Event size | Fat events, fat buffers | Project to only the fields the join function needs |
A memory-conscious configuration:
var config = new StreamJoinConfiguration
{
WindowSize = TimeSpan.FromMinutes(5), // tight window
JoinType = StreamJoinType.Inner,
MaxBufferSizePerKey = 100, // cap per key
CleanupInterval = TimeSpan.FromSeconds(10) // sweep aggressively
};
If you only need the order ID, customer ID, and carrier in the output, do not carry the entire order payload through the join — map down to a lean projection before .JoinStream(...) so the buffers stay small.
Watch the buffers in production. The operator exposes live counts:
Console.WriteLine(
$"Left buffer: {joinOp.GetLeftBufferCount()}, " +
$"Right buffer: {joinOp.GetRightBufferCount()}");
Wire these into your metrics. A buffer that climbs without falling back means one side is starved (the other stream stopped arriving), a key is hot, or your window is too wide for the volume.
Dispose when done. The operator runs an internal cleanup timer; disposing it stops the timer and releases buffers:
await orderStream.StopAsync();
joinOp.Dispose();
Where to go next
A stream-stream join is the right tool when both sides are live streams. If one side is slowly-changing reference data — a customer table, a product catalog — reach for a stream-table join instead, which looks up against a state store rather than buffering a second stream.
- Docs — cortex.buildersoft.io/get-started/home
- GitHub — github.com/buildersoftio/cortex
- Recipes — on-site recipes
- Discord — join the community
Install with dotnet add package Cortex.Streams (add Cortex.Streams.Kafka for the Kafka source). Cortex Data Framework v3.1.2 targets .NET 6.0 and up.