延迟队列
延迟队列,对列内部是有序的,最重要的特性就是体现在他的延时属性上,延时队列中的元素时希望在指定时间到了之后或者之前取出和处理,简单的来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列的使用场景
- 订单在十分钟内未支付则自动取消
- 新创建的店铺,如果在十天内没有上传商品,则自动发送消息提醒
- 用户注册成功后,如果没有在三天内登录则发送短信提醒消息
- 用户发起退款,如果在三天内没有得到处理则通知相关运营人员。
- 预定会员后,需要在预定的时间点前十分钟通知各个与会员人员参加会议。
延迟队列整合SpringBoot
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xh</groupId>
<artifactId>SpringBoot_RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBoot_RabbitMQ</name>
<description>SpringBoot_RabbitMQ</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--rabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ测试依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.xh.springBoot.SpringBootRabbitMqApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitMQ接口文档")
.description("描述Rabbit微服务接口定义")
.version("1.0")
.contact(new Contact("我喜欢","127.0.0.1","2844****670@qq.com"))
.build();
}
}
队列TTL
创建两个队列QA和QB,两者队列TTL分被是10s,40s,然后在创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下:
代码文件
/*
TTL 队列 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
//普通的交换机名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String A_QUEUE = "QA";
public static final String B_QUEUE = "QB";
//死信队列的名称
public static final String Y_DEAD_LETTER_QUEUE = "QD";
//声明XCHANGE
@Bean("xExchange") //起别名
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明yExchange 别名
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列
@Bean("queueA")
public Queue queueA(){
Map<String,Object> map =new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//设置TTL单位是ms 什么时候消息成为死信,10秒钟后
map.put("x-message-ttl",10000);
return QueueBuilder.durable(A_QUEUE).withArguments(map).build();
}
//声明队列
@Bean("queueB")
public Queue queueB(){
Map<String,Object> map =new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//设置TTL单位是ms 什么时候消息成为死信,40秒钟后
map.put("x-message-ttl",40000);
return QueueBuilder.durable(B_QUEUE).withArguments(map).build();
}
//死信队列
@Bean("queueD")
public Queue QueueD(){
return QueueBuilder.durable(Y_DEAD_LETTER_QUEUE).build();
}
//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//绑定
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//绑定
@Bean
public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
/*
发送延迟消息
http://地址/ttl/sendMessage/子非吾喵
*/
@Slf4j //打印日志
@RestController
@RequestMapping("/ttl")
public class SellMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
//开始发送消息
@GetMapping("/sendMessage/{message}")
//注意@PathVariable 可以自定义传值到url中,前提示变量名必须和url中的一样
public void sendMessage(@PathVariable String message){
//后边的语句参数会替换{},这由程序员控制
log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的消息队列:" + message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的消息队列:" + message);
}
}
http://127.0.0.1:8080/ttl/sendMessage/hello1
http://127.0.0.1:8080/ttl/sendMessage/hello2
上述存在一个不足之处,就是我们每次使用的时候,都需要增加一个新的时间需求,就需要新增一个队列,这里只是用了10s和40s两个时间选项,但万一我要一个小时呢,难不成还又要添加,这样太麻烦了。
延迟队列优化
使用QC来设置时间,用户自定义时间。
具体代码如下
TtlQueueConfig.java中添加
public static final String C_QUEUE = "QC";
//声明QC
@Bean("queueC")
public Queue queueC(){
Map<String,Object> map = new HashMap<>(3);
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//TTL设置时长 ms (这里不写,又用户自己定义)
return QueueBuilder.durable(C_QUEUE).withArguments(map).build();
}
//绑定
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
SellMessageController.java
//开始发送消息 消息 TTL
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",
new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,msg -> {
//发送消息的时候,延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
} );
}
注意我在写这个代码的时候,刚刚开始是敲错了一个字母,导致运行时生成的队列,然后调错的时候,需要生成的队列删除,然后重新创建,可以解决问题。。
http://127.0.0.1:8080/ttl/sendExpirationMsg/hello1/2000
http://127.0.0.1:8080/ttl/sendExpirationMsg/hello2/20000
死信在做延迟的一个巨大缺陷,消息不会按时“死亡”,由于RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延迟时长很短,则第二个消息并不会优先得到执行。
这里出现一个问题,当出现两条消息的时候,我们发现,他居然是按照谁先来的,哪怕你时间短,但是你比另一个队列慢一步,你就只能老老实实排队了。
RabbitMQ插件实现延迟队列
这个就是解决上边的问题的,我们使用插件解决。
在官网上下载: https://www/rabbitmq.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange-3.8.0
拷贝到
/usr/lib/rabbitmq/lib/rabbitmq_server-版本号/plugs
安装指令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后在linux系统下解压
然后安装rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之后重启
systemctl restart rabbitmq-server(这个是你安装时候的名字)
这个时候不在是队列,而是交换机了。
没插件的时候
基于插件的
解决办法:
https://blog.csdn.net/DZP_dream/article/details/118391439
docker search rabbitmq
安装拉取容器:
docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e
启动rabbitmq
RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:management
https://www.rabbitmq.com/community-plugins.html 下载插件
负责插件到容器中
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
[root@local rabbitmq]# docker exec -it rabbitmq /bin/bash
root@3bb56f68570b:/# rabbitmq-plugins list
启动插件
root@3bb56f68570b:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
设置延迟队列配置类
DelayedConfig
/*
延迟队列插件
*/
@Configuration //实例化
public class DelayedQueueConfig {
//队列 //
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routing";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//声明交换机 基于插件的交换机就是这么定义的
@Bean
//自定义交换机 由于是不存在rabbit里边的交换机,也就是我们使用的延迟队列插件
public CustomExchange delayedExchange(){
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
/*
1.交换机名称
2.交换机类型
3.是否需要持久化
4.是否需要自动删除
5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
true,false,map);
}
//绑定
@Bean
public Binding delayedQueueBingdingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange
){
//绑定 将队列和交换机直接进行绑定
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
生产者代码
/*
* @Title 发送消息 基于插件的 消息以及 延迟的时间
* @Description 延迟队列的插件
* @author 罗小黑
* @date 2022/11/6 11:40
*/
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayedTime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayedTime:{}",
new Date().toString(),delayedTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
//发送消息的时候,延迟时长 单位ms
msg.getMessageProperties().setDelay(delayedTime);
return msg;
} );
}
消费者代码
/*
延迟队列消费者
*/
@Component
@Slf4j
public class DelayedQueueConsumer {
/*
基于插件的延迟队列
*/
//监听消息
@RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}
http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/20000
http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/2000
第二个消息被先消费掉了,符合预期。
总结:
延迟队列在需要延时的处理的场景下非常有用,使用rabbitmq来实现延迟队列可以很好的利用rabbitmq的特性,如消息的可靠发送,消息可靠投递,死信队列来保证消息至少被消费一次已经未被正常处理的消息不会被丢弃,另外,通过rabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者信息丢失
当然,延迟队列还有很多其他选择,比如利用java的delayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有各的特点,看需要的适用的场景。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197983.html