数据同步解决方案之canal
Canal概述
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
简单的说,canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
工作原理
MySQL主备复制原理
MySQL master将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events进行查看)
MySQL slave将master的binary log events拷贝到它的中继日志(relay log)
MySQL slave重放relay log中事件,将数据变更反映它自己的数据
Canal工作原理
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向 MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给 slave(即 canal )
canal解析binary log对象(原始为 byte 流)
环境部署
mysql开启binlog模式
1.查看当前mysql是否开启binlog模式。
SHOW VARIABLES LIKE '%log_bin%'
如果log_bin的值为OFF是未开启,为ON是已开启。
2.修改/etc/my.cnf 需要开启binlog模式。
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置msyql的server_id,不要和canal的slaveId重复
修改完成之后,重启mysqld的服务。
3.进入mysql
mysql -h localhost -u root -p
4.创建账号 用于测试使用
使用root账号创建用户并授予权限
# 创建账号
create user canal@'%' IDENTIFIED by 'canal';
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
# 刷新并应用
FLUSH PRIVILEGES;
canal服务端安装配置
拉取canal容器
docker pull canal/canal-server:v1.1.5
启动容器
docker run -p 11111:11111 --name canal -id 0c7f1d62a7d8
进入canal容器
docker exec -it canal bash
编辑canal容器的配置
vi canal-server/conf/example/instance.properties
修改3处:
# 与server_id的值不重复即可
canal.instance.mysql.slaveId=666
# 数据库地址
canal.instance.master.address=IP:3306
# 创建的账号、密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
[root@064fb8202b09 admin]# cat canal-server/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 与server_id的值不重复即可
canal.instance.mysql.slaveId=666
# enable gtid use true/false
canal.instance.gtidon=false
# position info
# 数据库地址
canal.instance.master.address=IP:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# 创建的账号、密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
重启docker
docker restart canal
查看日志
tail -n60 -f canal-server/logs/example/example.log
Canal的基本使用
引入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
编码
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
// 进行连接
connector.connect();
// 进行订阅
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
//使用循环不断的获取canal信息
while (emptyCount < totalEmptyCount) {
//获取Message对象; 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
// System.out.println("当前监控到的binLog消息数量是:" + size);
//判断是否有数据
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
//如果没有数据,等待1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
//如果有数据,进行数据解析
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
//遍历获取到的Entry集合
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
// 获取存储的内容
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
// 事件的类型,增删改查哪种 eventType
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// 改变的内容(增量数据)
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
//获取改变之前的数据
rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据:" + c.getName() + "::" + c.getValue()));
//获取改变之后的数据
rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变之后的数据:" + c.getName() + "::" + c.getValue()));
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
执行测试
修改数据库数据
empty count : 65
================> binlog[mysql-bin.000019:17797] , name[test_db,tb_user] , eventType : UPDATE
empty count : 1
empty count : 2
SpringBoot集成Canal
引入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.7</version>
</dependency>
数据库配置
spring.datasource.url = jdbc:mysql://localhost:3306/demo?useUnicode=true&characterEncoding=UTF-8&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username = root
spring.datasource.password = 123456
Canal编码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
@Slf4j
public class CanalClient {
/**
* sql队列
*/
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/**
* canal入库方法
*/
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",11111), "example", "canal", "canal");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
// 提交确认
connector.ack(batchId);
// 当队列中堆积sql大于一定值则执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/**
* 执行队列中的sql
*/
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
log.info("Executing sql ---> {}",sql);
this.execute(sql.toString());
}
}
/**
* 数据处理
*/
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
// 删除类型SQL
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
// 修改类型SQL
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
// 插入类型SQL
saveInsertSql(entry);
}
}
}
}
/**
* 更新SQL
*/
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 删除SQL
*/
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
// 暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 插入SQL
*/
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/**
* 执行SQL,落库操作
*/
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: "+ row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
pring-boot-starter-canal
spring-boot-starter-canal
是第三方开源的一个项目,集成了Spring Boot,可直接使用。
GitHub地址:spring-boot-starter-canal
GitHub 搜索spring-boot-starter-canal
,选择其中一个下载源码到本地,并安装到本地仓库,在项目中引用即可。
引入依赖
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Canal启动配置
项目启动,执行canal客户端监听
@Component
public class CanalCommandLineRunner implements CommandLineRunner {
@Resource
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}
编码
@SpringBootApplication
//声明当前的服务是canal的客户端
@EnableCanalClient
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
}
//声明当前的类是canal的监听类
@CanalEventListener
public class MyEventListener {
@InsertListenPoint
public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("执行 InsertListenPoint。。。。。");
System.out.println("eventType: " + eventType.toString());
//获取改变之前的数据
System.out.println("改变之前的数据:" + rowData.getBeforeColumnsList());
//获取改变之后的数据
System.out.println("改变之后的数据:" + rowData.getAfterColumnsList());
System.out.println("-----------------");
}
@UpdateListenPoint(schema = "test_db", table = "tb_user")
public void onEvent1(CanalEntry.RowData rowData) {
System.out.println("执行 UpdateListenPoint。。。。。");
printData(null, rowData);
}
@DeleteListenPoint
public void onEvent3(CanalEntry.EventType eventType) {
System.out.println("执行 DeleteListenPoint。。。。。");
printData(eventType, null);
}
@ListenPoint(destination = "example", schema = "test_db", table = {"tb_user", "tb_role"}, eventType = CanalEntry.EventType.UPDATE)
public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.out.println("执行 ListenPoint。。。。。");
printData(eventType, rowData);
}
private void printData(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (eventType != null) {
System.out.println("eventType: " + eventType.toString());
}
if (rowData != null) {
//获取改变之前的数据
System.out.println("改变之前的数据:" + rowData.getBeforeColumnsList());
//获取改变之后的数据
System.out.println("改变之后的数据:" + rowData.getAfterColumnsList());
}
System.out.println("-----------------");
}
}
执行测试
修改数据库数据
执行 UpdateListenPoint。。。。。
改变之前的数据:[index: 0
sqlType: 12
name: "username"
isKey: true
updated: false
............
增量数据订阅与消费
配置RabbitMQ
@Configuration
public class RabbitMQConfig {
//定义交换机名称
public static final String CANAL_EXCHANGE = "canal_exchange";
//定义队列名称
public static final String CANAL_QUEUE = "canal_queue";
/**
* 声明队列
* @return
*/
@Bean(CANAL_QUEUE)
public Queue CANAL_QUEUE() {
return new Queue(CANAL_QUEUE);
}
/**
* 声明交换机
* @return
*/
@Bean(CANAL_EXCHANGE)
public Exchange CANAL_EXCHANGE() {
return ExchangeBuilder.fanoutExchange(CANAL_EXCHANGE).durable(true).build();
}
/**
* 队列与交换机的绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding CANAL_EXCHANGE_BINDING(@Qualifier(CANAL_QUEUE) Queue queue, @Qualifier(CANAL_EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
配置application.yml
spring:
rabbitmq:
host: ip
username: admin
password: admin
订阅–生产者
@CanalEventListener
public class ProduceListener {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param eventType 当前操作数据库的类型
* @param rowData 当前操作数据库的数据
*/
@ListenPoint(schema = "test_db", table = "tb_user")
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if ("insert".equalsIgnoreCase(eventType.toString())) {
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
if ("email".equals(column.getName()) && StringUtils.isNotBlank(column.getValue())) {
System.out.println("发送邮箱账号数据到MQ:" + column.getValue());
//发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.CANAL_EXCHANGE, "", column.getValue());
}
}
}
}
}
消费–消费者
@Component
public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.CANAL_QUEUE)
public void receiveMessage(String message){
System.out.println("MQ收到的消息:"+message);
}
}
执行测试
发送邮箱账号数据到MQ:123456789@qq.om
INFO 12652 --- [pool-2-thread-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [ip:5672]
INFO 12652 --- [pool-2-thread-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4ed5eb72:0/SimpleConnection@7adf1cd7 [delegate=amqp://admin@ip:5672/, localPort= 7468]
MQ收到的消息:123456789@qq.om
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137057.html