数据同步方案
没有“唯一正确方案”,只有“是否符合你业务特性”的方案。
| 场景 | 推荐方案 |
|---|---|
| 一次性全量同步 | Logstash JDBC |
| 定时增量同步(低实时) | Logstash JDBC(增量字段) |
| 准实时 / 实时同步 | Debezium(CDC)+ Kafka + ES |
| 云厂商 / 官方方案 | Elastic 官方 Connector |
| 业务逻辑复杂 | 自研同步程序(Python/Java) |
- 数据源:MySQL / PostgreSQL
- 延迟:分钟级
- 结构化表数据
- 实现简单、稳定
DB -> JDBC -> Logstash -> ElasticSearch
通过 SQL + 增量字段(如 updated_at)拉取数据。
Logstash 配置(PostgreSQL)
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/postgresql.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://db:5432/app"
jdbc_user => "user"
jdbc_password => "pwd"
statement => "SELECT * FROM users"
}
}
output {
elasticsearch {
hosts => ["http://es:9200"]
index => "users"
document_id => "%{id}"
}
}
条件
表中必须有:
- updated_at
- 或自增 id
jdbc {
statement => "
SELECT * FROM users
WHERE updated_at > :sql_last_value
"
use_column_value => true
tracking_column => "updated_at"
schedule => "*/5 * * * *"
}
📌 Logstash 会自动保存 sql_last_value
优点
- 简单
- 成熟稳定
- 运维成本低
缺点
- 延迟高
- 无法感知 delete(需软删除)
- 高并发更新时压力大
- 高实时性(秒级)
- 数据频繁更新
- 对一致性要求高
- 大型系统
Postgres/MySQL
↓ binlog / WAL
Debezium
↓
Kafka
↓
ElasticSearch Sink
- 基于 binlog / WAL
- 不查数据库
- 可感知 INSERT / UPDATE / DELETE
- 支持回放(Kafka)
- 真正“准实时”
- 架构复杂
- 组件多(Kafka、Connect)
- 运维成本高
Debezium 基于数据库日志(binlog/WAL)
通过 CDC 捕获变更并投递到 Kafka
再由 Sink 写入 Elasticsearch
实现低延迟、高可靠、可回放的数据同步
- 官方维护
- 支持 MySQL / PostgreSQL
- 支持全量 + 增量
- 管理界面在 Kibana
架构:
DB -> Elastic Connector -> ElasticSearch
- 最省心
- 与 ES 版本强一致
- 灵活性略低
- 高级玩法不如 Debezium
架构
DB -> 程序 -> ElasticSearch
Python 示例(简化)
from elasticsearch import ElasticSearch
es = ElasticSearch("http://localhost:9200")
es.index(
index="users",
id=user.id,
document={
"name": user.name,
"email": user.email
}
)
同步策略
- DB 写成功 -> 同步写 ES
- MQ / 事件驱动
- 定时扫描补偿
- 事务一致性要自己处理
- 失败重试
- 幂等(document_id 必须稳定)
| 方案 | 方式 |
|---|---|
| Logstash JDBC | 软删除字段(is_deleted) |
| Debezium | 自动捕获 DELETE |
| 自研 | DB delete -> ES delete |
| 官方 Connector | 支持 |
必做事项
- DB 主键 -> ES _id
- keyword / text 合理 mapping
- 不要让 ES 动态建字段(dynamic=false)
- 使用 index template
- 低实时、简单同步:Logstash JDBC
- 高实时、强一致:Debezium + Kafka
- 官方省心:Elastic Connector
- 强业务定制:自研同步服务
PostgreSQL/MySQL 同步到 Elasticsearch:
- 如果是离线或分钟级同步,推荐使用 Logstash JDBC
- 如果是准实时、并且需要感知 delete,推荐使用 Debezium 基于 binlog/WAL 的 CDC 同步
用数据库主键作为 ES 的 document_id。
通过 ILM、mapping 控制索引生命周期和结构,保证长期可维护性。