Příspěvek

4. CDC in action

4. CDC in action

Back to the series parent

This article concludes our CDC series by demonstrating all previously discussed concepts in practice.

To recap, we’ll use the following stack:

  • microservices written in Kotlin and Spring Boot (though the specific framework isn’t critical)
  • Redis Streams
  • PostgreSQL with logical replication enabled
  • Debezium

Based on our discussion thus far, we have a solid plan for implementing data change propagation between beehive-control and external consumers:

  • PostgreSQL’s Write-Ahead Log (WAL) serves as the CDC source
  • Relevant data changes will be written to the outbox table as part of the same transaction that made the original modification
  • Each change will include a new aggregate representing the corresponding domain event
  • We subscribe to outbox table changes via a dedicated PostgreSQL publication and replication slot. Debezium manages this replication and publishes events to a Redis Stream.
  • Consumers of beehive-control domain events can simply subscribe to the Redis Stream and decode the payload

Let’s begin!

Beehive Aggregates

At this point, you might be wondering what DDD aggregates are as we haven’t defined them explicitly yet. Let’s address that now.

DDD encourages modeling in terms of business events rather than low-level CRUD operations. Events like HiveNameChanged or HiveColonySizeUpdated may not be meaningful to other components, potentially leading to unnecessary coupling. In the context of our beehive management system, key lifecycle events for a hive could be represented by, but are not limited to, the following aggregates:

1. HiveEstablished

Signifies the creation or establishment of a new beehive. The essential information we’d want to capture at this point would likely include a unique identifier for the hive and its initial location.

Example:

1
2
3
4
5
6
7
8
{
  "hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
  "location": {
    "latitude": 48.7654,
    "longitude": 16.8765
  },
  "establishmentDateTime": "2025-04-19T11:09:00Z"
}

2. HiveRelocated

Indicates that an existing beehive has been moved from one location to another. We’d need to identify the hive that was moved and its new location.

Example:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
  "newLocation": {
    "latitude": 48.8765,
    "longitude": 16.9876
  },
  "previousLocation": {
    "latitude": 48.7654,
    "longitude": 16.8765
  },
  "relocationDateTime": "2025-04-19T11:09:00Z"
}

3. HiveDisposed

Signifies the end of a beehive’s lifecycle. We primarily need to identify the hive that was disposed, and optionally, the reason of disposal.

Example:

1
2
3
4
5
{
  "hiveId": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
  "disposalReason": "PEST_INFESTATION",
  "disposalDateTime": "2025-04-19T11:09:00Z"
}

Hive management deployment

Below is a shortened, redacted version of the Docker Compose file I used extensively while experimenting with CDC:

Click to show the docker-compose.yml
services:
  # This is the main microservice which will create new domain events
  # by pushing new rows to the outbox table.
  control:
    container_name: beehive-control
    ...
    depends_on:
      postgres:
        condition: service_healthy
      flyway:
        condition: service_completed_successfully
    networks:
      - beehive-network
    ...
  
  # Creating a fresh Postgres instance proved to be extremely useful
  # for debugging PostgreSQL publication and replication slot behaviour.
  postgres:
    container_name: beehive-control-db
    image: postgis/postgis:17-3.5-alpine
    environment:
      - POSTGRES_DB=beehive
      - POSTGRES_USER=beehive
      - POSTGRES_PASSWORD=beehive
    ports:
      - "54321:5432"
    networks:
      - beehive-network
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready -U beehive -d beehive" ]
      interval: 5s
      timeout: 5s
      retries: 5
    # This was discussed in detail in earlier articles, and is critical
    # for the CDC pipeline to function correctly.
    command: >
      postgres
        -c wal_level=logical
        -c max_wal_senders=10
        -c max_replication_slots=10
  
  # Once PostgreSQL is up and running, run a set of Flyway
  # changesets, including those which creates a PostgreSQL
  # publication and a replication slot.
  flyway:
    container_name: beehive-control-flyway
    command: migrate
    build:
      context: .
      dockerfile: flyway.Dockerfile
    image: "..."
    environment:
      - TAG=${TAG}
      - FLYWAY_URL=...
      ...
    depends_on:
      postgres:
        condition: service_healthy
    networks:
      - beehive-network

  # This will be our Debezium sink.
  # Kafka setup was intentionally skipped for simplicity.
  redis:
    container_name: beehive-redis
    image: redis:7.2-alpine
    ports:
      - "6379:6379"
    networks:
      - beehive-network
    command: redis-server --appendonly yes
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 5
    # This is also somewhat helpful - mounting a volume restores the
    # previous state of the Redis streams which capture outbox messages,
    # so you get a clean perspective of what the messages looked like before.
    volumes:
      - redis-data:/data

  debezium:
    container_name: debezium
    image: quay.io/debezium/server:3.0.7.Final
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    networks:
      - beehive-network
    # Some properties are quite awkward and clunky to put
    # as environment variables, so I decided to use a dedicated
    # configuration file which will be discussed later.
    #
    # Note: you could also use a docker volume here, so
    # that debezium does not always start from scratch, but
    # from my experience, that always led to an inconsistent
    # Debezium state. It's much better to only rely on 
    # the replication slot state persisted directly in Postgres.
    volumes:
      - ./etc/debezium.properties:/debezium/config/application.properties

networks:
  beehive-network:
    driver: bridge

volumes:
  redis-data:


The only other necessary comment related to Debezium is how I configured it and why.

Debezium configuration

Debezium is highly configurable and well-documented, so always make sure you consult it thoroughly before deploying Debezium to your environment.

Core Debezium Configuration

In our playground, domain event volume is low, so high-throughput considerations are unnecessary and we’re we’re willing to tolerate some lag in exchange for increased robustness, guaranteed delivery and putting less strain on the database server.

That’s why we’re okay with a 3s poll interval which will check whether there are any new events in the WAL. The only schema we’re interested in is outbox, as that’s where everything interesting to other services happens.

1
2
3
4
5
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.topic.prefix=beehive
debezium.source.poll.interval.ms=3000
debezium.source.heartbeat.interval.ms=30000
debezium.source.schema.include.list=outbox

debezium.source.heartbeat.interval.ms is a setting relevant for high-throughput databases with publication configured for low-latency tables. It is a “positive integer value that specifies the number of milliseconds the connector waits for new change events to appear before it starts processing a batch of events”.This part of configuration aptly explained in a Debezium article.

PostgreSQL Connection Settings

Next, we’ll need to set up the publication and replication slot. I’ve written dedicated Flyway changesets which do it for me:

Publication changeset
DO $$
  DECLARE
    wal_level text;
  BEGIN
    SELECT current_setting('wal_level') INTO 
      wal_level;
    -- Only create publication if WAL level is set to logical
    IF wal_level = 'logical' THEN
      -- Create a publication if it doesn't exist
      IF NOT EXISTS (
        SELECT 1 
        FROM pg_publication 
        WHERE pubname = 'dbz_publication'
      ) THEN
        CREATE PUBLICATION dbz_publication
          FOR TABLE outbox.outbox
          WITH (
            publish = 'insert, update, delete, truncate'
          );
      END IF;
    ELSE
      RAISE NOTICE 
        'WAL level is set to %. Skipping publication creation.', 
        wal_level;
    END IF;
  END
$$;
Replication slot changeset
DO $$
  DECLARE
    wal_level text;
  BEGIN
    SELECT current_setting('wal_level') INTO 
      wal_level;
    -- Only create replication slot if WAL level is set to logical
    IF wal_level = 'logical' THEN
      -- Create a logical replication slot if it doesn't exist
      IF NOT EXISTS (
        SELECT 1 
        FROM pg_replication_slots 
        WHERE slot_name = 'debezium'
      ) THEN
        PERFORM pg_create_logical_replication_slot(
          'debezium', 
          'pgoutput'
        );
      END IF;
    ELSE
      RAISE NOTICE 
        'WAL level is set to %. Skipping replication slot creation.', 
        wal_level;
    END IF;
  END
$$;


Combining this with the docker compose file configuration, we get the next Debezium configuration part:

1
2
3
4
5
6
7
8
9
10
11
12
13
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=beehive
debezium.source.database.password=beehive
debezium.source.database.dbname=beehive
debezium.source.database.server.name=beehive-postgresql

debezium.source.plugin.name=pgoutput
debezium.source.publication.name=dbz_publication
debezium.source.slot.name=debezium
# adjust this to your preference:
# https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-snapshot-mode-options
debezium.source.snapshot.mode=initial

Redis Sink Configuration

With the PostgreSQL connector set up, we now need to send the CDC Debezium messages somewhere to be consumed by subscribers. This component is known in Debezium terminology as a sink. Let’s configure Redis Streams accordingly:

1
2
3
4
5
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.sink.redis.record.key.mapper=io.debezium.server.redis.RedisStandardRecordKey
debezium.sink.redis.stream.name.mapper=topic
debezium.sink.redis.field.name=payload

Message Converters and Transformations

By default, Debezium sends quite extensive messages to configured sinks as it cares about type safety, as described by Jiří Pechanec. Nevertheless, it might be desirable to send the data payload only, without schema definiton, which is what the unwrap transformation does:

1
2
3
4
5
6
7
8
debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.source.value.converter.schemas.enable=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.drop.tombstones=false
debezium.transforms.unwrap.delete.handling.mode=rewrite

As you might expect, unwrap is not the only option. See the Debezium SMT documentation if you want to learn more. We’ll return to the issue of typing later.

Logging Configuration

This section should be self-explanatory. I’ll just add that extensive and thorough Debezium logging has been of immense help to me, so in my opinion, it’s worth checking out.

1
2
3
quarkus.log.level=DEBUG
quarkus.log.console.level=DEBUG
debezium.log.level=TRACE

Producing CDC events

Once the Docker Compose services are deployed, the beehive-control API becomes accessible.

To showcase the integration on the client side, let’s now consider the establishment of a hive.

Our HTTP endpoint will be nothing out of the ordinary:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@PreAuthorize("hasAuthority('beehive:management:hive')")
@RestController
@RequestMapping("\${api.api-base-url}/hives", produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType.APPLICATION_JSON_VALUE])
class HiveResource(
    private val hiveCreateUseCase: HiveCreateUseCase,
    ...
) {
    ...

    @PostMapping
    fun createHive(
        @CurrentUser userId: UUID,
        @RequestBody creationRequest: HiveCreateRequest,
    ): HiveCreateResponse = hiveCreateUseCase.create(userId, creationRequest)

    ...
}

The key part is the implementation of HiveCreateUseCase, where we emit an event to Spring’s internal event bus:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import cz.beehive.control.apiary.ApiaryService
import cz.beehive.control.apiary.HiveService
import cz.beehive.control.db.hive.tables.pojos.Hive
import cz.beehive.control.event.BeehiveSpringEventBusPublisher
import cz.beehive.control.extras.spring.classSpecificLogger
import cz.beehive.control.geo.GeoJsonPoint
import cz.beehive.control.hive.api.HiveCreateRequest
import cz.beehive.control.hive.api.HiveCreateResponse
import cz.beehive.control.hive.dto.HiveEstablished
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Clock
import java.time.ZonedDateTime
import java.util.UUID

@Service
class HiveCreateUseCase(
    private val apiaryService: ApiaryService,
    private val hiveService: HiveService,
    private val beehiveSpringEventBusPublisher: BeehiveSpringEventBusPublisher,
    private val clock: Clock,
) {
    @Transactional
    fun create(
        userId: UUID,
        hiveCreateRequest: HiveCreateRequest,
    ): HiveCreateResponse {
        ...
        val newHive =
            hiveService.create(
                Hive(
                    name = hiveCreateRequest.name,
                    apiaryId = hiveCreateRequest.apiaryId,
                    location = hiveCreateRequest.location,
                ),
            )
      
       
        // this `HiveEstablished` event will be picked up by @EventListener
        beehiveSpringEventBusPublisher.publish(
            HiveEstablished(
                hiveId = requireNotNull(newHive.id),
                location = newHive.location as GeoJsonPoint,
                establishmentDateTime = ZonedDateTime.now(clock),
            ),
        )

        return HiveCreateResponse(requireNotNull(newHive.id))
    }

    companion object {
        val logger = classSpecificLogger()
    }
}

This event is then processed by @TransactionalEventListener which writes to the outbox table. This gives us clear separation of concerns.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import cz.beehive.control.hive.dto.HiveEstablished
import org.springframework.stereotype.Service
import org.springframework.transaction.event.TransactionalEventListener

@Service
class BeehiveEventListener(
    private val outboxService: OutboxService,
) {

    @TransactionalEventListener(HiveEstablished::class)
    fun hiveEstablishmentEvent(event: HiveEstablished) {
        outboxService.insert(
            aggregateId = event.hiveId,
            event = event,
        )
    }
}

By executing:

1
2
3
4
5
curl \
    -v -X POST \
    -H "Content-Type: application/json" \
    --data "{ \"name\": \"Ivory 50D\", \"apiaryId\": \"73d9bbef-a855-7cbd-c530-880abefcf048\", \"location\": { \"type\": \"Point\", \"coordinates\": [42.1, 12.34]  }  }" \
    localhost:8080/api/hives

we create a new entry in the outbox table and the rest of the work will be done by PostgreSQL and Debezium.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
beehive-control          | o.s.web.servlet.DispatcherServlet        : POST "/api/hives", parameters={}
beehive-control          | s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to cz.beehive.control.hive.api.HiveResource#createHive(UUID, HiveCreateRequest)
beehive-control          | m.m.a.RequestResponseBodyMethodProcessor : Read "application/json;charset=UTF-8" to [HiveCreateRequest(name=Ivory 50D, apiaryId=73d9bbef-a855-7cbd-c530-880abefcf048, location=GeoJsonPoi (truncated)...]
beehive-control          | c.b.c.h.u.HiveCreateUseCase$Companion    : Creating hive Ivory 50D in apiary 73d9bbef-a855-7cbd-c530-880abefcf048 at GeoJsonPoint(type=Point, coordinates=[42.1, 12.34])
...
debezium  | checking for more records...
debezium  | polling records...
debezium  | no records available or batch size not reached yet, sleeping a bit...
debezium  | refreshing DB schema for table 'outbox.outbox'
debezium  | Mapping table 'outbox.outbox' to schemas under 'beehive.outbox.outbox'
debezium  | field 'id' (STRING) from column id uuid(2147483647, 0) NOT NULL DEFAULT VALUE gen_random_uuid()
debezium  | field 'id' (STRING) from column id uuid(2147483647, 0) NOT NULL DEFAULT VALUE gen_random_uuid()
debezium  | field 'aggregate_id' (STRING) from column aggregate_id text(2147483647, 0) NOT NULL
debezium  | field 'type' (STRING) from column type text(2147483647, 0) NOT NULL
debezium  | field 'type_version' (STRING) from column type_version text(2147483647, 0) NOT NULL
debezium  | field 'type_group' (STRING) from column type_group text(2147483647, 0) NOT NULL
debezium  | field 'payload' (STRING) from column payload jsonb(2147483647, 0) NOT NULL
debezium  | field 'inserted' (INT64) from column inserted timestamp(29, 6) NOT NULL DEFAULT VALUE now()
debezium  | Mapped primary key for table 'outbox.outbox' to schema: {"name" : "beehive.outbox.outbox.Key", "type" : "STRUCT", "optional" : "false", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "default" : "00000000-0000-0000-0000-000000000000", "version" : "1"}}]}
debezium  | Mapped columns for table 'outbox.outbox' to schema: {"name" : "beehive.outbox.outbox.Value", "type" : "STRUCT", "optional" : "true", "default" : null, "fields" : [{"name" : "id", "index" : "0", "schema" : {"name" : "io.debezium.data.Uuid", "type" : "STRING", "optional" : "false", "default" : "00000000-0000-0000-0000-000000000000", "version" : "1"}}, {"name" : "aggregate_id", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type_version", "index" : "3", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "type_group", "index" : "4", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}, {"name" : "payload", "index" : "5", "schema" : {"name" : "io.debezium.data.Json", "type" : "STRING", "optional" : "false", "default" : null, "version" : "1"}}, {"name" : "inserted", "index" : "6", "schema" : {"name" : "io.debezium.time.MicroTimestamp", "type" : "INT64", "optional" : "false", "default" : "0", "version" : "1"}}]}
debezium  | Relation '19859' resolved to table 'outbox.outbox'

(Note I’ve truncated the timestamp and all other irrelevant attributes of all logs so that it’s a bit more compact.)

And, indeed, we can now see new messages to be consumed in the Redis stream beehive.outbox.outbox:

XINFO STREAM beehive.outbox.outbox;
{
  "id": "febdad72-762d-46bc-9347-e42c92e685ef",
  "aggregate_id": "14e30f4d-b2e2-4a9f-a0ba-d0a3d0024476",
  "type": "HiveEstablished",
  "type_version": "1.1.0",
  "type_group": "cz.beehive.control",
  "payload": "{"hiveId": "14e30f4d-b2e2-4a9f-a0ba-d0a3d0024476", "location": {"type": "Point", "coordinates": [42.1, 12.34]}, "establishmentDateTime": "2025-05-01T17:31:34.764418754Z"}",
  "inserted": 1746114062001,
  "__deleted": "false"
}

(Again, this is a prettified and redacted output of the command.)

The only issue is that the consumer has no idea what the incoming type should look like as there is no schema attached to it. We can see the schema being correctly detected in debezium logs but since we configured Debezium to strip it off, the consumer has no way of inferring it reliably itself.

Restoring Debezium type safety by using schema registry

Generally speaking, the Redis Stream consumer side is trivial enough that we won’t discuss it here. The only issue we have to deal with is schema inference.

1. Send schema with every payload

Indeed, this might seem like the most straighforward solution. However, it has two major downsides:

  • By design, you will have a dynamic API. This prevents clients from generating type-safe schema beforehand. This might not be a downside, though, when the schema is bound to frequent change.
  • The schema will most likely constitute the largest part of any message in the stream, which might be a consideration when it comes to infrastructure cost, if your throughput is expected to be high.

2. Send a reference type and use Schema Registry

We won’t discuss it in great detail here. The main idea is to have a schema registry of types from which the client can generate types in advance. You could use Apicurio for that as it integrates nicely with Debezium but any schema registry will do (even static git repository might be more than enough). In Apicurio, our HiveEstablished type might be available on this resource:

1
GET /v3/groups/cz.beehive.control/artifacts/HiveEstablished/versions/1.1.0

which goes nicely with the payload we designed for Redis Streams.

As of May 2025, the downside of this approach is that async API generators are sparse and not as well maintained as, say, OpenAPI generators. There is indeed an AsyncAPI initiative, so you might want to also check it out. The Apache AVRO format is also a suitable option if you want to only define types.

3. Hybrid between 1. and 2.

As a part of a Redis Stream message, you could also send a type reference, alonside with its version, and dynamically fetch the schema in runtime and deserialize it this way.

This concludes our series. Happy software engineering!


Summary

1. How do you implement the Outbox pattern with Spring Boot?
Use Spring's @TransactionalEventListener to capture domain events and write them to an outbox table within the same transaction as your data changes. This maintains atomicity while providing clear separation of concerns.
2. What are DDD aggregates in the context of CDC?
DDD aggregates represent meaningful business events rather than low-level CRUD operations. For example, instead of "HiveNameChanged," use events like "HiveEstablished" or "HiveRelocated" to model domain events that communicate essential business changes to other services.
3. How do you handle schema evolution in CDC event streams?
There are at least 3 options:
  • Include the full schema with each message (simple but verbose);
  • Use a schema registry to store and reference schemas (efficient but requires infrastructure);
  • Use a hybrid approach sending type references with messages that consumers can use to fetch schemas dynamically.
This post is licensed under CC BY 4.0 by the author.

Music Fun Fact

Loading a fun music fact...