Skip to main content
☘️ Septvean's Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Debezium sync PostgreSQL to ElasticSearch

使用 Debezium 将 PostgreSQL 中的数据同步到 ElasticSearch

内容:架构 -> 原理 -> 实操 -> 关键配置 -> 生产注意点

重点:PostgreSQL(logical replication)

一、整体架构(标准做法)

PostgreSQL
  ↓  (WAL / logical decoding)
Debezium PostgreSQL Connector
Kafka Topic
Kafka Connect Elasticsearch Sink
Elasticsearch Index

Debezium 负责“抓变更”,ES Sink 负责“写索引”

二、核心原理(PG 专属)

  • PostgreSQL 写数据 -> WAL
  • Debezium 通过 logical replication slot 读取 WAL
  • 将变更转成 CDC 事件写入 Kafka
  • ES Sink 从 Kafka 消费并 upsert / delete 到 ES

📌 PG 的 CDC 强依赖 replication slot

三、前置条件(必须)

1️⃣ PostgreSQL 参数配置

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

重启 PostgreSQL 生效。

2️⃣ 数据表要求(非常重要)

  • ✅ 必须有 Primary Key
  • ❌ 没主键 -> update / delete 无法正确同步

3️⃣ 数据库用户权限

CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

四、Docker Compose(推荐用于验证 / PoC)

这是 最小可用 版本,生产请拆分部署。

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: postgres
    ports:
      - "5432:5432"

  connect:
    image: debezium/connect:2.5
    ports:
      - "8083:8083"
    depends_on: [kafka, postgres]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status

五、配置 Debezium PostgreSQL Connector

POST http://localhost:8083/connectors
{
  "name": "pg-debezium",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "postgres",

    "slot.name": "debezium_slot",
    "plugin.name": "pgoutput",

    "schema.include.list": "public",
    "table.include.list": "public.users",

    "snapshot.mode": "initial",

    "include.schema.changes": "false",

    "topic.prefix": "pg01"
  }
}

Kafka Topic 命名

pg01.public.users

六、配置 Elasticsearch Sink Connector

POST http://localhost:8083/connectors
{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

    "connection.url": "http://elasticsearch:9200",

    "topics": "pg01.public.users",

    "key.ignore": "false",
    "schema.ignore": "true",

    "write.method": "upsert",
    "behavior.on.null.values": "delete",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}

七、PostgreSQL -> ES 行为映射

PG 操作 Kafka 事件 ES 行为
INSERT op=c index
UPDATE op=u upsert
DELETE op=d + tombstone delete

关键配置:

write.method=upsert
behavior.on.null.values=delete

八、事件瘦身(强烈推荐)

Debezium 原始事件

{
  "before": {...},
  "after": {...},
  "source": {...},
  "op": "u"
}

unwrap 后(ES 友好)

{
  "id": 1,
  "name": "Alice",
  "updated_at": "2025-01-01T10:00:00"
}

👉 ES 只需要 after

九、ES Index 设计(生产重点)

1️⃣ 主键映射

  • PG 主键 -> Kafka key -> ES _id

2️⃣ 提前建 Mapping(避免翻车)

PUT users
{
  "mappings": {
    "properties": {
      "id": { "type": "long" },
      "name": { "type": "keyword" },
      "created_at": { "type": "date" }
    }
  }
}

十、PostgreSQL 专属坑(必须看)

1️⃣ Replication Slot 堆积(高危)

  • Debezium 停了
  • WAL 不清理
  • 磁盘爆满

排查:

SELECT slot_name, active FROM pg_replication_slots;

2️⃣ 大事务

  • 一次 UPDATE 百万行
  • WAL 巨大
  • ES 写入被打爆

👉 拆事务、限流

3️⃣ 没主键

  • update/delete 不稳定 👉 CDC 设计失败

十一、生产优化建议

  • 只同步必要表 / 字段
  • 大表先 离线导入 ES
  • CDC 只跑增量
  • ES Sink 批量写
  • 监控 slot / WAL / Kafka lag

十二、总结

PostgreSQL -> Debezium -> Kafka -> ES

是当前最稳定、最可控、最可扩展的实时同步方案