Canal-监听数据库表的变化

1. 简介

Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费功能。

工作原理

Mysql主备复制原理

Canal-监听数据库表的变化

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

mysql的binlog

它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。 

binlog有三种模式:STATEMENTROWMIXED

  1. STATEMENT 记录的是执行的sql语句
  2. ROW 记录的是真实的行数据记录
  3. MIXED 记录的是1+2,优先按照1的模式记录

比如:

update user set age = 33 

对应STATEMENT模式只是记录了当前执行的sql,而对应ROW模式则有可能有成千上万条记录(当然这取决于你user表的记录数)

2. 可以干什么

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3. 安装

原生安装请参考:https://github.com/alibaba/canal/wiki/QuickStart

3.1 docker-compose安装

3.1.1 创建同步用户

如果想查看mysql server的相关配置可以参考 https://www.cnblogs.com/ludangxin/p/16358928.html 中的master配置

CREATE USER canal IDENTIFIED BY 'canal';   GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; 

3.1.2 修改配置文件

首先启动一个零时的容器用于 将容器中的配置文件信息copy到宿主机

# run 零时容器 docker run --name canal-temp -d --rm canal/canal-server:v1.1.6 # 执行copy操作  copy配置文件到当前目录中 docker cp canal-temp:/home/admin/canal-server/conf ./canal-server/conf 

canal-server/conf配置文件目录结构如下

 canal-server/conf ├── canal.properties # canal server 的配置文件参数信息 例如:服务的端口/集群参数/server 模式(# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)等 ├── canal_local.properties ├── example # canal 实例相关配置信息,如果想要监听多个mysql server 可以copy此文件进行配置,当然也要在canal.properties的 canal.destinations 中添加对应的文件夹名称 │   ├── h2.mv.db │   ├── instance.properties │   └── meta.dat ├── logback.xml ... 

修改配置文件信息

canal.properties我们使用默认的配置信息 即:canal.serverMode = tcp

example/instance.properties中配置mysql server连接信息 如下:

# position info # mysql url 我这里直接使用的是 mysql容器name canal.instance.master.address=my_mysql:3306 # 监听的binlog 文件名称 例:mysql-bin.000007 canal.instance.master.journal.name= # 日志文件的Offset canal.instance.master.position=  # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8  # table regex # 默认配置是同步所有的库和表 #canal.instance.filter.regex=.*\..* # 配置只监听test库的user表,如果需要读取多个表可以使用正则表达式或者用逗号隔开 canal.instance.filter.regex=test.user 

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠()
常见例子:
所有表:.* or .…
canal schema下所有表: canal…*
canal下的以canal打头的表:canal.canal.*
canal schema下的一张表:canal.test1
多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解
析sql,所以无法准确提取tableName进行过滤)

3.1.3 启动canal

docker-compose.yaml 如下

因为canal需要读取mysql server的bin-log所以需要设置加入到mysql server的网络中去

version: '3'    services:     canal:         image: canal/canal-server:v1.1.6         hostname: canal         container_name: canal         restart: "no"         ports:             - "11111:11111"         volumes:             - "./canal-server/conf:/home/admin/canal-server/conf"             - "./canal-server/logs:/home/admin/canal-server/logs"         networks:             - mysql_mysql  networks:   mysql_mysql:     external: true 

4. springboot 测试

tips:可参考 https://github.com/NormanGyllenhaal/canal-client

4.1 添加依赖

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-web</artifactId> </dependency>  <dependency>     <groupId>org.projectlombok</groupId>     <artifactId>lombok</artifactId>     <optional>true</optional> </dependency>  <!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter --> <dependency>     <groupId>top.javatool</groupId>     <artifactId>canal-spring-boot-starter</artifactId>     <version>1.2.1-RELEASE</version> </dependency> 

4.2 添加配置

canal:   server: localhost:11111   destination: example  logging:   level:     top.javatool.canal.client.client: OFF 

4.3 监听canal数据

package com.ldx.canaldemo.handler;  import com.ldx.canaldemo.domain.User; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler;  @Slf4j @Component // 监听user表 @CanalTable(value = "user") public class UserHandler implements EntryHandler<User> {      @Override     public void insert(User user) {         log.info("insert info {}", user);     }      @Override     public void update(User before, User after) {         log.info("update before {} ", before);         log.info("update after {}", after);     }      @Override     public void delete(User user) {         log.info("delete {}", user);     } } 
package com.ldx.canaldemo.domain;  import lombok.Data;  import javax.persistence.Column; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable;  @Data @Table(name = "user") public class User implements Serializable {     @Id     @GeneratedValue(strategy = GenerationType.IDENTITY)     @Column(name = "id")     private Integer id;      @Column(name = "username")     private String username;      @Column(name = "password")     private String password;      @Column(name = "sex")     private Integer sex; } 

4.4 测试

user表信息如下

CREATE TABLE `user` (   `id` int(11) NOT NULL,   `username` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,   `password` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,   `age` int(1) DEFAULT NULL,   `sex` int(1) COLLATE utf8mb4_bin DEFAULT NULL,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; 

测试新增

INSERT INTO `test`.`user`(`id`, `username`, `password`, `age`, `sex`) VALUES (5, 'zhangtieniu', '123456', 28, 1); 

查看控制台输出如下

2022-06-14 13:50:38.144  INFO 71053 --- [xecute-thread-1] com.ldx.canaldemo.handler.UserHandler    : insert info User(id=5, username=zhangtieniu, password=123456, age=28, sex=1) 

测试修改

UPDATE `test`.`user` SET `username` = 'zhangsan', age = 23 WHERE id = 5; 

查看控制台输出如下

2022-06-14 13:54:55.997  INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler    : update before User(id=null, username=zhangtieniu, password=null, age=28, sex=null)  2022-06-14 13:54:55.997  INFO 71053 --- [xecute-thread-2] com.ldx.canaldemo.handler.UserHandler    : update after User(id=5, username=zhangsan, password=123456, age=23, sex=1) 

测试删除

DELETE FROM `test`.`user` WHERE id = 5; 

查看控制台输出如下

2022-06-14 13:56:46.359  INFO 71053 --- [xecute-thread-3] com.ldx.canaldemo.handler.UserHandler    : delete User(id=5, username=zhangsan, password=123456, age=23, sex=1) 

5. rabbit mq 测试

5.1 修改canal配置

canal.properties 修改如下

# 将serverMode 修改成rabbitMQ canal.serverMode = rabbitMQ  # 添加rabbitmq 配置信息 rabbitmq.host = rabbitmq:5672 rabbitmq.virtual.host = / rabbitmq.exchange = canal.exchange rabbitmq.username = admin rabbitmq.password = admin123 

example/instance.properties 添加路由规则

canal.mq.topic=canal_routing_key 

5.2 springboot consumer

5.2.1 添加依赖

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

5.2.2 添加配置

spring:   rabbitmq:     host: localhost     port: 5672     # rabbit 默认的虚拟主机     virtual-host: /     # rabbit 用户名密码     username: admin     password: admin123     listener:       simple:         # manual 手动确认         acknowledge-mode: manual 

5.2.3 添加 consumer

package com.ldx.canaldemo.rabbitmq;  import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.io.IOException;  /**  * @author ludangxin  * @date 2022/6/14  */ @Slf4j @Component public class RabbitMQConsumer {    @RabbitListener(bindings =          {@QueueBinding(                value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),                exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),                key = MqConstant.CANAL_ROUTING_KEY)          })    public void helloRabbitMq(Message message, Channel channel) throws IOException {       MessageProperties messageProperties = message.getMessageProperties();       try {          log.info(message.toString());          log.info(new String(message.getBody()));          channel.basicAck(messageProperties.getDeliveryTag(), false);       } catch(Exception e) {          // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息          if(messageProperties.getRedelivered()) {             log.info("消息已重复处理失败,拒绝再次接收...");             // 拒绝消息Ò             channel.basicReject(messageProperties.getDeliveryTag(), false);          }          else {             log.info("消息即将再次返回队列处理...");             channel.basicNack(messageProperties.getDeliveryTag(), false, true);          }       }    } } 
package com.ldx.canaldemo.rabbitmq;  /**  * @author ludangxin  * @date 2022/6/14  */ public interface MqConstant {     String CANAL_EXCHANGE = "canal.exchange";     String CANAL_QUEUE = "canal_queue";     String CANAL_ROUTING_KEY = "canal_routing_key"; }  

5.3 启动测试

先启动项目让程序自动建立所需mq中的交换机和队列

Canal-监听数据库表的变化

测试新增

INSERT INTO `test`.`user`(`id`, `username`, `password`, `age`, `sex`) VALUES (8, 'zhangtieniu', '123456', 28, 1); 

查看控制台输出如下

2022-06-14 14:42:04.818  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@189a76a(byte[414])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=4, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue]) 2022-06-14 14:42:04.818  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangtieniu","password":"123456","age":"28","sex":"1"}],"database":"test","es":1655188924000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655188924822,"type":"INSERT"} 

测试修改

UPDATE `test`.`user` SET `username` = 'zhangsan', age = 23 WHERE id = 8; 

查看控制台输出如下

2022-06-14 14:56:23.471  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@6a3a1f0(byte[446])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=5, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue]) 2022-06-14 14:56:23.471  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189783000,"id":7,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":[{"username":"zhangtieniu","age":"28"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189783493,"type":"UPDATE"} 

测试删除

DELETE FROM `test`.`user` WHERE id = 8; 

查看控制台输出如下

2022-06-14 14:57:06.407  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : (Body:'[B@628caa50(byte[411])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=canal.exchange, receivedRoutingKey=canal_routing_key, deliveryTag=6, consumerTag=amq.ctag-KXSHZ8D0wMQo7z2_L2LKsg, consumerQueue=canal_queue]) 2022-06-14 14:57:06.408  INFO 73549 --- [ntContainer#0-1] c.l.canaldemo.rabbitmq.RabbitMQConsumer  : {"data":[{"id":"8","username":"zhangsan","password":"123456","age":"23","sex":"1"}],"database":"test","es":1655189826000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","username":"varchar(255)","password":"varchar(255)","age":"int(11)","sex":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"username":12,"password":12,"age":4,"sex":12},"table":"user","ts":1655189826419,"type":"DELETE"} 

6. canal admin管理canal

详情查看:https://gitee.com/zhengqingya/docker-compose

使用手册:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide

原生安装:https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

  1. 创建canal admin 数据库 canal_manager

  2. 运行初始化sqlLiunx/canal/canal_admin/canal_manager.sql

    文件内容如下:

    CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;  USE `canal_manager`;  SET NAMES utf8; SET FOREIGN_KEY_CHECKS = 0;  -- ---------------------------- -- Table structure for canal_adapter_config -- ---------------------------- DROP TABLE IF EXISTS `canal_adapter_config`; CREATE TABLE `canal_adapter_config` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `category` varchar(45) NOT NULL,   `name` varchar(45) NOT NULL,   `status` varchar(45) DEFAULT NULL,   `content` text NOT NULL,   `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  -- ---------------------------- -- Table structure for canal_cluster -- ---------------------------- DROP TABLE IF EXISTS `canal_cluster`; CREATE TABLE `canal_cluster` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `name` varchar(63) NOT NULL,   `zk_hosts` varchar(255) NOT NULL,   `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  -- ---------------------------- -- Table structure for canal_config -- ---------------------------- DROP TABLE IF EXISTS `canal_config`; CREATE TABLE `canal_config` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `cluster_id` bigint(20) DEFAULT NULL,   `server_id` bigint(20) DEFAULT NULL,   `name` varchar(45) NOT NULL,   `status` varchar(45) DEFAULT NULL,   `content` text NOT NULL,   `content_md5` varchar(128) NOT NULL,   `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`),   UNIQUE KEY `sid_UNIQUE` (`server_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  -- ---------------------------- -- Table structure for canal_instance_config -- ---------------------------- DROP TABLE IF EXISTS `canal_instance_config`; CREATE TABLE `canal_instance_config` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `cluster_id` bigint(20) DEFAULT NULL,   `server_id` bigint(20) DEFAULT NULL,   `name` varchar(45) NOT NULL,   `status` varchar(45) DEFAULT NULL,   `content` text NOT NULL,   `content_md5` varchar(128) DEFAULT NULL,   `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`),   UNIQUE KEY `name_UNIQUE` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  -- ---------------------------- -- Table structure for canal_node_server -- ---------------------------- DROP TABLE IF EXISTS `canal_node_server`; CREATE TABLE `canal_node_server` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `cluster_id` bigint(20) DEFAULT NULL,   `name` varchar(63) NOT NULL,   `ip` varchar(63) NOT NULL,   `admin_port` int(11) DEFAULT NULL,   `tcp_port` int(11) DEFAULT NULL,   `metric_port` int(11) DEFAULT NULL,   `status` varchar(45) DEFAULT NULL,   `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  -- ---------------------------- -- Table structure for canal_user -- ---------------------------- DROP TABLE IF EXISTS `canal_user`; CREATE TABLE `canal_user` (   `id` bigint(20) NOT NULL AUTO_INCREMENT,   `username` varchar(31) NOT NULL,   `password` varchar(128) NOT NULL,   `name` varchar(31) NOT NULL,   `roles` varchar(31) NOT NULL,   `introduction` varchar(255) DEFAULT NULL,   `avatar` varchar(255) DEFAULT NULL,   `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,   PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  SET FOREIGN_KEY_CHECKS = 1;  -- ---------------------------- -- Records of canal_user -- ---------------------------- BEGIN; INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28'); COMMIT;  SET FOREIGN_KEY_CHECKS = 1; 
  3. docker-compose.yaml如下

    version: '3' services:   canal_admin:     image: canal/canal-admin:v1.1.6     container_name: canal_admin                  restart: unless-stopped                      volumes:        - "./canal/canal-admin/logs:/home/admin/canal-admin/logs"     environment:       TZ: Asia/Shanghai       LANG: en_US.UTF-8       canal.adminUser: admin       canal.adminPasswd: 123456       spring.datasource.address: my_mysql:3306       spring.datasource.database: canal_manager       spring.datasource.username: root       spring.datasource.password: 123456     ports:       - "8089:8089"     networks:       - canal       - mysql_mysql   canal_server:     image: canal/canal-server:v1.1.6     container_name: canal_server                   restart: unless-stopped                        volumes:                                         - "./canal/canal-server/logs:/home/admin/canal-server/logs"     environment:                                     TZ: Asia/Shanghai       LANG: en_US.UTF-8       canal.admin.manager: canal_admin:8089       canal.admin.port: 11110       canal.admin.user: admin       canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9     ports:       - "11110:11110"       - "11111:11111"       - "11112:11112"     depends_on:       - canal_admin     links:       - canal_admin     networks:       - canal       - mysql_mysql networks:   canal:   mysql_mysql:     external: true 
  4. 启动服务/访问http://localhost:8089/#/canalServer/nodeServers

    登陆 用户名/密码:admin/123456

    Canal-监听数据库表的变化

发表评论

相关文章