Debezium sync MySQL to ElasticSearch
使用 Debezium 将 MySQL 中的数据同步到 ElasticSearch
内容:架构 -> 原理 -> 实操 -> 关键配置 -> 常见坑
MySQL
↓ (binlog, ROW)
Debezium MySQL Connector
↓
Kafka Topic
↓
Kafka Connect Elasticsearch Sink
↓
Elasticsearch Index
- 👉 Debezium 只负责 CDC
- 👉 真正写 ES 的是 Elasticsearch Sink Connector
- Debezium 是 Source Connector
- ES 是 Sink
- 职责分离:
- Debezium:捕获变更
- ES Sink:索引 / upsert / delete
👉 这是 官方推荐架构
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
重启 MySQL 后确认:
SHOW VARIABLES LIKE 'binlog_format';
CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
| 组件 | 建议 |
|---|---|
| MySQL | 8.0.x |
| Kafka | ≥ 3.x |
| Debezium | ≥ 2.x |
| Elasticsearch | 7.x / 8.x |
用于 学习 / PoC / 测试
docker-compose.yml(简化版)
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
ports:
- "3306:3306"
connect:
image: debezium/connect:2.5
ports:
- "8083:8083"
depends_on: [kafka, mysql]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
POST http://localhost:8083/connectors
{
"name": "mysql-debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "mysql01",
"database.include.list": "testdb",
"table.include.list": "testdb.users",
"snapshot.mode": "initial",
"include.schema.changes": "false",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mysql"
}
}
生成的 Kafka Topic
mysql01.testdb.users
POST http://localhost:8083/connectors
{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "mysql01.testdb.users",
"key.ignore": "false",
"schema.ignore": "true",
"write.method": "upsert",
"behavior.on.null.values": "delete",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
| MySQL 操作 | ES 行为 |
|---|---|
| insert | index |
| update | update / upsert |
| delete | delete |
关键配置:
write.method=upsert
behavior.on.null.values=delete
Debezium 原始事件(很大)
{
"before": {...},
"after": {...},
"op": "u"
}
unwrap 后(推荐)
{
"id": 1,
"name": "张三",
"updated_at": "2025-01-01"
}
👉 ES 只关心 after
- MySQL PK -> Kafka key -> ES _id
PUT users
{
"mappings": {
"properties": {
"id": { "type": "long" },
"name": { "type": "keyword" },
"created_at": { "type": "date" }
}
}
}
👉 避免动态 mapping 翻车
❌ 没用 upsert
✅ write.method=upsert
❌ tombstone 被丢
✅ behavior.on.null.values=delete
👉 正常
- Debezium = at-least-once
- ES 用 _id 覆盖即可
❌ update/delete 异常
👉 MySQL CDC 强烈要求主键
- 只同步 必要表 / 字段
- 大表先离线导入,再 CDC
- ES Sink 批量写
- Kafka Topic 合理分区
- binlog 不要堆积
MySQL -> Debezium -> Kafka -> ES Sink
= 当前最稳妥、可扩展、可运维的实时同步方案