1.RabbitMQ安装
2.RabbitMQ可视化页面翻译
3.消息推送接收
3.1.流程图
3.2.常用三种交换机
1.Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列, routing key必须相等。
直连交换机是一对一,如果配置多台监听绑定到同一个直连交换的同一个队列会轮询的方式对消息进行消费,而且不存在重复消费。
1.Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。
这个交换机在接收到消息后,会直接转发到绑定到它上面的 所有队列。
2.Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
topic 的模式匹配包括两个通配符:#和* ,其中 # 匹配 0 或多个单词, * 匹配一个单词 (必须出现的)。
4.Springboot+RabbitMQ
4.1.项目初始化
我们可以先注释已写的消费者(不然瞬间消费控制台看不到效果),使用同一个项目模拟,发送完消息后,再重新启动放开进行消费,这样就不用再跑一个项目进行模拟调用了,完美。
4.1.1.pom引入
测试Springboot版本:3.0.1
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.1.2.yml参数配置
# Spring
spring:
# rabbitmq 配置
rabbitmq:
addresses: 139.x.xxx.xxx
port: 5672
username: admin
password: 123456
listener:
simple:
#最小消费者数量
concurrency: 10
#最大的消费者数量
max-concurrency: 10
4.2.项目实战
4.2.1.Direct Exchange
4.2.1.1.DirectRabbitController
package com.cn.controller.test.rabbitmq;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.cn.common.AjaxResult;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @date 2023/9/20
* @description 发送消息
*/
@Tag(name = "RabbitMQ")
@RestController
@Slf4j
@RequestMapping("test/rabbitmq")
public class DirectRabbitController{
@Autowired
RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMsg")
public AjaxResult<?> sendDirectMsg() {
String msgId = String.valueOf(UUID.randomUUID());
String msmData = "test msg:" + Math.random();
String createTime = DateUtil.now();
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("msmData", msmData);
map.put("createTime", createTime);
String payload = JSON.toJSONString(map);
// 如果写的是默认交换机,可以不传交换机名称
// rabbitTemplate.convertAndSend("TestDirectRouting", payload);
// 将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", payload);
return AjaxResult.success();
}
@GetMapping("/sendDirectMsg2")
public AjaxResult<?> sendDirectMsg2() {
System.out.println("sendDirectMsg2准备发送");
String msgId = String.valueOf(UUID.randomUUID());
String msmData = "test msg:" + Math.random();
String createTime = DateUtil.now();
Map<String, Object> map = new HashMap<>();
map.put("msgId2", msgId);
map.put("msmData2", msmData);
map.put("createTime2", createTime);
String payload = JSON.toJSONString(map);
// 1.由于bindingDirect2绑定交换机使用的默认交换机,使用withQueueName表示用和队列名称相同的键值对
// rabbitTemplate.convertAndSend("TestDirectQueue2", payload);
// 2.将消息携带绑定键值:TestDirectRouting2 发送到交换机TestDirectExchange2
// rabbitTemplate.convertAndSend("TestDirectExchange2", "TestDirectRouting2", payload);
rabbitTemplate.convertAndSend("TestDirectExchange2", "TestDirectRouting2", payload);
return AjaxResult.success();
}
}
4.2.1.2.DirectRabbitConfig
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @date 2023/9/20
* @description 配置类
*/
@Configuration
public class DirectRabbitConfig {
// 队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
// 一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue", true);
}
// Direct交换机 name:TestDirectExchange,durable:是否持久化,autoDelete:是否自动删除
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}
// 绑定,将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
// 队列2 起名:TestDirectQueue2
@Bean
public Queue TestDirectQueue2() {
return new Queue("TestDirectQueue2");
}
// 交换机2
@Bean
DirectExchange TestDirectExchange2() {
return new DirectExchange("TestDirectExchange2");
}
// 绑定交换机2(使用默认交换机配置)
@Bean
Binding bindingDirect2() {
// 1.默认配置,routingKey:使用withQueueName表示用和队列名称相同的键值对(这种写法是错误的“默认交换机不允许设置routingKey。默认交换机是自动的,不需要声明交换机也不需要指定路由key。在发送消息时,交换机给空值,路由key给声明的队列名就可以了”)
// return BindingBuilder.bind(TestDirectQueue2()).to(DirectExchange.DEFAULT).with("TestDirectRouting2"); 错误的写法
// 2.使用默认交换机不允许设置routingKey,故使用默认配置“withQueueName”
// return BindingBuilder.bind(TestDirectQueue2()).to(DirectExchange.DEFAULT).withQueueName();
// 3.使用交换机2
return BindingBuilder.bind(TestDirectQueue2()).to(TestDirectExchange2()).with("TestDirectRouting2");
}
}
4.2.1.3.DirectReceiverConsumer
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @date 2023/9/20
* @description 消费者
*/
@Component
// 监听也可以放下面,写多个监听
// @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiverConsumer {
@RabbitListener(queues = "TestDirectQueue")// 监听的队列名称 TestDirectQueue
@RabbitHandler
public void process1_1(String msg) {
System.out.println("DirectReceiverConsumer:1_1消费者收到(TestDirectQueue)消息 : " + msg);
}
@RabbitListener(queues = "TestDirectQueue")// 监听的队列名称 TestDirectQueue
@RabbitHandler
public void process1_2(String msg) {
System.out.println("DirectReceiverConsumer:1_2消费者收到(TestDirectQueue)消息 : " + msg);
}
@RabbitListener(queues = "TestDirectQueue2")// 监听的队列名称 TestDirectQueue2
@RabbitHandler
public void process2(String msg) {
System.out.println("DirectReceiverConsumer消费者收到(TestDirectQueue2)消息 : " + msg);
}
}
4.2.1.4.发送和接受消息
4.2.1.4.1.查询MQ控制台已有交换机和队列
4.2.1.4.2.使用Apifox模拟发送请求
两个消息同时发送100条消息。 消息1有两个消费者,消息2一个消费者。
4.2.1.4.3.消费者进行消费
4.2.2.Fanout Exchange
4.2.2.1.FanoutRabbitController
package com.cn.controller.test.rabbitmq;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.cn.common.AjaxResult;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @date 2023/9/20
* @description Fanout交换机 发送消息
*/
@Tag(name = "RabbitMQ")
@RestController
@Slf4j
@RequestMapping("test/rabbitmq")
public class FanoutRabbitController {
@Autowired
RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendFanoutMsg")
public AjaxResult<?> sendFanoutMsg() {
String msgId = String.valueOf(UUID.randomUUID());
String msmData = "test msg:" + Math.random();
String createTime = DateUtil.now();
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("msmData", msmData);
map.put("createTime", createTime);
String payload = JSON.toJSONString(map);
// 因为是扇型交换机, 路由键无需配置,配置也不起作用
rabbitTemplate.convertAndSend("fanoutExchange", null, payload);
return AjaxResult.success();
}
}
4.2.2.2.FanoutRabbitConfig
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @date 2023/9/21
* @description Fanout交换机配置类
*/
@Configuration
public class FanoutRabbitConfig {
/**
* 创建三个队列 :fanout1 fanout2 fanout3
* 将三个队列都绑定在交换机 fanoutExchange 上
* 因为是扇型交换机, 路由键无需配置,配置也不起作用
*/
@Bean
public Queue queue1() {
return new Queue("fanout1");
}
@Bean
public Queue queue2() {
return new Queue("fanout2");
}
@Bean
public Queue queue3() {
return new Queue("fanout3");
}
// 交换机
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchange1() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
Binding bindingExchange2() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@Bean
Binding bindingExchange3() {
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
}
4.2.2.3.FanoutReceiverConsumer
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @date 2023/9/20
* @description 消费者
*/
@Component
public class FanoutReceiverConsumer {
@RabbitListener(queues = "fanout1")
@RabbitHandler
public void process1(@Payload String msg) {
System.out.println("FanoutReceiverConsumer1:fanout1消息 : " + msg);
}
@RabbitListener(queues = "fanout2")
@RabbitHandler
public void process2(@Payload String msg) {
System.out.println("FanoutReceiverConsumer2:fanout2消息 : " + msg);
}
@RabbitListener(queues = "fanout3")
@RabbitHandler
public void process3(@Payload String msg) {
System.out.println("FanoutReceiverConsumer3:fanout3消息 : " + msg);
}
}
4.2.2.4.发送和接受消息
4.2.2.4.1.查询MQ控制台已有交换机和队列
4.2.2.4.2.使用Apifox模拟发送请求
4.2.2.4.3.消费者进行消费
Fanout123都消费完毕
4.2.3.Topic Exchange
4.2.3.1.TopicRabbitController
package com.cn.controller.test.rabbitmq;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.cn.common.AjaxResult;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @date 2023/9/20
* @description Topic交换机 发送消息
*/
@Tag(name = "RabbitMQ")
@RestController
@Slf4j
@RequestMapping("test/rabbitmq")
public class TopicRabbitController {
@Autowired
RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendTopicMsg1")
public AjaxResult<?> sendTopicMsg1() {
String msgId = String.valueOf(UUID.randomUUID());
String msmData = "test msg:" + Math.random();
String createTime = DateUtil.now();
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("msmData", msmData);
map.put("createTime", createTime);
String payload = JSON.toJSONString(map);
rabbitTemplate.convertAndSend("topicExchange", "topic.key1", payload);
return AjaxResult.success();
}
@GetMapping("/sendTopicMsg2")
public AjaxResult<?> sendTopicMsg2() {
String msgId = String.valueOf(UUID.randomUUID());
String msmData = "test msg:" + Math.random();
String createTime = DateUtil.now();
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("msmData", msmData);
map.put("createTime", createTime);
String payload = JSON.toJSONString(map);
rabbitTemplate.convertAndSend("topicExchange", "topic.key2", payload);
return AjaxResult.success();
}
}
4.2.3.2.TopicRabbitConfig
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @date 2023/9/21
* @description Topic交换机配置类
*/
@Configuration
public class TopicRabbitConfig {
// 绑定键
public final static String key1 = "topic.key1";
public final static String key2 = "topic.key2";
@Bean
public Queue Queue1() {
return new Queue(key1);
}
@Bean
public Queue Queue2() {
return new Queue(key2);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
// 将Key1Queue和topicExchange绑定,而且绑定的键值为topic.key1
// 这样只要是消息携带的路由键是topic.key1,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(Queue1()).to(topicExchange()).with(key1);
}
// 将Key2Queue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(Queue2()).to(topicExchange()).with("topic.#");
}
}
4.2.3.3.TopicRabbitConfig
package com.cn.controller.test.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @author Yph
* @date 2023/9/20
* @description 消费者
*/
@Component
public class TopicReceiverConsumer {
@RabbitListener(queues = "topic.key1")
@RabbitHandler
public void process1(@Payload String msg) {
System.out.println("TopicReceiverConsumer1:topic.key1消息 : " + msg);
}
@RabbitListener(queues = "topic.key2")
@RabbitHandler
public void process2(@Payload String msg) {
System.out.println("TopicReceiverConsumer2:topic.key2消息 : " + msg);
}
}
4.2.3.4.发送和接受消息
4.2.3.4.1.查询MQ控制台已有交换机和队列
4.2.3.4.2.使用Apifox模拟发送请求
1.先发送消息1
2.运行消费者进行消费
key1和key2都消费完毕。
3.送消息2
key2才接收到消息,key2绑定的键是“topic.#”,key1绑定键是“topic.key1”。
故而key1接收不到键为“topic.key2”的消息,key2可以匹配接收到。
4.2.4.生产者消息确认回调
其实就是消息确认(生产者推送消息成功,消费者接收消息成功)
4.2.4.1.CallBackRabbitConfig配置类
package com.cn.controller.test.rabbitmq;
import org.jetbrains.annotations.NotNull;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @date 2023/9/21
* @description 消息回调
* 生产者消息确认回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)
*/
@Configuration
public class CallBackRabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认情况:" + b);
System.out.println("ConfirmCallback: " + "原因:" + s);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(@NotNull ReturnedMessage returnedMessage) {
System.out.println("ReturnCallback: " + "消息:" + returnedMessage.getMessage());
System.out.println("ReturnCallback: " + "回应码:" + returnedMessage.getReplyCode());
System.out.println("ReturnCallback: " + "回应信息:" + returnedMessage.getReplyText());
System.out.println("ReturnCallback: " + "交换机:" + returnedMessage.getExchange());
System.out.println("ReturnCallback: " + "路由键:" + returnedMessage.getRoutingKey());
}
});
return rabbitTemplate;
}
}
4.2.4.1.发送消息确认消息
出现下面这种情况,表示消费者正常消费消费
4.2.4.1.其他情况
4.2.5.消费者消息确认机制
消费者消息确认机制参考文章:https://blog.csdn.net/qq_35387940/article/details/100514134
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/192717.html