debezium: CDC 从postgresql到rabbitmq

2024-11-28

配置 postgresql

数据库配置文件添加:

wal_level=logical

配置rabbitmq

配置Topic交换机 demo.public.document 并通过 document_key监听 document_queue 队列

配置debezium

debezium.sink.type=rabbitmq

debezium.sink.rabbitmq.connection.host=your.host.name
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.routingKey=document_key

debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0

debezium.source.database.hostname=your.host.name
debezium.source.database.port=5432
debezium.source.database.user=username
debezium.source.database.password=password
debezium.source.database.dbname=document

debezium.source.plugin.name=pgoutput
debezium.source.tombstones.on.delete=false

debezium.source.topic.prefix=demo
debezium.source.schema.include.list=public
debezium.source.table.include.list=public.document

启动debezium镜像

docker run -it --name debezium -p 8081:8081 -v /opt/apps/debezium/config:/debezium/conf -v /opt/apps/debezium/data/data:/debezium/data quay.io/debezium/server:3.0.0.Final     

操作数据库更新,队列中会出现如下类似结构的数据:

{
	"schema": {
		"type": "struct",
		"fields": [
			...
		],
		"optional": false,
		"name": "demo.public.document.Envelope",
		"version": 2
	},
	"payload": {
		"before": {
			...
		},
		"after": {
			...
		},
		"source": {
			...
		},
		"transaction": null,
		"op": "u"
	}
}

注意事项

1. /debezium/data 目录必须可读写
2. 如果想接收CDC变更表的before数据(即 payload.before),需要对目标表进行如下操作

ALTER TABLE document REPLICA IDENTITY FULL