4. CDC in action
This article concludes our CDC series by demonstrating all previously discussed concepts in practice.
To recap, we’ll use the following stack:
- microservices written in Kotlin and Spring Boot (though the specific framework isn’t critical)
- Redis Streams
- PostgreSQL with logical replication enabled
- Debezium
Based on our discussion thus far, we have a solid plan for implementing data change propagation between beehive-control
and external consumers:
- PostgreSQL’s Write-Ahead Log (WAL) serves as the CDC source
- Relevant data changes will be written to the outbox table as part of the same transaction that made the original modification
- Each change will include a new aggregate representing the corresponding domain event
- We subscribe to outbox table changes via a dedicated PostgreSQL publication and replication slot. Debezium manages this replication and publishes events to a Redis Stream.
- Consumers of
beehive-control
domain events can simply subscribe to the Redis Stream and decode the payload
Let’s begin!
Beehive Aggregates
At this point, you might be wondering what DDD aggregates are as we haven’t defined them explicitly yet. Let’s address that now.
DDD encourages modeling in terms of business events rather than low-level CRUD operations. Events like HiveNameChanged
or HiveColonySizeUpdated
may not be meaningful to other components, potentially leading to unnecessary coupling. In the context of our beehive management system, key lifecycle events for a hive could be represented by, but are not limited to, the following aggregates:
1. HiveEstablished
Signifies the creation or establishment of a new beehive. The essential information we’d want to capture at this point would likely include a unique identifier for the hive and its initial location.
Example:
1
2
3
4
5
6
7
8
{
"hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"location": {
"latitude": 48.7654,
"longitude": 16.8765
},
"establishmentDateTime": "2025-04-19T11:09:00Z"
}
2. HiveRelocated
Indicates that an existing beehive has been moved from one location to another. We’d need to identify the hive that was moved and its new location.
Example:
1
2
3
4
5
6
7
8
9
10
11
12
{
"hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"newLocation": {
"latitude": 48.8765,
"longitude": 16.9876
},
"previousLocation": {
"latitude": 48.7654,
"longitude": 16.8765
},
"relocationDateTime": "2025-04-19T11:09:00Z"
}
3. HiveDisposed
Signifies the end of a beehive’s lifecycle. We primarily need to identify the hive that was disposed, and optionally, the reason of disposal.
Example:
1
2
3
4
5
{
"hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"disposalReason": "PEST_INFESTATION",
"disposalDateTime": "2025-04-19T11:09:00Z"
}
Hive management deployment
Below is a shortened, redacted version of the Docker Compose file I used extensively while experimenting with CDC:
Click to show the docker-compose.yml
services:
# This is the main microservice which will create new domain events
# by pushing new rows to the outbox table.
control:
container_name: beehive-control
...
depends_on:
postgres:
condition: service_healthy
flyway:
condition: service_completed_successfully
networks:
- beehive-network
...
# Creating a fresh Postgres instance proved to be extremely useful
# for debugging PostgreSQL publication and replication slot behaviour.
postgres:
container_name: beehive-control-db
image: postgis/postgis:17-3.5-alpine
environment:
- POSTGRES_DB=beehive
- POSTGRES_USER=beehive
- POSTGRES_PASSWORD=beehive
ports:
- "54321:5432"
networks:
- beehive-network
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U beehive -d beehive" ]
interval: 5s
timeout: 5s
retries: 5
# This was discussed in detail in earlier articles, and is critical
# for the CDC pipeline to function correctly.
command: >
postgres
-c wal_level=logical
-c max_wal_senders=10
-c max_replication_slots=10
# Once PostgreSQL is up and running, run a set of Flyway
# changesets, including those which creates a PostgreSQL
# publication and a replication slot.
flyway:
container_name: beehive-control-flyway
command: migrate
build:
context: .
dockerfile: flyway.Dockerfile
image: "..."
environment:
- TAG=${TAG}
- FLYWAY_URL=...
...
depends_on:
postgres:
condition: service_healthy
networks:
- beehive-network
# This will be our Debezium sink.
# Kafka setup was intentionally skipped for simplicity.
redis:
container_name: beehive-redis
image: redis:7.2-alpine
ports:
- "6379:6379"
networks:
- beehive-network
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
# This is also somewhat helpful - mounting a volume restores the
# previous state of the Redis streams which capture outbox messages,
# so you get a clean perspective of what the messages looked like before.
volumes:
- redis-data:/data
debezium:
container_name: debezium
image: quay.io/debezium/server:3.0.7.Final
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- beehive-network
# Some properties are quite awkward and clunky to put
# as environment variables, so I decided to use a dedicated
# configuration file which will be discussed later.
#
# Note: you could also use a docker volume here, so
# that debezium does not always start from scratch, but
# from my experience, that always led to an inconsistent
# Debezium state. It's much better to only rely on
# the replication slot state persisted directly in Postgres.
volumes:
- ./etc/debezium.properties:/debezium/config/application.properties
networks:
beehive-network:
driver: bridge
volumes:
redis-data:
The only other necessary comment related to Debezium is how I configured it and why.
Debezium configuration
Debezium is highly configurable and well-documented, so always make sure you consult it thoroughly before deploying Debezium to your environment.
Core Debezium Configuration
In our playground, domain event volume is low, so high-throughput considerations are unnecessary and we’re we’re willing to tolerate some lag in exchange for increased robustness, guaranteed delivery and putting less strain on the database server.
That’s why we’re okay with a 3s poll interval which will check whether there are any new events
in the WAL. The only schema we’re interested in is outbox
, as that’s where everything interesting to
other services happens.
1
2
3
4
5
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.topic.prefix=beehive
debezium.source.poll.interval.ms=3000
debezium.source.heartbeat.interval.ms=30000
debezium.source.schema.include.list=outbox
debezium.source.heartbeat.interval.ms
is a setting relevant for high-throughput databases
with publication configured for low-latency tables. It is a “positive integer value that
specifies the number of milliseconds the connector waits for new change events to appear
before it starts processing a batch of events”.This part of configuration aptly explained
in a Debezium article.
PostgreSQL Connection Settings
Next, we’ll need to set up the publication and replication slot. I’ve written dedicated Flyway changesets which do it for me:
Publication changeset
DO $$
DECLARE
wal_level text;
BEGIN
SELECT current_setting('wal_level') INTO
wal_level;
-- Only create publication if WAL level is set to logical
IF wal_level = 'logical' THEN
-- Create a publication if it doesn't exist
IF NOT EXISTS (
SELECT 1
FROM pg_publication
WHERE pubname = 'dbz_publication'
) THEN
CREATE PUBLICATION dbz_publication
FOR TABLE outbox.outbox
WITH (
publish = 'insert, update, delete, truncate'
);
END IF;
ELSE
RAISE NOTICE
'WAL level is set to %. Skipping publication creation.',
wal_level;
END IF;
END
$$;
Replication slot changeset
DO $$
DECLARE
wal_level text;
BEGIN
SELECT current_setting('wal_level') INTO
wal_level;
-- Only create replication slot if WAL level is set to logical
IF wal_level = 'logical' THEN
-- Create a logical replication slot if it doesn't exist
IF NOT EXISTS (
SELECT 1
FROM pg_replication_slots
WHERE slot_name = 'debezium'
) THEN
PERFORM pg_create_logical_replication_slot(
'debezium',
'pgoutput'
);
END IF;
ELSE
RAISE NOTICE
'WAL level is set to %. Skipping replication slot creation.',
wal_level;
END IF;
END
$$;
Combining this with the docker compose file configuration, we get the next Debezium configuration part:
1
2
3
4
5
6
7
8
9
10
11
12
13
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=beehive
debezium.source.database.password=beehive
debezium.source.database.dbname=beehive
debezium.source.database.server.name=beehive-postgresql
debezium.source.plugin.name=pgoutput
debezium.source.publication.name=dbz_publication
debezium.source.slot.name=debezium
# adjust this to your preference:
# https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-snapshot-mode-options
debezium.source.snapshot.mode=initial
Redis Sink Configuration
With the PostgreSQL connector set up, we now need to send the CDC Debezium messages somewhere to be consumed by subscribers. This component is known in Debezium terminology as a sink. Let’s configure Redis Streams accordingly:
1
2
3
4
5
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.sink.redis.record.key.mapper=io.debezium.server.redis.RedisStandardRecordKey
debezium.sink.redis.stream.name.mapper=topic
debezium.sink.redis.field.name=payload
Message Converters and Transformations
By default, Debezium sends quite extensive messages to configured sinks as it cares about type safety, as described by Jiří Pechanec.
Nevertheless, it might be desirable to send the data payload only, without schema definiton, which is what the unwrap
transformation does:
1
2
3
4
5
6
7
8
debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.drop.tombstones=false
debezium.transforms.unwrap.delete.handling.mode=rewrite
As you might expect, unwrap
is not the only option. See the Debezium SMT documentation if you want to learn more.
We’ll return to the issue of typing later.
Logging Configuration
This section should be self-explanatory. I’ll just add that extensive and thorough Debezium logging has been of immense help to me, so in my opinion, it’s worth checking out.
1
2
3
quarkus.log.level=DEBUG
quarkus.log.console.level=DEBUG
debezium.log.level=TRACE
Producing CDC events
Once the Docker Compose services are deployed, the beehive-control
API becomes accessible.
To showcase the integration on the client side, let’s now consider the establishment of a hive.
Our HTTP endpoint will be nothing out of the ordinary:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@PreAuthorize("hasAuthority('beehive:management:hive')")
@RestController
@RequestMapping("\${api.api-base-url}/hives", produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType.APPLICATION_JSON_VALUE])
class HiveResource(
private val hiveCreateUseCase: HiveCreateUseCase,
...
) {
...
@PostMapping
fun createHive(
@CurrentUser userId: UUID,
@RequestBody creationRequest: HiveCreateRequest,
): HiveCreateResponse = hiveCreateUseCase.create(userId, creationRequest)
...
}
The key part is the implementation of HiveCreateUseCase
, where we emit an event to Spring’s internal event bus:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import cz.beehive.control.apiary.ApiaryService
import cz.beehive.control.apiary.HiveService
import cz.beehive.control.db.hive.tables.pojos.Hive
import cz.beehive.control.event.BeehiveSpringEventBusPublisher
import cz.beehive.control.extras.spring.classSpecificLogger
import cz.beehive.control.geo.GeoJsonPoint
import cz.beehive.control.hive.api.HiveCreateRequest
import cz.beehive.control.hive.api.HiveCreateResponse
import cz.beehive.control.hive.dto.HiveEstablished
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Clock
import java.time.ZonedDateTime
import java.util.UUID
@Service
class HiveCreateUseCase(
private val apiaryService: ApiaryService,
private val hiveService: HiveService,
private val beehiveSpringEventBusPublisher: BeehiveSpringEventBusPublisher,
private val clock: Clock,
) {
@Transactional
fun create(
userId: UUID,
hiveCreateRequest: HiveCreateRequest,
): HiveCreateResponse {
...
val newHive =
hiveService.create(
Hive(
name = hiveCreateRequest.name,
apiaryId = hiveCreateRequest.apiaryId,
location = hiveCreateRequest.location,
),
)
// this `HiveEstablished` event will be picked up by @EventListener
beehiveSpringEventBusPublisher.publish(
HiveEstablished(
hiveId = requireNotNull(newHive.id),
location = newHive.location as GeoJsonPoint,
establishmentDateTime = ZonedDateTime.now(clock),
),
)
return HiveCreateResponse(requireNotNull(newHive.id))
}
companion object {
val logger = classSpecificLogger()
}
}
This event is then processed by @TransactionalEventListener which writes to the outbox
table. This
gives us clear separation of concerns.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import cz.beehive.control.hive.dto.HiveEstablished
import org.springframework.stereotype.Service
import org.springframework.transaction.event.TransactionalEventListener
@Service
class BeehiveEventListener(
private val outboxService: OutboxService,
) {
@TransactionalEventListener(HiveEstablished::class)
fun hiveEstablishmentEvent(event: HiveEstablished) {
outboxService.insert(
aggregateId = event.hiveId,
event = event,
)
}
}
By executing:
1
2
3
4
5
curl \
-v -X POST \
-H "Content-Type: application/json" \
--data "{ \"name\": \"Ivory 50D\", \"apiaryId\": \"73d9bbef-a855-7cbd-c530-880abefcf048\", \"location\": { \"type\": \"Point\", \"coordinates\": [42.1, 12.34] } }" \
localhost:8080/api/hives
we create a new entry in the outbox
table and the rest of the work will be done by PostgreSQL and Debezium.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
beehive-control | o.s.web.servlet.DispatcherServlet : POST "/api/hives", parameters={}
beehive-control | s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to cz.beehive.control.hive.api.HiveResource#createHive(UUID, HiveCreateRequest)
beehive-control | m.m.a.RequestResponseBodyMethodProcessor : Read "application/json;charset=UTF-8" to [HiveCreateRequest(name=Ivory 50D, apiaryId=73d9bbef-a855-7cbd-c530-880abefcf048, location=GeoJsonPoi (truncated)...]
beehive-control | c.b.c.h.u.HiveCreateUseCase$Companion : Creating hive Ivory 50D in apiary 73d9bbef-a855-7cbd-c530-880abefcf048 at GeoJsonPoint(type=Point, coordinates=[42.1, 12.34])
...
debezium | checking for more records...
debezium | polling records...
debezium | no records available or batch size not reached yet, sleeping a bit...
debezium | refreshing DB schema for table 'outbox.outbox'
debezium | Mapping table 'outbox.outbox' to schemas under 'beehive.outbox.outbox'
debezium | field 'id' (STRING) from column id uuid(2147483647, 0) NOT NULL DEFAULT VALUE gen_random_uuid()
debezium | field 'id' (STRING) from column id uuid(2147483647, 0) NOT NULL DEFAULT VALUE gen_random_uuid()
debezium | field 'aggregate_id' (STRING) from column aggregate_id text(2147483647, 0) NOT NULL
debezium | field 'type' (STRING) from column type text(2147483647, 0) NOT NULL
debezium | field 'type_version' (STRING) from column type_version text(2147483647, 0) NOT NULL
debezium | field 'type_group' (STRING) from column type_group text(2147483647, 0) NOT NULL
debezium | field 'payload' (STRING) from column payload jsonb(2147483647, 0) NOT NULL
debezium | field 'inserted' (INT64) from column inserted timestamp(29, 6) NOT NULL DEFAULT VALUE now()
debezium | Mapped primary key for table 'outbox.outbox' to schema: {"name" : "beehive.outbox.outbox.Key", "type" : "STRUCT", "optional" : "false", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "default" : "00000000-0000-0000-0000-000000000000", "version" : "1"}}]}
debezium | Mapped columns for table 'outbox.outbox' to schema: {"name" : "beehive.outbox.outbox.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "default" : "00000000-0000-0000-0000-000000000000", "version" : "1"}}, {"name" : "aggregate_id", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type_version", "index" : "3", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type_group", "index" : "4", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "payload", "index" : "5", "schema" : {"name" : "io.debezium.data.Json", "type" : "STRING", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "inserted", "index" : "6", "schema" : {"name" : "io.debezium.time.MicroTimestamp", "type" : "INT64", "optional" : "false", "default" : "0", "version" : "1"}}]}
debezium | Relation '19859' resolved to table 'outbox.outbox'
(Note I’ve truncated the timestamp and all other irrelevant attributes of all logs so that it’s a bit more compact.)
And, indeed, we can now see new messages to be consumed in the Redis stream beehive.outbox.outbox
:
XINFO STREAM beehive.outbox.outbox;
{
"id": "febdad72-762d-46bc-9347-e42c92e685ef",
"aggregate_id": "14e30f4d-b2e2-4a9f-a0ba-d0a3d0024476",
"type": "HiveEstablished",
"type_version": "1.1.0",
"type_group": "cz.beehive.control",
"payload": "{"hiveId": "14e30f4d-b2e2-4a9f-a0ba-d0a3d0024476", "location": {"type": "Point", "coordinates": [42.1, 12.34]}, "establishmentDateTime": "2025-05-01T17:31:34.764418754Z"}",
"inserted": 1746114062001,
"__deleted": "false"
}
(Again, this is a prettified and redacted output of the command.)
The only issue is that the consumer has no idea what the incoming type should look like as there is no schema attached to it. We can see the schema being correctly detected in debezium logs but since we configured Debezium to strip it off, the consumer has no way of inferring it reliably itself.
Restoring Debezium type safety by using schema registry
Generally speaking, the Redis Stream consumer side is trivial enough that we won’t discuss it here. The only issue we have to deal with is schema inference.
1. Send schema with every payload
Indeed, this might seem like the most straighforward solution. However, it has two major downsides:
- By design, you will have a dynamic API. This prevents clients from generating type-safe schema beforehand. This might not be a downside, though, when the schema is bound to frequent change.
- The schema will most likely constitute the largest part of any message in the stream, which might be a consideration when it comes to infrastructure cost, if your throughput is expected to be high.
2. Send a reference type and use Schema Registry
We won’t discuss it in great detail here. The main idea is to have a schema registry of types from which the client can generate types in advance.
You could use Apicurio for that as it integrates nicely with Debezium but any schema registry will do (even static git repository might be more than enough).
In Apicurio, our HiveEstablished
type might be available on this resource:
1
GET /v3/groups/cz.beehive.control/artifacts/HiveEstablished/versions/1.1.0
which goes nicely with the payload we designed for Redis Streams.
As of May 2025, the downside of this approach is that async API generators are sparse and not as well maintained as, say, OpenAPI generators. There is indeed an AsyncAPI initiative, so you might want to also check it out. The Apache AVRO format is also a suitable option if you want to only define types.
3. Hybrid between 1. and 2.
As a part of a Redis Stream message, you could also send a type reference, alonside with its version, and dynamically fetch the schema in runtime and deserialize it this way.
This concludes our series. Happy software engineering!
Summary
- Include the full schema with each message (simple but verbose);
- Use a schema registry to store and reference schemas (efficient but requires infrastructure);
- Use a hybrid approach sending type references with messages that consumers can use to fetch schemas dynamically.