柏竹 柏竹
首页
后端
前端
  • 应用推荐
关于
友链
  • 分类
  • 标签
  • 归档

柏竹

奋斗柏竹
首页
后端
前端
  • 应用推荐
关于
友链
  • 分类
  • 标签
  • 归档
  • Java基础

  • JavaWeb

  • 拓展技术

  • 框架技术

    • Maven
    • Activiti
    • Shiro
    • Dubbo
    • RabbitMQ
    • Netty网络通信
    • Canal
      • 概述
      • 运作原理
      • 安装
        • docker
      • SprinBoot监听同步
  • 数据库

  • 数据结构

  • Spring

  • SpringMVC

  • SpringBoot

  • SpringClound

  • Ruoyi-Vue-Plus

  • 后端
  • 框架技术
柏竹
2023-04-05
目录

Canal

# 概述

Canal 是个开源的消息队列系统 , 基于 数据库增量 日志解析 进行同步 , 提供增量 增量数据 订阅/消费 . 用于 异步消息传递 和 事件驱动 应用程序

GitHub : https://github.com/alibaba/canal (opens new window)

Canal核心 :

  • 数据同步
  • 分布式架构
  • 多协议支持
  • 权限管理

# 运作原理

Canal 同步原理 , 基于主从同步实现 , 运作原理 :

  1. Canal 提供 MySQL 二进制文件(binary_log) , 用于记录 增 删 改 命令的事件
  2. master 将 增 删 改 事件 写入 binary_log , 写入前会根据 Canal 配置过滤规则 , 过滤binary_log
  3. slave 异步线程不断读取 master 写入的 binary_log 拷贝到 中继日志(realy_log)
  4. slave 根据 realy_log 中的事件 , 将数据再次加载一次 , 到达数据同步目的

Canal_slave

你可以把它看做是 伪装成MySQL的部分 , 去监听MySQL

# 安装

以下安装基于 MySQL 数据库 , 实现同步

准备 :

  • Java环境
  • Canal安装包 , 并上传主机
  • MySQL

MySQL授权用户

# 创建用户 canal
create user canal@'%' IDENTIFIED by 'canal';
# 授权用户
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
# 刷新生效
FLUSH PRIVILEGES;

提示

账号密码都是 canal , 后续自行更改即可!

Canal的 conf配置

  1. 进入 conf目录 , 拷贝一份 example文件夹 , 并更改为你实例的名称
  2. 进入 刚刚拷贝的文件夹 , 配置 instance.properties文件
# sql IP:Port
canal.instance.master.address=127.0.0.1:3306
# sql账号
canal.instance.dbUsername=canal
# sql密码
canal.instance.dbPassword=canal
# sql字符集
canal.instance.connectionCharset=UTF-8
# 启动同步日志
canal.instance.tsdb.enable=true
# sql库名
canal.instance.defaultDatabaseName=<库名>
# 监听 指定表名
canal.instance.filter.regex=<库名>\\..*

监听表名支持写法 :

Perl正则表达式 , 多个正则需要 , 分割 , 转义符需要 \\

格式 说明
.* / .*\\.. 所有库表
canal\\..* canal库下的 所有表
canal\\.canal.* canal库下的 以canal开头的表
canal.test1 canal库的test1表

启动验证

进入bin执行以下命令 :

# 启动
sh startup.sh
# 创建实例
sh canal.sh start
# 查看日志 (但SQL 增 改 时..)
sh ./bin/logview.sh -f -c ../conf/example/instance.properties
# 后面加 -s 选项 代表 包含删除操作
sh ./bin/logview.sh -f -c ../conf/example/instance.properties -s

# docker

MySQL配置

MySQL根据目录下的 /conf/my.cnf配置

# 配置文件路径
log-bin= /var/lib/mysql/mysql-bin
# 指定数据库
binlog-do-db=<你的数据库>

验证配置生效

# shell 重启容器
docker restart mysql
# 进入sql指定以下命令
show master status;

docker容器 通信

容器与容器之间的独立的 , 因此需要建立网络通信

创建网络

docker network create <网络名称>

MySQL连接网络

docker network connect <网络名称> <容器name/id>
# MySQL容器连接
docker network connect <网络名称> <容器name/id>

Canal配置

载入Canal镜像 , 从外部下载的

docker load -i <文件名>.tar

运行 Canal容器

docker run -p 11111:11111 --name canal \
-e canal.destinations=<库名> \
-e canal.instance.master.address=mysql:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=<库名>\\..* \
--network <网络名称> \
-d canal/canal-server:v1.1.5

验证

进入容器日志查看

# 进入容器
docker exec -it canal bash
# 查看日志
tail -f canal-server/logs/canal/canal.log
# 查看实例日志
tail -f canal-server/logs/sans/sans.log

# SprinBoot监听同步

引入依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

配置

canal:
  destination: <库名> # canal的集群名字,要与安装canal时设置的名称一致
  server: <IP>:11111 # canal服务地址

修改实体类

通过 @Id/@Column /@Transient/@TableField 等注解完善 对库表的映射

在逆向工程生成中 , 可以生成这些注解 , 如果如果没有需要手动配置

监听器类

通过实现 EntryHandler<T>接口 编写监听器 , 里面包含有 增 删 改 触发的方法 , 我们需要重写他们

应用前我们需要注意 :

  • 监听类需要 @CanalTable("<表名>")注解 指定监听的表
  • EntryHandler<T>接口 泛型 需要些表中对应的 实体类
import com.heima.item.pojo.Item;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import javax.annotation.Resource;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
	
    // 封装的 RedisTeamp操作类(并非重点)
    @Resource
    private RedisHandler redisHandler;

    // 监控消息 增 删 改

    @Override
    public void insert(Item item) {
        String key = "item:" + item.getId();
        redisHandler.save(key, item);
    }

    /**
     * 可根据 之后/之前 的数据差异进行控制更新缓存
     * @param before 变化前对象
     * @param after 变化后对象
     */
    @Override
    public void update(Item before, Item after) {
        String key = "item:" + after.getId();
        redisHandler.save(key, after);
    }

    @Override
    public void delete(Item item) {
        String key = "item:" + item.getId();
        redisHandler.deleteByKey(key);
    }
}

运行测试

  1. 查看Redis数据
  2. 更改MySQL数据
  3. 查看Redis数据

可以发现 Reids 和 MySQL数据是同步的


目前学习 Canal 简单应用 , 发现这个的确挺好用的 , 后续学到其他知识点尝试拓展下~

#消息队列#Canal

← Netty网络通信 MySQL→

最近更新
01
HTTPS自动续签
10-21
02
博客搭建-简化版(脚本)
10-20
03
ruoyi-vue-plus-部署篇
07-13
更多文章>
Theme by Vdoing | Copyright © 2019-2024 | 桂ICP备2022009417号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式