1、概念
-
延时队列(延迟队列):队列内部是
有序
的,最重要的特性就体现在它的延时
属性上 -
延时队列中的元素是希望在指定时间到了以后或之前取出和处理
-
延时队列就是用来存放需要在指定时间被处理的元素的队列
2、使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。
如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于”如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
3、整合Springboot
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tuwer</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<!-- Swagger 3.0.0 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
- application.yml
spring:
rabbitmq:
host: 192.168.19.101
port: 5672
username: admin
password: admin
- 添加Swagger配置类
package com.tuwer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import java.util.ArrayList;
/**
* @author 土味儿
* Date 2022/3/26
* @version 1.0
*/
@Configuration
@EnableWebMvc
public class SwaggerConfig {
/**
* 配置Swagger具体参数
* @return
*/
@Bean
public Docket docket(){
return new Docket(DocumentationType.OAS_30)
.groupName("webApi")
.apiInfo(apiInfo())
.select()
.build();
}
/**
* 自定义API文档信息
* @return
*/
private ApiInfo apiInfo(){
// 作者信息
Contact contact = new Contact("土味儿", "http://localhost:8080/", "2141421414@qq.com");
return new ApiInfo(
"Hello Swagger API 文档",
"大道无垠 行者无疆",
"v1.0",
"http://localhost:8080/",
contact,
"Apache 2.0",
"http://www.apache.org/licenses/LICENSE-2.0",
new ArrayList());
}
}
4、队列实现
1)代码架构
创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后再创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:
2)代码实现
-
配置类 RabbitMqConfig.java
声明交换机、队列及绑定关系
package com.tuwer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author 土味儿
* Date 2022/3/27
* @version 1.0
* 配置交换机、队列及绑定关系
*/
@Configuration
public class RabbitMqConfig {
/**
* 交换机:普通、死信
*/
public static final String EXCHANGE_X = "X";
public static final String DEAD_LETTER_EXCHANGE_Y = "Y";
/**
* 队列:普通、死信
*/
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String DEAD_LETTER_QUEUE_D = "QD";
/**
* 获取普通交换机
* @return
*/
@Bean("exchangeX")
public DirectExchange getExchangeX(){
return new DirectExchange(EXCHANGE_X);
}
/**
* 获取死信交换机
* @return
*/
@Bean("deadLetterExchangeY")
public DirectExchange getDeadLetterExchangeY(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_Y);
}
/**
* 获取普通队列A
* @return
*/
@Bean("queueA")
public Queue getQueueA(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
/**
* 获取普通队列B
* @return
*/
@Bean("queueB")
public Queue getQueueB(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
/**
* 获取死信队列D
* @return
*/
@Bean("queueD")
public Queue getQueueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
/**
* 绑定
* 队列QA 绑定 交换机X
* @return
*/
@Bean
public Binding queueABindExchangeX(
@Qualifier("queueA") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XA");
}
/**
* 绑定
* 队列QB 绑定 交换机X
* @return
*/
@Bean
public Binding queueBBindExchangeX(
@Qualifier("queueB") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XB");
}
/**
* 绑定
* 队列QD 绑定 交换机Y
* @return
*/
@Bean
public Binding queueDBindExchangeY(
@Qualifier("queueD") Queue queue,
@Qualifier("deadLetterExchangeY") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("YD");
}
}
-
生产者 SendMsgController.java
发送消息;由
RabbitTemplate
来发送消息
package com.tuwer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
/**
* @author 土味儿
* Date 2022/3/27
* @version 1.0
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*
* @param message
* @return
*/
@RequestMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable("message") String message) {
rabbitTemplate.convertAndSend("X","XA",message);
rabbitTemplate.convertAndSend("X","XB",message);
log.info("当前时间:{},发送给两个TTL队列:{}", LocalTime.now(),message);
return "消息【" + message + "】已发送!";
}
}
-
消费者 DeadLetterQueueConsumer.java
- 一直监听队列;
@RabbitListener(queues = "QD")
- 官方教程中加入了Channel,但方法中没有使用,实测去掉Channel,也不用抛异常,可以运行;
public void receiveD(Message message, Channel channel) throws Exception {
- 一直监听队列;
package com.tuwer.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
/**
* @author 土味儿
* Date 2022/3/27
* @version 1.0
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
* 监听队列QD
* @param message
*/
@RabbitListener(queues = "QD")
//public void receiveD(Message message, Channel channel) throws Exception {
public void receiveQueueD(Message message){
String msg = new String(message.getBody());
log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
}
}
3)运行测试
5、队列优化
1)问题
第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
队列的过期时间固定,不够灵活。可以建立一个通用队列,过期时间灵活设置。
2)代码架构图
3)实现
-
修改配置类 RabbitMqConfig.java
增加QC队列,并绑定交换机;QC中不设置过期时间
@Configuration
public class RabbitMqConfig {
// 省略...
public static final String QUEUE_C = "QC";
// 省略...
/**
* 获取普通通用队列C
* 不设置过期时间
* @return
*/
@Bean("queueC")
public Queue getQueueC(){
Map<String, Object> arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
//arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
// 省略...
/**
* 绑定
* 队列QC 绑定 交换机X
* @return
*/
@Bean
public Binding queueCBindExchangeX(
@Qualifier("queueC") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XC");
}
// 省略...
}
-
修改生产者 SendMsgController.java
增加可以发送过期时间消息的方法
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 省略...
/**
* 发送有过期时间的消息
*
* @param message
* @param ttlTime
* @return
*/
@RequestMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(
@PathVariable("message") String message,
@PathVariable("ttlTime") String ttlTime
) {
// 消息处理器:设置过期时间
MessagePostProcessor messagePostProcessor = msg ->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
};
rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
log.info("当前时间:{},发送给QC队列:{},过期时间:{}ms", LocalTime.now(),message,ttlTime);
return "消息【" + message + "】已发送!";
}
}
4)运行测试
因为RabbitMQ 只会检查第一个消息是否过期
,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行!
6、插件实现延迟队列
1)docker中安装插件
- 下载插件
插件下载地址:https://www.rabbitmq.com/community-plugins.html
找到rabbitmq_delayed_message_exchange下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 将插件复制到容器内,进行安装
# 拷贝至docker容器内
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq容器ID:/plugins
# 进入docker容器内
docker exec -it rabbitmq bash
# 赋予权限
chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
# 启动延时插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2)代码架构图
3)实现
- 配置类 DelayedQueueConfig.java
package com.tuwer.config;
import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author 土味儿
* Date 2022/3/28
* @version 1.0
*/
@Configuration
public class DelayedQueueConfig {
/**
* 交换机、队列、RoutingKey
*/
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
/**
* 自定义交换机
*
* @return
*/
@Bean("delayedExchange")
public CustomExchange getDelayedExchange() {
// 参数
Map<String, Object> arguments = new HashMap<>(1);
// 交换机类型:直接
arguments.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
//arguments.put("x-delayed-type","direct");
// 自定义交换机
return new CustomExchange(
DELAYED_EXCHANGE_NAME,
"x-delayed-message",
true,
false,
arguments);
}
/**
* 延迟队列
* 和普通队列一样
*
* @return
*/
@Bean("delayedQueue")
public Queue getDelayedQueue() {
//return new Queue(DELAYED_QUEUE_NAME);
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
/**
* 绑定
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayedQueueBindDelayedExchange(
@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange exchange
) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(DELAYED_ROUTING_KEY)
.noargs();
}
}
-
生产者
因为 setDelay() 接收 Integer 类型参数,所以 delayTime 类型为 Integer;
/**
* 发送有过期时间的消息(基于插件)
*
* @param message
* @param delayTime
* @return
*/
@RequestMapping("/sendDelayMsg/{message}/{delayTime}")
public String sendMsg(
@PathVariable("message") String message,
@PathVariable("delayTime") Integer delayTime
) {
System.out.println(message + delayTime);
// 消息处理器:设置延迟时间
MessagePostProcessor messagePostProcessor = msg ->{
msg.getMessageProperties().setDelay(delayTime);
return msg;
};
rabbitTemplate.convertAndSend(
DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY,
message,
messagePostProcessor
);
log.info("当前时间:{},发送给延迟队列:{},延迟时间:{}ms", LocalTime.now(),message,delayTime);
return "消息【" + message + "】已发送!";
}
- 消费者(监听器)DelayedLetterCustomer.java
package com.tuwer.consumer;
import com.tuwer.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
/**
* @author 土味儿
* Date 2022/3/28
* @version 1.0
*/
@Slf4j
@Component
public class DelayedLetterCustomer {
/**
* 监听延迟队列消息
* @param message
*/
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
}
}
4)测试
5)总结
-
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次,以及未被正确处理的消息不会被丢弃。
-
通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
-
延时队列还有很多其它选择,比如:利用Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/70401.html