【尚硅谷 RabbitMQ】6、图文详解 延时队列:整合Springboot 队列优化 Docker中安装插件及代码实现

导读:本篇文章讲解 【尚硅谷 RabbitMQ】6、图文详解 延时队列:整合Springboot 队列优化 Docker中安装插件及代码实现,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、概念

  • 延时队列(延迟队列):队列内部是 有序 的,最重要的特性就体现在它的 延时 属性上

  • 延时队列中的元素是希望在指定时间到了以后或之前取出和处理

  • 延时队列就是用来存放需要在指定时间被处理的元素的队列

2、使用场景

在这里插入图片描述

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。

如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于”如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

在这里插入图片描述

在这里插入图片描述

3、整合Springboot

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.tuwer</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
        <!-- Swagger 3.0.0 -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • application.yml
spring:
  rabbitmq:
    host: 192.168.19.101
    port: 5672
    username: admin
    password: admin
  • 添加Swagger配置类
package com.tuwer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;

import java.util.ArrayList;

/**
 * @author 土味儿
 * Date 2022/3/26
 * @version 1.0
 */
@Configuration
@EnableWebMvc
public class SwaggerConfig {
    /**
     * 配置Swagger具体参数
     * @return
     */
    @Bean
    public Docket docket(){
        return new Docket(DocumentationType.OAS_30)
                .groupName("webApi")
                .apiInfo(apiInfo())
                .select()
                .build();
    }

    /**
     * 自定义API文档信息
     * @return
     */
    private ApiInfo apiInfo(){
        // 作者信息
        Contact contact = new Contact("土味儿", "http://localhost:8080/", "2141421414@qq.com");

        return new ApiInfo(
                "Hello Swagger API 文档",
                "大道无垠 行者无疆",
                "v1.0",
                "http://localhost:8080/",
                contact,
                "Apache 2.0",
                "http://www.apache.org/licenses/LICENSE-2.0",
                new ArrayList());
    }
}

【SpringBoot】18、整合Swagger 3.0【狂神篇】_土味儿~的博客-CSDN博客

在这里插入图片描述

4、队列实现

1)代码架构

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后再创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:

在这里插入图片描述

在这里插入图片描述

2)代码实现

  • 配置类 RabbitMqConfig.java

    声明交换机、队列及绑定关系

package com.tuwer.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 土味儿
 * Date 2022/3/27
 * @version 1.0
 * 配置交换机、队列及绑定关系
 */
@Configuration
public class RabbitMqConfig {
    /**
     * 交换机:普通、死信
     */
    public static final String EXCHANGE_X = "X";
    public static final String DEAD_LETTER_EXCHANGE_Y = "Y";
    /**
     * 队列:普通、死信
     */
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String DEAD_LETTER_QUEUE_D = "QD";

    /**
     * 获取普通交换机
     * @return
     */
    @Bean("exchangeX")
    public DirectExchange getExchangeX(){
        return new DirectExchange(EXCHANGE_X);
    }

    /**
     * 获取死信交换机
     * @return
     */
    @Bean("deadLetterExchangeY")
    public DirectExchange getDeadLetterExchangeY(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE_Y);
    }

    /**
     * 获取普通队列A
     * @return
     */
    @Bean("queueA")
    public Queue getQueueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
        // 设置死信交换机绑定的RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置过期时间TTL:单位毫秒
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    /**
     * 获取普通队列B
     * @return
     */
    @Bean("queueB")
    public Queue getQueueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
        // 设置死信交换机绑定的RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置过期时间TTL:单位毫秒
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    /**
     * 获取死信队列D
     * @return
     */
    @Bean("queueD")
    public Queue getQueueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
    }

    /**
     * 绑定
     * 队列QA 绑定 交换机X
     * @return
     */
    @Bean
    public Binding queueABindExchangeX(
            @Qualifier("queueA") Queue queue,
            @Qualifier("exchangeX") DirectExchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("XA");
    }

    /**
     * 绑定
     * 队列QB 绑定 交换机X
     * @return
     */
    @Bean
    public Binding queueBBindExchangeX(
            @Qualifier("queueB") Queue queue,
            @Qualifier("exchangeX") DirectExchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("XB");
    }

    /**
     * 绑定
     * 队列QD 绑定 交换机Y
     * @return
     */
    @Bean
    public Binding queueDBindExchangeY(
            @Qualifier("queueD") Queue queue,
            @Qualifier("deadLetterExchangeY") DirectExchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("YD");
    }

}
  • 生产者 SendMsgController.java

    发送消息;由 RabbitTemplate 来发送消息

package com.tuwer.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;

/**
 * @author 土味儿
 * Date 2022/3/27
 * @version 1.0
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param message
     * @return
     */
    @RequestMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable("message") String message) {
        rabbitTemplate.convertAndSend("X","XA",message);
        rabbitTemplate.convertAndSend("X","XB",message);
        log.info("当前时间:{},发送给两个TTL队列:{}", LocalTime.now(),message);
        return "消息【" + message + "】已发送!";
    }
}
  • 消费者 DeadLetterQueueConsumer.java

    • 一直监听队列;@RabbitListener(queues = "QD")
    • 官方教程中加入了Channel,但方法中没有使用,实测去掉Channel,也不用抛异常,可以运行;public void receiveD(Message message, Channel channel) throws Exception {
package com.tuwer.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * @author 土味儿
 * Date 2022/3/27
 * @version 1.0
 */
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    /**
     * 监听队列QD
     * @param message
     */
    @RabbitListener(queues = "QD")
    //public void receiveD(Message message, Channel channel) throws Exception {
    public void receiveQueueD(Message message){
        String msg = new String(message.getBody());
        log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
    }
}

3)运行测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

5、队列优化

1)问题

第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

队列的过期时间固定,不够灵活。可以建立一个通用队列,过期时间灵活设置。

2)代码架构图

在这里插入图片描述

3)实现

  • 修改配置类 RabbitMqConfig.java

    增加QC队列,并绑定交换机;QC中不设置过期时间

@Configuration
public class RabbitMqConfig {
    // 省略...
    public static final String QUEUE_C = "QC";
    
    // 省略...


    /**
     * 获取普通通用队列C
     * 不设置过期时间
     * @return
     */
    @Bean("queueC")
    public Queue getQueueC(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
        // 设置死信交换机绑定的RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置过期时间TTL:单位毫秒
        //arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    
    // 省略...

    /**
     * 绑定
     * 队列QC 绑定 交换机X
     * @return
     */
    @Bean
    public Binding queueCBindExchangeX(
            @Qualifier("queueC") Queue queue,
            @Qualifier("exchangeX") DirectExchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("XC");
    }
    
    // 省略...

}
  • 修改生产者 SendMsgController.java

    增加可以发送过期时间消息的方法

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 省略...

    /**
     * 发送有过期时间的消息
     *
     * @param message
     * @param ttlTime
     * @return
     */
    @RequestMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public String sendMsg(
            @PathVariable("message") String message,
            @PathVariable("ttlTime") String ttlTime
    ) {
        // 消息处理器:设置过期时间
        MessagePostProcessor messagePostProcessor = msg ->{
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        };
        rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);

        log.info("当前时间:{},发送给QC队列:{},过期时间:{}ms", LocalTime.now(),message,ttlTime);
        return "消息【" + message + "】已发送!";
    }
}

在这里插入图片描述

在这里插入图片描述

4)运行测试

在这里插入图片描述

在这里插入图片描述

因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

6、插件实现延迟队列

在这里插入图片描述

1)docker中安装插件

  • 下载插件
插件下载地址:https://www.rabbitmq.com/community-plugins.html

找到rabbitmq_delayed_message_exchange下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
  • 将插件复制到容器内,进行安装
# 拷贝至docker容器内
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq容器ID:/plugins
# 进入docker容器内
docker exec  -it rabbitmq  bash
# 赋予权限
chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
# 启动延时插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2)代码架构图

在这里插入图片描述

3)实现

  • 配置类 DelayedQueueConfig.java
package com.tuwer.config;

import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 土味儿
 * Date 2022/3/28
 * @version 1.0
 */
@Configuration
public class DelayedQueueConfig {
    /**
     * 交换机、队列、RoutingKey
     */
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /**
     * 自定义交换机
     *
     * @return
     */
    @Bean("delayedExchange")
    public CustomExchange getDelayedExchange() {
        // 参数
        Map<String, Object> arguments = new HashMap<>(1);
        // 交换机类型:直接
        arguments.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
        //arguments.put("x-delayed-type","direct");

        // 自定义交换机
        return new CustomExchange(
                DELAYED_EXCHANGE_NAME,
                "x-delayed-message",
                true,
                false,
                arguments);
    }

    /**
     * 延迟队列
     * 和普通队列一样
     *
     * @return
     */
    @Bean("delayedQueue")
    public Queue getDelayedQueue() {
        //return new Queue(DELAYED_QUEUE_NAME);
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }

    /**
     * 绑定
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayedQueueBindDelayedExchange(
            @Qualifier("delayedQueue") Queue queue,
            @Qualifier("delayedExchange") CustomExchange exchange
    ) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DELAYED_ROUTING_KEY)
                .noargs();
    }
}

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 生产者

    因为 setDelay() 接收 Integer 类型参数,所以 delayTime 类型为 Integer;

/**
* 发送有过期时间的消息(基于插件)
*
* @param message
* @param delayTime
* @return
*/
@RequestMapping("/sendDelayMsg/{message}/{delayTime}")
public String sendMsg(
    @PathVariable("message") String message,
    @PathVariable("delayTime") Integer delayTime
) {
    System.out.println(message + delayTime);
    // 消息处理器:设置延迟时间
    MessagePostProcessor messagePostProcessor = msg ->{
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    };

    rabbitTemplate.convertAndSend(
        DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
        DelayedQueueConfig.DELAYED_ROUTING_KEY,
        message,
        messagePostProcessor
    );

    log.info("当前时间:{},发送给延迟队列:{},延迟时间:{}ms", LocalTime.now(),message,delayTime);
    return "消息【" + message + "】已发送!";
}
  • 消费者(监听器)DelayedLetterCustomer.java
package com.tuwer.consumer;

import com.tuwer.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * @author 土味儿
 * Date 2022/3/28
 * @version 1.0
 */
@Slf4j
@Component
public class DelayedLetterCustomer {
    /**
     * 监听延迟队列消息
     * @param message
     */
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message){
        String msg = new String(message.getBody());
        log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
    }
}

4)测试

在这里插入图片描述
在这里插入图片描述

5)总结

  • 延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次,以及未被正确处理的消息不会被丢弃。

  • 通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

  • 延时队列还有很多其它选择,比如:利用Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/70401.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!