The problem: your primary database is not the only place that needs to know
Almost every non-trivial system ends up with more than one copy of the truth. The primary database holds the records; then there is a cache that serves reads faster, a search index that powers full-text queries, and an analytics store that the business team queries for dashboards. The primary is authoritative. Everything else is a derived view that has to stay in sync with it.
Keeping those views fresh is where it gets painful. The two common approaches both have well-known failure modes:
- Polling — periodically running
SELECT ... WHERE updated_at > @lastagainst the primary. It is laggy by construction (you only see changes at the next poll), it hammers the database with repeated scans, and it silently misses hard deletes unless you build tombstones yourself. - Dual writes — having the application write to the database and the cache/index/queue in the same code path. This is unreliable: the second write can fail after the first succeeds, and now the two stores disagree with no record of how. There is no transaction spanning your database and your search cluster.
Change Data Capture (CDC) sidesteps both. Instead of asking the database "what changed?", CDC reads the database's own change log — the write-ahead log, the CDC tables, the oplog — and turns every committed INSERT, UPDATE, and DELETE into an event. Because it reads the same log the database uses for durability and replication, it sees every change, in commit order, including deletes, with minimal added load.
This post shows how to do CDC in .NET with the Cortex Data Framework. The class names, constructor parameters, and settings here are taken directly from the Cortex CDC documentation.
How Cortex does CDC
In Cortex, CDC is not a separate product bolted on the side. As the docs put it, "CDC is implemented as a source operator that integrates with various database systems to capture and emit change events into a stream processing pipeline."
That is the key idea. A CDC source is just another source operator — the same slot you would otherwise fill with a Kafka consumer, an HTTP endpoint, or an in-app emitter. You attach it with .Stream(source), then the change events flow through the exact same Map / Filter / Sink / AddBranch operators you already use. Nothing about the downstream pipeline is CDC-specific.
There are three adapters, each its own NuGet package:
| Database | Package | Source operator | Settings type | Change record |
|---|---|---|---|---|
| PostgreSQL | Cortex.Streams.CDC.PostgreSQL | PostgresSourceOperator | PostgresSettings | PostgresRecord |
| SQL Server | Cortex.Streams.CDC.MSSqlServer | SqlServerCDCSourceOperator | SqlServerSettings | SqlServerRecord |
| MongoDB | Cortex.Streams.CDC.MongoDb | MongoDbCDCSourceOperator | MongoDbCDCSettings | MongoDbRecord |
The change-event shape is consistent across adapters. Every record carries an operation type (insert / update / delete), the row or document data, and metadata such as the change time. In every adapter the record exposes record.Operation (a string such as "INSERT", "UPDATE", "DELETE", or "InitialLoad"), record.Data (a Dictionary<string, object> of column or field names to values, which you can index with record.Data["Severity"]), and record.ChangeTime.
A worked example: streaming PostgreSQL changes end to end
PostgreSQL is the most illustrative case because Cortex builds on logical replication. Per the docs, the PostgresSourceOperator "uses logical replication and the wal2json plugin to capture changes from PostgreSQL in real-time. It optionally sets up or verifies a publication and logical replication slot if configured to do so."
Start by installing the packages:
dotnet add package Cortex.Streams
dotnet add package Cortex.States
dotnet add package Cortex.Streams.CDC.PostgreSQL
1. Configure the source
PostgresSettings controls the behaviour of the capture. The constructor of PostgresSourceOperator takes the connection string, the schema, the table, the replication slot name and publication name, and the settings object. These are the exact parameter names from the docs:
using Cortex.Streams;
using Cortex.Streams.PostgreSQL;
using Cortex.States;
string connectionString = "Host=myHost;Database=myDB;Username=myUser;Password=myPass;";
string schemaName = "public";
string tableName = "Customers";
var postgresSettings = new PostgresSettings
{
DoInitialLoad = true, // snapshot the table first, then stream
PullInterval = TimeSpan.FromSeconds(3), // how often to read from the WAL
ConfigureCDCInServer = true, // create publication + slot if missing
ReplicaIdentity = ReplicaIdentityMode.Full // full row image for UPDATE/DELETE
};
var pgCdcOperator = new PostgresSourceOperator(
connectionString,
schemaName,
tableName,
slotName: "my_slot",
publicationName: "my_publication",
postgresSettings);
Two settings are worth calling out. ConfigureCDCInServer = true tells the operator to "create a publication (my_publication) and a logical replication slot (my_slot) if they do not exist" — convenient in development, though in production you will usually provision those by hand. And ReplicaIdentity = ReplicaIdentityMode.Full matters for deletes: Postgres only logs the full before image of a row when REPLICA IDENTITY is FULL, otherwise a DELETE event tells you little more than the primary key.
2. Attach, branch, project, and sink
Now wire the source into a pipeline. The change records arrive as PostgresRecord; from there it is ordinary Cortex. Here we route inserts and updates to a cache-upsert sink and deletes to a cache-eviction sink using AddBranch:
using System.Collections.Generic;
public record CacheEntry(string Key, Dictionary<string, object> Payload);
var stream = StreamBuilder<PostgresRecord>
.CreateNewStream("customers-cdc")
.Stream(pgCdcOperator)
// PostgresRecord.Operation is one of INSERT, UPDATE, DELETE, or InitialLoad
.AddBranch("upserts", branch =>
{
// inserts, updates, and the initial-load snapshot rows
branch
.Filter(r => r.Operation is "INSERT" or "UPDATE" or "InitialLoad")
.Map(r => new CacheEntry(
Key: r.Data["id"]?.ToString(),
Payload: r.Data))
.Sink(entry =>
Console.WriteLine($"UPSERT {entry.Key} -> {entry.Payload.Count} cols"));
})
.AddBranch("deletes", branch =>
{
// deletes: evict from the cache
branch
.Filter(r => r.Operation == "DELETE")
.Map(r => r.Data["id"]?.ToString())
.Sink(id =>
Console.WriteLine($"EVICT {id}"));
})
.Build();
stream.Start();
StreamBuilder<PostgresRecord> is the single-generic builder — you parameterise it only on the type entering the pipeline. .Stream(pgCdcOperator) attaches the CDC source; from then on the Filter, Map, AddBranch, and Sink operators behave identically to any other Cortex stream. Each AddBranch(...) defines an independent sub-pipeline, so inserts and deletes are handled by completely separate logic without any shared if/else.
In a real system the two sinks would write to Redis, Elasticsearch, or another Cortex sink operator instead of the console — but the structure stays exactly the same. (If a branch only needs a plain sink with no per-branch transform, .FanOut(fo => fo.To("name", item => ...)) is the lighter-weight alternative to AddBranch.)
Shutting down cleanly
When the process stops, stop the stream so the operator can persist its position:
stream.Stop();
Because the last processed log position is checkpointed (more on this below), the next start resumes from where it left off rather than replaying the whole table.
Operational features that make it production-grade
The capture loop is the easy part. What separates a toy from something you can run is how it handles restarts, duplicates, and observability. Cortex bakes these in.
Initial snapshot / load
When DoInitialLoad = true, the operator does a one-time full read before it starts streaming incremental changes. For Postgres this is "a single SELECT * FROM schema.table before streaming from the WAL"; for SQL Server it "performs a single pass over the table"; for MongoDB "the entire collection is read once." This is how a brand-new search index or cache gets bootstrapped to the current state of the data and then kept live — without a separate migration job. (The snapshot rows arrive with Operation == "InitialLoad", which is why the upsert branch above accepts that value too.)
Checkpointing / offsets
Each adapter records where it is in the change log so it can resume after a restart without losing or replaying data:
- PostgreSQL "maintains the last processed LSN in the state store to resume from the same position upon restart."
- SQL Server "stores last processed LSN (Log Sequence Number) plus a last-record hash."
- MongoDB "stores a resume token from the change stream plus a record hash."
By default that checkpoint lives in an in-memory store, which is fine for development but is lost on restart. For production you pass a persistent state store. Every adapter accepts one — typed as IDataStore<string, byte[]> — as a trailing constructor argument:
using Cortex.States.RocksDb;
var checkpointStore = new RocksDbStateStore<string, byte[]>("customers-cdc-checkpoint", "./data");
var cdcSourceOperator = new SqlServerCDCSourceOperator(
connectionString,
schemaName,
tableName,
sqlServerSettings,
checkpointStore);
With RocksDB backing the checkpoint, the operator resumes from the last processed LSN after a crash or deployment, "preventing data loss or duplication."
Duplicate handling
Polling and resume-token mechanics can re-deliver a change at the boundary of a restart. To guard against that, the adapters deduplicate by content. The SQL Server operator "computes an MD5 hash of each record's data" and "compares it with the hash of the last emitted record to avoid duplicates"; the Mongo and Postgres adapters keep a record hash alongside their checkpoint for the same reason. The best-practices guide notes the one thing you must get right for this to work: "ensure unique or stable column sets for hashing."
Telemetry
CDC operators integrate with Cortex's telemetry, "providing observability through integrated metrics and tracing." You enable it on the builder before attaching the source — .WithTelemetry(...) (like .WithPerformanceOptions(...) and .WithErrorHandling(...)) must come before .Stream(...):
using Cortex.Telemetry.OpenTelemetry;
var stream = StreamBuilder<PostgresRecord>
.CreateNewStream("customers-cdc")
.WithTelemetry(new OpenTelemetryProvider())
.Stream(pgCdcOperator)
.Sink(record => Console.WriteLine($"{record.Operation}"))
.Build();
Per-database notes and prerequisites
The three adapters present the same programming model, but the engines underneath are different and each has its own setup. Here is the high-level picture.
PostgreSQL — logical replication / WAL
PostgreSQL exposes changes through its write-ahead log via logical replication. Prerequisites from the docs:
- Set
wal_level = logicalinpostgresql.confand restart. - The connecting role needs replication privileges.
- The wal2json plugin must be installed (often bundled; on Debian/Ubuntu,
sudo apt-get install postgresql-14-wal2json). - If
ConfigureCDCInServer = false, create the publication and slot yourself:CREATE PUBLICATION my_publication FOR TABLE my_schema.my_table;andSELECT * FROM pg_create_logical_replication_slot('my_slot', 'wal2json');
SQL Server — CDC tables
SQL Server has built-in Change Data Capture that records row-level changes into shadow tables, which the operator polls with cdc.fn_cdc_get_all_changes_<capture_instance>. Prerequisites:
- SQL Server Agent must be running — CDC relies on it to capture changes in the background.
- You need
sysadminordb_ownerto enable CDC. WithConfigureCDCInServer = truethe operator runssys.sp_cdc_enable_db/sys.sp_cdc_enable_tablefor you; otherwise enable CDC manually first. - CDC is available from SQL Server 2008 (Enterprise), in Standard Edition from 2016 SP1, and in the Developer Edition used for local Docker setups. Watch transaction-log growth, since CDC increases log usage.
MongoDB — change streams
MongoDB CDC uses change streams, which "reliably capture changes from a replica set or sharded cluster without manual polling." Prerequisites:
- A replica set or sharded cluster — change streams do not work on a standalone node. For a single-node dev box, run
rs.initiate()once. - The user needs the
changeStreamprivilege (or oplog read access). - MongoDB 3.6+.
The Mongo operator is constructed from a driver IMongoDatabase, the collection name, and MongoDbCDCSettings, which uses Delay and MaxBackOffSeconds rather than PullInterval:
using Cortex.Streams.MongoDb;
using MongoDB.Driver;
var database = new MongoClient("mongodb://localhost:27017").GetDatabase("myDb");
var mongoCdcSettings = new MongoDbCDCSettings
{
DoInitialLoad = true,
Delay = TimeSpan.FromSeconds(3),
MaxBackOffSeconds = 60
};
var cdcOperator = new MongoDbCDCSourceOperator(database, "Products", mongoCdcSettings);
Note that each operator targets one table or collection. To monitor several, instantiate one operator per table (or, for Postgres, add multiple tables to a single publication).
Best practices
The Cortex CDC best-practices guide distils production experience into a short checklist:
- Use persistent state stores. In production, always back checkpoints with a reliable
IDataStore(RocksDB or another external store) so they survive restarts. - Pick sensible intervals. Tune
PullInterval(or the MongoDelay) to balance data freshness against database load — higher intervals reduce load but raise latency. - Keep deduplication honest. The hash-based mechanism only works if your column sets are unique and stable.
- Apply least privilege. Each database user needs exactly the CDC/replication rights it requires, and credentials must be protected.
- Monitor lag and errors. In high-throughput systems, confirm the operator keeps up with the change volume, and capture logs.
- Watch the logs that drive CDC. Keep an eye on WAL (Postgres), transaction log (SQL Server), and oplog retention/size (MongoDB).
- Test with realistic workloads. Validate initial loads, incremental changes, restarts, and failure conditions before going live.
A practical addition from the CDC pipeline itself: filter early. Because the change events flow through normal Cortex operators, a .Filter(...) right after .Stream(...) lets you drop irrelevant operations or tables before they reach expensive downstream work — exactly the "robust filtering" the docs recommend for optimising resource usage.
Where to go next
CDC turns your database's change log into a first-class event stream, and Cortex lets you consume it with the same pipeline you already know — initial snapshot, checkpointing, deduplication, and telemetry included.
- Docs: cortex.buildersoft.io/get-started/home
- GitHub: github.com/buildersoftio/cortex
- Recipes: /cortex/recipes
- Discord: Join the community