SpringBoot+RabbitMQ实现消息延迟推送


前言

如今我们用的软件都有消息延迟推送,应用比较广泛,比如:

  • 淘宝签收后7天自动确认收货,在签收购买的商品后,物流系统会在7天后延时发送一个消息给支付系统,告诉支付系统将钱打给商家,这个过程持续7天,就是使用了消息中间件的延迟推送。
  • 12306购票支付确认页面,在选好票点击确定跳转的页面中一般都有倒计时,代表30分钟订单不确认的话将自动取消订单。其实系统在下订单那一刻就发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果在30分钟内完成订单了,则可以通过逻辑代码判断忽略收到的消息。

使用延时消息我们有这样几种解决方案:

  • 使用Redis给订单设置过期时间,最后通过判断Redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息延迟推送性能较低,因为Redis存于内存中,遇到恶意下单的或者刷单的话,无疑会给内存带来巨大压力。
  • 使用传统数据库轮询来判断数据库表订单的状态,这无疑增加了IO次数,数据库压力大,性能极低。
  • 使用JVM原生的DelayQueue,这也是大量占用内存,并且还没有持久化策略,系统宕机或者重启的话会丢失订单信息。

消息延迟推送的实现

插件

在 RabbitMQ 3.6.x 之前一般采用死信队列 + TTL过期时间来实现延迟队列,这里不做过多介绍。

在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。

去官网下载延时队列插件:https://www.rabbitmq.com/community-plugins.html ,然后将其放置到RabbitMQ 根目录下的 plugins 下。

使用命令启用插件:

# linux
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# windows 进入sbin目录
rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

配置类

首先创建配置类,创建交换机和消息队列,并绑定关系。

package com.itjing.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lijing
 * @date 2022年06月02日 19:12
 * @description 延迟队列配置
 * 去官网下载延时队列插件:https://www.rabbitmq.com/community-plugins.html
 * 使用命令启用插件:
 * linux:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 * windows:进入sbin目录,执行 rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
 */

@Configuration
public class DelayMQConfig {

    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "MQ.LazyQueue";
    public static final String LAZY_KEY = "lazy.#";

    /**
     * 在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,
     * 也可以设置参数内容传入交换机声明的方法中,
     * 因为第一种方式的底层就是通过这种方式来实现的。
     * @return
     */

    @Bean
    public TopicExchange lazyExchange() {
        Map<String, Object> pros = new HashMap<>();
        // 设置交换机支持延迟消息推送
//        pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, truefalse, pros);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue() {
        return new Queue(LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding() {
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }

    /**
     * 创建初始化 RabbitAdmin 对象,这样才能创建(必须)
     * @param connectionFactory
     * @return
     */

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(lazyExchange());
        rabbitAdmin.declareQueue(lazyQueue());
        rabbitAdmin.declareBinding(lazyBinding());
        return rabbitAdmin;
    }
}

在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置参数内容传入交换机声明的方法中,因为exchange.setDelayed(true)的底层就是设置参数内容来实现的。

消息生产者

package com.itjing.rabbitmq.provider;

import com.itjing.rabbitmq.config.DelayMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author lijing
 * @date 2022年06月02日 19:31
 * @description 延迟队列消息发送者
 */

@Component
@Slf4j
public class DelayMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendLazy(Object message, Integer delayTime) {

        rabbitTemplate.setMandatory(true);

//        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            log.info("CorrelationData content : " + correlationData);
            log.info("Ack status : " + ack);
            log.info("Cause content : " + cause);
            if (ack) {
                log.info("消息成功发送");
            } else {
                log.info("消息发送失败:" + correlationData + ", 出现异常:" + cause);
            }
        }));

//        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routingKey) -> {
            log.info("被退回的消息为:{}", msg);
            log.info("replyCode:{}", replyCode);
            log.info("replyText:{}", replyText);
            log.info("exchange:{}", exchange);
            log.info("routingKey:{}", routingKey);
        });

        // id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData("12345678909" + new Date());

        // 发送消息时指定 header 延迟时间
        rabbitTemplate.convertAndSend(DelayMQConfig.LAZY_EXCHANGE, "lazy.boot", message,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // 设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        // message.getMessageProperties().setHeader("x-delay", "6000");
                        // 可以观察 setDelay(Integer i) 底层代码
                        // 也是在 header 中设置 x-delay,等同于手动设置 header
                        message.getMessageProperties().setDelay(delayTime);
                        return message;
                    }
                }, correlationData);
    }

}

发送消息时需要指定延迟推送的时间,这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

public void setDelay(Integer delay) {
    if (delay != null && delay >= 0) {
        this.headers.put("x-delay", delay);
    } else {
        this.headers.remove("x-delay");
    }

消息消费者

package com.itjing.rabbitmq.consumer;

import com.itjing.rabbitmq.config.DelayMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author lijing
 * @date 2022年06月02日 19:40
 * @description 延时队列消费者
 */

@Component
public class DelayMQReceiver {

    @RabbitListener(queues = DelayMQConfig.LAZY_QUEUE)
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("lazy receive " + new String(msg.getBody()));
    }
}

测试

package com.itjing.rabbitmq.controller;

import com.itjing.rabbitmq.provider.DelayMQSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author lijing
 * @date 2022年06月02日 19:46
 * @description 延迟队列控制器
 */

@RestController
@Slf4j
@RequestMapping("/delay")
public class DelayMQController {

    @Autowired
    private DelayMQSender mqSender;

    /**
     * 6 秒后收到了消息
     */

    @GetMapping("/send")
    public void sendLazy() {
        String msg = "hello spring boot";
        mqSender.sendLazy(msg, 6 * 1000);
    }
}

最终在 6 秒后收到了消息。



原文始发于微信公众号(程序员阿晶):SpringBoot+RabbitMQ实现消息延迟推送

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/19586.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!