Debezium sync PostgreSQL to ElasticSearch
使用 Debezium 将 PostgreSQL 中的数据同步到 ElasticSearch
内容:架构 -> 原理 -> 实操 -> 关键配置 -> 生产注意点
重点:PostgreSQL(logical replication)
PostgreSQL
↓ (WAL / logical decoding)
Debezium PostgreSQL Connector
↓
Kafka Topic
↓
Kafka Connect Elasticsearch Sink
↓
Elasticsearch Index
Debezium 负责“抓变更”,ES Sink 负责“写索引”
- PostgreSQL 写数据 -> WAL
- Debezium 通过 logical replication slot 读取 WAL
- 将变更转成 CDC 事件写入 Kafka
- ES Sink 从 Kafka 消费并 upsert / delete 到 ES
📌 PG 的 CDC 强依赖 replication slot
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
重启 PostgreSQL 生效。
- ✅ 必须有 Primary Key
- ❌ 没主键 -> update / delete 无法正确同步
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
这是 最小可用 版本,生产请拆分部署。
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
postgres:
image: postgres:15
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
connect:
image: debezium/connect:2.5
ports:
- "8083:8083"
depends_on: [kafka, postgres]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
POST http://localhost:8083/connectors
{
"name": "pg-debezium",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "postgres",
"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"schema.include.list": "public",
"table.include.list": "public.users",
"snapshot.mode": "initial",
"include.schema.changes": "false",
"topic.prefix": "pg01"
}
}
Kafka Topic 命名
pg01.public.users
POST http://localhost:8083/connectors
{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "pg01.public.users",
"key.ignore": "false",
"schema.ignore": "true",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
| PG 操作 | Kafka 事件 | ES 行为 |
|---|---|---|
| INSERT | op=c | index |
| UPDATE | op=u | upsert |
| DELETE | op=d + tombstone | delete |
关键配置:
write.method=upsert
behavior.on.null.values=delete
Debezium 原始事件
{
"before": {...},
"after": {...},
"source": {...},
"op": "u"
}
unwrap 后(ES 友好)
{
"id": 1,
"name": "Alice",
"updated_at": "2025-01-01T10:00:00"
}
👉 ES 只需要 after
- PG 主键 -> Kafka key -> ES _id
PUT users
{
"mappings": {
"properties": {
"id": { "type": "long" },
"name": { "type": "keyword" },
"created_at": { "type": "date" }
}
}
}
- Debezium 停了
- WAL 不清理
- 磁盘爆满
排查:
SELECT slot_name, active FROM pg_replication_slots;
- 一次 UPDATE 百万行
- WAL 巨大
- ES 写入被打爆
👉 拆事务、限流
- update/delete 不稳定 👉 CDC 设计失败
- 只同步必要表 / 字段
- 大表先 离线导入 ES
- CDC 只跑增量
- ES Sink 批量写
- 监控 slot / WAL / Kafka lag
PostgreSQL -> Debezium -> Kafka -> ES
是当前最稳定、最可控、最可扩展的实时同步方案