文章目录
1. 引言
面对日益复杂的信息平台,消息队列使用是解决什么问题呢?初步总结一下可以解决如下场景问题:
-
业务解耦:
A系统
需要耦合B、C、D系统
,在消息队列之前可以通过共享数据、接口调用等方式来实现业务,现在可以通过消息中间件进行解耦。 -
削峰填谷:在互联网经常会出现流量突然飙升的情况,以前很多时候就是通过性能优化、加服务器等方式,可以通过消息中间件缓存相关任务,然后按计划的进行处理。
-
异步:可以通过消息推送及短信发送进行说明,业务平台并不关注具体消息的发送细则,完全可以通过消息队列的方式,直接下发任务,由任务消费者进行处理。
本文将通过Rabbit MQ
、Spring Boot
集成使用来进行分享。
2. 安装rabbitMq
如何安装rabbitMq以及解决在安装的过程中出现的问题,可以参考我的这篇博客:一文详解Windows安装RabbitMQ教程
- 启动
RabbitMq
,如下图所示:
- 输入初始用户名(
guest
)和初始密码(guest
),登录rabbitmq
,如下图所示:
现在队列是空的,因为没有往队列发送数据,
3. Spring Boot接入
maven
引用Spring Boot AMQP
插件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在
application.properties
文件配置如下信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=study
如果你是application.yml
配置文件,可以如下配置
spring:
rabbitmq:
host: 127.0.0.1
password: guest
port: 5672
username: guest
virtual-host: study
- 创建
RabbitFanoutExchangeConfig
类
该类是用来配置rabbitmq
的交换机和消息队列(queue
),如下代码所示:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 念兮为美
* @datetime 2022/9/1 15:41
* @desc rabbitmq 配置类
*/
@Configuration
public class RabbitFanoutExchangeConfig {
/**
* 交换机初始化
*
* @author 念兮为美
* @datetime 2022/9/1:15:43
* @return
*/
public static final String DEMO_EXCHANGE = "demo.exchange";
@Bean(name = DEMO_EXCHANGE)
public FanoutExchange demoExchange() {
return new FanoutExchange(DEMO_EXCHANGE, true, false);
}
/**
* 队列初始化
*
* @author 念兮为美
* @datetime 2022/9/1:15:43
* @return
*/
public static final String DEMO_QUEUE = "demo.queue";
@Bean(name = DEMO_QUEUE)
public Queue demoQueue() {
return new Queue(DEMO_QUEUE, true, false, false);
}
/**
* 交换机队列绑定
*
* @author 念兮为美
* @datetime 2022/9/1:15:45
* @param demoQueue 消息队列
* @param fanoutExchange 扇形交换机
* @return 返回绑定的对象
*/
@Bean
public Binding bindingSimpleQueue1(
@Qualifier(DEMO_QUEUE) Queue demoQueue,
@Qualifier(DEMO_EXCHANGE) FanoutExchange fanoutExchange) {
return BindingBuilder.bind(demoQueue).to(fanoutExchange);
}
}
- 创建消息生产者
RabbitMqSenderService
类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author 念兮为美
* @datetime 2022/9/1 15:59
* @desc rabbitmq发送者服务
*/
@Component
public class RabbitMqSenderService {
@Autowired private RabbitTemplate rabbitTemplate;
/**
* 消息发送者
*
* @author 念兮为美
* @datetime 2022/9/1:16:06
* @param exchange 交换机
* @param routingKey 路由键值
* @param message 消息信息
* @return
*/
public void send(String exchange, String routingKey, Message message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息 : " + message);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationId);
System.out.println("结束发送消息 : " + message);
}
}
- 创建消息接收者
RabbitMqReceiver
类
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* @author 念兮为美
* @datetime 2022/9/1 16:08
* @desc 消息接收者
*/
public class RabbitMqReceiver {
/**
* 消息接收者
*
* @author 念兮为美
* @datetime 2022/9/1:16:11
* @param msg 消息体
* @return
*/
@RabbitListener(queues = RabbitFanoutExchangeConfig.DEMO_QUEUE)
public void receiverLogAll(String msg) {
System.out.println("log.all:" + msg);
}
}
- 在
DemoApplicationTests
编写测试代码
import com.alibaba.fastjson.JSONObject;
import com.slowcode.config.rabbitmq.RabbitFanoutExchangeConfig;
import com.slowcode.config.rabbitmq.RabbitMqSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@ActiveProfiles("application.properties")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DemoApplicationTests {
@Autowired
private RabbitMqSenderService rabbitMqSenderService;
@Test
public void testRabbitMq() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name", "念兮为美");
jsonObject.put("age", 18);
jsonObject.put("address", "江苏省无锡市新吴区");
Message message = new Message(jsonObject.toJSONString().getBytes());
rabbitMqSenderService.send(RabbitFanoutExchangeConfig.DEMO_EXCHANGE, "demo", message);
}
}
启动DemoApplicationTests
类,输出日志如下:
开始发送消息 : (Body:'[B@233f52f8(byte[72])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2022-09-02 17:00:37.763 INFO 39020 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2022-09-02 17:00:37.809 INFO 39020 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#10c47c79:0/SimpleConnection@75882261 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55639]
结束发送消息 : (Body:'[B@233f52f8(byte[72])' MessageProperties [headers={spring_listener_return_correlation=5542ab7b-282e-4344-b86f-b90891ecf44c}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
- 登录
rabbitmq
后台消息是否写入到队列中
rabbitmq
后台登录地址为:http://localhost:15672
由图可知,消息写入成功。
- 查看消息是否被成功消费
我们在 RabbitMqReceiver
类上加上@Component
注解,启动spring boot
项目,如下图所示:
由图可知,RabbitMqSenderService
发送的消息成功被RabbitMqReceiver
消费。
rabbitmq
后台管理为空,如下图所示:
4. 消息队列常用5种模式
Simple Work Queue
简单工作队列
该模式在Spring boot
中是很少用到的一个场景,一般都会通过Exchange
进行消息分配到队列从而为以后扩展预留一个入口。
Publish/Subscribe
发布订阅模式
该模式性能最好,拿到消息直接放入队列
Routing
路由模式
该模式通过routing key
进行全字匹配,匹配上将相关消息放入相关队列。
Topics
主题模式
该模式通过routng key
进行模糊匹配,匹配上将相关信息放入相关队列,具体匹配规则如下:
-
字符:匹配单个单词,比如
log.
可以匹配log.all
、log.error
等 -
字符#:可以匹配
0
个或多个标识符,如log.#
可以匹配log.business.all
,http://log.service.info
等
Header
模式
通过message header
头部信息进行比对,可以根据定义全匹配、部分匹配等规则。
5. 消息持久化
消息持久化需要对交换机持久化、队列持久化、消息持久化,代码如下:
//初始化·Exchange,name是交换机名,durable是否持久化,autoDelete当前会话结束删除
public AbstractExchange(String name, boolean durable, boolean autoDelete)
//初始化Queue, name是队列名,durable是否持久化,exclusive排他性,autodelete当前会话结束删除
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
//发送消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
6. 延迟发送
6.1 TTL机制实现
针对消息延迟发送可通过死信队列进行处理:
- 消息超时
弊端第一条消息超时设置一分钟,第二条消息超时设置2秒
,那么第二条消息消费也要等第一条消息消费玩才能生效,时间不累加。
MessageProperties messageProperties=new MessageProperties();
messageProperties.setExpiration("2000"); //设置超时时间为2S
Message message=new Message(message_01.getBytes(),messageProperties);
- 队列超时
队列超时就是任何一个消息放入都会在指定时间过期,如果消息自身设置了超时时间,那就看消息超时和队列超时那个时间短,以短时间为准。
@Bean(name = DELAY_QUEUE)
public Queue delayQueue() {
Map<String, Object> queueProperties = new HashMap<>();
queueProperties.put("x-dead-letter-exchange", DEAD_EXCHANGE);
queueProperties.put("x-dead-letter-routing-key", "lsf.dead.letter");
queueProperties.put("x-message-ttl",10000);#设置超时时间为10s
return new Queue(DELAY_QUEUE, true, false, false, queueProperties);
}
6.2 rabbitmq_delay_message_exchange插件实现
rabbitmq
官网插件下载页面:https://www.rabbitmq.com/community-plugins.html
安装步骤如下:
-
选择
rabbitmq_delayed_message_exchange
进入具体插件下载页,选择相关版本号:rabbitmq_delayed_message_exchange-3.8.0.ez
-
copy
文件到路径/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.7/plugins
-
启用插件:
rabbitmq-plugins enable rabbitmq_delay_message_exchange
-
重启
Rabbitmq
服务
在~SpringBoot·中实现,交换机声明为·x-delayed-message·,然后消息发送设置·delay·属性,代码如下:
//声明交换机为x-delayed-message
public static final String CUSTOM_DELAY_EXCHANGE = "lsf.custom.delay.exhange";
@Bean(name = CUSTOM_DELAY_EXCHANGE)
public Exchange customDelayExchange() {
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put("x-delayed-type","direct");
return new CustomExchange(CUSTOM_DELAY_EXCHANGE,
"x-delayed-message",
true,
false,
exchangeArgs);
}
//消息发送
MessageProperties messageProperties=new MessageProperties();
messageProperties.setDelay(150000);
String message_01="第一条消息延迟:15s "+sdf.format(date);
Message message=new Message(message_01.getBytes(),messageProperties);
rabbitmqService.send(RabbitCustomDeleyConfig.CUSTOM_DELAY_EXCHANGE,
RabbitCustomDeleyConfig.CUSTOM_DELAY_MESSAGE,
message);
消息会在交换机中等待,设置时间到了就会被消费,比消息过期机制好,并不会出现第一条消息过期时间长,第二条消息过期时间短还需要等第一条消费后才消费第二条。
7. 可靠性发送与接收
- 生产端可靠性发送–传递到
Exchange
相关处理方法
- 通过消息发送判断传递到
Exchange
的回调,开启回调,如下代码所示:
//连接开启回调
connectionFactory.setPublisherConfirms(true);
//绑定回调触发方法
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
- 实现
RabbitTemplate.ConfirmCallback
封装实现方法,如下代码所示:
package lsf.rabbit.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}
}
- 生产端可靠性发送–消息未传递到队列回调:
//开启消息没有传递到队列回调
connectionFactory.setPublisherReturns(true);
//开启回调和绑定回调方法
rabbitTemplate.setMandatory(true);//开启强制委托模式
rabbitTemplate.setReturnCallback(new RabbitReturnsCallback());
继承接口RabbitTemplate.ReturnCallback
实现消息未到队列业务处理
package lsf.rabbit.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RabbitReturnsCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String result = String.format("消息发送ReturnCallback-未到达消息队列:{%s}--{%s}--{%s}--{%s}--{%s}",
message, replyCode, replyText, exchange, routingKey);
System.out.println(result);
}
}
- 消费者可靠性接收 一般在队列中比如消息消费异常,不做处理就会将消息放回对头,很容易陷入死循环,所以可以通过死信队列的方式将消息序列化到另外的队列设置超时重试, 或入库补偿机制等方式。
//配置确认模式为手动的
@RabbitListener(queues = RabbitRetryConfig.RETRY_NORMAL_QUEUE, ackMode = "MANUAL")
#手动拒绝消息,其中requeue为否再次入队
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
#手动否定确认,其中requeue为否再次入队
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
8. 解决Spring Boot集成RabbitMQ的常见问题
8.1 ACCESS_REFUSED – Login was refused using authentication mechanism PLAIN
请参考我的这篇博文解决该问题:https://blog.csdn.net/lvoelife/article/details/126657171
8.2 Not management user(Login failed)
请参考我的这篇博文解决该问题:https://blog.csdn.net/lvoelife/article/details/126659721
8.3 Error: unable to perform an operation on node ‘rabbit@DESKTOP-xxx‘
请参考我的这篇博客解决该问题:https://blog.csdn.net/lvoelife/article/details/126658695
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/99135.html