Guides
Debezium Topic Reroute SMT
In this tutorial, we will go over how to set up Debezium Topic Routing SMTs and how this works with Transfer.
Available
After artielabs/transfer:2.0.3
+
Why is this useful?
By default, Debezium will map 1 topic to 1 table. There are 2 common scenarios where you’d want this to be different.
- A partitioned database. You might want all the messages to show up on the same topic as opposed to one topic per partition.
- You don’t want to manually create topics. If you are using the pub/sub setting, Debezium does not automatically create topics. This means you will need to manually do so. This can be onerous as you will need to manually create a new topic per table.
Debezium application.properties
# Offset storage
debezium.source.offset.storage.file.filename=/tmp/foo
debezium.source.offset.flush.interval.ms=0
# Pubsub setup: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_google_cloud_pubsub
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=artie-labs
debezium.sink.pubsub.ordering.enabled=true
# Postgres
debezium.source.snapshot.mode=initial
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=dbserver1
# Syncing customers, orders and products table.
debezium.source.table.include.list=inventory.customers,inventory.orders,inventory.products,inventory.users
debezium.source.plugin.name=pgoutput
# Re-route them all to the same topic.
debezium.transforms=Reroute
debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
# Matches all the tables under the inventory schema
debezium.transforms.Reroute.topic.regex=(.*).inventory(.*)
# Becomes, topicPrefix.schema.all_tables => dbserver1.inventory.all_tables
debezium.transforms.Reroute.topic.replacement=$1.inventory.all_tables
Transfer config.yaml
outputSource: bigquery
queue: pubsub
flushIntervalSeconds: 30
bufferRows: 99999
pubsub:
projectID: artie-labs
pathToCredentials: /tmp/bq.json
topicConfigs:
- db: customers
schema: public
topic: dbserver1.inventory.all_tables
cdcFormat: debezium.postgres.wal2json
cdcKeyFormat: org.apache.kafka.connect.json.JsonConverter
bigquery:
pathToCredentials: /tmp/bq.json
projectID: artie-labs
defaultDataset: fake
Was this page helpful?