Skip to main content
☘️ Septvean's Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Debezium sync MySQL to ElasticSearch

使用 Debezium 将 MySQL 中的数据同步到 ElasticSearch

内容:架构 -> 原理 -> 实操 -> 关键配置 -> 常见坑

一、整体架构(先有全局感)

MySQL
  ↓  (binlog, ROW)
Debezium MySQL Connector
Kafka Topic
Kafka Connect Elasticsearch Sink
Elasticsearch Index
  • 👉 Debezium 只负责 CDC
  • 👉 真正写 ES 的是 Elasticsearch Sink Connector

二、为什么不用“Debezium 直接写 ES”

  • Debezium 是 Source Connector
  • ES 是 Sink
  • 职责分离:
  • Debezium:捕获变更
  • ES Sink:索引 / upsert / delete

👉 这是 官方推荐架构

三、前置条件

1️⃣ MySQL 配置(必须)

[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

重启 MySQL 后确认:

SHOW VARIABLES LIKE 'binlog_format';

2️⃣ MySQL 账号权限

CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

3️⃣ 组件版本建议

组件 建议
MySQL 8.0.x
Kafka ≥ 3.x
Debezium ≥ 2.x
Elasticsearch 7.x / 8.x

四、Docker Compose 一键环境(推荐)

用于 学习 / PoC / 测试

docker-compose.yml(简化版)

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: root
    ports:
      - "3306:3306"

  connect:
    image: debezium/connect:2.5
    ports:
      - "8083:8083"
    depends_on: [kafka, mysql]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status

五、配置 Debezium MySQL Connector(CDC)

POST http://localhost:8083/connectors
{
  "name": "mysql-debezium",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "mysql01",
    "database.include.list": "testdb",
    "table.include.list": "testdb.users",

    "snapshot.mode": "initial",

    "include.schema.changes": "false",

    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.mysql"
  }
}

生成的 Kafka Topic

mysql01.testdb.users

六、配置 Elasticsearch Sink Connector

POST http://localhost:8083/connectors
{
  "name": "es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

    "connection.url": "http://elasticsearch:9200",

    "topics": "mysql01.testdb.users",

    "key.ignore": "false",
    "schema.ignore": "true",

    "write.method": "upsert",

    "behavior.on.null.values": "delete",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

    "transforms.unwrap.drop.tombstones": "false"
  }
}

七、MySQL -> ES 数据映射关系

MySQL 操作 ES 行为
insert index
update update / upsert
delete delete

关键配置:

write.method=upsert
behavior.on.null.values=delete

八、事件“瘦身”(非常重要)

Debezium 原始事件(很大)

{
  "before": {...},
  "after": {...},
  "op": "u"
}

unwrap 后(推荐)

{
  "id": 1,
  "name": "张三",
  "updated_at": "2025-01-01"
}

👉 ES 只关心 after

九、ES Index 设计建议(生产重点)

1️⃣ 主键一致

  • MySQL PK -> Kafka key -> ES _id

2️⃣ 字段类型提前建 mapping

PUT users
{
  "mappings": {
    "properties": {
      "id": { "type": "long" },
      "name": { "type": "keyword" },
      "created_at": { "type": "date" }
    }
  }
}

👉 避免动态 mapping 翻车

十、常见问题 & 排坑

1️⃣ ES 有数据但更新不生效

❌ 没用 upsert

✅ write.method=upsert

2️⃣ delete 不同步

❌ tombstone 被丢

✅ behavior.on.null.values=delete

3️⃣ 数据重复

👉 正常

  • Debezium = at-least-once
  • ES 用 _id 覆盖即可

4️⃣ 表没有主键

❌ update/delete 异常

👉 MySQL CDC 强烈要求主键

十一、生产级优化建议(重点)

  • 只同步 必要表 / 字段
  • 大表先离线导入,再 CDC
  • ES Sink 批量写
  • Kafka Topic 合理分区
  • binlog 不要堆积

十二、总结

MySQL -> Debezium -> Kafka -> ES Sink

= 当前最稳妥、可扩展、可运维的实时同步方案