前言
如今我们用的软件都有消息延迟推送,应用比较广泛,比如:
-
淘宝签收后7天自动确认收货,在签收购买的商品后,物流系统会在7天后延时发送一个消息给支付系统,告诉支付系统将钱打给商家,这个过程持续7天,就是使用了消息中间件的延迟推送。 -
12306购票支付确认页面,在选好票点击确定跳转的页面中一般都有倒计时,代表30分钟订单不确认的话将自动取消订单。其实系统在下订单那一刻就发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果在30分钟内完成订单了,则可以通过逻辑代码判断忽略收到的消息。
使用延时消息我们有这样几种解决方案:
-
使用Redis给订单设置过期时间,最后通过判断Redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息延迟推送性能较低,因为Redis存于内存中,遇到恶意下单的或者刷单的话,无疑会给内存带来巨大压力。 -
使用传统数据库轮询来判断数据库表订单的状态,这无疑增加了IO次数,数据库压力大,性能极低。 -
使用JVM原生的DelayQueue,这也是大量占用内存,并且还没有持久化策略,系统宕机或者重启的话会丢失订单信息。
消息延迟推送的实现
插件
在 RabbitMQ 3.6.x 之前一般采用死信队列 + TTL过期时间来实现延迟队列,这里不做过多介绍。
在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。
去官网下载延时队列插件:https://www.rabbitmq.com/community-plugins.html ,然后将其放置到RabbitMQ 根目录下的 plugins 下。
使用命令启用插件:
# linux
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# windows 进入sbin目录
rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
配置类
首先创建配置类,创建交换机和消息队列,并绑定关系。
package com.itjing.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author lijing
* @date 2022年06月02日 19:12
* @description 延迟队列配置
* 去官网下载延时队列插件:https://www.rabbitmq.com/community-plugins.html
* 使用命令启用插件:
* linux:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
* windows:进入sbin目录,执行 rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
*/
@Configuration
public class DelayMQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
/**
* 在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,
* 也可以设置参数内容传入交换机声明的方法中,
* 因为第一种方式的底层就是通过这种方式来实现的。
* @return
*/
@Bean
public TopicExchange lazyExchange() {
Map<String, Object> pros = new HashMap<>();
// 设置交换机支持延迟消息推送
// pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue() {
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
/**
* 创建初始化 RabbitAdmin 对象,这样才能创建(必须)
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(lazyExchange());
rabbitAdmin.declareQueue(lazyQueue());
rabbitAdmin.declareBinding(lazyBinding());
return rabbitAdmin;
}
}
在 Exchange 的声明中可以设置exchange.setDelayed(true)
来开启延迟队列,也可以设置参数内容传入交换机声明的方法中,因为exchange.setDelayed(true)
的底层就是设置参数内容来实现的。
消息生产者
package com.itjing.rabbitmq.provider;
import com.itjing.rabbitmq.config.DelayMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author lijing
* @date 2022年06月02日 19:31
* @description 延迟队列消息发送者
*/
@Component
@Slf4j
public class DelayMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendLazy(Object message, Integer delayTime) {
rabbitTemplate.setMandatory(true);
// rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
log.info("CorrelationData content : " + correlationData);
log.info("Ack status : " + ack);
log.info("Cause content : " + cause);
if (ack) {
log.info("消息成功发送");
} else {
log.info("消息发送失败:" + correlationData + ", 出现异常:" + cause);
}
}));
// rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routingKey) -> {
log.info("被退回的消息为:{}", msg);
log.info("replyCode:{}", replyCode);
log.info("replyText:{}", replyText);
log.info("exchange:{}", exchange);
log.info("routingKey:{}", routingKey);
});
// id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("12345678909" + new Date());
// 发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend(DelayMQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// message.getMessageProperties().setHeader("x-delay", "6000");
// 可以观察 setDelay(Integer i) 底层代码
// 也是在 header 中设置 x-delay,等同于手动设置 header
message.getMessageProperties().setDelay(delayTime);
return message;
}
}, correlationData);
}
}
发送消息时需要指定延迟推送的时间,这里在发送消息的方法中传入参数 new MessagePostProcessor()
是为了获得 Message
对象,因为需要借助 Message
对象的api 来设置延迟时间。
可以观察 setDelay(Integer i)
底层代码,也是在 header 中设置 x-delay。等同于手动设置 header
message.getMessageProperties().setHeader("x-delay", "6000");
public void setDelay(Integer delay) {
if (delay != null && delay >= 0) {
this.headers.put("x-delay", delay);
} else {
this.headers.remove("x-delay");
}
消息消费者
package com.itjing.rabbitmq.consumer;
import com.itjing.rabbitmq.config.DelayMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author lijing
* @date 2022年06月02日 19:40
* @description 延时队列消费者
*/
@Component
public class DelayMQReceiver {
@RabbitListener(queues = DelayMQConfig.LAZY_QUEUE)
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
}
测试
package com.itjing.rabbitmq.controller;
import com.itjing.rabbitmq.provider.DelayMQSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author lijing
* @date 2022年06月02日 19:46
* @description 延迟队列控制器
*/
@RestController
@Slf4j
@RequestMapping("/delay")
public class DelayMQController {
@Autowired
private DelayMQSender mqSender;
/**
* 6 秒后收到了消息
*/
@GetMapping("/send")
public void sendLazy() {
String msg = "hello spring boot";
mqSender.sendLazy(msg, 6 * 1000);
}
}
最终在 6 秒后收到了消息。
原文始发于微信公众号(程序员阿晶):SpringBoot+RabbitMQ实现消息延迟推送
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/19586.html