Skip to main content
☘️ Septvean's Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

数据同步方案

没有“唯一正确方案”,只有“是否符合你业务特性”的方案。

一、先给结论(选型速览)

场景 推荐方案
一次性全量同步 Logstash JDBC
定时增量同步(低实时) Logstash JDBC(增量字段)
准实时 / 实时同步 Debezium(CDC)+ Kafka + ES
云厂商 / 官方方案 Elastic 官方 Connector
业务逻辑复杂 自研同步程序(Python/Java)

二、方案一(最常用):Logstash JDBC 同步

✅ 适合场景

  • 数据源:MySQL / PostgreSQL
  • 延迟:分钟级
  • 结构化表数据
  • 实现简单、稳定

🔧 同步原理

DB -> JDBC -> Logstash -> ElasticSearch

通过 SQL + 增量字段(如 updated_at)拉取数据。

🧱 全量同步示例

Logstash 配置(PostgreSQL)

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/postgresql.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "jdbc:postgresql://db:5432/app"
    jdbc_user => "user"
    jdbc_password => "pwd"
    statement => "SELECT * FROM users"
  }
}

output {
  elasticsearch {
    hosts => ["http://es:9200"]
    index => "users"
    document_id => "%{id}"
  }
}

🔁 增量同步(生产必用)

条件

表中必须有:

  • updated_at
  • 或自增 id
jdbc {
  statement => "
    SELECT * FROM users
    WHERE updated_at > :sql_last_value
  "
  use_column_value => true
  tracking_column => "updated_at"
  schedule => "*/5 * * * *"
}

📌 Logstash 会自动保存 sql_last_value

⚠️ 优缺点

优点

  • 简单
  • 成熟稳定
  • 运维成本低

缺点

  • 延迟高
  • 无法感知 delete(需软删除)
  • 高并发更新时压力大

三、方案二(生产级实时):Debezium CDC + Kafka

✅ 适合场景

  • 高实时性(秒级)
  • 数据频繁更新
  • 对一致性要求高
  • 大型系统

🔥 同步架构

Postgres/MySQL
   ↓ binlog / WAL
Debezium
Kafka
ElasticSearch Sink

🚀 优点(非常重要)

  • 基于 binlog / WAL
  • 不查数据库
  • 可感知 INSERT / UPDATE / DELETE
  • 支持回放(Kafka)
  • 真正“准实时”

❌ 缺点

  • 架构复杂
  • 组件多(Kafka、Connect)
  • 运维成本高

☘️ 总结

Debezium 基于数据库日志(binlog/WAL)
通过 CDC 捕获变更并投递到 Kafka
再由 Sink 写入 Elasticsearch
实现低延迟、高可靠、可回放的数据同步

四、方案三:Elastic 官方 Connector(8.x 推荐)

✅ 特点

  • 官方维护
  • 支持 MySQL / PostgreSQL
  • 支持全量 + 增量
  • 管理界面在 Kibana

架构:

DB -> Elastic Connector -> ElasticSearch

优点

  • 最省心
  • 与 ES 版本强一致

缺点

  • 灵活性略低
  • 高级玩法不如 Debezium

五、方案四:自研同步程序(Python / Java)

✅ 适合你(你有 FastAPI + SQLAlchemy 背景)

架构

DB -> 程序 -> ElasticSearch

Python 示例(简化)

from elasticsearch import ElasticSearch

es = ElasticSearch("http://localhost:9200")

es.index(
    index="users",
    id=user.id,
    document={
        "name": user.name,
        "email": user.email
    }
)

同步策略

  • DB 写成功 -> 同步写 ES
  • MQ / 事件驱动
  • 定时扫描补偿

⚠️ 注意

  • 事务一致性要自己处理
  • 失败重试
  • 幂等(document_id 必须稳定)

六、DELETE 如何同步?

方案 方式
Logstash JDBC 软删除字段(is_deleted)
Debezium 自动捕获 DELETE
自研 DB delete -> ES delete
官方 Connector 支持

七、索引设计(非常关键)

必做事项

  • DB 主键 -> ES _id
  • keyword / text 合理 mapping
  • 不要让 ES 动态建字段(dynamic=false)
  • 使用 index template

八、方案对比总结

  • 低实时、简单同步:Logstash JDBC
  • 高实时、强一致:Debezium + Kafka
  • 官方省心:Elastic Connector
  • 强业务定制:自研同步服务

九、总结

PostgreSQL/MySQL 同步到 Elasticsearch:

  • 如果是离线或分钟级同步,推荐使用 Logstash JDBC
  • 如果是准实时、并且需要感知 delete,推荐使用 Debezium 基于 binlog/WAL 的 CDC 同步

用数据库主键作为 ES 的 document_id。

通过 ILM、mapping 控制索引生命周期和结构,保证长期可维护性。