Consume JSON events from a Kafka topic
Attach a KafkaSourceOperator to a stream to consume and deserialize JSON messages from a Kafka topic. JSON (de)serialization is the default; pass a custom IDeserializer<T> for Avro/Protobuf/Base64.
using Cortex.Streams;
using Cortex.Streams.Kafka;
public record PageView(string UserId, string Url, DateTime At);
// KafkaSourceOperator: bootstrapServers + topic; JSON deserialization by default.
var source = new KafkaSourceOperator<PageView>(
bootstrapServers: "localhost:9092",
topic: "page-views");
var stream = StreamBuilder<PageView>
.CreateNewStream("PageViewIngest")
.Stream(source) // external source attached
.Sink(pv => Console.WriteLine($"{pv.UserId} -> {pv.Url}"))
.Build();
stream.Start();
// A stream with an attached source pulls on its own; do NOT call Emit() here.
Console.ReadLine();
stream.Stop();