文章目录
- 消息中间件引言
- 3.消息中间件示例
- 4.MQ选型对比文档
- RabbitMQ基础
- 4.Hello World
- docker部署RabbitMq
- RabbitMq常见工作模式
- 6.主题(Topic)模式
- Springboot整合RabbitMq
- RabbitMq消息幂等性
- RabbitMq的消息持久化
- RabbitMq消息确认机制之confirm模式
- RabbitMq死信队列和备份交换机
- RabbitMq综合总结
-
- RabbitMQ基础
-
- 1.RabbitMQ简单介绍
- 2.为什么要使用rabbitmq,使用rabbitmq的场景
- 3.如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
- 4.如何避免消息重复投递或重复消费?
- 5.消息怎么路由?
- 6.如何判断一个消息是死信消息
- 7.消息基于什么传输?
- 8.消息如何分发?
- 9.如何确保消息不丢失?
- 10.mq的缺点
- 11.如何保证RabbitMQ消息的顺序性?
- 12.死信队列+消息过期完成未支付订单自动取消
- 13.如何解决丢数据的问题?
- 14.死信队列&死信交换器
- 15、vhost 是什么?起什么作用?
- 16、RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,还是对应着进程实体?分别起什么作用?
- 17、RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?
- 18、向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行consume 动作会发生什么?
- 19、routing_key 和 binding_key 的最大长度是多少
- 20、“dead letter”queue 的用途?
- 21、为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有durable 属性,同时 message 具有 persistent 属性才行?
- 22、Basic.Reject 的用法是什么?
- 23、RabbitMQ 的高可用性如何保证?
- 消息幂等性
- RabbitMq交换机类型
- RabbitMq高可用
- 总结
消息中间件引言
1.消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等
message queue MQ
2.消息队列应用场景
以下介绍消息队列在实际应用中常用的使用场景。异步处理
,应用解耦
,流量削峰
和消息通讯
四个场景。
2.1.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。
传统的做法有两种 1.串行的方式; 2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2.2.应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
下单流程
订单服务 新增订单
库存服务 减库存
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
2.3.流量削峰
流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理
2.4.日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据
2.5.消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。
3.消息中间件示例
3.1.电商系统
消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket MQ。
(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。
3.2.日志收集系统
分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(Other App)四部分组成。
Zookeeper注册中心,提出负载均衡和地址查找服务日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
Kafka集群:接收,路由,存储,转发等消息处理
Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据
4.MQ选型对比文档
Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
1、 RabbitMq比kafka成熟,在可用性上,稳定性上,可靠性上,RabbitMq超过kafka
2、 Kafka设计的初衷就是处理日志的,可以看做是一个日志系统,针对性很强,所以它并没有具备一个成熟MQ应该具备的特性
3、 但是Kafka的性能(吞吐量、tps)比RabbitMq要强,Kafka 自身服务和消费者都需要依赖 Zookeeper。
- RabbitMQ 在有大量消息堆积的情况下性能会下降,Kafka不会。毕竟AMQP设计的初衷不是用来持久化海量消息的,而Kafka一开始是用来处理海量日志的。
总的来说,RabbitMQ 和 Kafka 都是十分优秀的分布式的消息代理服务,只要合理部署,不作,基本上可以满足生产条件下的任何需求。
RabbitMQ基础
1.RabbitMQ初识
1.1.RabbitMQ简介
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
1.2.消息队列应用场景
开发中消息队列通常有如下应用场景:
1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
1.3.市场上常见的消息队列产品
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(发布订阅模型)。
1.4.Why RabbitMQ?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言(高并发语言)。
5、Spring Boot默认已集成RabbitMQ,SpringBoot把RabbitMQ自动整合!自动化配置!
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
2.相关概念
2.1.AMQP是什么
AMQP,即Advanced Message Queuing Protocol
,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
总结:AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
RabbitMQ 就是基于 AMQP 协议实现的。
3.RabbitMQ工作原理
下图是RabbitMQ的基本结构:
组成部分说明如下:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消息接收流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
4.RabbitMQ安装
4.1. 下载安装
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需 要安装Erlang/OTP,并保持版本匹配,
如下图: RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
1)下载erlang 地址如下: http://erlang.org/download/otp_win64_20.3.exe
以管理员方式运行此文件,安装。 erlang安装完成需要配置erlang环境变量: ERLANG_HOME=G:\softDevelopment\erlang\erl10.1 在path中添 加%ERLANG_HOME%\bin;
2)安装RabbitMQ https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
以管理员方式运行此文件,安装。
3.2. 启动
启动安装成功后会自动创建RabbitMQ服务并且启动。
1)从开始菜单启动RabbitMQ 完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 停止
3.3. 启动安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ 管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3.4. 注意事项
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang 搜索RabbitMQ、ErlSrv,将对应的项全部删除。
4.Hello World
4.1.需求分析
按照官方教程(http://www.rabbitmq.com/getstarted.html)
4.2.搭建环境
Java客户端:生产者和消费者
生产者和消费者都属于客户端,rabbitMQ的java客户端如下:
我们先用 rabbitMQ官方提供的java client测试,目的是对RabbitMQ的交互过程有个清晰的认识。 参考 :https://github.com/rabbitmq/rabbitmq-java-client/
创建maven工程 创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
项目中所需的依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
4.3.生产者
在生产者工程下的test中创建测试类如下:
package com.bruceliu.producer001;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author bruceliu
* @create 2019-10-23 11:34
* @description
*/
public class Procducer001 {
//队列名称
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
// 创建与RabbitMQ服务的TCP连接
connection = factory.newConnection();
// /创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
/*** 声明队列,如果Rabbit中没有此队列将自动创建
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
***/
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "helloworld---我是一条测试消息" + System.currentTimeMillis();
/***
* 消息发布方法
* param1:Exchange的名称,如果没有指定,则使用Default Exchange
* 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定
* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
* param3:消息包含的属性
* param4:消息体
*/
channel.basicPublish("", QUEUE, null, message.getBytes());
System.out.println("Send Mes!!sage is:'" + message + "'");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
}
}
}
}
4.4.消费者
在消费者工程下的test中创建测试类如下:
package com.bruceliu.consumer001;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author bruceliu
* @create 2019-10-23 11:45
* @description
*/
public class Consumer001 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在服务器的ip和端口
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE, true, false, false, null);
// 定义消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*** 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
* @param properties
* @param body
* @throws IOException*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
// /路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body,"UTF-8");
System.out.println("receive message.." + msg);
}
};
/*** 监听队列
* String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE, true, consumer);
}
}
4.5.总结
发送端操作流程
1)创建连接
2)创建通道
3)声明队列
4)发送消息
接收端
1)创建连接
2)创建通道
3)声明队列
4)监听队列
5)接收消息
6)ack回复
docker部署RabbitMq
1、查询rabbitmq镜像
docker search rabbitmq:management
2、拉取rabbitmq镜像
docker pull rabbitmq:management
3、创建并启动容器
创建和启动(同时设置用户和密码)
docker run -d --hostname rabbit --name rabbitmq --restart always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -v /etc/localtime:/etc/localtime:ro -v /usr/local/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
其中:
15672:控制台端口号 Web插件访问端口
5672:应用访问端口号 程序访问端口
控制台端口用于管理rabbitmq,应用访问端口号为应用程序访问
4、查看rabbitmq运行状况
docker logs rabbit
5、访问
http://ip:15672
6、登录
默认账户名:guest
密码:guest
启动修改过账号: admin admin
RabbitMq常见工作模式
1.简介
最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件。本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结。RabbitMQ常用的工作模式有:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式。本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html。
2.简单队列模式(Simple Queue)
【模型图】
只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)
上图中,“P”是我们的生产者,“C”是我们的消费者。
【获取MQ连接对象工具类】
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MQConnecitonUtils {
private static final String RABBITMQ_HOST = "127.0.0.1";
private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_VHOST = "/";
private static final String RABBITMQ_USERNAME = "guest";
private static final String RABBITMQ_PASSWORD = "guest";
public static Connection getConnection() {
//定义MQ连接对象
Connection connection = null;
//创建MQ连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置MQ主机名称
connectionFactory.setHost(RABBITMQ_HOST);
// 设置MQ AMQP端口号
connectionFactory.setPort(RABBITMQ_PORT);
// 设置MQ 连接的virtual host
connectionFactory.setVirtualHost(RABBITMQ_VHOST);
// 设置MQ 用户名称
connectionFactory.setUsername(RABBITMQ_USERNAME);
// 设置MQ 用户密码
connectionFactory.setPassword(RABBITMQ_PASSWORD);
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
//返回连接对象
return connection;
}
}
【生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Date;
public class ProducerTest01 {
private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
private static final String SIMPLE_QUEUE_MESSAGE = "Hello World!"+new Date();
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
//创建通道
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
//发送消息到队列MQ_SIMPLE_QUEUE
//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消费者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest001 {
private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("receive message: " + message);
}
};
//监听消息队列
channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.工作队列模式(Work Queues)
【模型图】
多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。工作队列有轮训分发和公平分发两种模式。
下面先说说**轮训分发(round-robin)**方式:
【消息生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Date;
/**
* @description
* 说明:
* 消费者1与消费者2处理的消息是均分的,而且消息是轮训分发的(轮训分发 round-robin)
*/
public class ProducerTest02 {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> "+new Date();
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//发送10条消息到工作队列
for (int i = 1; i <= 10; i++) {
StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);
//发送消息
channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消息消费者A】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest02_A {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel = null;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer-A】receive message: " + message);
try {
//模拟延迟
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【消息消费者B】
package com.bruceliu.consumer003;
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ConsumerTest02_B {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
Channel channel = null;
try {
//创建消息通道对象
channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer-B】receive message: " + message);
try {
//模拟延迟
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【运行结果】
由上面图可见,消费者1和消费者2处理的消息是均分的(消费的消息条数一样),而且消息是轮询分发的,也就是说同一个消息只能被一个消费者消费。上面的消费者1和消费者2处理消息的效率不同,但是最后接收到的消息还是一样多,如果需要让工作效率高的消费者消费更多的消息,那么可以使用公平分发,下面介绍一下工作队列的公平分发模式(能者多劳)。
【生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @description 工作队列 - 消息生产者 (公平分发方式Fair dispatch)
* 说明:
* 1. 生产者、消费者指定:channel.basicQos(1);
* 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
* 3. 消费者必须关闭自动应答:autoAck = false;
* 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;
*/
public class ProducerTest03 {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建Queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者(同一时刻服务器只会发送一条消息给消费者),消费者端发送了ack后才会接收下一个消息。
channel.basicQos(1);
//发送10条消息到工作队列
for (int i = 1; i <= 10; i++) {
StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);
//发送消息
channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消费者A】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest03_A {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer-A】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
boolean autoAck = false;
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【消费者B】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest03_B {
private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//声明queue队列
channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body,"UTF-8");
System.out.println(("【CustomConsumer-B】receive message: " + message));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【运行结果】
由此可见,消费者2的效率相对较高,所以消费者2消费消息比消费者1多一些,这样就可以充分发挥消费者处理消息的能力。
注意点:
1. 生产者、消费者指定:channel.basicQos(1);
2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
3. 消费者必须关闭自动应答:autoAck = false;
4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;
4.发布-订阅模式(Publish/Subscribe)
【模型图】
生产者将消息发送到交换器,然后交换器绑定到多个队列,监听该队列的所有消费者消费消息。
【生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* 说明:可实现一条消息被多个消费者消费
* a. 一个生产者,多个消费者;
* b. 每一个消费者都有自己的消息队列;
* c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
* d. 每个队列都需要绑定到交换机上;
* e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
*/
public class ProducerTest04 {
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
//类型:分发 广播
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象publish_subscribe_exchange_fanout
channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!";
channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消费者A】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest04_A {
private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【消费者B】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ConsumerTest04_B {
private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";
private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上
channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
a. 一个生产者,多个消费者;
b. 每一个消费者都有自己的消息队列,分别绑定到不同的队列上;
c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
d. 每个队列都需要绑定到交换机上;
e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
f. 如果消息发送到没有队列绑定的交换器时,消息将会丢失,因为交换器没有存储消息的能力,只有队列才有存储消息的能力;
5.路由模式(Routing)
【模型图】
生产者将消息发送到direct交换器,它会把消息路由到那些binding key与routing key完全匹配的Queue中,这样就能实现消费者有选择性地去消费消息。
【生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @description 说明:生产者发送消息的时候指定routing key,然后消费者绑定队列的时候也指定一些binding key,只有binding key与routing key一致的消费者才能接收到此消息
*/
public class ProducerTest05 {
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//交换机类型:direct
private static final String EXCHANGE_TYPE = "direct";
private static final String EXCHANGE_ROUTE_KEY = "info";
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!!";
//指定routing key为info
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消费者A】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer05_A {
private static final String QUEUE_NAME = "routing_direct_queue_name";
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "error";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【消费者B】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer05_B {
private static final String QUEUE_NAME = "routing_direct_queue_name02";
private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
//binding key
private static final String EXCHANGE_ROUTE_KEY01 = "error";
private static final String EXCHANGE_ROUTE_KEY02 = "info";
private static final String EXCHANGE_ROUTE_KEY03 = "warning";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【运行结果】
因为生产者发布消息的时候指定了routing key为info, 消费者绑定队列的时候指定的binding key 为error,显然消费者1接收不到此消息,因为消费者2绑定队列的时候指定了binding key为error、info、warning,所以消费者2能够成功接收该消息进行消费。
6.主题(Topic)模式
【模型图】
类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。
【生产者】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* 说明:
* #: 代表一个或者多个
* *: 代表一个
* 举例:
* 比如发送消息的时候指定了routing key为news.insert,
* 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;
*
*/
public class ProducerTest06 {
private static final String EXCHANGE_NAME = "exchange_topic";
//交换机类型:topic 类似正则匹配模式
private static final String EXCHANGE_TYPE = "topic";
//指定routing key
private static final String EXCHANGE_ROUTE_KEY = "news.insert";
public static void main(String[] args) {
//获取MQ连接
Connection connection = MQConnecitonUtils.getConnection();
//从连接中获取Channel通道对象
Channel channel = null;
try {
channel = connection.createChannel();
//创建交换机对象
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//发送消息到交换机exchange上
String msg = "hello world!!!";
channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【消费者A】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer06_A {
private static final String QUEUE_NAME = "topic_queue_name1";
private static final String EXCHANGE_NAME = "exchange_topic";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "news.insert";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer01】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【消费者B】
import com.bruceliu.utils.MQConnecitonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer06_B {
private static final String QUEUE_NAME = "topic_queue_name2";
private static final String EXCHANGE_NAME = "exchange_topic";
//binding key
private static final String EXCHANGE_ROUTE_KEY = "news.#";
public static void main(String[] args) {
//获取MQ连接对象
Connection connection = MQConnecitonUtils.getConnection();
try {
//创建消息通道对象
final Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机上,并且指定routing_key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
channel.basicQos(1);
//创建消费者对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消息消费者获取消息
String message = new String(body, "UTF-8");
System.out.println("【CustomConsumer02】receive message: " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//消费完一条消息需要自动发送确认消息给MQ
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//使用公平分发必须关闭自动应答
boolean autoAck = false;
//监听消息队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
【运行结果】
生产者发送消息绑定的routing key 为news.insert;消费者1监听的队列和交换器binding key 为news.insert;消费者2监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。
RabbitMQ的常见交换机类型:
https://www.cnblogs.com/linyufeng/p/9885020.html
Springboot整合RabbitMq
1. 使用Direct交换机
1.1 前言
使用SpringBoot对RabbitMQ进行整合,模拟生产者服务器(9000)向向消费者服务器(8088)发送消息的过程,消息生产者通过接受Http请求向消息队列发送消息(Controller层、Service层),接收端则直接监听队列接收消息。这里Demo中通过请求两个不同的接口向不同的队列发送消息,在消费者端将会接收到对应监听队列的消息。
关于RabbitMQ的搭建及搭建中常见的问题参考连接:RabbitMQ搭建及问题
1.2 简介
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据路由键全文匹配去寻找队列
1.3 添加依赖
在pom.xml文件中添加依赖,主要是springboot中web和amqp的starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
1.4 application.yml配置
消息生产者(发送端)配置,端口9000
server:
port: 9000
spring:
application:
name: direct-sender
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
消费者(接收端)配置,端口8088
server:
port: 8088
spring:
application:
name: direct-receiver
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
1.5 消息生产者(发送端)代码
配置SendConfig.java
配置中创建两个队列,分别为direct.queue.1和direct.queue.2,并且分别通过路由键direct.route.key.1和direct.route.key.2进行绑定。使用两个队列进行演示目的是展示交换机通过路由键将消息进行分发
@Configuration
public class SendConfig {
public static final String DIRECT_QUEUE_1 = "direct.queue.1";
public static final String DIRECT_QUEUE_2 = "direct.queue.2";
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_ROUTE_KEY_1 = "direct.route.key.1";
public static final String DIRECT_ROUTE_KEY_2 = "direct.route.key.2";
@Bean
public Queue directQueue1() {
return new Queue(DIRECT_QUEUE_1);
}
@Bean
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE_2);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTE_KEY_1);
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with(DIRECT_ROUTE_KEY_2);
}
}
复制代码
Controller层代码
设置访问路径和参数调用service层
@RestController
@RequestMapping("/direct")
public class SendController {
@Autowired
private DirectSendService directSendService;
@GetMapping("/queue1/{msg}")
public void sendQueue1(@PathVariable String msg) {
directSendService.sendQueue1(msg);
}
@GetMapping("/queue2/{msg}")
public void sendQueue2(@PathVariable String msg) {
directSendService.sendQueue2(msg);
}
}
Service层代码
使用RabbitTemplate的convertAndSend方法进行发送,方法有很多重载方法,选用convertAndSend(String exchange, String routingKey, Object object),指定交换机、路由键和发送的消息
@Service
public class DirectSendServiceImpl implements DirectSendService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendQueue1(String msg) {
System.out.println("发送到队列1:" + msg);
rabbitTemplate.convertAndSend(SendConfig.DIRECT_EXCHANGE, SendConfig.DIRECT_ROUTE_KEY_1, msg);
}
@Override
public void sendQueue2(String msg) {
System.out.println("发送到队列2:" + msg);
rabbitTemplate.convertAndSend(SendConfig.DIRECT_EXCHANGE, SendConfig.DIRECT_ROUTE_KEY_2, msg);
}
}
1.6 消费者(接收端)代码
消息接收端通过@RabbitListener注解监听队列,当队列有消息时自动读取
@Component
public class DirectReceiver {
@RabbitListener(queues = "direct.queue.1")
public void receiveDirect1(String msg) {
System.out.println("接收到direct.queue.1的消息:" + msg);
}
@RabbitListener(queues = "direct.queue.2")
public void receiveDirect2(String msg) {
System.out.println("接收到direct.queue.2的消息:" + msg);
}
}
复制代码
1.7 验证运行结果sss
启动服务器,分别调用发送端接口
http://127.0.0.1:9000/direct/queue1/direct
http://127.0.0.1:9000/direct/queue2/direct
复制代码
调用成功后RabbitMQ管理页面中便会新增两个队列,对列名为SendConfig中设置的名字
此时在消费者端控制台中会显示接收到的数据
此时如果关闭接收端服务器,然后一直调用发送接口,消息会都积累在队列中,在管理页面中显示为ready。
当再次启动接收端时便会以此将消息从队列中读取出来
2. 使用Fanout交换机
2.1 前言
使用SpringBoot对RabbitMQ进行整合,模拟生产者服务器(9000)向两台消费者服务器(8001和8002)发送消息的过程,消息生产者通过接受Http请求向消息队列发送消息(Controller层、Service层),接收端则直接监听队列接收消息。这里Demo中通过请求接口向绑定在交换机上的队列发送消息,在两个消费者服务器均会队列的消息。
关于RabbitMQ的搭建及搭建中常见的问题参考连接:RabbitMQ搭建及问题
2.2 简介
Fanout交换机会忽略路由键,将消息分发到所有绑定到交换机上的队列,即消息广播。
2.3 添加依赖
在pom.xml文件中添加依赖,主要是springboot中web和amqp的starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
复制代码
2.4 application.yml配置
demo模拟一个发送端,两个接收端的情况
消息生产者(发送端)配置,端口9001
server:
port: 9001
spring:
application:
name: direct-sender
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
消费者1(接收端)配置,端口8001
server:
port: 8001
spring:
application:
name: fanout-receiver-1
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
消费者2(接收端)配置,端口8002
server:
port: 8002
spring:
application:
name: fanout-receiver-1
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
2.5 消息生产者(发送端)
SendConfig.java配置信息
配置信息中创建两个队列分别为fanout.queue.1、fanout.queue.2,通过FanoutExchange类创建Fanout交换机。由于Fanout交换机不关心路由键,因此在创建绑定时不用设置路由键。
@Configuration
public class SendConfig {
public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE_1);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_2);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
复制代码
Controller层
@RestController
@RequestMapping("/fanout")
class SendController{
@Autowired
private SendService sendService;
@GetMapping("/send/{msg}")
public void fanoutSend(@PathVariable String msg){
sendService.send(msg);
}
}
复制代码
Service层
Service层中调用convertAndSend方法进行消息发送,由于Fanout不关心路由键,因此第二个参数可以随意设置,在这里选用空字符串
@Service
public class SendServiceImpl implements SendService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(String msg) {
System.out.println("Fanout发送数据:" + msg);
rabbitTemplate.convertAndSend(SendConfig.FANOUT_EXCHANGE, "", msg);
}
}
复制代码
2.6 消费者(接收端)
消费者
@Component
public class FanoutReceiver {
@Value("${server.port}")
private String port;
@RabbitListener(queues = "fanout.queue.1")
public void receiveDirect1(String msg) {
System.out.println(port+"收到fanout的消息:" + msg);
}
}
复制代码
消费者2
@Component
public class FanoutReceiver {
@Value("${server.port}")
private String port;
@RabbitListener(queues = "fanout.queue.2")
public void receiveDirect1(String msg) {
System.out.println(port + "接收到fanout的消息:" + msg);
}
}
复制代码
在这里将配置文件中的端口号设置到变量中,已便于区分消息来源
2.7 验证结果
启动生产者和两个消费者服务器,调用接口发送消息
http://127.0.0.1:9001/fanout/send/fanout
复制代码
在RabbitMQ管理界面中可以看到两个队列已经成功声明 此时,调用一次接口,在两个接收端均可以收到接口发送的消息
3. 使用Topic交换机
3.1 前言
使用SpringBoot对RabbitMQ进行整合,模拟生产者服务器(9002)向消费者服务器(8003)发送消息的过程,消息生产者通过接收 HTTP 请求向消息队列发送消息(Controller层、Service层),接收端则直接监听队列接收消息。这里的Demo分别设置有三个接口,每个接口向不同绑定路由规则的队列发送消息,观察消费者端接收情况。
关于RabbitMQ的搭建及搭建中常见的问题参考连接:RabbitMQ搭建及问题
3.2 简介
Topic交换机与Direct相似,通过与交换机绑定队列的路由键进行消息分发。不同的是Topic可以通过使用通配符( * 和 #)将消息分发到一个或者多个队列当中
通配符 | 说明 | 示例 |
---|---|---|
* | 匹配一个或多个内容 | bigdata. * 可以匹配到 bigdata.spark 或者 bigdata.hadoop.hive 等 |
# | 匹配一个内容 | bigdata.# 只能匹配到 bigdata.spark 或者 bigdata.hadoop |
3.3 添加依赖
在 pom.xml 文件中添加依赖,主要是 SpringBoot中 WEB 依赖 starter 和 amqp 的依赖 starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
复制代码
3.4 application.yml 配置
消息生产者(发送端)配置,端口9002
server:
port: 9002
spring:
application:
name: topic-sender
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
消费者(接收端)配置,端口8003
server:
port: 8003
spring:
application:
name: topic-receiver
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
3.5 消息生产者(发送端)
SendConfig.java配置信息
在配置信息中创建三个队列分别为topic.queue.1、topic.queue.2、topic.queue.3,并且绑定到交换机上的路由键分别为topic.message.rabbit、topic.message.*、topic.#
@Component
public class SendConfig {
public static final String TOPIC_QUEUE_1 = "topic.queue.1";
public static final String TOPIC_QUEUE_2 = "topic.queue.2";
public static final String TOPIC_QUEUE_3 = "topic.queue.3";
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_ROUTING_KEY_1 = "topic.message.rabbit";
public static final String TOPIC_ROUTING_KEY_2 = "topic.message.*";
public static final String TOPIC_ROUTING_KEY_3 = "topic.#";
@Bean
public Queue queue1() {
return new Queue(TOPIC_QUEUE_1);
}
@Bean
public Queue queue2() {
return new Queue(TOPIC_QUEUE_2);
}
@Bean
public Queue queue3() {
return new Queue(TOPIC_QUEUE_3);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);
}
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()).to(topicExchange()).with(TOPIC_ROUTING_KEY_3);
}
}
复制代码
Controller层
demo中有三个接口发送消息,分别发送消息到三个不同的路由键上,用于观察哪些接收端接收到了对应消息
@RestController
@RequestMapping("/topic")
public class SendController {
@Autowired
private TopicService topicService;
@GetMapping("/send1/{msg}")
public void send1(@PathVariable String msg) {
topicService.send1(msg);
}
@GetMapping("/send2/{msg}")
public void send2(@PathVariable String msg) {
topicService.send2(msg);
}
@GetMapping("/send3/{msg}")
public void send3(@PathVariable String msg) {
topicService.send3(msg);
}
}
复制代码
Service层
- send1方法发送信息到路由键topic.message.rabbit
- send2方法发送信息到路由键topic.message.kafka
- send3方法发送信息到路由键topic
@Service
public class TopicServiceImpl implements TopicService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send1(String msg) {
System.out.println("路由键 topic.message.rabbit 发送消息:"+msg);
rabbitTemplate.convertAndSend(SendConfig.TOPIC_EXCHANGE, "topic.message.rabbit", msg);
}
@Override
public void send2(String msg) {
System.out.println("路由键 topic.message.kafka 发送消息:"+msg);
rabbitTemplate.convertAndSend(SendConfig.TOPIC_EXCHANGE, "topic.message.kafka", msg);
}
@Override
public void send3(String msg) {
System.out.println("路由键 topic 发送消息:"+msg);
rabbitTemplate.convertAndSend(SendConfig.TOPIC_EXCHANGE, "topic", msg);
}
}
复制代码
3.6 消费者(接收端)
@Component
public class TopicReceive {
@RabbitListener(queues = "topic.queue.1")
public void receiveTopic1(String msg) {
System.out.println("topic.queue.1 接收到数据:" + msg);
}
@RabbitListener(queues = "topic.queue.2")
public void receiveTopic2(String msg) {
System.out.println("topic.queue.2 接收到数据:" + msg);
}
@RabbitListener(queues = "topic.queue.3")
public void receiveTopic3(String msg) {
System.out.println("topic.queue.3 接收到数据:" + msg);
}
}
3.7 测试运行结果
1.调用接口send1,发送到路由键topic.message.rabbit
http://127.0.0.1:9002/topic/send1/topic
控制台信息: 可以看到三个队列均接收到了信息 2.调用接口send2,发送到路由键topic.message.kafka
http://127.0.0.1:9002/topic/send2/topic
复制代码
只有队列2和队列3接收到数据 3.调用接口send3,发送到路由键topic
http://127.0.0.1:9002/topic/send3/topic
复制代码
4. 使用Headers交换机
4.1 前言
使用SpringBoot对RabbitMQ进行整合,模拟生产者服务器(9003)向消费者服务器(8004)发送消息的过程,消息生产者通过接受Http请求向消息队列发送消息(Controller层、Service层),接收端则直接监听队列接收消息。这里的Demo分别设置两种不同的绑定类型,观察消费者端接收消息的情况。
关于RabbitMQ的搭建及搭建中常见的问题参考连接:RabbitMQ搭建及问题
4.2 简介
头交换机(headers exchange)使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等
4.3 添加依赖
在pom.xml文件中添加依赖,主要是springboot中web和amqp的starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
复制代码
4.4 application.yml配置
消息生产者(发送端),端口9003
server:
port: 9003
spring:
application:
name: header-sender
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
消费者(接收端),端口8004
server:
port: 8004
spring:
application:
name: header-receiver-1
rabbitmq:
username: admin
password: admin
host: 192.168.108.128
port: 5672
复制代码
4.5 消息生产者(发送端)
SendConfig.java配置信息
配置信息中创建两个队列分别为header.queue.1和header.queue.2,在绑定的时候调用方法whereAll和whereAny,whereAll用于匹配所有头信息,whereAny只需要匹配到一条即可。
@Component
public class SendConfig {
public static final String HEADER_QUEUE_1 = "header.queue.1";
public static final String HEADER_QUEUE_2 = "header.queue.2";
public static final String HEADER_EXCHANGE = "header.exchange";
@Bean
public Queue queue1() {
return new Queue(HEADER_QUEUE_1);
}
@Bean
public Queue queue2() {
return new Queue(HEADER_QUEUE_2);
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE);
}
@Bean
public Binding binding1() {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue1");
header.put("bindType", "whereAll");
return BindingBuilder.bind(queue1()).to(headersExchange()).whereAll(header).match();
}
@Bean
public Binding binding2() {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue2");
header.put("bindType", "whereAny");
return BindingBuilder.bind(queue2()).to(headersExchange()).whereAny(header).match();
}
}
复制代码
Controller层
@RestController
@RequestMapping("/header")
public class SendController {
@Autowired
private HeaderService headerService;
@GetMapping("/send1/{msg}")
public void send1(@PathVariable String msg) {
headerService.send1(msg);
}
@GetMapping("/send2/{msg}")
public void send2(@PathVariable String msg) {
headerService.send2(msg);
}
}
复制代码
Service层
与前面不同的是,这里通过使用MessageProperties对象封装头信息,通过Message对象传递消息
@Service
public class HeaderServiceImpl implements HeaderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send1(String msg) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("queue", "queue1");
messageProperties.setHeader("bindType", "whereAll");
Message message = new Message(msg.getBytes(), messageProperties);
System.out.println("发送消息:"+msg);
rabbitTemplate.convertAndSend(SendConfig.HEADER_EXCHANGE, null, message);
}
@Override
public void send2(String msg) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("queue", "queue2");
messageProperties.setHeader("bindType", "whereAny");
Message message = new Message(msg.getBytes(), messageProperties);
System.out.println("发送消息:"+msg);
rabbitTemplate.convertAndSend(SendConfig.HEADER_EXCHANGE, null, message);
}
}
复制代码
4.6 消费者(接收端)
@Component
public class receive {
@RabbitListener(queues = "header.queue.1")
public void receive1(String msg) {
System.out.println("接收到 header.queue.1 发送的消息:" + msg);
}
@RabbitListener(queues = "header.queue.2")
public void receive2(String msg) {
System.out.println("接收到 header.queue.2 发送的消息:" + msg);
}
}
RabbitMq消息幂等性
1.简介
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。
如何避免重复消费的问题?
消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:
-
方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
-
方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。
本文将通过一个示例展示第一种方式避免消息重复消费。
2.消息全局ID
示例使用springboot + mysql, 首先得在mysql中创建一张表,用于记录消息是否被消费记录。
- 数据库创建表语句:
-- ----------------------------
-- Table structure for message_idempotent
-- ----------------------------
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
`message_id` varchar(50) NOT NULL COMMENT '消息ID',
`message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bruceliu.springboot.message</groupId>
<artifactId>springboot_rabbitmq_message_idempotent_consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置文件:
server:
port: 9098
spring:
application:
name: rabbitmq_message_idempotent_producer
rabbitmq:
host: localhost
virtual-host: /
username: guest
password: guest
publisher-confirms: true
port: 5672
datasource:
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rabbitmq_message_idempotent?characterEncoding=utf8
jpa:
database: MySQL
show-sql: true
hibernate:
naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy
enable_lazy_load_no_trans: true
rabbitmq配置类:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private static final String EXCHANGE_NAME = "message_idempotent_exchange";
private static final String QUEUE_NAME = "message_idempotent_queue";
private static final String ROUTE_KEY = "message.#";
/**
* 创建通配符交换机实例
*
* @return 通配符交换机实例
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
* 创建队列实例,并持久化
*
* @return 队列实例
*/
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
/**
* 绑定队列到交换机
*
* @return 绑定对象
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTE_KEY);
}
}
实体类:
@Entity
@Table(name = "message_idempotent")
public class MessageIdempotent implements Serializable {
@Id
@Column(name = "message_id")
private String messageId;
@Column(name = "message_content")
private String messageContent;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getMessageContent() {
return messageContent;
}
public void setMessageContent(String messageContent) {
this.messageContent = messageContent;
}
}
repository JPA接口:
@Repository
public interface MessageIdempotentRepository extends JpaRepository<MessageIdempotent, String> {
}
消息发送者:
@Component
public class MessageIdempotentProducer {
private static final String EXCHANGE_NAME = "message_idempotent_exchange";
private static final String ROUTE_KEY = "message.insert";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMessage() {
//创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("hello,message idempotent!".getBytes(), messageProperties);
System.out.println("生产消息:" + message.toString());
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, message);
}
}
注意:这里通过设置UUID为消息全局ID,当然也可以使用时间戳或者业务标识+UUID都可以,只要保证消息ID唯一即可。
消息消费者:
import com.bruceliu.bean.MessageIdempotent;
import com.bruceliu.mapper.MessageIdempotentRepository;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@Component
public class MessageIdempotentConsumer {
private static final Logger logger = LoggerFactory.getLogger(MessageIdempotentConsumer.class);
@Autowired
private MessageIdempotentRepository messageIdempotentRepository;
@RabbitHandler
//org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
//注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
@RabbitListener(queues = "message_idempotent_queue")
@Transactional
public void handler(Message message, Channel channel) throws IOException {
/**
* 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
*/
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
if (StringUtils.isBlank(messageId)) {
logger.info("获取消费ID为空!");
return;
}
MessageIdempotent messageIdempotent=null;
Optional<MessageIdempotent> list = messageIdempotentRepository.findById(messageId);
if(list.isPresent()){
messageIdempotent=list.get();
}
//System.out.println("messageIdempotent="+messageIdempotent+"--->messageId:"+messageId);
//如果找不到,则进行消费此消息
if (null == messageIdempotent) {
//获取消费内容
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);
//手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//存入到表中,标识该消息已消费
MessageIdempotent idempotent = new MessageIdempotent();
idempotent.setMessageId(messageId);
idempotent.setMessageContent(msg);
messageIdempotentRepository.save(idempotent);
} else {
//如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
logger.error("该消息已消费,无须重复消费!");
}
}
}
需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,并且将消息记录进行入库。
测试用例:
import com.bruceliu.producer.MessageIdempotentProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqMessageIdempotentApplicationTests {
@Autowired
private MessageIdempotentProducer producer;
@Test
public void contextLoads() {
producer.sendMessage();
}
}
运行结果:
Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
2019-10-29 17:12:11.175 INFO 6156 --- [tContainer#0-10] c.b.consumer.MessageIdempotentConsumer : -----获取生产者消息-------------------->messageId:25d4f91c-e408-4129-b5e4-f9204f5bd298,消息内容:hello,message idempotent!
Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
Hibernate: insert into message_idempotent (message_content, message_id) values (?, ?)
可以看到,消息成功被消费者进行 消费,并且将消费记录存到数据表中,用于后面消费的时候进行判断,这样就可以有效避免消息被重复消费的问题。
思路总结:
就是首先我们需要根据消息生成一个全局唯一ID,目的就是为了保障操作是绝对唯一的。将消息全局ID作为数据库表主键,因为主键不可能重复。即在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。
3.总结
以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。
RabbitMq的消息持久化
1.简介
在RabbitMQ中,如果遇到RabbitMQ服务停止或者挂掉,那么我们的消息将会出现丢失的情况,为了在RabbitMQ服务重启的情况下,不丢失消息,我们可以将Exchange(交换机)、Queue(队列)与Message(消息)都设置为可持久化的(durable)。这样的话,能够保证绝大部分的消息不会被丢失,但是还有有一些小概率会发生消息丢失的情况。下面通过一个简单的示例总结在RabbitMQ中如何进行消息持久化。
2.核心API
消息持久化主要是将交换机、队列以及消息设置为durable = true(可持久化的),要点主要有三个:
- 声明交换机Exchange的时候设置 durable=true;
//public TopicExchange(String name, boolean durable, boolean autoDelete)
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME,true,false);
}
- 声明队列Queue的时候设置 durable=true;
//public Queue(String name, boolean durable)
@Bean
public Queue queue() {
//durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
return new Queue(QUEUE_NAME, false);
}
- 发送消息的时候设置消息的 deliveryMode = 2;
使用convertAndSend方式发送消息,消息默认就是持久化的.
new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
3.试验步骤
这里我们声明两种队列,一个是持久化的,一个是非持久化的,我们测试在RabbitMQ服务重启的情况下,未被消费的消息是否还存在,消费者重启之后能否重新消费之前的消息。
RabbitMQ配置类: RabbitMQ配置信息,绑定交换器、队列、路由键设置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
*
* 如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,可以将交换机、队列、消息都进行持久化,这样可以保证绝大部分情况下消息不会丢失。
* 但还是会有小概率发生消息丢失的情况(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),
* 如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。(transaction/confirm机制)
*
* <p>
* 说明:
* 1. 队列持久化:需要在声明队列的时候设置durable=true,如果只对队列进行持久化,那么mq重启之后队列里面的消息不会保存
* 如果需要队列里面的消息也保存下来,那么还需要对消息进行持久化;
* <p>
* 2. 消息持久化:设置消息的deliveryMode = 2,消费者重启之后还能够继续消费持久化之后的消息;
* 使用convertAndSend方式发送消息,消息默认就是持久化的,下面是源码:
* new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
* <p>
* 3.重启mq: CMD命令行下执行 net stop RabbitMQ && net start RabbitMQ
*/
@Component
public class RabbitMQConfig {
private static final String ROUTING_KEY = "user.#";
private static final String DURABLE_QUEUE_NAME = "durable_queue_name";
private static final String DURABLE_EXCHANGE_NAME = "durable_exchange_name";
private static final String QUEUE_NAME = "not_durable_queue_name";
private static final String EXCHANGE_NAME = "not_durable_exchange_name";
@Bean
public Queue durableQueue() {
//public Queue(String name, boolean durable)
//durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
return new Queue(DURABLE_QUEUE_NAME, true);
}
@Bean
public TopicExchange durableExchange() {
//声明交换机的时候默认也是持久化的
return new TopicExchange(DURABLE_EXCHANGE_NAME,true,false);
}
@Bean
public Binding durableBinding() {
return BindingBuilder.bind(durableQueue()).to(durableExchange()).with(ROUTING_KEY);
}
@Bean
public Queue queue() {
//public Queue(String name, boolean durable)
//durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
return new Queue(QUEUE_NAME, false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME,true,false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);
}
}
生产者:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String DURABLE_EXCHANGE_NAME = "not_durable_exchange_name";
private static final String ROUTING_KEY = "user.add";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 1; i <= 3; i++) {
String message = "消息" + i;
logger.info("【生产者】发送消息:" + message);
rabbitTemplate.convertAndSend(DURABLE_EXCHANGE_NAME, ROUTING_KEY, message, new CorrelationData(UUID.randomUUID().toString()));
}
}
}
消费者:
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "not_durable_queue_name")
public void receiveMessage(String msg, Message message, Channel channel) {
logger.info("【Consumer receiveMessage】接收到消息为:[{}]", msg);
}
}
应用配置文件
server:
port: 9999
spring:
application:
name: mq-message-persistence
rabbitmq:
host: 127.0.0.1
virtual-host: /
username: guest
password: guest
port: 5672
connection-timeout: 10000
测试用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbooyRabbitmqMessagePersistenceApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
producer.sendMessage();
}
}
运行结果
为了测试在MQ重启之后,消费者能够消费持久化之后的消息,这里可以先将消费者监听队列暂时注释掉,让生产者发送三条消息,但是没有消费者去消费,这样MQ重启之后,队列中还是存在三条消息。
启动项目,通过管理控制台可以看到成功创建两个交换机以及两个队列。
这时候,我们重启RabbitMQ服务,检查一下重启之后队列、消息是否还存在。
同理,交换机也类似。
这时候放开之前注释的消费者代码块,重启项目,通过控制台可以看到消费者成功消费了之前持久化的三条消息,由此证明了在MQ重启之后,消费者可以继续消费之前的消息
2019-10-28 23:19:20.968 INFO 16636 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer receiveMessage】接收到消息为:[消息1]
2019-10-28 23:19:20.969 INFO 16636 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer receiveMessage】接收到消息为:[消息2]
2019-10-28 23:19:20.969 INFO 16636 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer receiveMessage】接收到消息为:[消息3]
4.关于默认持久化的说明
我们使用new Queue() 创建队列的时候,默认就是持久化的,即durable=true,下面是部分源码:
创建交换机的时候,也是默认持久化交换机的,下面是构造器源码:
使用convertAndSend方式发送消息,消息默认就是持久化的,下面是源码:
public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException {
this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
}
protected Message convertMessageIfNecessary(Object object) {
return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;
static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = 0;
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
case NON_PERSISTENT:
return 1;
case PERSISTENT:
return 2;
default:
return -1;
}
}
6.总结
RabbitMQ中实现消息和队列的持久化,虽然不能说百分之百保证消息不会丢失,但是能够保证绝大部分不会丢失。在实际项目中,通常需要对消息进行持久化,因为不可能保证服务器永远不会出现down机情况。
RabbitMq消息确认机制之confirm模式
1.简介
在RabbitMQ中,消息确认主要有生产者发送确认和消费者接收确认:
生产者发送确认:指生产者发送消息后到RabbitMQ服务器,如果RabbitMQ服务器收到消息,则会给我们生产者一个应答,用于告诉生产者该条消息已经成功到达RabbitMQ服务器中。
消费者接收确认:用于确认消费者是否成功消费了该条消息。
消息确认的实现方式主要有两种,一种是通过事务的方式(channel.txSelect()、channel.txCommit()、channel.txRollback()),另外一种是confirm确认机制。因为事务模式比较消耗性能,在实际工作中也用的不多,这里主要介绍通过confirm机制来实现消息的确认,保证消息的准确性。
2.生产者发送确认
在RabbitMQ中实现生产者发送确认的方法(本文使用springboot项目),主要有两点:
配置文件中配置消息发送确认
spring.rabbitmq.publisher-confirms = true
生产者实现 RabbitTemplate.ConfirmCallback接口,重写方法
confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
当然也可以通过注入的方式自定义confirm listener.
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
.........
}
}
3.消费者接收确认
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。确认模式主要分为下面三种:
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
注意:在springboot项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:
手动确认与自动确认的区别:
自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失。
手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况。
手动确认主要使用的方法有下面几个:
public void basicAck(long deliveryTag, boolean multiple):
deliveryTag 表示该消息的index(long类型的数字);
multiple 表示是否批量(true:将一次性ack所有小于deliveryTag的消息);
如果成功消费消息,一般调用下面的代码用于确认消息成功处理完
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
手动拒绝消息:
public void basicNack(long deliveryTag, boolean multiple, boolean requeue):告诉服务器这个消息我拒绝接收,basicNack可以一次性拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);
requeue:指定被拒绝的消息是否重新回到队列;
示例:
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
public void basicReject(long deliveryTag, boolean requeue):也是用于拒绝消息,但是只能拒绝一条消息,不能同时拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
requeue:指定被拒绝的消息是否重新回到队列;
4.示例
下面通过一个示例说明在RabbitMQ中实现发送确认和消费者确认的使用方法。
引入pom.xml依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件application.yml:
server:
port: 6666
spring:
application:
name: mq-message-confirm2
rabbitmq:
host: 127.0.0.1
virtual-host: /vhost
username: guest
password: guest
port: 5672
#消息发送确认回调
publisher-confirms: true
#指定消息确认模式为手动确认
listener:
simple:
acknowledge-mode: manual
#发送返回监听回调
publisher-returns: true
这里需要注意两点:
spring.rabbitmq.publisher-confirms = true
spring.rabbitmq.listener.simple.acknowledge-mode = manual
自定义消息发送、返回回调监听类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
/**
* @description 自定义消息发送确认的回调
* 实现接口:implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
* ConfirmCallback:只确认消息是否正确到达交换机中,不管是否到达交换机,该回调都会执行;
* ReturnCallback:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行;
*/
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger logger = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
*/
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(this);
}
/**
* 消息从交换机成功到达队列,则returnedMessage方法不会执行;
* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
/**
* 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;
* 如果消息正确到达交换机,则该方法中isSendSuccess = true;
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
logger.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());
if (isSendSuccess) {
logger.info("confirm回调方法>>>消息发送到交换机成功!");
} else {
logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);
}
}
}
注意这里我同时也实现了RabbitTemplate.ReturnCallback返回回调接口,并且重写了returnedMessage()方法,返回回调主要指的是:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行returnedMessage()。
RabbitMQ配置信息
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @description RabbitMQ配置信息,绑定交换器、队列、路由键设置
* 说明:
* 1. 声明Exchange交换器;
* 2. 声明Queue队列;
* 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键;
*/
@Component
public class RabbitMQConfig {
private static final String EXCHANGE_NAME = "message_confirm_exchange";
private static final String QUEUE_NAME = "message_confirm_queue";
private static final String ROUTING_KEY = "user.#";
@Bean
private TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
private Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
private Binding bindingDirect() {
return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);
}
}
发送者:这里发送了三条消息,如果三条消息中某条消息已经被拒绝过一次,那么触发basicNack()重新回到队列中,如果该消息再次被拒绝,那么消费者将会调用basicReject()直接拒绝该条消息,以后也不会再次接收该消息。
package com.bruceliu.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String EXCHANGE_NAME = "message_confirm_exchange";
private static final String ROUTING_KEY = "user.";
@Autowired
public RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 1; i <= 3; i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
logger.info("【Producer】发送的消费ID = {}", correlationData.getId());
String msg = "hello confirm message" + i;
logger.info("【Producer】发送的消息 = {}", msg);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg, correlationData);
}
}
}
消费者1:这里模拟了在处理消息的时候触发一个空指针异常,用于触发拒绝某个消息。
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer01 {
private static final Logger logger = LoggerFactory.getLogger(Consumer01.class);
@RabbitListener(queues = "message_confirm_queue")
public void receiveMessage01(String msg, Channel channel, Message message) throws IOException {
try {
// 这里模拟一个空指针异常,
String string = null;
string.length();
logger.info("【Consumer01成功接收到消息】>>> {}", msg);
// 确认收到消息,只确认当前消费者的一个消息收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
logger.info("【Consumer01】消息已经回滚过,拒绝接收消息 : {}", msg);
// 拒绝消息,并且不再重新进入队列
//public void basicReject(long deliveryTag, boolean requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.info("【Consumer01】消息即将返回队列重新处理 :{}", msg);
//设置消息重新回到队列处理
// requeue表示是否重新回到队列,true重新入队
//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
}
消费者2:该消费者为正常消费消息。
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Consumer02 {
private static final Logger logger = LoggerFactory.getLogger(Consumer02.class);
@RabbitListener(queues = "message_confirm_queue")
public void receiveMessage02(String msg, Channel channel, Message message) throws IOException {
try {
logger.info("【Consumer02成功接收到消息】>>> {}", msg);
// 确认收到消息,只确认当前消费者的一个消息收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
logger.info("【Consumer02】消息已经回滚过,拒绝接收消息 : {}", msg);
// 拒绝消息,并且不再重新进入队列
//public void basicReject(long deliveryTag, boolean requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.info("【Consumer02】消息即将返回队列重新处理 :{}", msg);
//设置消息重新回到队列处理
// requeue表示是否重新回到队列,true重新入队
//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
}
启动项目,查看运行结果(运行结果不固定)
2019-10-28 12:05:06.023 INFO 6672 --- [ntContainer#1-1] com.bruceliu.consumer.Consumer02 : 【Consumer02成功接收到消息】>>> hello confirm message2
2019-10-28 12:05:06.023 INFO 6672 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer01 : 【Consumer01】消息即将返回队列重新处理 :hello confirm message1
java.lang.NullPointerException
at com.bruceliu.consumer.Consumer01.receiveMessage01(Consumer01.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$429/509806761.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.lang.Thread.run(Thread.java:745)
2019-10-28 12:05:06.023 java.lang.NullPointerException
INFO 6672 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer01 : 【Consumer01】消息即将返回队列重新处理 :hello confirm message3
at com.bruceliu.consumer.Consumer01.receiveMessage01(Consumer01.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$429/509806761.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.lang.Thread.run(Thread.java:745)
2019-10-28 12:05:06.039 INFO 6672 --- [ntContainer#1-1] com.bruceliu.consumer.Consumer02 : 【Consumer02成功接收到消息】>>> hello confirm message1
2019-10-28 12:05:06.039 INFO 6672 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer01 : 【Consumer01】消息已经回滚过,拒绝接收消息 : hello confirm message3
java.lang.NullPointerException
at com.bruceliu.consumer.Consumer01.receiveMessage01(Consumer01.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$429/509806761.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
at java.lang.Thread.run(Thread.java:745)
由控制台结果可见,hello confirm message3这条消息第一次被consumer1拒绝了一次,执行basicNack重新回到队列,第二次又被判断为该条消息已经回滚过,调用basicReject方法又被拒绝并且禁止重新回到队列,这样该条消息将不会被消费者重新消费。
hello confirm message2这条消息成功被consumer2消费掉,hello confirm message3这条消息第一次也被 consumer1拒绝了,但是在重新回到队列之后,被consumer2成功消费了。
同时可以看到,三条消息都正确从发送者到达交换机,所以都执行了 confirm(CorrelationData correlationData, boolean isSendSuccess, String error)回调方法。
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: b70413cb-c642-47ec-b8e2-a0e290c66de3
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: c9f6da2d-731b-4849-8cba-187a3d7f0ae4
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 28a58376-78e3-4d5d-80da-81663152a292
2019-10-28 12:05:06.023 INFO 19012 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
2019-10-28 12:05:06.023 INFO 19012 --- [ Thread-2] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
因为消息都成功从交换机正确到达队列中,所有监听的returnCallback的returnedMessage()方法并没有被执行。下面我们测试一下,假设指定一个binding key不匹配的。修改下面的路由键,让消息无法从交换机正确路由到队列上:
首先在RabbitMQ管理控制台将之前的user.#绑定键解除绑定:
重新启动项目,查看控制台日志:
2019-10-28 12:13:09.223 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = def5a3bc-d8f9-4dfb-99b5-b57465185451
2019-10-28 12:13:09.223 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message1
2019-10-28 12:13:09.223 INFO 25984 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-10-28 12:13:09.285 INFO 25984 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50cf5a23:0/SimpleConnection@7c551ad4 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 65013]
2019-10-28 12:13:09.316 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = 71878808-41be-4933-853c-99e4339a8611
2019-10-28 12:13:09.316 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message2
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : returnedMessage回调方法>>>hello confirm message1,replyCode:312,replyText:NO_ROUTE,exchange:message_confirm_exchange,routingKey:user456.#
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: def5a3bc-d8f9-4dfb-99b5-b57465185451
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
2019-10-28 12:13:09.316 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = c7c26855-118c-4fb4-99da-2a5700a08776
2019-10-28 12:13:09.316 INFO 25984 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message3
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : returnedMessage回调方法>>>hello confirm message2,replyCode:312,replyText:NO_ROUTE,exchange:message_confirm_exchange,routingKey:user456.#
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 71878808-41be-4933-853c-99e4339a8611
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : returnedMessage回调方法>>>hello confirm message3,replyCode:312,replyText:NO_ROUTE,exchange:message_confirm_exchange,routingKey:user456.#
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: c7c26855-118c-4fb4-99da-2a5700a08776
2019-10-28 12:13:09.316 INFO 25984 --- [ 127.0.0.1:5672] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机成功!
下面我们测试一下消息从发送者未能正确到达交换机的情况,这里主要修改一个不存在的交换机名称,这样消息就不能正确到达消费者监听队列所在的交换机message_confirm_exchange,从而触发confirmCallback中发送失败的情况,error为错误原因。
运行结果
2019-10-28 12:19:05.810 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = c75f5631-7fc4-451f-a32a-72f8926666fa
2019-10-28 12:19:05.826 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message1
2019-10-28 12:19:05.826 INFO 5092 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-10-28 12:19:05.857 INFO 5092 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#50cf5a23:0/SimpleConnection@7c551ad4 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 65190]
2019-10-28 12:19:05.904 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = fb4e1898-3a3b-4caa-8ef1-8d6093987c3d
2019-10-28 12:19:05.904 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message2
2019-10-28 12:19:05.904 ERROR 5092 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)
2019-10-28 12:19:05.904 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消费ID = 937f06c5-16a6-49a9-9fb0-83f0562cd96c
2019-10-28 12:19:05.904 INFO 5092 --- [ main] com.bruceliu.producer.Producer : 【Producer】发送的消息 = hello confirm message3
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: c75f5631-7fc4-451f-a32a-72f8926666fa
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)]
2019-10-28 12:19:05.904 ERROR 5092 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: fb4e1898-3a3b-4caa-8ef1-8d6093987c3d
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)]
2019-10-28 12:19:05.904 ERROR 5092 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 937f06c5-16a6-49a9-9fb0-83f0562cd96c
2019-10-28 12:19:05.904 INFO 5092 --- [nectionFactory1] c.b.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange123456' in vhost '/', class-id=60, method-id=40)]
5.总结
本文主要介绍了RabbitMQ中生产者消息发送确认和消费者接收确认,同时也扩展了监听返回回调returnCallback,在实际项目中,一般都用手动确认的方式,再加上一些补偿措施,这样可以保证绝大部分的消息不会出现丢失的情况。
RabbitMq死信队列和备份交换机
1.引言
死信交换机(Dead-Letter-Exchange):当消息在一个队列中由于过期、被拒绝等原因变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是死信交换机,绑定死信交换机的队列就称之为死信队列。
判断一个消息是否是死信消息(Dead Message)的依据:
a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
b. 消息过期; 消息过期时间设置主要有两种方式:
1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒);
2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的值,单位为 毫秒);
3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;
c. 队列已满(队列满了,无法再添加消息到mq中);
使用方法:申明队列的时候设置 x-dead-letter-exchange 参数
备份交换器(alternate-exchange):未被正确路由的消息将会经过此交换器
使用方法:申明交换器的时候设置 alternate-exchange 参数
2.案例模拟
下面通过一个简单的示例说明在RabbitMQ中死信队列和备份交换机的使用方法。
2.1.示意图
2.2.配置文件
RabbitMQ配置信息,绑定交换器、队列、路由键设置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: RabbitMQ配置信息,绑定交换器、队列、路由键设置
* @author: weishihuai
* @Date: 2019/6/27 15:38
* <p>
* 说明:
* <p>
* 死信交换机(Dead-Letter-Exchange): 当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上
* <p>
* 使用方法:申明队列的时候设置 x-dead-letter-exchange 参数
* <p>
* 判断一个消息是否是死信消息(Dead Message)的依据:
* a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
* b. 消息过期; 消息过期时间设置主要有两种方式:
* 1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒)
* 2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的值,单位为 毫秒)
* 3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;
* c. 队列已满(队列满了,无法再添加数据到mq中);
* <p>
* 备份交换器(alternate-exchange):未被正确路由的消息将会经过此交换器
* 使用方法:申明交换器的时候设置 alternate-exchange 参数
*/
/**
* @author bruceliu
* @create 2019-11-03 17:41
* @description
*/
@Component
public class RabbitMQConfig {
//没有被正确路由的消息 备份队列
private static final String MESSAGE_BAK_QUEUE_NAME = "un_routing_queue_name";
//没有被正确路由的消息 备份交换机
private static final String MESSAGE_BAK_EXCHANGE_NAME = "un_routing_exchange_name";
//死信队列
private static final String DEAD_LETTERS_QUEUE_NAME = "dead_letters_queue_name";
//死信交换机
private static final String DEAD_LETTERS_EXCHANGE_NAME = "dead_letters_exchange_name";
//目标队列
private static final String QUEUE_NAME = "test_dlx_queue_name";
//目标消息交换机
private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
//ROUTING_KEY
private static final String ROUTING_KEY = "user.add";
/**
* 声明备份队列、备份交换机、绑定队列到备份交换机
* 建议使用FanoutExchange广播式交换机
*/
@Bean
public Queue msgBakQueue() {
return new Queue(MESSAGE_BAK_QUEUE_NAME);
}
@Bean
public FanoutExchange msgBakExchange() {
return new FanoutExchange(MESSAGE_BAK_EXCHANGE_NAME);
}
@Bean
public Binding msgBakBinding() {
return BindingBuilder.bind(msgBakQueue()).to(msgBakExchange());
}
/**
* 声明死信队列、死信交换机、绑定队列到死信交换机
* 建议使用FanoutExchange广播式交换机
*/
@Bean
public Queue deadLettersQueue() {
return new Queue(DEAD_LETTERS_QUEUE_NAME);
}
@Bean
public FanoutExchange deadLettersExchange() {
return new FanoutExchange(DEAD_LETTERS_EXCHANGE_NAME);
}
@Bean
public Binding deadLettersBinding() {
return BindingBuilder.bind(deadLettersQueue()).to(deadLettersExchange());
}
/**
* 声明普通队列,并指定相应的备份交换机、死信交换机
*/
@Bean
public Queue queue() {
Map<String, Object> arguments = new HashMap<>(10);
//指定死信发送的Exchange
arguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME);
return new Queue(QUEUE_NAME, true, false, false, arguments);
}
@Bean
public Exchange exchange() {
Map<String, Object> arguments = new HashMap<>(10);
//声明备份交换机
arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME);
return new DirectExchange(EXCHANGE_NAME, true, false, arguments);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
}
}
2.3.生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
private static final String ROUTING_KEY = "user.add";
private static final String UN_ROUTING_KEY = "user.delete";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
// 发送10条能够正确被路由的消息
for (int i = 1; i <= 10; i++) {
String message = "发送第" + i + "条消息.";
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, new CorrelationData(UUID.randomUUID().toString()));
logger.info("【发送了一条能够正确被路由的消息】,exchange:[{}],routingKey:[{}],message:[{}]", EXCHANGE_NAME, ROUTING_KEY, message);
}
// 发送两条不能正确被路由的消息,该消息将会被转发到我们指定的备份交换器中
for (int i = 1; i <= 2; i++) {
String message = "不能正确被路由的消息" + i;
rabbitTemplate.convertAndSend(EXCHANGE_NAME, UN_ROUTING_KEY, message, new CorrelationData(UUID.randomUUID().toString()));
logger.info("【发送了第一条不能正确被路由的消息】,exchange:[{}],routingKey:[{}],message:[{}]", EXCHANGE_NAME, UN_ROUTING_KEY, message);
}
}
}
2.4.回调
自定义消息发送确认的回调
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(CustomConfirmCallback.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
*/
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
logger.info("(start)生产者消息确认=========================");
if (!isSendSuccess) {
logger.info("消息可能未到达rabbitmq服务器");
}
logger.info("(end)生产者消息确认=========================");
}
}
2.5 消费者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Random;
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitListener(queues = "test_dlx_queue_name")
public void receiveMessage(String receiveMessage, Message message, Channel channel) {
try {
logger.info("【Consumer】接收到消息:[{}]", receiveMessage);
//这里模拟随机拒绝一些消息到死信队列中
if (new Random().nextInt(10) < 5) {
logger.info("【Consumer】拒绝一条信息:[{}],该消息将会被转发到死信交换器中", receiveMessage);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
logger.info("【Consumer】接消息后的处理发生异常", e);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e1) {
logger.error("手动确认消息异常.", e1);
}
}
}
}
2.6.配置文件
server:
port: 7777
spring:
application:
name: mq-dead-letter-exchange-producer
rabbitmq:
host: 127.0.0.1
virtual-host: /
username: guest
password: guest
port: 5672
#消息发送确认回调
publisher-confirms: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true
prefetch: 1
auto-startup: true
default-requeue-rejected: false
# publisher-returns: true
template:
#当mandatory设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;
#当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;
#通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
mandatory: true
connection-timeout: 10000
2.7.测试用例
package com.bruceliu.test;
import com.bruceliu.producer.Producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author bruceliu
* @create 2019-11-03 17:47
* @description
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDeadLetterExchangeApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
producer.sendMessage();
}
}
2.8.运行结果
生产者:
2019-11-03 17:59:46.026 INFO 33100 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2019-11-03 17:59:46.073 INFO 33100 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6134ac4a:0/SimpleConnection@255e5e2e [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54334]
2019-11-03 17:59:46.120 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第1条消息.]
2019-11-03 17:59:46.120 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第2条消息.]
2019-11-03 17:59:46.120 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第3条消息.]
2019-11-03 17:59:46.120 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第4条消息.]
2019-11-03 17:59:46.120 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第5条消息.]
2019-11-03 17:59:46.136 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第6条消息.]
2019-11-03 17:59:46.136 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第7条消息.]
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第8条消息.]
2019-11-03 17:59:46.151 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第9条消息.]
2019-11-03 17:59:46.151 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了一条能够正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.add],message:[发送第10条消息.]
2019-11-03 17:59:46.151 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了第一条不能正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.delete],message:[不能正确被路由的消息1]
2019-11-03 17:59:46.151 INFO 33100 --- [ main] com.bruceliu.producer.Producer : 【发送了第一条不能正确被路由的消息】,exchange:[test_dlx_exchange_name],routingKey:[user.delete],message:[不能正确被路由的消息2]
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.151 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.167 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.167 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.167 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (start)生产者消息确认=========================
2019-11-03 17:59:46.167 INFO 33100 --- [ 127.0.0.1:5672] c.b.callback.CustomConfirmCallback : (end)生产者消息确认=========================
2019-11-03 17:59:46.167 INFO 33100 --- [ Thread-2] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
消费者:
2019-11-03 17:59:46.151 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第1条消息.]
2019-11-03 17:59:46.151 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第1条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.151 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第2条消息.]
2019-11-03 17:59:46.151 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第2条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.151 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第3条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第4条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第4条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第5条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第5条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第6条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第6条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第7条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第7条消息.],该消息将会被转发到死信交换器中
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第8条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第9条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】接收到消息:[发送第10条消息.]
2019-11-03 17:59:46.167 INFO 32904 --- [ntContainer#0-1] com.bruceliu.consumer.Consumer : 【Consumer】拒绝一条信息:[发送第10条消息.],该消息将会被转发到死信交换器中
由上图可见,生产者发送了12条消息,其中有两条消息不能被正确路由到队列的,那么这两条消息应该被转发到备份交换机上所绑定的队列上面;其中也有7条消息被拒绝了,那么将会被转发到死信交换机对应的死信队列中,可以观察MQ管理控制台:
arguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME); //指定死信发送的Exchange
arguments.put("alternate-exchange", MESSAGE_BAK_EXCHANGE_NAME); //声明备份交换机
3.应用场景
订单30分钟自动失效???
实际工作中,死信队列可以应用在许多场景中,例如常见的过期未支付订单自动取消就可以通过 (死信队列 + 过期时间)来实现,就是当有一个队列 queue1,其 对应的死信交换机 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,
当队列 queue1 中有一条消息因过期(假设30分钟未支付就取消订单)或者其他原因成为死信的消息,消息就会被转发到死信队列上面,然后我们可以通过监听死信队列中的消息,同时可以加上判断订单的状态是否已经支付,如果已经支付那么不处理,如果未支付,那么可以更新订单状态为已取消。(也就相当于消费的是因过期产生的死信订单信息)。
对比未使用消息队列的时候的解决方案:
设置一个定时器,每秒轮询数据库查找超出过期时间且未支付的订单,然后修改状态,但是这种方式会占用很多资源
相比较而言,使用消息队列可以减少对数据库的压力,在高流量的情况下可以提高系统的响应速度。
4.总结
思路总结:
声明队列的时候指定参数 “ x-dead-letter-exchange ” 对应 死信路由器(dlx_exchange);
发送消息的时候指定消息的过期时间为30分钟(或者其他时间),这样消息超过30分钟未消费就变为死信消息;
生产者发送消息到交换机 ,等待30分钟后,会去绑定的死信路由(dlx_exchange),然后被转发到死信队列;
然后我们通过监听死信队列的消息,查询该订单是否支付,如果没有支付,则取消该订单;
RabbitMq综合总结
RabbitMQ基础
1.RabbitMQ简单介绍
RabbitMQ是一个开源消息代理软件(有时称为面向消息的中间件),它实现了高级消息队列协议(AMQP)。RabbitMQ服务器使用Erlang编程语言编写,构建在Open Telecom Platform框架上,用于集群和故障转移。
2.为什么要使用rabbitmq,使用rabbitmq的场景
1.在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
2.拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
3.实现消费者和生产者之间的解耦。
4.对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
5.可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
3.如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
消息确认的实现方式主要有两种,一种是通过事务的方式(channel.txSelect()、channel.txCommit()、channel.txRollback()),另外一种是confirm确认机制。因为事务模式比较消耗性能,在实际工作中也用的不多,这里主要介绍通过confirm机制来实现消息的确认,保证消息的准确性。
消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
配置文件中配置消息发送确认
spring.rabbitmq.publisher-confirms = true
生产者实现 RabbitTemplate.ConfirmCallback接口,重写方法
confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
当然也可以通过注入的方式自定义confirm listener.
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
//做一些补偿性的措施!人肉补偿!
.........
}
}
第消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。确认模式主要分为下面三种:
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
注意:在springboot项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:
手动确认与自动确认的区别:
自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失。
手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况。如果有少量丢失还是需要做补偿的机制!
手动确认主要使用的方法有下面几个:
public void basicAck(long deliveryTag, boolean multiple):
deliveryTag 表示该消息的index(long类型的数字);
multiple 表示是否批量(true:将一次性ack所有小于deliveryTag的消息);
如果成功消费消息,一般调用下面的代码用于确认消息成功处理完
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
手动拒绝消息:
public void basicNack(long deliveryTag, boolean multiple, boolean requeue):告诉服务器这个消息我拒绝接收,basicNack可以一次性拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);
requeue:指定被拒绝的消息是否重新回到队列;
示例:
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
public void basicReject(long deliveryTag, boolean requeue):也是用于拒绝消息,但是只能拒绝一条消息,不能同时拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
requeue:指定被拒绝的消息是否重新回到队列;
4.如何避免消息重复投递或重复消费?
就是首先我们需要根据消息生成一个全局唯一ID,目的就是为了保障操作是绝对唯一的。将消息全局ID作为数据库表主键,因为主键不可能重复。即在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。
5.消息怎么路由?
消息提供方->路由->一至多个队列
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为以下三种:
fanout:如果交换器收到消息,将会广播到所有绑定的队列上
direct:如果路由键完全匹配,消息就被投递到相应的队列
topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符
6.如何判断一个消息是死信消息
a. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
b. 消息过期; 消息过期时间设置主要有两种方式:
1.设置队列的过期时间,这样该队列中所有的消息都存在相同的过期时间(在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒);
2.单独设置某个消息的过期时间,每条消息的过期时间都不一样;(设置消息属性的 expiration 参数的值,单位为 毫秒);
3.如果同时使用了两种方式设置过期时间,以两者之间较小的那个数值为准;
c. 队列已满(队列满了,无法再添加消息到mq中);
7.消息基于什么传输?
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
8.消息如何分发?
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。
通过路由可实现多消费的功能
9.如何确保消息不丢失?
消息持久化,当然前提是队列必须持久化
RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。
一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。
10.mq的缺点
系统可用性降低
系统引入的外部依赖越多,越容易挂掉,本来你就是A系统调用BCD三个系统的接口就好了,人ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整?MQ挂了,整套系统崩溃了,你不就完了么。
系统复杂性提高:
硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已
一致性问题:
A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,最好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了10倍。但是关键时刻,用,还是得用的.
11.如何保证RabbitMQ消息的顺序性?
答:单线程消费保证消息的顺序性;对消息进行编号,消费者处理消息是根据编号处理消息;
12.死信队列+消息过期完成未支付订单自动取消
其实这种场景可以使用死信队列来做,就是用户提交订单之后,发送一条消息并且设置消息过期时间为半个小时(或其他时间),如果超过设置的这个时间,那么消息自动变成死信,就会被转发到死信队列中,这时候我们可以监听死信队列中的消息,然后查询一下订单的状态,如果还是未支付的话,那么更新订单的状态为已取消。
13.如何解决丢数据的问题?
生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
消费者丢数据
启用手动确认模式可以解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
14.死信队列&死信交换器
DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。
15、vhost 是什么?起什么作用?
vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
16、RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,还是对应着进程实体?分别起什么作用?
queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。
17、RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?
答:可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。
18、向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行consume 动作会发生什么?
答:都会收到 Channel.Close 信令告之不存在(内含原因 404 NOT_FOUND)。
19、routing_key 和 binding_key 的最大长度是多少
答:255 字节。
20、“dead letter”queue 的用途?
答:当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject
进行了拒绝时(同时设置 requeue=false),那么该消息会被放入“dead letter”queue 中。
该 queue 可用于排查 message 被 reject 或 undeliver 的原因。
21、为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有durable 属性,同时 message 具有 persistent 属性才行?
答:binding 关系可以表示为 exchange – binding – queue 。从文档中我们知道,若要求
投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,要求 exchange
和 queue 都设置 durable 属性。其实这问题可以这么想,若 exchange 或 queue 未设置
durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属
性,仍然存在 message 虽然能恢复但却无处容身的问题;同理,若 message 本身未设置
persistent 属性,则 message 的持久化更无从谈起。
22、Basic.Reject 的用法是什么?
答:该信令可用于 consumer 对收到的 message 进行 reject 。若在该信令中设置
requeue=true,则当 RabbitMQ server 收到该拒绝信令后,会将该 message 重新发送到下
一个处于 consume 状态的 consumer 处(理论上仍可能将该消息发送给当前
consumer)。若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直接将该
message 从 queue 中移除。
另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获取到的
message 做任何处理。
而 Basic.Nack 是对 Basic.Reject 的扩展,以支持一次拒绝多条 message 的能力。
23、RabbitMQ 的高可用性如何保证?
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式
单机模式不存在高可用。
普通集群模式也不存在高可用性,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。但是你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上 拉取数据过来。这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实 例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
镜像集群模式的策略是高可用策略,指定的时候可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的 节点上去了。
消息幂等性
1.简介
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。
如何避免重复消费的问题?
消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:
-
方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
-
方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。
2.总结
以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。
RabbitMq交换机类型
最新版本的RabbitMQ交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange。
1.Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
2.Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
3.Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
RabbitMq高可用
单个的 RabbitMQ 肯定无法实现高可用,要想高可用,还得上集群。
在正式的聊聊集群的原理之前,我们先简单了解下RabbitMQ的优缺点,然后为什么要使用RabbitMQ的集群模式,他能为我们解决那些问题,各种不同的集群模式有什么区别,又有什么优缺点;
1.RabbitMQ的优缺点
RabbitMQ的优点,其实已经介绍过了,无外乎三大点:解耦、削峰、异步通讯等,我们这里简单提下,但是相对于优点来说,RabbitMQ同样也有缺点:
- 增加了系统应用的复杂性
- RabbitMQ服务如果出现问题,则整个服务将会瘫痪
- 新增了很多异常情况的处理, 比如消息丢失、消息消费失败, 消息重复消费等
- RabbitMQ服务在高并发情况下很容易出现性能瓶颈,进而影响整个系统的运行
其中1和3的问题都可以通过代码和配置来解决,但是问题2和4就不好解决了,为了解决上面的问题,就出现了RabbitMQ服务的集群
2 .RabbitMQ两种集群
2.1.普通集群
什么是普通集群呢? 就是在多个联通的服务器上安装不同的RabbitMQ的服务,这些服务器上的RabbitMQ服务组成一个个节点,通过RabbitMQ内部提供的命令或者配置来构建集群,形成了RabbitMQ的普通集群模式.
- 当用户向服务注册一个队列,该队列会随机保存到某一个服务节点上,然后将对应的元数据同步到各个不同的服务节点上
- RabbitMQ的普通集群模式中,每个RabbitMQ都保存有相同的元数据
用户只需要链接到任一一个服务节点中,就可以监听消费到对应队列上的消息数据 - 但是RabbitMQ的实际数据却不是保存在每个RabbitMQ的服务节点中,这就意味着用户可能联系的是RabbitMQ服务节点C,但是C上并没有对应的实际数据,也就是说RabbitMQ服务节点C,并不能提供消息供用户来消费,那么RabbitMQ的普通集群模式如何解决这个问题呢?
- RabbitMQ服务节点C发现自己本服务节点并没有对应的实际数据后,因为每个服务节点上都会保存相同的元数据,所以服务节点C会根据元数据,向服务节点B(该服务节点上有实际数据可供消费)请求实际数据,然后提供给用户进行消费
- 这样给用户的感觉就是,在RabbitMQ的普通集群模式中,用户连接任一服务节点都可以消费到消息
- 普通集群模式的优点:提高消费的吞吐量
普通集群模式的原理比较简单,但是并不能真正意义上的实现高可用,他也存在以下的以下缺点:
-
为了请求RabbitMQ的实际数据以提供给用户,可能会在RabbitMQ内部服务节点之间进行频繁的进行数据交互,这样的交互比较耗费资源
-
当其中一个RabbitMQ的服务节点宕机了,那么该节点上的实际数据就会丢失,用户再次请求时,就会请求不到数据,系统的功能就会出现异常
普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
那么该怎么解决上述的问题呢?
2.2.镜像集群
为了解决上面普通模式的两个显著的缺点,RabbitMQ官方提供另外一种集群模式:镜像集群模式
- 生产者向任一服务节点注册队列,该队列相关信息会同步到其他节点上
- 任一消费者向任一节点请求消费,可以直接获取到消费的消息,因为每个节点上都有相同的实际数据
- 任一节点宕机,不影响消息在其他节点上进行消费
镜像集群模式是怎么开启的呢?这里简单说下,在普通集群模式的基础上,我们可以通过web控制端来配置数据的同步策略,可以配置同步所有的节点,也可以配置同步到指定数量的服务节点
虽然镜像集群模式能够解决普通集群模式的缺点,当任一节点宕机了,不能正常提供服务了,也不影响该消息的正常消费,但是其本身也有相应的缺点:
- 性能开销非常大,因为要同步消息到对应的节点,这个会造成网络之间的数据量的频繁交互,对于网络带宽的消耗和压力都是比较重的
- 没有扩展可言,rabbitMQ是集群,不是分布式的,所以当某个Queue负载过重,我们并不能通过新增节点来缓解压力,因为所以节点上的数据都是相同的,这样就没办法进行扩展了.
它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
总结
订单超时支付图解
RabbitMQ在项目中运用场景
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/94154.html