大家好,我是一安,上一篇《如何监听 MySQL 实现数据变化后的实时通知》介绍了通过实时监听Mysql并发送给RabbitMQ,文中有提到让大家自己试着修改发送到Kafka或者ES,但有的小伙伴卡在了推送ES这块,ES现在又比较流行,那今天就再介绍一下如何推送到ES
这里需要引入一个第三方组件canal-adapter,顺便简要概括一下canal-server canal-adapter canal-admin分别是什么
canal-server(canal-deploy):可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。
canal-adapter:相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。
canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
环境准备
接上一篇《如何监听 MySQL 实现数据变化后的实时通知》
ES搭建《如何快速搭建一套生产级Elasticsearch》
安装canal-adapter
1.拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
2.首次运行
docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5
3.复制容器中的配置文件到本地
docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /mydata/canal-adapter/conf/
docker cp canal-adapter:/opt/canal-adapter/conf/es7 /mydata/canal-adapter/conf/
再次提醒:
目的是为了找到配置文件application.yml、mytest_user.yml的目录,然后把配置文件挂载出来,方便之后在容器之外修改
修改配置
application.yml
注意:
1.配置信息很多,注意标红配置,其他配置信息保持不变
2.配置ES的时候,一定要保证和配置目录名称一致,例如小编建的是es7目录,这里配置也是
- name: es7
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal-server地址
canal.tcp.server.host: 192.168.5.128:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
#源数据库地址
url: jdbc:mysql://192.168.5.128:3307/demo?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
hosts: 192.168.5.128:9201 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: my-es
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
mytest_user.yml
注意:
1.删除多余配置
rm -rf biz_order.yml customer.yml
2.修改
_index: 索引名称
,sql: "sql语句"
#数据源的key值,对应application.yml中srcDataSources
dataSourceKey: defaultDS
#canal的instance或MQ的topic
destination: example
groupId: g1
esMapping:
_index: user
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name, a.address, b.email from user a"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>={}"
commitBatch: 3000
重新创建canal-adapter容器并挂在配置文件
#停止容器
docker stop canal-adapter
#删除容器
docker rm canal-adapter
#重新运行canal-adapter,使用挂载出来的配置文件
docker run --name canal-adapter -p 8081:8081 -v /mydata/canal-adapter/conf/:/opt/canal-adapter/conf -v /mydata/canal-adapter/conf/es7:/opt/canal-adapter/conf/es7 -d slpcat/canal-adapter:v1.1.5
至此,环境准备完成,开始测试。
演示
建ES索引
这里小编指定了3个分片,一个副本
curl -XPUT http://192.168.5.128:9201/user -H 'Content-Type: application/json' -d '{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas": "1",
"max_result_window":"2147483647"
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"address": {
"type": "keyword"
},
"email": {
"type": "keyword"
}
}
}
}'
mysql新增数据记录
insert into user (name,address,email) values ('lisi','beijing','1234');
查看ES
curl -XGET -H "Content-Type: application/json" http://192.168.5.128:9201/user/_search?pretty
{
"took" : 10,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "user",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"name" : "lisi",
"address" : "beijing",
"email" : "1234"
}
}
]
}
}
mysql更新数据记录
update user set name='zhangsan' where id = 3;
查看ES
curl -XGET -H "Content-Type: application/json" http://192.168.5.128:9201/user/_search?pretty
{
"took" : 23,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "user",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"name" : "zhangsan",
"address" : "beijing",
"email" : "1234"
}
}
]
}
}
数据删除大家可以亲自实验一下,小编验证正常。
号外!号外!
如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!
原文始发于微信公众号(一安未来):实时监听MySQL同步数据到ElasticSearch
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44932.html