大家好,我是一安,之前介绍《实际开发中,如何保证数据库和缓存双写一致性》一文中,最后有提到使用Canal监听Mysql,今天就完整的介绍一下。
前言
在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。
一般遇到这种情况下,在实时性要求不高的场景我们有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。
canal简介
Canal是阿里巴巴旗下的一款开源项目,纯Java开发,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括
-
数据库镜像 -
数据库实时备份 -
索引构建和实时维护(拆分异构索引、倒排索引等) -
业务 cache 刷新 -
带业务逻辑的增量数据处理
工作原理
MySQL主备复制原理
-
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看) -
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) -
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
-
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 -
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) -
canal 解析 binary log 对象(原始为 byte 流)
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
环境准备
小编这里演示canal并推送mysql消息到rabbitmq,有兴趣的可以参考,修改接收方,比如kafka、ES
mysql和rabbitmq安装
安装mysql和rabbitmq,之前有专门介绍两个安装
这里直接使用之前搭的环境,使用过程中遇到一点小问题,下面会提到
-
开启mysql的bin-log日志
小编之前的搭的主从复制,所以是小编这里是开启的,如果你是自己搭的环境,检查是否开启,没有的话开启即可,命令如下:
1.查看mysql是否开启bin-log日志
SHOW VARIABLES LIKE '%log_bin%'
2.没有开启的话,增加如下配置重启mysql
server-id=1
log-bin=mysql-bin
binlog-format=ROW
-
创建canal用户获取bin-log日志
进入mysql执行如下命令即可:
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by "canal";
flush privileges;
-
rabbitmq新建一个topic类型交换机canal.topic,然后新增队列:canal.topic, 绑定canal.topic交换机, RoutingKey:canal.topic
canal安装
docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -p 11111:11111 -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /mydata/canal-server/conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal-server/conf/
注意:
容器里配置复制到外面是为了后面挂载,以后修改配置不需要每次都进入容器内部 小编这里一开始未复制出容器里配置,而且直接外面建了两个空的配置文件,然后追加了配置信息,启动一直报空指针 2022-08-23 11:06:51.844 [main] ERROR com.alibaba.otter.canal.deployer.CanalLauncher - ## S>>omething goes wrong when starting up the canal Server:
java.lang.NullPointerException: null
at com.alibaba.otter.canal.deployer.CanalStarter.start(CanalStarter.java:68) ~[can>>al.deployer-1.1.5.jar:na]
at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:117) ~[canal.deployer-1.1.5.jar:na]
查看源码部分后,启动需要加载很多配置,而小编使用的自己建的配置文件,只有部分配置
修改配置
instance.properties
# 不能和mysql重复
canal.instance.mysql.slaveId=2
# 使用mysql的虚拟ip和端口
canal.instance.master.address=192.168.5.128:3307
# binlog日志名称(非必选)
canal.instance.master.journal.name=mysql-bin.000006
# mysql主库链接时起始的binlog偏移量(非必选
canal.instance.master.position=4
# 使用已创建的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# rabbitmq中配置的绑定的 routingkey
canal.mq.topic=canal.topic
canal.properties
目前rabbitMQ没有支持端口配置,默认是5672
canal.serverMode = rabbitMQ
rabbitmq.host = 192.168.5.128
rabbitmq.virtual.host = /
# rabbitmq中新建的Exchange
rabbitmq.exchange = canal.topic
rabbitmq.username = guest
rabbitmq.password = guest
#exchange的模式
rabbitmq.deliveryMode = topic
重新创建canal容器并挂在配置文件
1.先停掉原来启动的
docker stop canal
2.删除原来的容器
docker rm canal
3.重新创建容器并挂载配置文件
docker run --name canal -p 11111:11111 -v /mydata/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /mydata/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5
至此,环境准备完成,开始测试。
演示
新建数据库
-
新建测试数据库:demo
create database demo;
问题出现了,mysql,rabbitmq,canal都启动正常,但rabbitmq收不到消息,小编这里找了一个小时终于找到了,是之前搭建mysql环境配置的问题,原来配置主从复制时指定了需要同步的数据库,注释掉就可以了(自己刨的坑自己填,大家要注意自己的环境)
#需要同步的数据库
#binlog-do-db=demo_0
#binlog-do-db=demo_1
{"data":null,"database":"","es":1661232453000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'","sqlType":null,"table":"","ts":1661232453785,"type":"QUERY"}
新增表
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(30) NOT NULL DEFAULT '',
`address` varchar(200) DEFAULT NULL,
`email` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
{"data":null,"database":"demo","es":1661232589000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `user` (n `id` int(11) NOT NULL AUTO_INCREMENT,n `name` varchar(30) NOT NULL DEFAULT '',n `address` varchar(200) DEFAULT NULL,n `email` varchar(100) DEFAULT NULL,n PRIMARY KEY (`id`)n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"user","ts":1661232589258,"type":"CREATE"}
新增数据记录
insert into user (name,address,email) values ('lisi','beijing','1234');
{"data":[{"id":"1","name":"lisi","address":"beijing","email":"1234"}],"database":"demo","es":1661232730000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(30)","address":"varchar(200)","email":"varchar(100)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"address":12,"email":12},"table":"user","ts":1661232730878,"type":"INSERT"}
数据修改,删除,字段变更就不一一测试了,留给大家一些操作空间。
Java客户端连接
注意:客户端连接需要修改配置canal.properties中canal.serverMode = tcp
1.引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
2.代码示例
public static void main(String[] args){
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.5.128", 11111), "example", "", "");
canalConnector.connect();
//订阅所有消息
canalConnector.subscribe(".*\..*");
// 只订阅test数据库下的所有表
//canalConnector.subscribe("test");
//恢复到之前同步的那个位置
canalConnector.rollback();
for(;;){
//获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
Message message = canalConnector.getWithoutAck(100);
//获取消息id
long batchId = message.getId();
if(batchId != -1){
printEnity(message.getEntries());
//提交确认
//canalConnector.ack(batchId);
//处理失败,回滚数据
//canalConnector.rollback(batchId);
}
}
}
private static void printEnity(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){
continue;
}
try{
// 序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println(rowChange.getEventType());
switch (rowChange.getEventType()){
//如果希望监听多种事件,可以手动增加case
case INSERT:
// 表名
String tableName = entry.getHeader().getTableName();
System.out.println("表名:"+tableName);
//测试users表进行映射处
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column c:afterColumnsList){
System.out.println("字段:"+c.getName()+",值:"+c.getValue());
}
System.out.println("插入的数据是:" + afterColumnsList);
break;
case UPDATE:
List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
System.out.println("更新的数据是:" + afterColumnsList2);
break;
case DELETE:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
System.out.println("被删除的数据是:" + beforeColumnsList);
break;
default:
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
3.输出
INSERT
表名:user
字段:id,值:2
字段:name,值:lisi
字段:address,值:beijing
字段:email,值:1234
插入的数据是:
[index: 0 sqlType: 4 name: "id" isKey: true updated: true isNull: false value: "2" mysqlType: "int(11)"
, index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "lisi" mysqlType: "varchar(30)"
, index: 2 sqlType: 12 name: "address" isKey: false updated: true isNull: false value: "beijing" mysqlType: "varchar(200)"
, index: 3 sqlType: 12 name: "email" isKey: false updated: true isNull: false value: "1234" mysqlType: "varchar(100)"]
号外!号外!
如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!
原文始发于微信公众号(一安未来):如何监听 MySQL 实现数据变化后的实时通知
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44937.html