1. Canal 简介
Canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
Canal 工作原理:
-
canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave,向 mysql master 发送 dump 协议 -
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal) -
canal 解析 binary lo g对象(原始为 byte 流)
2. 使用 Docker 快速安装 MySQL 5.7
-
拉取 mysql:5.7 官方镜像:
docker pull mysql:5.7
-
创建 mysql 配置和数据映射目录(目录可以按自己的喜好):
cd /docker/mysqld/data
cd /docker/mysqld/conf
-
在 /docker/mysqld/conf 目录下建立配置文件 config-file.cnf :
config-file.cnf 内容如下,打开 mysql 的 binlog :
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
-
创建并启动容器,注意 初始密码 和 目录映射:
docker run --name mysql_5.7 -v /docker/mysqld/data:/var/lib/mysql -v /docker/mysqld/conf:/etc/mysql/conf.d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=88021120 -d mysql:5.7
-
进入 mysql,查看 binlog 开启情况:
SHOW VARIABLES LIKE '%log_bin%';
-
去 /docker/mysqld/data 目录下查看 binlog 已存在:
-
新建 canal 账户并授权操作,为后续做准备:
-- 新建用户
CREATE USER canal IDENTIFIED BY 'canal';
-- 授权操作
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-- 刷新权限
FLUSH PRIVILEGES;
3. 使用 Docker 快速安装 Redis 6.2
-
拉取 redis:6.2 官方镜像:
docker pull redis:6.2
-
创建 redis 配置和数据映射目录(目录可以按自己的喜好):
cd /docker/redis/data
cd /docker/redis/conf
-
在 /docker/redis/conf 目录下建立配置文件 redis.conf :
redis.conf 可以去官网下载:http://download.redis.io/redis-stable/
注意修改两处配置,否则会导致远程连接不上:
# 注释掉这个配置
# bind 127.0.0.1 -::1
# 保护模式‘yes’改为‘no’
protected-mode no
-
创建并启动容器,注意 开启持久化 和 目录映射:
docker run --name redis_6.2 -v /docker/redis/data:/data -v /docker/redis/conf:/usr/local/etc/redis -p 6379:6379 -d redis:6.2 redis-server /usr/local/etc/redis/redis.conf --appendonly yes
4. 安装 Canal-Admin 和 Canal-Deployer 1.1.4
-
去官网下载 1.1.4 版本的 canal.admin-1.1.4.tar.gz 和 canal.deployer-1.1.4.tar.gz :
canal 的官网下载地址:https://github.com/alibaba/canal/releases
-
解压并上传到任意的目录:
-
进入 /root/canal-admin/conf 目录,将 canal_manager.sql 导入 MySQL 中:
-
修改 /root/canal-admin/conf 目录下 application.yml 配置:
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
-
进入 /root/canal-admin/bin 目录,启动 canal-admin :
sh /root/canal-admin/bin/startup.sh
记得打开防火墙端口:
firewall-cmd --zone=public --add-port=8089/tcp --permanent
firewall-cmd --reload
-
进入 canal-admin 的控制台,用户名 admin,密码 123456 :
http://(安装机器的IP):8089/
现在我们来安装 canal-deployer,虚拟机资源有限,我们采用单机部署方式。
-
进入 /root/canal-deployer/conf 目录,修改 canal_local.properties 配置:
# register ip
canal.register.ip = (远程连接用的IP)
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
-
进入 /root/canal-deployer/conf 目录,把 examlpe 文件夹复制一个 instance:
-
进入 /root/canal-deployer/bin 目录,以 local 配置启动 canal-deployer :
sh /root/canal-deployer/bin/startup.sh local
记得打开防火墙端口:
firewall-cmd --zone=public --add-port=11110-11112/tcp --permanent
firewall-cmd --reload
-
进入 canal-admin 的控制台,如果你的配置正确,server 列表里会自动出现启动:
-
在 instance 管理中新建一个 instance,注意名字要和刚才复制的文件夹名字对应:
-
载入配置模板,修改配置:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\..*
#################################################
-
过几秒钟以后,如果你的配置正确,instance 列表里会自动出现启动:
点击详情查看日志如下:
2021-03-31 18:03:01.644 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2021-03-31 18:03:01.649 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [instance/instance.properties]
2021-03-31 18:03:01.675 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2021-03-31 18:03:01.676 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [instance/instance.properties]
2021-03-31 18:03:01.707 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-instance
2021-03-31 18:03:01.708 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*..*$
2021-03-31 18:03:01.708 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2021-03-31 18:03:01.721 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
5. 在 Spring 项目中使用 Canal-Client 同步数据
-
这里大家自己去建立一个最简单的 SpringBoot项目,要求能连接 MySQL 和 Redis 操作即可,随便建一些比较简单的表,下边作者可以给出一些简单的参考性配置,简易搭建一个 SpingBoot+MybatisPlus+Redis 的项目:
pom.xml 文件依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yly</groupId>
<artifactId>region</artifactId>
<version>1.0.0</version>
<name>region</name>
<description></description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml 文件配置:
server:
port: 8080
tomcat:
uri-encoding: UTF-8
spring:
redis:
# Redis数据库索引,默认为0
database: 0
# Redis端口
port: 6379
# Redis服务器主机
host: 192.168.138.128
lettuce:
pool:
# 连接池最大连接数
max-active: 8
# 连接池最大空闲
max-idle: 8
# 连接池最小空闲
min-idle: 2
# 连接池最大阻塞等待时间
max-wait: 1ms
# 超时时间
shutdown-timeout: 100ms
datasource:
name: yly_region
# 基本属性
url: jdbc:mysql://192.168.138.128:3306/yly_region?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&autoReconnect=true&rewriteBatchedStatements=true&allowMultiQueries=true
username: root
password: 88021120
type: com.alibaba.druid.pool.DruidDataSource
# druid相关配置
druid:
# 监控统计拦截的filters
filters: stat
# 配置初始化大小/最小/最大
initial-size: 10
min-idle: 10
max-active: 100
# 获取连接等待超时时间
max-wait: 60000
# 间隔多久进行一次检测,检测需要关闭的空闲连接
time-between-eviction-runs-millis: 60000
# 一个连接在池中最小生存的时间
min-evictable-idle-time-millis: 300000
validation-query: SELECT 'x'
test-while-idle: true
test-on-borrow: false
test-on-return: false
# 打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为false
pool-prepared-statements: false
max-pool-prepared-statement-per-connection-size: 20
# WebStatFilter配置
web-stat-filter:
enabled: true
url-pattern: /*
exclusions: /druid/*,*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico
session-stat-enable: true
session-stat-max-count: 100
# StatViewServlet配置
stat-view-servlet:
enabled: true
url-pattern: /druid/*
reset-enable: true
login-username: caijiaqi
login-password: 88021120
jackson:
date-format: yyyy-MM-dd HH:mm:ss
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
MybatisPlusConfig.java 文件配置:
@Configuration
@EnableTransactionManagement
@MapperScan("com.yly.region.mapper")
public class MybatisPlusConfig {
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
// 设置请求的页面大于最大页后操作, true调回到首页,false继续请求
paginationInterceptor.setOverflow(false);
// 设置最大单页限制数量,默认 500 条,-1 不受限制
paginationInterceptor.setLimit(500);
// 开启 count 的 join 优化,只针对部分 left join
paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
return paginationInterceptor;
}
}
RedisRepositoryConfig.java 文件配置:
@Configuration
@EnableRedisRepositories
public class RedisRepositoryConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
Jackson2JsonRedisSerializer<?> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
-
pom 文件中引用依赖,上面例子已包含:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
-
新建一个 canal 客户端,并且依赖 ApplicationRunner,在 Spring 容器启动完成后开启守护线程同步任务(注意 import 时选择 canal 包下的类):
@Slf4j
@Component
public class CanalClient implements ApplicationRunner {
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final String TABLE_NAME = "yly_rule";
private static final String PRIMARY_KEY = "id";
private static final String SEPARATOR = ":";
private static final String CANAL_SERVER_HOST = "192.168.138.131";
private static final int CANAL_SERVER_PORT = 11111;
private static final String CANAL_INSTANCE = "instance";
private static final String USERNAME = "canal";
private static final String PASSWORD = "canal";
@Override
public void run(ApplicationArguments args) throws Exception {
this.initCanal();
}
public void initCanal() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(CANAL_SERVER_HOST, CANAL_SERVER_PORT),
CANAL_INSTANCE, USERNAME, PASSWORD);
int batchSize = 1000;
try {
log.info("启动 canal 数据同步...");
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
// 时间间隔1000毫秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
syncEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private void syncEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
log.info("================> binlog[{}:{}] , name[{},{}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
String tableName = entry.getHeader().getTableName();
if (!TABLE_NAME.equalsIgnoreCase(tableName)) continue;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
redisInsert(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE) {
printColumn(rowData.getAfterColumnsList());
redisUpdate(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
redisDelete(tableName, rowData.getBeforeColumnsList());
}
}
}
}
private void redisInsert(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步新增,key:" + key);
break;
}
}
}
private void redisUpdate(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步更新,key:" + key);
break;
}
}
}
private void redisDelete(String tableName, List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.delete(key);
log.info("redis数据同步删除,key:" + key);
break;
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
-
根据个人喜好随便建一个表来测试,作者用 yly_rule 表:
测试 INSERT:
现在我们来新增一条数据:
INSERT INTO `yly_rule`(`id`, `create_time`, `name`, `store_type`, `kilometers`)
VALUES (4, '2021-03-31 18:36:32', '玩具', 4, 10);
我们看到项目中输出日志:
我们去 Redis 中查看数据,数据已经同步插入:
测试 UPDATE:
现在我们来修改一条数据:
UPDATE `yly_rule` SET `kilometers` = 8 WHERE `id` = 4;
我们看到项目中输出日志:
我们去 Redis 中查看数据,数据已经同步更新:
测试 DELETE:
现在我们来删除一条数据:
DELETE FROM `yly_region`.`yly_rule` WHERE `id` = 4;
我们看到项目中输出日志:
我们去 Redis 中查看数据,已经不存在了。
6. 回顾与思考
Canal 的好处在于对业务代码没有侵入,因为是基于监听 binlog 日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。
以上只是一个测试的案例。Canal 根据偏移量增量同步 MySQL 的 binlog,可以为每个 instance 配置路由规则,只同步部分内容,业务代码也可以自行修改,不仅仅同步到 Redis,也可以同步到其他存储介质中,不仅仅同步相同数据,可以自定义数据模型结构进行转换。
上述代码中,我们得到 binlog 同步数据对象 proto,简单介绍格式:
EntryProtocol.proto
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
由此可见,可以根据 DDL(data definition language) 数据定义语言, DML(data manipulation language) 数据操纵语言 区分自己不同的操作,DML 操作也能得到具体 数据库名表名以及字段名和数据变更前变更后的各种详细数据信息,我们可以选择性地结合业务来同步数据。
当然这仅仅只是入门,实际项目中一般配置 MQ 模式,配合 RocketMQ 或者 Kafka,Canal 会把数据发送到 MQ 的 topic 中,然后通过消息队列的消费者进行处理。Canal 的部署也是支持集群的,需要配合 ZooKeeper 进行集群管理。
原文始发于微信公众号(白菜说技术):Canal 解决 MySQL 和 Redis 数据同步问题
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/172642.html