实时监听MySQL同步数据到ElasticSearch


大家好,我是一安,上一篇《如何监听 MySQL 实现数据变化后的实时通知》介绍了通过实时监听Mysql并发送给RabbitMQ,文中有提到让大家自己试着修改发送到Kafka或者ES,但有的小伙伴卡在了推送ES这块,ES现在又比较流行,那今天就再介绍一下如何推送到ES

这里需要引入一个第三方组件canal-adapter,顺便简要概括一下canal-server canal-adapter canal-admin分别是什么

canal-server(canal-deploy):可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。

canal-adapter:相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。

canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

环境准备

接上一篇《如何监听 MySQL 实现数据变化后的实时通知

ES搭建《如何快速搭建一套生产级Elasticsearch

安装canal-adapter

1.拉取镜像
  docker pull slpcat/canal-adapter:v1.1.5
2.首次运行
  docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5
3.复制容器中的配置文件到本地
docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /mydata/canal-adapter/conf/
docker cp canal-adapter:/opt/canal-adapter/conf/es7 /mydata/canal-adapter/conf/

再次提醒:

目的是为了找到配置文件application.ymlmytest_user.yml的目录,然后把配置文件挂载出来,方便之后在容器之外修改

修改配置

application.yml

注意:

1.配置信息很多,注意标红配置,其他配置信息保持不变

2.配置ES的时候,一定要保证和配置目录名称一致,例如小编建的是es7目录,这里配置也是- name: es7

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal-server地址
    canal.tcp.server.host: 192.168.5.128:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      #源数据库地址
      url: jdbc:mysql://192.168.5.128:3307/demo?useUnicode=true
      username: root
      password: root

  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        hosts: 192.168.5.128:9201
 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: my-es
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

mytest_user.yml

注意:

1.删除多余配置rm -rf biz_order.yml customer.yml

2.修改_index: 索引名称sql: "sql语句"

#数据源的key值,对应application.yml中srcDataSources
dataSourceKey: defaultDS 
#canal的instance或MQ的topic
destination: example
groupId: g1
esMapping:
  _index: user
  _id: _id
  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name, a.address, b.email from user a"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}"
  commitBatch: 3000

重新创建canal-adapter容器并挂在配置文件

#停止容器
  docker stop canal-adapter
#删除容器
  docker rm canal-adapter
#重新运行canal-adapter,使用挂载出来的配置文件
  docker run --name canal-adapter -p 8081:8081 -v /mydata/canal-adapter/conf/:/opt/canal-adapter/conf -v /mydata/canal-adapter/conf/es7:/opt/canal-adapter/conf/es7 -d slpcat/canal-adapter:v1.1.5

至此,环境准备完成,开始测试。

演示

建ES索引

这里小编指定了3个分片,一个副本

curl  -XPUT  http://192.168.5.128:9201/user -H 'Content-Type: application/json' -d '{
 "settings" : {
     "number_of_shards" : 3,
     "number_of_replicas": "1",
     "max_result_window":"2147483647"   
 },
  "mappings": {   
      "properties": {
        "id": {
          "type": "keyword"
        },
        "name": {
          "type": "keyword"
        },
        "address": {
          "type": "keyword"
        },
        "email": {
          "type": "keyword"
        }
      }
    }
}'

mysql新增数据记录

insert into user (name,address,email) values ('lisi','beijing','1234');

查看ES

 curl -XGET  -H "Content-Type: application/json" http://192.168.5.128:9201/user/_search?pretty
 
 
 {
  "took" : 10,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "user",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "name" : "lisi",
          "address" : "beijing",
          "email" : "1234"
        }
      }
    ]
  }
}

mysql更新数据记录

  update user set name='zhangsan' where id = 3;

查看ES

 curl -XGET  -H "Content-Type: application/json" http://192.168.5.128:9201/user/_search?pretty
 
 
 {
  "took" : 23,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "user",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "name" : "zhangsan",
          "address" : "beijing",
          "email" : "1234"
        }
      }
    ]
  }
}

数据删除大家可以亲自实验一下,小编验证正常。

号外!号外!

如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!

实时监听MySQL同步数据到ElasticSearch

多线程+EasyExcel实现报表优雅导出


面试官:MQ 消息丢失、重复、积压问题,如何解决?


面试官:在浏览器输入URL回车之后发生了什么


实时监听MySQL同步数据到ElasticSearch


原文始发于微信公众号(一安未来):实时监听MySQL同步数据到ElasticSearch

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44932.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!