柏竹 柏竹
首页
后端
前端
  • 应用推荐
关于
友链
  • 分类
  • 标签
  • 归档

柏竹

奋斗柏竹
首页
后端
前端
  • 应用推荐
关于
友链
  • 分类
  • 标签
  • 归档
  • Java基础

  • JavaWeb

  • 拓展技术

  • 框架技术

    • Maven
    • Activiti
    • Shiro
    • Dubbo
    • RabbitMQ
      • 介绍
        • MQ特性
        • 可靠性
        • 顺序性
        • 幂等性
        • MQ分类
        • ActiveMQ
        • Kafka
        • RocketMQ
        • RabbitMQ (学习)
        • 安装
        • Web管理
        • 安装
        • 用户管理
      • 核心
        • 简单模式 Hello World
        • 工具类优化
        • 工作模式 Work queues
        • 轮询分发
        • 消息应答
        • 自动应答
        • 手动应答
        • 持久化
        • 队列持久化
        • 消息持久化
        • 信道堆积
        • 发布确认模式 Publisher Confirms
        • 发布/订阅 模式 Publish/Subscribe
        • 交换机 Exchange
        • 临时队列
        • 绑定 binding
        • direct
        • topic
        • fnout
        • 代码实战
        • direct点对点测试
        • topic发布订阅
        • 广播 fnout
      • 进阶
        • 死信队列
        • 测试
        • 整合SpringBoot
        • 快速入门
        • 操作对象
        • 消费者
        • 延迟队列
        • 应用方式
        • 代码示例
        • 代码测试
        • 插件拓展
        • 应用方式
        • 代码示例
        • 代码测试
        • 发布确认高级
        • 代码测试
        • 回退消息
        • 代码示例
        • 代码测试
        • 备用交换机
        • 代码示例
        • 代码测试
        • 队列优先级
        • 测试
        • 惰性队列
      • 集群
        • Docker
    • Netty网络通信
    • Canal
  • 数据库

  • 数据结构

  • Spring

  • SpringMVC

  • SpringBoot

  • SpringClound

  • Ruoyi-Vue-Plus

  • 后端
  • 框架技术
柏竹
2023-03-10
目录

RabbitMQ

# 介绍

RabbitMQ 是个中间件 , 负责 接收/存储/转发 消息数据 . 类似 平时快递派送的过程

是什么?

MQ 本质是个队列 , 先进先出 消息推送 , 是种跨进程通信机制的上下游传递消息 . 主要解决不同对象通信的序列

为什么用?

  • 流量消锋 : 如果系统最大多处理1W条请求 , 且还是高峰期的时候 , 很有可能会突破1W导致宕机 . MQ可以队列缓冲 , 防止宕机 (高峰期体验差 , 但能保障了不会宕机)
  • 应用解耦 : 系统有多个子系统 , 在业务涉及多个子系统完成时 , 当中的一个子系统宕机了 , 导致该业务异常无法运作 . MQ可以将要处理的业务缓存到消息队列中 , 由消息队列进行访问子系统执行业务 , 防止不一致运作问题 , 提高可用性
  • 异步处理 : 假如 A调用B , 但B需要执行很长一段时间 , 但A想知道B执行的进度 , 以往 会通过 A调用API查B进度 , 显然不优雅 . MQ可以使用消息总线 , A调用B后 , MQ会监控B进度(A实时得到B进度) , B处理完后 会发消息给MQ , 由MQ转发至A .

# MQ特性

# 可靠性

消息丢失一般有3情况 :

  • 生产者丢失 通过 [发布确认](#发布确认模式 Publisher Confirms) , 确保 生产者 发送消息到MQ (SpringBoot应用
  • MQ丢失 通过 持久化/死信队列/备用交换机/回退消息 , 确保消息在MQ中不会丢失
  • 消费者丢失 通过 消息应答 , 确保 消费者 消费成功ack响应

# 顺序性

  • 1个消费者消费1个队列是没有顺序问题的
  • 多个消费者消费同一个队列时就出现消费顺序的问题 . 可以考虑将一个队列分为多个队列 , 将需要保证顺序的消息发到一个队列里 , 一个队列对应一个消费者
  • 当消息在消费者端用多线程处理时 , 也会出现顺序问题 . 可以考虑在内存中维护多个队列 , 将MQ发来的需要保证顺序的消息放在同一个内存队列里 , 然后一个线程处理一个队列里的消息

# 幂等性

保证一条消息不会被重复消费 , 也不会对数据库产生影响

场景 :

  • 手机验证码 , 只能使用一次 , 再次发送验证码 , 则会刷新原旧的验证码
  • 订单支付 , 每个订单只能支付一次

解决方案 :

  • 消息使用 全局ID (可 通过时间戳/UUID等方式) , 确保唯一性 , 当写入数据时先判断是否存在 , 存在就没必要插入了 , 保证了不会重复插入现象
  • 采用 Redis 自带的天然幂等性 setnx

参考文章 : https://blog.csdn.net/zw791029369/article/details/109561457 (opens new window)

# MQ分类

# ActiveMQ

优点 : 单机 万级吞吐量 , 时效性s级 , 可用性高 , 基于主从架构实现高可用性 , 消息可靠性较低的概率丢失数据 缺点 : 官方社区对 ActiveMQ5.x 维护越来越少 , 高吞吐量场景较少使用

# Kafka

Kafka是大数据消息的中间件 , 满受大厂的采纳

优点 : 单机 百万级吞吐量 , 时效性ms级 , 运作稳定 ; 分布式 , 少数宕机 , 也不会造成影响 . 消息有序 , 有UI管理页面 , 日志实时更新

缺点 : 单机超过64个队列 , 消息队列多 , 响应长 (轮询) , 实时性取决轮询间隔 , 业务失败不能重试 , 社区更新慢

# RocketMQ

自 阿里巴巴 开源产品 , Java实现 , 参考了 Kafka设计 的改进版

优点 : 单机 十万级吞吐量 , 可用性高 , 分布式架构 , 消息0丢失 , 支持 10亿级别的消息堆积 , 数据堆积不会影响性能

缺点 : 语言拓展少 , 现阶段Java/C++实现 , 社区活跃一般

# RabbitMQ (学习)

是当前主流的消息中间件之一

优点 : 高并发 , 性能高 , 单机万级吞吐量 , 跨平台 , 多语言支持 , 文档齐全 , 社区活跃高 , 更新频繁

缺点 : 商业收费 , 学习成本高

# 安装

官方 : https://www.rabbitmq.com/download.html (opens new window)

下载

  • MQ : RabbitMQ 下载地址 (opens new window) 选择以 noarch.rpm 结尾的安装包
  • Erlang : Erlang 下载地址 (opens new window) , Erlang 和 RabbitMQ 版本对照 (opens new window) (MQ采用Erang语言开发 , 因此需要安装环境)

注意Linux版本支持

**安装步骤 : **

  1. Linux上传文件 , 创建目录文件放置里面 ==mkdir /usr/local/rabbitmq==

  2. 安装 Erlang , RabbitMQ , socat(MQ依赖插件)

    # Erlang
    rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    	# 检查版本 quit退出
    	erl -v
    # socat 依赖插件
    yum install socat -y
    # RabbitMQ
    rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
    	# 启动服务
    	systemctl start rabbitmq-server
    	# 查看服务状态 (active绿色表示成功)
    	systemctl status rabbitmq-server
    

# Web管理

方便 查阅/操作 MQ

# 安装

  1. 执行指令安装 ==rabbitmq-plugins enable rabbitmq_management==
  2. 重启MQ服务
  3. Web访问 http://ip:15672 (IP为 Linux地址)
  4. 账号密码为 guest (账号密码相同)
  5. 在终端中添加账号 , 并且给予权限

注意 :

  • 安装前提关闭MQ服务
  • Linux防火墙开放 15672端口(Web管理) , 5672端口(API连接)

# 用户管理

创建用户 ==rabbitmqctl add_user <用户名> <密码>==

查看用户 ==rabbitmqctl list_users==

修改密码 ==rabbitmqctl change_password <用户名> <新密码>==

删除用户

==rabbitmqctl delete_user <用户名>==

设置用户 ==rabbitmqctl set_user_tags <用户名> <角色>==

角色 说明
administrator 可以登录控制台、查看所有信息、并对rabbitmq进行管理
monToring 监控者;登录控制台,查看所有信息
policymaker 策略制定者;登录控制台指定策略
managment 普通管理员;登录控制

权限分配

# 为用户添加资源权限,添加配置、写、读权限
# rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" bozhu ".*" ".*" ".*"

# 核心

官方API : https://rabbitmq.github.io/rabbitmq-java-client/api/current/ (opens new window)

以下几大模式通过 Java API实现 , 模式的发送过程 , 在项目中需要引入以下依赖

点击查看 Maven配置
<dependencies>
    <!--rabbitmq 依赖客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--操作文件流的一个依赖-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>
<!--指定 jdk 编译版本-->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

# 简单模式 Hello World

Java API实现 , 模拟发送接收过程

基本方法

返回 方法 说明
- ==queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)== 发送消息
String ==basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)== 接收消息 , 返回消息序列号

实现步骤 :

  1. 消息生产者

    点击查看代码
    public class Producer {
        // 队列名称
        public static final String QUERY_NAME = "hello";
    
        // 发消息
        public static void main(String[] args) throws Exception {
            // 连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 基础信息
            factory.setHost("192.168.186.128");
            factory.setUsername("bozhu");
            factory.setPassword("123123");
            Connection connection = factory.newConnection();
            // 频道
            Channel channel = connection.createChannel();
            /**
             * 生成队列
             * 参数 :
             *  1. 队列名称
             *  2. 队列消息是否持久化(是否磁盘存储)
             *  3. 队列是否进行消息共享(多个消费者共享)
             *  4. 是否自动删除(最后消费者开端连接后)
             *  5. 其他参数
             */
            channel.queueDeclare(QUERY_NAME, false, false, false, null);
    
            // 发消息
            String msg = "hello world";
    
            /**
             * 发送消息
             *  1. 指定交换机
             *  2. 路由key值(本次队列名称)
             *  3. 其他参数信息
             *  4. 发送消息的消息体
             */
            channel.basicPublish("", QUERY_NAME, null, msg.getBytes());
            System.out.println("send Success");
        }
    }
    
  2. 消息消费者

    点击查看代码
    public class Consumer {
    
        public static final String QUERY_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            // 连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 基础信息
            factory.setHost("192.168.186.128");
            factory.setUsername("bozhu");
            factory.setPassword("123123");
            Connection connection = factory.newConnection();
            // 频道
            Channel channel = connection.createChannel();
    
            /**
             * 接收消息
             *  1. 指定队列
             *  2. 成功后是否自动应答
             *  3. 未成功回调信息
             *  4. 取消回调信息
             */
            channel.basicConsume(QUERY_NAME, true,
                    (consumerTag, message) -> {
                        System.out.println(new String(message.getBody()));
                    },
                    consumerTag -> {
                        System.out.println("消息被中断");
                    }
            );
        }
    }
    
  3. 测试 , 运行 生产者 => Web查看业务队列 => 运行 消费者 => Web查看业务队列

以上步骤中看出 生产者在运行后 , Web管理中可以看到队列新增了条 消息 (需要等待消费者消费) . 当消费者运行后会消耗掉该 消息

消费者类 不能用 junit测试 接口写 , 否则没有监听的效果

# 工具类优化

实现复用 , 减少代码重写

public class RabbitMqUtils {

    public static Channel getChanel() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.186.128");
        factory.setUsername("bozhu");
        factory.setPassword("123123");
        Channel channel;
        try {
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        }
        return channel;
    }
    
}

# 工作模式 Work queues

工作模式 主要思想是为了避免消息密集型的形式堆积 , 工作模式可以在多线程消费者中进行分发任务

# 轮询分发

轮询消费队列中的数据 , 消费者们会轮询进行消费消息 (每个消息只能被消费一次)

大致实现 :

  1. 批发生产者
  2. 多线程消费者
  3. 运行测试 , 查看消费者的消费情况

利用上面的 工具类 获取频道 (复用代码)

生产者

/**
 * 生产者
 */
public class Producer {

    // 队列名称
    public static final String QUERY_NAME = "hello";

    // 发消息
    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();
        String msg = "hello world";
        chanel.queueDeclare(QUERY_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {
            // 发消息
            String str = msg + i;
            chanel.basicPublish("", QUERY_NAME, null, str.getBytes());
            System.out.println("发送成功 => " + str);
        }
    }

}

消费者

服务形式多线程运行 , 可以区分出 消费者分别为 C1/C2/C3 , 在当中配置字符 , 来区分消费者

/**
 * 消费者
 */
public class Consumer {

    public static final String QUERY_NAME = "hello";

    public static void main(String[] args) throws IOException {

        System.out.println(args[0]+" 已运行!");

        Channel chanel = RabbitMqUtils.getChanel();

        chanel.basicConsume(QUERY_NAME, false, (consumerTag, message) -> {
            System.out.println("成功 =>"+consumerTag+" : "+new String(message.getBody()));
        }, consumerTag -> {
            System.out.println("失败");
        });
    }
    
}

运行测试

先运行所有 生产者(此时消费者会处于等待消费的状态) , 后运行 消费者 , 运行后可以看出消费者消费方式是轮询形式的

多服务运行

main方法运行会传递 args 参数 , 我们可以在以下进行传参运行

  1. 配置好 , 填充参数 (如果多参数需要空格分开)
  2. 打开服务(Alt+8) , 批量运行 image-20230227222618814

# 消息应答

为了保证发送过程不丢失信息 , MQ引入了消息应答机制 . 例如 : 注册账号 , 填写表单信息 , 确认提交的过程

应答机制 : 在消费者消耗处理后 , 才会告诉 MQ 进行删除消息

MQ有两种应答机制 :

  • 自动应答(默认)
  • 手动应答

可以在 ==Channel.basicConsume()== 方法的 autoAck参数 进行控制 手动/自动

一般情况建议选择手动应答 , 防止数据丢失问题

# 自动应答

自动应答 是为了解决 高吞吐/安全传输 方面做出了权衡 .

MQ不在乎消费者是否处理完成 , 都会告诉MQ删除队列 .

情况 :

  • 消费者 处理失败也没有异常 , 会 自动补偿 , MQ会重新向消费者投递消息
  • 消费者 异常了 , MQ会认为消费成功 , 会对消息进行删除 , 导致数据丢失

**重新入队机制 : **

消费者处理消息过程 , 突然宕机 , 没有ack确认 , MQ得知消息未完全处理 , 会将其消息重新排队列 , 由其他消费者处理

# 手动应答

手动应答 , 消费者处理后 , 两种可能 :

  • 消费者手动 ack(确认应答) , 告诉MQ消息完成进行删除
  • 消费者自动 nack(拒绝应答) , 告诉MQ处理失败 , 消息不会删除

应答方法

返回 方法 说明
void ==basicAck(long deliveryTag, boolean multiple)== 确认收到
void ==basicReject(long deliveryTag, boolean requeue)== 拒绝消息
void ==basicNack(long deliveryTag, boolean multiple, boolean requeue)== 拒绝收到

手动应答 和 重入队列机制 代码实例

生产者

/**
 * 消息在手动应答是不丢失、放回队列中重新消费
 * @author Sans
 */
public class Task {

    public static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();

        // 声明对队列
        chanel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            String msg = sc.next();
            chanel.basicPublish("", QUEUE_NAME,null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息 => "+msg);
        }
        
    }
}

消费者

说明 : 配置两个消费者 , 一个消费者等待1s ; 另一个消费者等待10s . 参数分别是 :

  • ==c1 1000==
  • ==c2 10000==
/**
 * 消费者
 * @author Sans
 */
public class Consumer {

    public static final String QUEUE_NAME = "ack_queue";

    /**
     * @param args [线程名 , 休眠执行时长]
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        Channel chanel = RabbitMqUtils.getChanel();

        System.out.println(args[0]+" : 运行");

        chanel.basicConsume(QUEUE_NAME, false, (consumerTag, message)->{
            // 等待
            try {
                Thread.sleep(Long.parseLong(args[1]));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println("接受到的消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
            /*
             * 手动应答
             * 1.消息的标记Tag
             * 2.是否批量应答 false表示不批量应答信道中的消息
             */
            chanel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        },(consumerTag)->{
        });
    }

}

宕机模拟测试

先运行 生产者 , 后运行2个消费者 , 最后生产者输入消息 , 以时间线来分析运作过程

时间 生产者 消费者1(1s) 消费者2(10s)
5s 发送 11 ; 发送22 收到11->ack
10s - 收到22->ack
15s 发送 33 : 发送44 收到33->ack 收到44->...(关闭)
16s 收到44->ack

# 持久化

持久化是将队列数据存储到磁盘中 , 并非在内存中 . 哪怕宕机停掉 , 不至于数据丢失的情况

# 队列持久化

在队列声明queueDeclare方法 中的第二个参数设为 true , 启动 队列持久化

==queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)==

注意 :

  • 原先队列中有非持久化 , 且队列名相同 , 那么会抛出错误 , 需要删除原先队列 , 并重新声明
  • 队列持久化 , 并不能进行对队列中的消息进行持久化
# 消息持久化

在消息发送的basicPublish方法 中的第二个参数设为 MessageProperties.PERSISTENT_TEXT_PLAIN , 启动 队列持久化

==basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)==

代码复用上面的即可 , 无需展示!

# 信道堆积

信道堆积 是指MQ发送到消费者的信道消息堆积数(缓冲区) . 缓冲区中默认情况是可以无限堆积的 , 因此需要自行控制堆积数 , 以防不必要的消息等待处理

信道堆积主要通过 ==basicQos(prefetchCount)==方法 控制信道堆积数(默认0->无限)

情况测试

当生产者大量生产消息 , 且又有多个消费者(效率不同)时 :

  • 未指定堆积数 : 处理慢的消费者会堆积多个消息等待处理 , 处理快的可能会处于闲置状态
  • 指定堆积数 : 处理慢的消费者 , 当消息堆积到达指定数值 , 轮询分发消息会跳过该消费者

轮询分发 : 会将所有消息平均分发被每个消费者中 , 等待消费

默认情况下每条信道能够堆积无数条

注意 :

  • 应答需要设为 手动应答 , 否则qos , 不会生效

# 发布确认模式 Publisher Confirms

发布确认 是保证了消息完好的推送到MQ队列中 , 确保数据不会丢失 , 以便消费者消费使用

大致步骤 :

  1. 获取信道
  2. 信道启动 发布确认模式 ==confirmSelect()==方法
  3. 向MQ推送消息 ==basicPublish()==方法
  4. 向MQ发送确认 ==waitForConfirms()==方法

异步发布 走以下步骤

  1. MQ返回状态 至 监听器 进行回调 ack/nack (成功/失败)
  2. 失败重新推送 (回至步骤3操作)

主要方法

返回 方法 说明
void ==confirmSelect()== 启动 发布确认模式
boolean ==waitForConfirms()== 向MQ发送 发布确认
ConfirmListener ==addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)== 监听器 , 监听发布状态 成功/失败 , 以lambda形式 回调它们

确认发布 可分为以下类型 :

  • 单体 同步等待确认 , 简单 , 吞吐量有限
  • 批量 批量同步等待确认 , 简单 , 一旦出问题难以判断
  • 异步 高性能 , 采用 监听器监听发布状态 和 ConcurrentSkipListMap哈希表 多线程管理 , 能够准确异常

耗时 : 单体 > 批量 > 异步

三种模式代码示例 :

秒表工具类

/**
 * 秒表计时工具
 */
public class StopWatchUtils {

    public static long start;
    public static long totalTime;

    public static void start(){
        start = System.currentTimeMillis();
    }

    public static void stop(){
        totalTime = System.currentTimeMillis() - start;
    }

    public static long getTotalTime() {
        return totalTime;
    }

}

测试耗时

点击查看代码
/**
 * 1000条消息发布测试
 */
public class Producer {

    public static long MSG_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        // 单体发布测试 (耗时 => 1391ms
        //publishMesIndividually();
        // 批量发布测试 (耗时 => 85ms
        //publishMesBatch();
        // 异步发布测试 (耗时 => 28ms
        publishMesAsync();
    }

    /**
     * 异步发布
     * 只管发 , 成功/失败 由监听器管
     */
    private static void publishMesAsync() throws IOException {
        Channel chanel = RabbitMqUtils.getChanel();
        String queueName = UUID.randomUUID().toString();
        // 队列声明 (持久化
        chanel.queueDeclare(queueName, true, false, false, null);
        // 启动确认发布
        chanel.confirmSelect();

        /*
          线程安全有序的哈希表 , 使用高并发情况
            - 轻松记录 序号与消息 的关联
            - 通过序号轻松批量删除条目
            - 支持多线程
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        /*
          监听器 (监听消息是否成功)
          参数1: 消息确认回调
          参数2: 消息失败回调
            - deliveryTag 消息标记
            - multiple 是否批量
         */
        chanel.addConfirmListener(
                (deliveryTag, multiple) -> {
                    // 是否批量
                    if (multiple) {
                        // 获取已经确认的视图
                        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                        // 清除视图内容
                        confirmed.clear();
                    } else {
                        // 直接删除序号
                        outstandingConfirms.remove(deliveryTag);
                    }
                    System.out.println("确认消息 => " + deliveryTag);
                },
                (deliveryTag, multiple) -> {
                    String msg = outstandingConfirms.remove(deliveryTag);
                    System.out.println("失败消息[" + msg + "] => " + deliveryTag);
                });

        StopWatchUtils.start();
        // 发消息
        for (int i = 1; i <= MSG_COUNT; i++) {
            String msg = "msg => " + i;
            chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
            // 存下所有 生产者发送的消息 K(消息序列号):V(消息内容)
            outstandingConfirms.put(chanel.getNextPublishSeqNo(), msg);
        }
        StopWatchUtils.stop();
        System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");
    }

    /**
     * 批量发布
     * 发布多个确认一次
     */
    private static void publishMesBatch() throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();
        String queueName = UUID.randomUUID().toString();
        // 队列声明 (持久化
        chanel.queueDeclare(queueName, true, false, false, null);
        // 启动确认发布
        chanel.confirmSelect();

        // 批单位
        int batchSize = 100;

        StopWatchUtils.start();
        for (int i = 1; i <= MSG_COUNT; i++) {
            String msg = "msg => " + i;
            chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
            if (i % batchSize == 0) {
                if (chanel.waitForConfirms()) System.out.println("第" + i / batchSize + "批发送成功" + msg);
            }
        }
        StopWatchUtils.stop();
        System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");

    }

    /**
     * 单体发布
     * 发布一次确认一次
     */
    public static void publishMesIndividually() throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();
        String queueName = UUID.randomUUID().toString();
        // 队列声明 (持久化
        chanel.queueDeclare(queueName, true, false, false, null);
        // 启动确认发布
        chanel.confirmSelect();

        StopWatchUtils.start();
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = "msg => " + i;
            chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
            if (chanel.waitForConfirms()) {
                System.out.println("发送成功 " + msg);
            }
        }
        StopWatchUtils.stop();
        System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");
    }

}

# 发布/订阅 模式 Publish/Subscribe

发布订阅模式是生产者推送的消息 , 其他消费者都均可收到该消息

大致流程

  1. 声明交换机 , 并设置 fanout 类型
  2. 生产者发送消息 到交换机
  3. 消费者队列绑定交换机 , 并配置 RoutingKey路由规则 (类似订阅)
  4. 消费者接收消息

# 交换机 Exchange

生产者生产的消息不会直接发送到队列中的 , 而是发送到交换机 , 由交换机推入队列

交换机类型 : (点击跳转代码示例)

  • direct(点对点) (默认) 交换机会匹配 生产者发送的 RoutingKey 与 消费者队列绑定交换机的 RoutingKey . 相同才能实现点对点发送 . 如果没有匹配到一个 , 很有可能会丢失数据
  • topic(发布订阅) 交换机会 通配符匹配 生产者发送的 RoutingKey 与 消费者队列绑定交换机的 RoutingKey , 符合条件的队列都会收到分发的消息
  • fnout(广播) 只要消费者绑定有该类型交换机 , 不管RoutingKey是否匹配 , 都会接收广播消息
  • ...

交换机 声明方法

==Channel.exchangeDeclare(String exchange, String type)==

  • exchange : 交换机名称
  • type : 交换类型

# 临时队列

临时队列 , 字面意思暂时使用的队列

队列特性 :

  • 随机名称
  • 断开消费者连接队列自动删除

Web管理页中 , 可以看到队列状态是 AD Excl (自动删除)

临时队列创建

==queueDeclare()==

一般去会获取队列标识进行食用 ==chanel.queueDeclare().getQueue();==

# 绑定 binding

Exchange交换机 创建后需要绑定队列才会进行推送消息至消费者 . 可以绑定多个队列 , 消息推送是根据 Routing Kye(路由规则) 来确定指定队列

点击查看代码

rabbitmq-temp01

交换机只负责转发消息 , 并没有存储消息的能力 , 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列 , 那么消息会丢失

绑定方法

==queueBind(String queue, String exchange, String routingKey)==

  • queue : 队列名
  • exchange : 交换机
  • routing Key : 路由Key

重载方法 , 可在最后面携带参数 , 详细自行API

Web管理页 绑定

Exchanges -> 指定Exchange交换机 -> Bindings

rabbitmq-temp022

# direct

交换机会匹配 生产者发送的 RoutingKey 与 消费者队列绑定交换机的 RoutingKey . 相同才能实现点对点发送 . 如果没有匹配到一个 , 很有可能会丢失数据

代码示例 : 点击跳转

# topic

交换机会根据 通配符匹配 生产者发送的 RoutingKey 与 消费者队列绑定交换机的 RoutingKey , 符合条件的队列都会收到分发的消息

通配符说明 :

  • * 代替一个单词
  • # 代替0个/多个单词

特殊情况 :

  • 如果只有一个 # 那么将会接收通道的所有数据
  • 如果没有 #/* 出现 , 默认采用 direct

匹配案例 :

RoutingKey 通配值 说明
com.sans.color *.sans.* 匹配3个单词中的中间单词 sans
com.sans.color.red #.red 匹配最后为 red
com.sans.color.blue con.# 匹配开头为 com

代码示例 : 点击跳转

# fnout

只要消费者绑定有该类型交换机 , 不管RoutingKey是否匹配 , 都会接收广播消息

代码示例 : [点击跳转](#广播 fnout)

# 代码实战

通用代码 , 参数自控

食用说明 :

  1. 采用Main传参控制 , 跳转了解
  2. 采用信道获取工具类 , 跳转了解

生产者

/**
 * 广播发送
 * @author Sans
 */
public class Producer {

    /**
     * @param args [交换机名, 交换机类型, 路由key]
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        String exchangeName = args[0];
        String exchangeTypeParam = args[1];
        // 如果未赋予值 , 默认为 ""
        String routingKey = args.length == 2 ? "" : args[2];

        System.out.println("[交换机名, 交换机类型, 路由key]");
        System.out.println(Arrays.toString(args));

        // 枚举验证
        BuiltinExchangeType exchangeType = null;
        for (BuiltinExchangeType value : BuiltinExchangeType.values()) {
            if (value.getType().equals(exchangeTypeParam)) exchangeType = value;
        }
        if (exchangeType == null) return;

        Channel channel = RabbitMqUtils.getChanel();
        /*
          声明一个exchange
          1.exchange的名称
          2.exchange的类型
         */
        channel.exchangeDeclare(exchangeName, exchangeType);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 发布消息
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:" + message);
        }
    }

}

消费者

/**
 * 消费者
 * @author Sans
 */
public class Consumer {

    /**
     * @param args [交换机名, 路由key]
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        String exchangeName = args[0];
        String routingKey = args[1];
        System.out.println("[交换机名, 路由key]");
        System.out.println(Arrays.toString(args));


        Channel chanel = RabbitMqUtils.getChanel();
        // 临时队列
        String queueName = chanel.queueDeclare().getQueue();
        // 绑定交换机
        chanel.queueBind(queueName, exchangeName, routingKey);

        // 接收消息
        chanel.basicConsume(queueName, true,(consumerTag,message)->{
            System.out.println("收到消息 => "+new String(message.getBody(), StandardCharsets.UTF_8));
        },consumerTag->{});

    }
}

以上Main中接收参数分别说明

身份 接收参数
生产者 [交换机名, 交换机类型, 路由key]
消费者 [交换机名, 路由key]
# direct点对点测试
运行顺序 应用程序服务 参数传递(直接复制即可)
1 生产者 P1 ==color direct blue==
2 消费者 C1 ==color blue==
3 消费者 C2 ==color black==

结果 : C1消费了 , C2无消费 . 只有 RoutingKey 匹配的消费者消费消息

# topic发布订阅
运行顺序 应用程序服务 传递参数(直接复制即可)
1 生产者 P1 ==color topic com.sans.red==
2 消费者 C1 ==color #.red==
3 消费者 C2 ==color com.#==
4 消费者 C3 ==color com.*==
5 消费者 C4 ==color *.red==
6 消费者 C5 ==color #==
7 消费者 C6 ==color *==

结果 : C1 , C2 , C5 消费者消费了 , 其余未消费 . 只有 RoutingKey通配符匹配 的消费者消费消息

# 广播 fnout
运行顺序 应用程序服务 传递参数(直接复制即可)
1 生产者 P1 ==color fnout red==
2 消费者 C1 ==color black==
3 消费者 C2 ==color blue==
4 消费者 C3 ==color yellow==

结果 : 所有消费者都消费了 . 消费者收到消息不会受到 RoutingKey的影响 , 只需绑定就可以收到通知

# 进阶

# 死信队列

死信 是无法被消费的消息

情况 : 生产者 发送消息 MQ , 消费者 从 队列 取出 , 由于某些原因导致 队列 中的某些消息无法被消费 , 这样的消息没有得到处理 , 称之为死信

场景 : 用户下订单时 , 点击支付 , 但又未在指定时间支付 , 死信队列机制会误认为异常消息 , 消息将会投入死信队列中

触发机制 :

  • 消息 TTL 过期
  • 队列到达最大长度
  • 消息被拒 , 使用 channel.basicNack/channel.basicReject 应答 , 并且参数requeue为false(不回流队列)

交换机 和 队列 中的配置 , 一旦有修改 需要删除 重新运行

Web管理页中 , 可以看到队列状态是 DLX(死信交换机) , DLK (死信routingKey)

**代码示例 : **

初始化构架声明

点击查看代码
/**
 * 初始化架构
 * 初始化 交换机 , 队列 结构信息
 * @author Sans
 */
public class InitialArchitecture {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";

    // 普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    // 死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    
    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();

        String normalRoutingKey = "sans";
        String deadRoutingKey = "dead";
        
        // 重新初始化 , 删除以往相同名称的 交换机和队列
        chanel.exchangeDelete(NORMAL_EXCHANGE);
        chanel.exchangeDelete(DEAD_EXCHANGE);
        chanel.queueDelete(NORMAL_QUEUE);
        chanel.queueDelete(DEAD_QUEUE);

        // 声明 普通,死信 交换机
        chanel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        chanel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        Map<String, Object> arguments = new HashMap<>();
        // 正常的队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 设置死信routingKey
        arguments.put("x-dead-letter-routing-key", deadRoutingKey);
        /* 超出长度 测试
           设置队列最大长度
        */ 
        //arguments.put("x-max-length", 6);

        // 声明 普通队列 , 死信队列
        chanel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        chanel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        // 普通队列 绑定 普通交换机
        chanel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, normalRoutingKey);
        // 死信队列 绑定 死信交换机
        chanel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, deadRoutingKey);
    }
}

生产者

点击查看代码
/**
 * 生产者
 * @author Sans
 */
public class Producer {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();

        String routingKey = "sans";

        for (int i = 0; i < 10; i++) {
            String msg = "msg " + i;
            /** TTL 超时测试
                参数3 : 构造者构建参数 ttl时间参数
            */
            chanel.basicPublish(NORMAL_EXCHANGE,
                    routingKey,
                    //new AMQP.BasicProperties().builder().expiration("10000").build(),
                    null,
                    msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送成功 => " + msg);
        }

    }

}

消费者1

点击查看代码
/**
 * 消费者1
 * @author Sans
 */
public class Consumer1 {

    // 普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();

        // 处理消息
        chanel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收消息 => " + msg);
            // 消息拒绝 测试
            //long deliveryTag = message.getEnvelope().getDeliveryTag();
            //if ("msg 4".equals(msg)) {
            //    /* 拒绝 消息/收到
            //        拒绝收到 (参数3: false防止回流队列): chanel.basicNack(deliveryTag, false, false);
            //        拒绝消息 (参数2: false防止回流队列): chanel.basicReject(deliveryTag, false);
            //     */
            //    //chanel.basicNack(deliveryTag, false, false);
            //    chanel.basicReject(deliveryTag, false);
            //    System.out.println("拒绝 => " + msg);
            //}else{
            //    chanel.basicAck(deliveryTag, false);
            //    System.out.println("接收消息 => " + msg);
            //}
        }, consumerTag -> {
        });

    }
}

消费者2 (死信消费)

点击查看代码
/**
 * 消费者2
 * @author Sans
 */
public class Consumer2 {

    // 死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel chanel = RabbitMqUtils.getChanel();

        // 处理消息
        chanel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
            System.out.println("接收消息 => " + new String(message.getBody(), StandardCharsets.UTF_8));
        }, consumerTag -> {
        });

    }
}

# 测试

触发死信机制 : 有3种触发条件 , 分别围绕它们的条件进行测试

过期触发TTL

  1. 配置 生产者发送方法 ==basicPublish()== 中的第三参数 添加消息属性 (采用构造者模式构造类) ==new AMQP.BasicProperties().builder().expiration("10000").build()== (TTL设置10s过期)
  2. 运行 初始化架构
  3. 运行 生产者
  4. 观察 Web管理页 , 普通队列中的所有消息 过期推送至 死信队列中
  5. 运行 消费者2 , 清除死信队列中的消息

队列到达最大长度

  1. 配置 初始化架构 , 在 声明普通队列 ==queueDeclare()==方法 中的第五个参数 编辑队列属性 Map集合新增 : ==arguments.put("x-max-length", 6);== (队列设置最大长度为6条消息)
  2. 运行 初始化架构
  3. 运行 生产者
  4. 运行 消费者1
  5. 观察 Web管理页 , 普通队列中的消息 , 有部分消息会被排挤到 死信队列中
  6. 运行 消费者2 , 清除死信队列中的消息

注意 :

  • 初始化架构类中的配置 一旦修改了 , 则需要删除掉原旧的 交换机/队列 , 在运行 (配置更改了会冲突)
  • 为了尽可能的展现消息进入 死信队列中 , 要确保上一次所修改的配置是否还原 , 以防上次的配置影响数据混乱

消息被拒绝

  1. 配置 消费者1 , 手动应答请求 , 指定部分拒绝接收

    chanel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        long deliveryTag = message.getEnvelope().getDeliveryTag();
        if ("msg 4".equals(msg)) {
            /* 拒绝 消息/收到
                拒绝收到 (参数3: false防止回流队列): chanel.basicNack(deliveryTag, false, false);
                拒绝消息 (参数2: false防止回流队列): chanel.basicReject(deliveryTag, false);
             */
            //chanel.basicNack(deliveryTag, false, false);
            chanel.basicReject(deliveryTag, false);
            System.out.println("拒绝 => " + msg);
        }else{
            chanel.basicAck(deliveryTag, false);
            System.out.println("接收消息 => " + msg);
        }
    }, consumerTag -> {
    });
    
  2. 运行 初始化架构

  3. 运行 生产者1

  4. 运行 消费者1

  5. 观察 Web管理页 , 普通队列中的消息 , 有一条 msg4消息 被拒绝 至死信队列中

  6. 运行 消费者2 , 清除死信队列中的消息

# 整合SpringBoot

Springboot文档 : https://docs.spring.io/spring-amqp/reference/html/amqp.html

# 快速入门

引入依赖

<!--RabbitMQ 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application配置

server:
    port: 8088
spring:
    rabbitmq:
        host: 8.130.47.114
        port: 5672
        username: bozhu
        password: 123123

端口容易冲突 , 建议修改

RabbitmqConfig配置类

交换机和队列 , 以及绑定 , 等操作是通过实例化形式进行的 , 以下由配置类形式展示

简单结构

rabbitmq-temp03

点击查看代码
@Configuration
public class RabbitmqConfig {
   
   // 交换机
   String aExchange = "a_exchange";
   String deadExchange = "dead_exchange";
   // 队列
   String aQueue = "a_queue";
   String deadQueue = "dead_queue";
   
   String routingKeyA = "RKA"
   String routingKeyB = "RKB"
   
   /**
    * 交换机
    */
   @Bean
   public DirectExchange aExchange() {
       return new DirectExchange(aExchange);
   }
   @Bean
   public DirectExchange deadExchange() {
       return new DirectExchange(deadExchange);
   }
   
   /**
    * 队列
    */
   @Bean
   public Queue aQueue() {
       // 配置 死信交换机, 死信RoutingKey, 过期TTL
       return QueueBuilder.durable(aExchange)
               .deadLetterExchange(deadExchange)
               .deadLetterRoutingKey(routingKeyB)
               .ttl(10*1000)
               .build();
   }
   @Bean
   public Queue deadQueue() {
       return QueueBuilder.durable(deadQueue).build();
   }
   
   /**
    * 绑定
    */
   @Bean
   public Binding bindingAqueueToAexchange(Queue aQueue , DirectExchange aExchange) {
       return BindingBuilder.bind(aQueue).to(aExchange).with(routingKeyA);
   }
   @Bean
   public Binding queueBbindX(Queue deadQueue , DirectExchange deadExchange) {
       return BindingBuilder.bind(deadQueue).to(deadExchange).with(routingKeyB);
   }
}

# 操作对象

类 说明
RabbitTemplate 简化的 发送/接收 消息类
AmqpAdmin 携带式对AMQP管理操作类

生产者发送消息 : ==RabbitTemplate.convertAndSend()==

# 消费者

@Component
public class QueueConsumer {
    // 队列处理消息
    // 注解指定队列名
    @RabbitListener(queues = "QA")
    public void receiveDead(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("处理消息 => " + msg);
    }
}

# 延迟队列

延迟队列 是用来存放到指定时间才被释放消息的队列

延迟队列机制 和 死信 TTL过期 相似 , 但延迟队列对时间的控制较灵活 , 应用应用场景广泛

场景 :

  1. 下单十分钟未支付自动取消订单
  2. 新建店铺 , 如果十天未上传商品 , 则自动发送消息提醒
  3. 新用户注册后 , 三天未登录则进行短信提醒
  4. 用户发起退款 , 如果三天未处理则通知相关运营人员
  5. 会议预定 , 指定时间的前十分钟提醒参加会议

可以控制任意时间点发送通知

# 应用方式

延迟配置

在 队列 中配置延迟属性

/* 变量说明
	NORMAL_QUEUE_A : 队列名称
	DEAD_EXCHANGE_Y : 死信交换机名称
	ROUTING_DEAD_QUEUE_KEY : 死信RoutingKey
*/
@Bean
public Queue aQueue() {
    // 配置 死信交换机, 死信RoutingKey, 过期TTL
    return QueueBuilder.durable(NORMAL_QUEUE_A)
            .deadLetterExchange(DEAD_EXCHANGE_Y)
            .deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
            .ttl(10*1000)
            .build();
}

在 生产者 消息发送方法 中配置延迟

指定消息TTL在队列中超时时长 : ==message.getMessageProperties().setExpiration(ttl);==

// 发送消息方法
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_C, msg,
        message -> {
            message.getMessageProperties().setExpiration(ttl);
            return message;
        }
);

# 代码示例

架构 :

rabbitmq-temp04

初始化架构 RabbitmqConfig

点击查看代码
@Configuration
public class RabbitmqConfig {

    // 普通/死信 交换机
    public static final String NORMAL_EXCHANGE_X = "EX";
    public static final String DEAD_EXCHANGE_Y = "EY";
    // 死信 队列
    public static final String DEAD_QUEUE = "QD";
    // 普通 队列
    public static final String NORMAL_QUEUE_A = "QA";
    public static final String NORMAL_QUEUE_B = "QB";
    public static final String NORMAL_QUEUE_C = "QC";

    // RoutingKey
    public static final String ROUTING_KEY_QUEUE_A = "RQA";
    public static final String ROUTING_KEY_QUEUE_B = "RQB";
    public static final String ROUTING_KEY_QUEUE_C = "RQC";
    // 死信 RoutingKey
    public static final String ROUTING_DEAD_QUEUE_KEY = "RQD";

    /**
     * 交换机
     */
    @Bean
    public DirectExchange xExchange() {
        return new DirectExchange(NORMAL_EXCHANGE_X);
    }

    @Bean
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE_Y);
    }

    /**
     * 队列
     */
    @Bean
    public Queue aQueue() {
        // 配置 死信交换机, 死信RoutingKey, 过期TTL
        return QueueBuilder.durable(NORMAL_QUEUE_A)
                .deadLetterExchange(DEAD_EXCHANGE_Y)
                .deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
                .ttl(10*1000)
                .build();
    }

    @Bean
    public Queue bQueue() {
        // 配置 死信交换机, 死信RoutingKey, 过期TTL
        return QueueBuilder.durable(NORMAL_QUEUE_B)
                .deadLetterExchange(DEAD_EXCHANGE_Y)
                .deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
                .ttl(40*1000)
                .build();
    }

    @Bean
    public Queue cQueue() {
        // 配置 死信交换机, 死信RoutingKey
        return QueueBuilder.durable(NORMAL_QUEUE_C)
                .deadLetterExchange(DEAD_EXCHANGE_Y)
                .deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
                .build();
    }


    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 绑定
     * 比较示例 : 普通队列绑定 普通队列 --- 普通交换机
     * chanel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, normalRoutingKey);
     */
    @Bean
    public Binding queueAbindingX(Queue aQueue , DirectExchange xExchange) {
        return BindingBuilder.bind(aQueue).to(xExchange).with(ROUTING_KEY_QUEUE_A);
    }
    @Bean
    public Binding queueBbindingX(Queue bQueue , DirectExchange xExchange) {
        return BindingBuilder.bind(bQueue).to(xExchange).with(ROUTING_KEY_QUEUE_B);
    }
    @Bean
    public Binding queueCbindingX(Queue cQueue, DirectExchange xExchange) {
        return BindingBuilder.bind(cQueue).to(xExchange).with(ROUTING_KEY_QUEUE_C);
    }
    @Bean
    public Binding queueDbindingY(Queue deadQueue , DirectExchange yExchange) {
        return BindingBuilder.bind(deadQueue).to(yExchange).with(ROUTING_DEAD_QUEUE_KEY);
    }
}

生产者 ProducerSendMessageController

点击查看代码
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public void sendMessage(@PathVariable String msg) {
        System.out.println("收到请求 => " + msg);

        // 发送消息
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_B, "B队列(40 000ms) =>" + msg);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_A, "A队列(10 000ms) =>" + msg);
    }
	
    /**
     * QC 发送消息(自定义时长)
     * @param msg 消息
     * @param ttl 过期时长 ms
     */
    @GetMapping("/qcSend/{msg}/{ttl}")
    public void sendMessage(@PathVariable String msg, @PathVariable String ttl) {
        System.out.printf("收到请求 => [%s, %s]%n", msg, ttl);
        rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_C, msg,
                message -> {
                    message.getMessageProperties().setExpiration(ttl);
                    return message;
                }
        );
    }

}

死信消费者 DeadQueueConsumer

@Component
public class DeadQueueConsumer {
	// 死信处理
    @RabbitListener(queues = DEAD_QUEUE)
    public void receiveDead(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("死信消费 => " + new Date() + " : " + msg);
    }
}

# 代码测试

测试 QA 和 QB 队列 TTL过期测试

  1. 访问 http://localhost:8088/producer/send/lisi
  2. 观察 控制台 , 死信队列打印的消息(等待10s/40s)

他们队列是分开分发一条消息 互不干扰 , 等待时间分别是 10s/40s

测试 QC 队列多消息

  1. 连续访问 http://localhost:8088/producer/qcSend/GOGO/10000

  2. 连续访问 http://localhost:8088/producer/qcSend/GOGOGO/15000

  3. 连续访问 http://localhost:8088/producer/qcSend/GO/5000

  4. 观察 控制台 , 消息是共同在队列中进行等待时间

    收到请求 => [GOGO, 10000]
    收到请求 => [GOGOGO, 15000]
    收到请求 => [GO, 5000]
    死信消费 => Mon Mar 06 11:25:59 CST 2023 : GOGO
    死信消费 => Mon Mar 06 11:26:05 CST 2023 : GOGOGO
    死信消费 => Mon Mar 06 11:26:05 CST 2023 : GO
    

在 多消息同一队列中 , 消息的TTL过期时间是同时加载的 , 并且是有序的 .

问题不难发现 , 同一时间发送两条消息 , 如果 第一个消息的TTL 大于 第二条消息的TTL , 即使 第二条消息的TTL已超时 , 也必须等待 第一条消息的TTL过期 , 最后也是按照先后顺序处理消息

# 插件拓展

该插件正是解决上面的问题 , 采用交换机的插件类型 , 能够实现 消息TTL等待是在交换机中等待执行 , 避免了在队列中排队的问题

插件官网 : https://www.rabbitmq.com/community-plugins.html

插件GitHub : https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装 :

  1. 进入MQ插件目录 , 并将下载好的插件放进去 .ez格式 (选择自己的版本号) ==cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins==

  2. 安装插件 (不需 填写版本和文件后缀) ==rabbitmq-plugins enable rabbitmq_delayed_message_exchange==

  3. 重启RabbitMQ ==systemctl restart rabbitmq-server==

  4. 观察 Web管理页 , 新增交换机 , 查看类型是否多出了 x-delayed-message类型

下载注意版本兼容问题

# 应用方式

交换机声明

采用 自定义交换机 进行实例化声明

@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-delayed-type", "direct");
    return new CustomExchange(
            DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}

消费者延迟发送

往消息添加 x-delay头 属性 , 延迟功能 ==message.getMessageProperties().setDelay(ttl);==

/* 变量说明
	DELAYED_EXCHANGE_NAME : 交换机名称
	DELAYED_ROUTING_KEY : RoutingKey
	msg : 发送消息
	ttl : 消息延迟时长
*/
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY ,msg , message -> {
    // 为消息属性添加延迟功能
    message.getMessageProperties().setDelay(ttl);
    return message;
});
# 代码示例

例图就不展示了 , 一个交换机: delayed.exchange , 一个队列: delayed.queue , 生产者 , 消费者

主要测试 队列TTL过期循序问题

初始化架构 DelayedQueueConfig

@Configuration
public class DelayedQueueConfig {

    // 交换机, 队列, routingKey
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /** 自定义交换机
     *  由于是自定义类型
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(
                DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

生产者

@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/delayed/{msg}/{ttl}")
    public void sendDelayedMessage(@PathVariable String msg, @PathVariable Integer ttl) {
        System.out.printf("收到请求 => [%s, %s]%n", msg, ttl);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY,msg , message -> {
            // 设置方式和以往不同 , 这个是设置延迟
            // message.getMessageProperties().setExpiration(ttl + "");
            message.getMessageProperties().setDelay(ttl);
            return message;
        });
    }
}

消费者

/**
 * 消费者
 * @author Sans
 */
@Component
public class QueueConsumer {
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void delayedConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("消费消息 => " + new Date() + " : " + msg);
    }
}
# 代码测试

队列多消息测试

  1. 连续访问 http://localhost:8088/producer/delayed/GOGO/10000

  2. 连续访问 http://localhost:8088/producer/delayed/GOGOGO/15000

  3. 连续访问 http://localhost:8088/producer/delayed/GO/5000

  4. 观察 控制台 , 消息的延迟时间 是否按照有小到大的顺序 进行处理

    收到请求 => [GOGO, 10000]
    收到请求 => [GOGOGO, 15000]
    收到请求 => [GO, 5000]
    消费消息 => Mon Mar 06 11:29:26 CST 2023 : GO
    消费消息 => Mon Mar 06 11:29:29 CST 2023 : GOGO
    消费消息 => Mon Mar 06 11:29:35 CST 2023 : GOGOGO
    

在Web管理页中 , 是在交换机查看 延迟的消息 , 并非在队列中查看 , 消息直到延迟时间到期才会释放

# 发布确认高级

生产者发送消息后会进行备份到缓存中 , 如果成功则从缓存删除该备份的消息 , 否则在缓存中执行定时任务 , 重新从缓存中重新发布至 交换机 , 直到成功为止

大致流程

  1. 配置 ==spring.rabbitmq.publisher-confirm-type: correlated==

  2. 声明基本架构 (交换机/队列/绑定)

  3. 生产者正常发送消息

  4. 消费者正常接收消息

  5. 编写 回调类 , 实现 RabbitTemplate.ConfirmCallback回调接口 (交换机)

  6. 重写 confirm()回调方法 (成功/失败 都会走该方法)

  7. 内部类接口注入 (由于是内部类不能直接拿去使用 , 不过可以通过以下形式注入其中)

    @Resource
    RabbitTemplate rabbitTemplate;
    /* @PostConstruct注解 
        在配置类 执行的构造函数 和 自动注入 后执行初始化的方法(类似servlet的init()方法) 
    */
    @PostConstruct
    public void init() {
        // 发布确认
        rabbitTemplate.setConfirmCallback(this);
    }
    
  8. 测试

配置

生产者的类型确认使用 spring.rabbitmq.publisher-confirm-type

配置值 说明
none(默认) 不做任何确认操作
correlated 消息到交换机触发 回调
simple 通过手动 waitForConfirms()返回结果回答(少用)

# 代码示例

例图就不展示了 , 一个交换机: confirm.exchange , 一个队列: confirm.queue , 生产者 , 消费者

配置

==spring.rabbitmq.publisher-confirm-type: correlated==

基本架构

@Configuration
public class ConfirmConfig {

    // 交换机, 队列, routingKey
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "confirm.routingkey";

    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmQueueBindingConfirmExchange(Queue confirmQueue, DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

}

生产者

@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/confirm/{msg}")
    public void confirmSendMessage(@PathVariable String msg) {
        System.out.println("confirmSend => " + msg);
        // 回调相关数据对象
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg, correlationData);
    }

}

消费者

@Component
public class QueueConsumer {
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void confirmConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("confirmConsumer => " + new Date() + " : " + msg);
    }
}

回调类

@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;
	
    // 注入内部类接口
    @PostConstruct
    public void init() {
        // 发布确认
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     * @param correlationData 回调的相关数据
     * @param ack ack为true,nack为false
     * @param cause 原因,对于nack,如果可用,否则为null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData == null ? "" : correlationData.getId();
        if (ack) {
            System.out.println("Success => " + id);
        } else {
            System.out.println("Failure => " + id + " [" + cause + "] ");
        }
    }
}

# 代码测试

根据可控变量分析 , 可分析出可能情况 :

  • 生产者 找不到交换机
  • 找到交换机 , 但找不到队列

验证请求 : http://localhost:8088/producer/confirm/GO

生产者 找不到交换机 模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机 测试结果 : 触发 ConfirmCallback()回调函数 , 失败 , 找不到交换机

confirmSend => GO
Failure => confirm2 [channel error; protocol method: #method&lt;channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no' in vhost '/', class-id=60, method-id=40)] 

找到交换机 , 但找不到队列

模拟故障操作 : (以下两个故障结果是一致的)

  • 更改 生产者发送方法指定的RoutingKey
  • 新建交换机 , 不进行绑定队列

测试结果 : 触发 ConfirmCallback()回调函数 , 成功 , 但消息没有得到消费 . 由于找不到队列消息而丢失

confirmSend => GO
Success => confirm3

通过以上情况测试不难发现 , 仅靠 交换机 的确认是不行的 , 还需要在 队列 中进行确认消息 !

可以通过退回消息 , 解决该问题

# 回退消息

回退消息 主要功能是 确认消息发到队列中 . 也解决了上面进入队列确认的问题

实现 基于 发布确认代码 新增 配置/代码

  1. 配置 ==spring.rabbitmq.publisher-returns: true==

  2. 编写 回调类 , 实现 RabbitTemplate.ReturnsCallback回调接口 (队列)

  3. 重写 returnedMessage()回调方法 , 找不到交换机失败回调

  4. 内部类接口注入 (由于是内部类不能直接拿去使用 , 不过可以通过以下形式注入其中)

# 代码示例

复用上面的 发布确认代码

配置 ==spring.rabbitmq.publisher-returns: true==

生产者

点击查看代码
@GetMapping("/messageFallback/{msg}")
public void messageFallbackSend(@PathVariable String msg) {
    System.out.println("messageFallbackSend => " + msg);
    // 发送成功
    CorrelationData correlationData1 = new CorrelationData("messageFallback1");
    rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg, correlationData1);
    // 生产者 找不到交换机
    //CorrelationData correlationData2 = new CorrelationData("messageFallback2");
    //rabbitTemplate.convertAndSend("no", "", msg, correlationData2);
    // 交换机 找不到RoutingKey
    //CorrelationData correlationData3 = new CorrelationData("messageFallback3");
    //rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "no", msg, correlationData3);
    // 交换机 找不到队列
    //CorrelationData correlationData4 = new CorrelationData("messageFallback4");
    //rabbitTemplate.convertAndSend("testNoNull", "", msg, correlationData4);
}

回调类

点击查看代码
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 发布确认
        rabbitTemplate.setConfirmCallback(this);
        // 消息回退
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换机确认回调方法
     * @param correlationData 回调的相关数据
     * @param ack ack为true,nack为false
     * @param cause 原因,对于nack,如果可用,否则为null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData == null ? "" : correlationData.getId();
        if (ack) {
            System.out.println("Success => " + id);
        } else {
            System.out.println("Failure => " + id + " [" + cause + "] ");
        }
    }

    /**
     * 回退消息
     *  当消息过程不能达到目的地 , 则将消息返回给生产者
     * @param returned 返回的消息和元数据
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("回退消息 ==>");
        System.out.println("  消息: " + new String(returned.getMessage().getBody(), StandardCharsets.UTF_8));
        System.out.println("  交换机名: " + returned.getExchange());
        System.out.println("  RoutingKey: " + returned.getRoutingKey());
        System.out.println("  退回原因: " + returned.getReplyText());
    }
}

# 代码测试

根据可控变量分析 , 可分析出可能情况 :

  • 生产者 找不到交换机
  • 生产者 找到交换机 , 但找不到队列

验证请求 : http://localhost:8088/producer/messageFallback/GO

生产者 找不到交换机 模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机 测试结果 : 触发 ConfirmCallback()回调函数 , 失败 , 找不到交换机

messageFallbackSend => GO
Failure => messageFallback2 [channel error; protocol method: #method&lt;channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no' in vhost '/', class-id=60, method-id=40)] 

生产者 找到交换机 , 但找不到队列

模拟故障操作 : (以下两个故障结果是一致的)

  • 更改 生产者发送方法指定的RoutingKey
  • 新建交换机 , 不进行绑定队列

测试结果 : 触发 ConfirmCallback()(成功)和returnedMessage()(失败) 回调函数 , 但消息没有得到消费 . 由于找不到队列消息而丢失

messageFallbackSend => GO
回退消息 ==>
  消息: GO
  交换机名: confirm.exchange
  RoutingKey: no
  退回原因: NO_ROUTE
Success => messageFallback3

# 备用交换机

备用交换机 , 字面意思 , 当某一交换机匹配不到RoutingKey指定的队列 , 那么会交给 备用交换机 处理 .

一般情况 备用交换机 , 用来处理消费者 监控/报警 等操作

应用方式

在配置交换机Bean的时候可以通过 构造者模式中的 alternate()方法 指定备用交换机

/* 变量说明
	BACKUP_EXCHANGE_NORMAL: 普通交换机名
	BACKUP_EXCHANGE_BACKUP: 备份交换机名
*/
@Bean
public DirectExchange backupNormalExchange() {
    return ExchangeBuilder.directExchange(BACKUP_EXCHANGE_NORMAL)
            .durable(true)
            // 备用
            .alternate(BACKUP_EXCHANGE_BACKUP)
            .build();
}

# 代码示例

架构图

rabbitmq-temp05

基本架构

点击查看代码
@Configuration
public class BackupConfig {

    // 交换机
    public static final String BACKUP_EXCHANGE_NORMAL = "backup.exchange.normal";
    public static final String BACKUP_EXCHANGE_BACKUP = "backup.exchange.backup";

    // 队列
    public static final String BACKUP_QUEUE_NORMAL = "backup.queue.normal";
    public static final String BACKUP_QUEUE_BACKUP = "backup.queue.backup";
    public static final String BACKUP_QUEUE_WARNING = "backup.queue.warning";

    // routingKey
    public static final String BACKUP_ROUTING_KEY_NORMAL = "backup.routingkey.normal";

    /**
     * 交换机
     */
    @Bean
    public DirectExchange backupNormalExchange() {
        return ExchangeBuilder.directExchange(BACKUP_EXCHANGE_NORMAL)
                .durable(true)
                // 备用
                .alternate(BACKUP_EXCHANGE_BACKUP)
                .build();
    }
    @Bean
    public FanoutExchange backupExchange() {
        // 广播
        return new FanoutExchange(BACKUP_EXCHANGE_BACKUP);
    }

    /**
     * 队列
     */
    @Bean
    public Queue backupNormalQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NORMAL).build();
    }
    @Bean
    public Queue backupBackupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_BACKUP).build();
    }
    @Bean
    public Queue backupWarningQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_WARNING).build();
    }

    /**
     * 绑定
     */
    @Bean
    public Binding bindingNormalQueueToNormalExchange(Queue backupNormalQueue, DirectExchange backupNormalExchange) {
        return BindingBuilder.bind(backupNormalQueue).to(backupNormalExchange).with(BACKUP_ROUTING_KEY_NORMAL);
    }
    @Bean
    public Binding bindingBackupBackupQueueToBackupExchange(Queue backupBackupQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupBackupQueue).to(backupExchange);
    }
    @Bean
    public Binding bindingBackupWarningQueueToBackupExchange(Queue backupWarningQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupWarningQueue).to(backupExchange);
    }
}

生产者

点击查看代码
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/backup/{msg}")
    public void backupSend(@PathVariable String msg) {
        System.out.println("backupSend => " + msg);
        // 发送成功
        CorrelationData correlationData1 = new CorrelationData("backupSend>1");
        rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL, BACKUP_ROUTING_KEY_NORMAL, msg, correlationData1);
        // 生产者 找不到交换机
        //CorrelationData correlationData2 = new CorrelationData("backupSend>2");
        //rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL+"123", BACKUP_ROUTING_KEY_NORMAL, msg, correlationData2);
        // 交换机 找不到RoutingKey
        //CorrelationData correlationData3 = new CorrelationData("backupSend>3");
        //rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL, BACKUP_ROUTING_KEY_NORMAL+"123", msg, correlationData3);
        // 交换机 找不到队列
        //CorrelationData correlationData4 = new CorrelationData("backupSend>4");
        //rabbitTemplate.convertAndSend("testNoNull", "", msg, correlationData4);
    }
}

消费者

点击查看代码
@Component
public class QueueConsumer {
    /**
     * 备份交换机
     */
    // 正常消费者
    @RabbitListener(queues = BACKUP_QUEUE_NORMAL)
    public void backupNormalConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("backupNormalConsumer => " + new Date() + " : " + msg);
    }
    // 备份消费者
    @RabbitListener(queues = BACKUP_QUEUE_BACKUP)
    public void backupConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("backupConsumer => " + new Date() + " : " + msg);
    }
    // 警告消费者
    @RabbitListener(queues = BACKUP_QUEUE_WARNING)
    public void backupWarningConsumer(Message message, Channel channel) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("backupWarningConsumer => " + new Date() + " : " + msg);
    }

}

# 代码测试

根据可控变量分析 , 可分析出可能情况 :

  • 生产者 找不到交换机
  • 交换机 RoutingKey匹配不到队列
  • 交换机 未绑定队列
  • 备用交换机 未绑定队列

验证请求 : http://localhost:8088/producer/messageFallback/GO

模拟故障操作 , 在生产者类中写有 , 去掉注释测试即可

生产者 找不到交换机

模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机 测试结果 : ConfirmCallback() 回调失败

backupSend => GO
Failure => backupSend>2 [channel error; protocol method: #method&lt;channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'backup.exchange.normal123' in vhost '/', class-id=60, method-id=40)] 

交换机 RoutingKey匹配不到队列

模拟故障操作 : 更改 生产者发送方法指定的RoutingKey

测试结果 : ConfirmCallback() 回调成功 , 并且 备用交换机处理消息

backupSend => GO 
Success => backupSend>3
backupWarningConsumer => Tue Mar 07 14:40:37 CST 2023 : GO
backupConsumer => Tue Mar 07 14:40:37 CST 2023 : GO

交换机 未绑定队列

模拟故障操作 : 新建交换机 , 不进行绑定队列 测试结果 : ConfirmCallback() 回调成功 , 在交换机中 触发returnedMessage()退回消息 , 备用交换机未处理消息

backupSend => GO
回退消息 ==>
  消息: GO
  交换机名: testNoNull
  RoutingKey: 
  退回原因: NO_ROUTE
Success => backupSend>4

备用交换机 未绑定队列

模拟故障操作 : 备用交换机 , 不进行绑定队列 (在Web管理页 解绑 重发消息即可实现)

测试结果 : ConfirmCallback() 回调成功 , 在备用交换机中 触发returnedMessage()退回消息 , 备用交换机未处理消息

backupSend => GO
回退消息 ==>
  消息: GO
  交换机名: backup.exchange.normal
  RoutingKey: backup.routingkey.normal123
  退回原因: NO_ROUTE
Success => backupSend>3

在上面4种情况观察分析 , 可以发现 交换机一旦未绑定队列 , 会使 消息回退 , 也不会执行备用交换机方案

# 队列优先级

不难想象 , 在双十一高峰期 , 订单会非常多 . 有时 公司出于利益方面 , 划分出客户等级 , 等级越高的客户他们的单子往往在拥挤的时候优先得到解决

在MQ当中优先级取值范围 : 0 ~ 255 (数值越大越优先)

应用方式

配置

在 队列 中配置 优先级属性

/* 变量说明
    CONFIRM_QUEUE_NAME: 队列名
*/
@Bean
public Queue priorityQueue() {
    return QueueBuilder.durable("priority.Queue")
            // 优先级 优先级最大值
            .maxPriority(40)
            .build();
}

在 生产者 消息发送方法 中配置优先级

指定 消息 在队列中的优先级 : ==message.getMessageProperties().setPriority(n);==

@GetMapping("/priority/{msg}")
public void priorityBend(@PathVariable String msg) {
    // 优先级倒过来 ,
    System.out.println("priorityBend => " + msg);
    for (int i = 0; i < 20; i++) {
        CorrelationData correlationData = new CorrelationDat22a(msg + i);
        int finalI = i;
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg + i, message -> {
            message.getMessageProperties().setPriority(finalI);
            return message;
        }, correlationData);
    }
}

# 测试

源代码基本架构 点击跳转

测试步骤 :

  1. 注释掉消费者代码
  2. 运行项目
  3. 访问 http://localhost:8088/producer/priority/GO (发送消息)
  4. 观察方式 , 有两种: Web管理页 (能看到消息明细) ; 控制台(测试顺序)

Web观察

进入 Queues -> 选择队列 -> Get Messages

rabbitmq-temp06

控制台观察 : 恢复消费者代码(去除先前注解) , 重新运行项目

confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO19
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO18
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO17
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO16
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO15
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO14
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO13
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO12
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO11
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO10
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO9
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO8
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO7
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO6
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO5
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO4
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO3
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO2
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO1
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO0

# 惰性队列

惰性队列 主要作用的将消息尽可能的存到磁盘中 , 而消费者响应消息的时候才会被加载到内存中 , 设计初衷主要是容纳更多的消息 , 以免高峰期导致内存爆炸现象

不同情况下的队列 :

  • 普通队列(default) : 消息保存到内存中 (尽可能提高性能)
  • 惰性队列(lazy) : 消息保存到磁盘中 (尽可能存储更多的消息)

惰性优点 :

  • 消费者失效 , 消息堆积情况
  • 大量消息 , 占用内存小

应用 :

  1. 在 队列 中配置 模式属性 : ==x-queue-mode: lazy==
  2. 大量发送消息
  3. 观察 Web内存占用情况

SpringBoot 配置

在 队列 中配置 惰性属性

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.Queue")
            // 惰性
        	.lazy()
            .build();
}

高压测试

在 100W 条消息的内存情况下

  • 普通 : 5.2MiB ≈ 5.078125MB ≈ 5078.1KB
  • 惰性 : 1.1MiB ≈ 1.07421875MB ≈ 1074.2KB

GB 是 生厂商为了方便计算 , 以十进制 10的3次方运算 . 如 : 1000MB = 1GB

GiB 而是 操作系统是采用 , 以二进制 2的10次方运算 . 如 : 1MiB = 1024KiB

为了便于理解 可以将 MiB为MB (其他单位也是如此)

采用 异步发布消息 100W条消息

普通 :

rabbitmq-temp07

惰性 :

rabbitmq-temp08

# 集群

以往的操作都是处于一台机器操作 , 那么该机器宕机了 , 就不能服务 , 因此我们只可以使用多台服务器连接形成集群 , 提高可用率 , 哪怕其中一台宕机了也可以完好的将数据保留

下面采用Docker模拟多台服务器应用

# Docker

在Docker中 部署安装RabbitMQ

  1. 安装Docker (网上教程烂大街..)

  2. 查看版本 https://hub.docker.com/_/rabbitmq , 并拉取下载 建议下载含有Web管理页 , 镜像中带有 mangement版本的

  3. 启动docker容器

    docker run -d --name [容器名称]  \
    -p 5672:5672  \
    -p 15672:15672  \
    -v `pwd`/data:/home/rabbitmq  \
    --hostname [节点名称]  \
    -e RABBITMQ_DEFAULT_USER=admin  \
    -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:[tag标签] \
    

    我个人的应用方式 (端口小修一下)

    docker run -d --name rabbitmq03  \
    -p 5674:5672  \
    -p 15674:15672  \
    -v `pwd`/data:/home/rabbitmq  \
    --hostname node03  \
    -e RABBITMQ_DEFAULT_USER=admin  \
    -e RABBITMQ_DEFAULT_PASS=admin  \
    rabbitmq:3.11.10-management
    

    选项说明 :

    选项 说明
    -d 后台运行
    --name 指定容器名称
    -p 指定端口 [外部端口]:[容器端口] (5672:连接访问; 15672: Web管理页)
    -v 映射 目录/文件
    --hostname 主机名 (较为重要 集群作为 节点名称 使用)
    -e 指定环境变量 (默认账号密码)
  4. Web打开 http://ip:15672/ / http://ip:15673/ / http://ip:15674/

  5. 账号密码 : admin

**检查 : **

  • 检查docker容器 ==docker ps -a==
  • 检查端口是否调试好 ==docker port {容器id}==
  • 查看防火墙是否开放端口

MQ容器不同版本的端口很有可能不同 , 可通过 ==docker ps -a== 进行检查

#消息队列#RabbitMQ
上次更新: 2023/03/12, 00:43:49

← Dubbo Netty网络通信→

最近更新
01
HTTPS自动续签
10-21
02
博客搭建-简化版(脚本)
10-20
03
ruoyi-vue-plus-部署篇
07-13
更多文章>
Theme by Vdoing | Copyright © 2019-2024 | 桂ICP备2022009417号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式