数据同步解决方案之Canal

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 数据同步解决方案之Canal,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

Canal概述

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

简单的说,canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

基于日志增量订阅和消费的业务包括

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理

在这里插入图片描述
github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

工作原理

MySQL主备复制原理

在这里插入图片描述

MySQL master将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events进行查看)

MySQL slave将master的binary log events拷贝到它的中继日志(relay log)

MySQL slave重放relay log中事件,将数据变更反映它自己的数据

Canal工作原理

canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向 MySQL master发送dump协议

MySQL master收到dump请求,开始推送binary log给 slave(即 canal )

canal解析binary log对象(原始为 byte 流)

环境部署

mysql开启binlog模式

1.查看当前mysql是否开启binlog模式。

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

2.修改/etc/my.cnf 需要开启binlog模式。

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1  # 配置msyql的server_id,不要和canal的slaveId重复

修改完成之后,重启mysqld的服务。

3.进入mysql

mysql -h localhost -u root -p

4.创建账号 用于测试使用

使用root账号创建用户并授予权限

# 创建账号
create user canal@'%' IDENTIFIED by 'canal';
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
# 刷新并应用
FLUSH PRIVILEGES;

canal服务端安装配置

拉取canal容器

docker pull canal/canal-server:v1.1.5

启动容器

docker run -p 11111:11111 --name canal -id 0c7f1d62a7d8

进入canal容器

docker exec -it canal bash

编辑canal容器的配置

vi canal-server/conf/example/instance.properties

修改3处:

# 与server_id的值不重复即可
 canal.instance.mysql.slaveId=666

# 数据库地址
canal.instance.master.address=IP:3306

# 创建的账号、密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

详细配置文档

[root@064fb8202b09 admin]# cat canal-server/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 与server_id的值不重复即可
 canal.instance.mysql.slaveId=666

# enable gtid use true/false
canal.instance.gtidon=false

# position info
# 数据库地址
canal.instance.master.address=IP:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# 创建的账号、密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

重启docker

docker restart canal

查看日志

tail -n60 -f canal-server/logs/example/example.log 

在这里插入图片描述

Canal的基本使用

引入依赖

<dependency>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.client</artifactId>
      <version>1.1.0</version>
</dependency>

编码

 public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",
                11111), "example", "canal", "canal");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            // 进行连接
            connector.connect();
            // 进行订阅
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            //使用循环不断的获取canal信息
            while (emptyCount < totalEmptyCount) {
                //获取Message对象; 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                // System.out.println("当前监控到的binLog消息数量是:" + size);

                //判断是否有数据
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);

                    //如果没有数据,等待1秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    //如果有数据,进行数据解析
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }


    private static void printEntry(List<CanalEntry.Entry> entrys) {
        //遍历获取到的Entry集合
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                // 获取存储的内容
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            // 事件的类型,增删改查哪种 eventType
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));


            // 改变的内容(增量数据)
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }

                //获取改变之前的数据
                rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据:" + c.getName() + "::" + c.getValue()));
                //获取改变之后的数据
                rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变之后的数据:" + c.getName() + "::" + c.getValue()));
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

执行测试

修改数据库数据

empty count : 65
================&gt; binlog[mysql-bin.000019:17797] , name[test_db,tb_user] , eventType : UPDATE
empty count : 1
empty count : 2

SpringBoot集成Canal

引入依赖

  	 <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        
	  <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        
        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.7</version>
        </dependency>

数据库配置

spring.datasource.url = jdbc:mysql://localhost:3306/demo?useUnicode=true&characterEncoding=UTF-8&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username = root
spring.datasource.password = 123456

Canal编码

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
@Slf4j
public class CanalClient {

    /**
     * sql队列
     */
    private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    @Resource
    private DataSource dataSource;

    /**
     * canal入库方法
     */
    public void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",11111), "example", "canal", "canal");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            try {
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }

                    // 提交确认
                    connector.ack(batchId);

                    // 当队列中堆积sql大于一定值则执行
                    if (SQL_QUEUE.size() >= 1) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 执行队列中的sql
     */
    public void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            log.info("Executing sql ---> {}",sql);
            this.execute(sql.toString());
        }
    }

    /**
     * 数据处理
     */
    private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();

                if (eventType == EventType.DELETE) {
                    // 删除类型SQL
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    // 修改类型SQL
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    // 插入类型SQL
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 更新SQL
     */
    private void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除SQL
     */
    private void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        // 暂时只支持单一主键
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 插入SQL
     */
    private void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 执行SQL,落库操作
     */
    public void execute(String sql) {
        Connection con = null;
        try {
            if(null == sql) return;
            con = dataSource.getConnection();
            QueryRunner qr = new QueryRunner();
            int row = qr.execute(con, sql);
            System.out.println("update: "+ row);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            DbUtils.closeQuietly(con);
        }
    }
}

pring-boot-starter-canal

spring-boot-starter-canal是第三方开源的一个项目,集成了Spring Boot,可直接使用。

GitHub地址:spring-boot-starter-canal

GitHub 搜索spring-boot-starter-canal,选择其中一个下载源码到本地,并安装到本地仓库,在项目中引用即可。

引入依赖

 <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

Canal启动配置

项目启动,执行canal客户端监听

@Component
public class CanalCommandLineRunner implements CommandLineRunner {

    @Resource
    private CanalClient canalClient;

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

编码

@SpringBootApplication
//声明当前的服务是canal的客户端
@EnableCanalClient
public class CanalApplication {
    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }
}
//声明当前的类是canal的监听类
@CanalEventListener
public class MyEventListener {

    @InsertListenPoint
    public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("执行 InsertListenPoint。。。。。");
        System.out.println("eventType: " + eventType.toString());
        //获取改变之前的数据
        System.out.println("改变之前的数据:" + rowData.getBeforeColumnsList());
        //获取改变之后的数据
        System.out.println("改变之后的数据:" + rowData.getAfterColumnsList());
        System.out.println("-----------------");
    }

    @UpdateListenPoint(schema = "test_db", table = "tb_user")
    public void onEvent1(CanalEntry.RowData rowData) {
        System.out.println("执行 UpdateListenPoint。。。。。");
        printData(null, rowData);
    }

    @DeleteListenPoint
    public void onEvent3(CanalEntry.EventType eventType) {
        System.out.println("执行 DeleteListenPoint。。。。。");
        printData(eventType, null);
    }

    @ListenPoint(destination = "example", schema = "test_db", table = {"tb_user", "tb_role"}, eventType = CanalEntry.EventType.UPDATE)
    public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("执行 ListenPoint。。。。。");
        printData(eventType, rowData);
    }

    private void printData(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (eventType != null) {
            System.out.println("eventType: " + eventType.toString());

        }
        if (rowData != null) {
            //获取改变之前的数据
            System.out.println("改变之前的数据:" + rowData.getBeforeColumnsList());
            //获取改变之后的数据
            System.out.println("改变之后的数据:" + rowData.getAfterColumnsList());
        }
        System.out.println("-----------------");
    }
}

执行测试

修改数据库数据

执行 UpdateListenPoint。。。。。
改变之前的数据:[index: 0
sqlType: 12
name: "username"
isKey: true
updated: false
............

增量数据订阅与消费

配置RabbitMQ

@Configuration
public class RabbitMQConfig {
    //定义交换机名称
    public static final String CANAL_EXCHANGE = "canal_exchange";

    //定义队列名称
    public static final String CANAL_QUEUE = "canal_queue";

    /**
     * 声明队列
     * @return
     */
    @Bean(CANAL_QUEUE)
    public Queue CANAL_QUEUE() {
        return new Queue(CANAL_QUEUE);
    }

    /**
     * 声明交换机
     * @return
     */
    @Bean(CANAL_EXCHANGE)
    public Exchange CANAL_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange(CANAL_EXCHANGE).durable(true).build();
    }

    /**
     * 队列与交换机的绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding CANAL_EXCHANGE_BINDING(@Qualifier(CANAL_QUEUE) Queue queue, @Qualifier(CANAL_EXCHANGE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
}

配置application.yml

spring:
  rabbitmq:
    host: ip
    username: admin
    password: admin

订阅–生产者

@CanalEventListener
public class ProduceListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param eventType 当前操作数据库的类型
     * @param rowData   当前操作数据库的数据
     */
    @ListenPoint(schema = "test_db", table = "tb_user")
    public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if ("insert".equalsIgnoreCase(eventType.toString())) {
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if ("email".equals(column.getName()) && StringUtils.isNotBlank(column.getValue())) {
                    System.out.println("发送邮箱账号数据到MQ:" + column.getValue());
                    //发送消息
                    rabbitTemplate.convertAndSend(RabbitMQConfig.CANAL_EXCHANGE, "", column.getValue());
                }
            }
        }
    }
}

消费–消费者

@Component
public class ConsumeListener {

    @RabbitListener(queues = RabbitMQConfig.CANAL_QUEUE)
    public void receiveMessage(String message){
        System.out.println("MQ收到的消息:"+message);
	}
}

执行测试

发送邮箱账号数据到MQ:123456789@qq.om
INFO 12652 --- [pool-2-thread-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [ip:5672]
 INFO 12652 --- [pool-2-thread-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4ed5eb72:0/SimpleConnection@7adf1cd7 [delegate=amqp://admin@ip:5672/, localPort= 7468]
MQ收到的消息:123456789@qq.om

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137057.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!