-
一、什么是延迟消息
-
二、延迟消息实现方法
-
2-1、基于插件实现
-
2-2、基于死信队列实现
-
2-3、基于阿里云的RabbitMQ
-
三、代码实现
-
3-1、插件实现
-
3-2、死信队列实现
-
3-3、阿里云版
之前一直没使用过RabbitMQ,最近有一个需求需要用到延迟消息,就简单的使用了一下,发现还是有蛮多坑的。
此篇文章只是RabbitMQ延迟消息相关内容,至于安装RabbitMQ等其它操作,参考百度。
一、什么是延迟消息
顾问思议所谓延迟消息就是延迟消息!!!
二、延迟消息实现方法
在RabbitMQ中实现延迟消息有下面几种方式。
2-1、基于插件实现
原生的RabbitMQ是不支持延迟消息的,我们可以先在MQ上安装一个插件然后再发送延迟消息。
优点:发送延迟消息比较简单,安装之后就相当于MQ支持了延迟消息
缺点:要安装插件
2-2、基于死信队列实现
可以理解成,给A队列发送一个设置了过期时间的消息(过期分为消息过期和队列过期),但是不给A队列设置消费者,这个A队列就是死信队列(其实就是一个普通队列,不给它设置消费者而已)。
等到了时间还没有被消费,这个消息就会被投递到配置的队列,我们可以给这个队列设置一个消费者,然后去消费这个消息就可以达到延迟的目的。
绿色的线表示正常的消息流程,红色表示投递到死信队列流程
注:
-
在创建A队列的时候需要给它设置死信交换机和死信路由key(如果不设置,消息无法消费它怎么知道投递给哪个队列呢?) -
当消息和队列同时设置了过期时间,当以最小的为准
优点:听起来牛X,不需要安装插件
缺点:写起来复杂,还浪费一个队列
2-3、基于阿里云的RabbitMQ
一般都是在自己的服务器上去安装,但有的公司也会去买阿里云的RabbitMQ
阿里云版的除了上述的两种还额外的支持了延迟消息,你只要设置消息头即可
官方文档 https://help.aliyun.com/document_detail/148083.html#section-rpd-mjh-kee
优点:什么都不用做,直接用
缺点:要钱
三、代码实现
3-1、插件实现
3-1-1、安装插件
自行百度,如果你没有安装插件,使用这个延迟类型的交换机会报错
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
3-1-2、配置延迟交换机
// 配置延时交换器
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay_exchange_name", "x-delayed-message",true, false,args);
}
3-1-3、发送延迟消息
消息都是一样的,在下面演示
3-2、死信队列实现
插件和阿里云的实现方式都很简单,一个是在配置交换机的时候设置一个属性,一个是直接发消息的时候设置过期时间,就不把全部的代码写出来了。
但是死信队列相对来说复杂一些,这里就把全部的代码罗列出来帮助你理解。
3-2-1、CityMapQueueConfig
队列、交换机、队列和交换机绑定配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CityMapQueueConfig {
// 定义普通队列和死信队列的名字
public static final String CITY_MAP_QUEUE = "city_map_queue";
public static final String CITY_MAP_DEAD_QUEUE = "city_map_dead_queue";
// 定义普通交换机和死信交换机的名字
public static final String CITY_MAP_DEAD_NAME = "city_map_dead_exchange";
public static final String CITY_MAP_NAME = "city_map_exchange";
// 定义普通路由key和死信路由key的名字
public static final String CITY_MAP_ROUTE_KEY = "city_map_route_key";
public static final String CITY_MAP_ROUTE_DEAD_KEY = "city_map_route_dead_key";
// 创建普通队列
@Bean
public Queue cityMapQueue() {
return QueueBuilder
.durable(CITY_MAP_QUEUE)
.build();
}
// 创建死信队列
@Bean
public Queue cityMapDeadQueue() {
return QueueBuilder
.durable(CITY_MAP_DEAD_QUEUE)
// 给死信队列配置 死信交换机 和 死信路由key
.withArgument("x-dead-letter-exchange", CITY_MAP_NAME)
.withArgument("x-dead-letter-routing-key", CITY_MAP_ROUTE_KEY)
// 设置队列过期时间
// .withArgument("x-message-ttl", 1000 * 15)
.build();
}
// 创建普通交换机
@Bean
public Exchange exchange() {
return ExchangeBuilder
.topicExchange(CITY_MAP_NAME)
.durable(true)
.build();
}
// 创建死信交换机
@Bean
public Exchange deadExchange() {
return ExchangeBuilder
.topicExchange(CITY_MAP_DEAD_NAME)
.durable(true)
.build();
}
// 绑定死信交换机和队列
@Bean
public Binding deadBinding() {
return BindingBuilder
.bind(cityMapDeadQueue())
.to(deadExchange())
.with(CITY_MAP_ROUTE_DEAD_KEY)
.and(null);
}
// 绑定普通交换机和队列
@Bean
public Binding binding() {
return BindingBuilder
.bind(cityMapQueue())
.to(exchange())
.with(CITY_MAP_ROUTE_KEY)
.and(null);
}
}
3-2-2、CityMapConsumer
消费者,只是需要消费一个队列就好了,死信队列无需设置消费者
import cn.ideamake.yxproperty.docking.mq.config.CityMapQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 城市地图消费者
*
* @author 陶梓洋
* @date 2022/03/22
*/
@Component
@Slf4j
public class CityMapConsumer {
@RabbitListener(queues = CityMapQueueConfig.CITY_MAP_QUEUE)
@RabbitHandler
public void handler(Message message) {
log.info("接受到MQ异步消息");
// TODO
}
}
3-2-3、RabbitMqProvider
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Component
@Slf4j
public class RabbitMqProvider {
@Resource
private AmqpTemplate rabbitMqTemplate;
/**
* 发送延迟消息
* @param exchange 死信交换机
* @param routerKey 死信路由key
* @param jsonParams json格式的消息
* @param messageId 唯一消息id
* @param delay 延迟时间 单位秒
*/
public void sendMessage(String exchange, String routerKey, String jsonParams,String messageId, int delay) {
log.info("mq发送异步延迟消息:{}", jsonParams);
if (StringUtils.isNotBlank(exchange) && StringUtils.isNotBlank(routerKey)) {
delay = delay * 1000;
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setMessageId(messageId)
.setExpiration(String.valueOf(delay))
.build();
rabbitMqTemplate.convertAndSend(exchange,
routerKey,
MessageBuilder.withBody(jsonParams.getBytes(StandardCharsets.UTF_8)).andProperties(props).build());
} else {
log.warn("参数不完整,未发送MQ消息的,exchange:{},routerKey:{}", exchange, routerKey);
}
}
}
3-2-4、使用
@Resource
private RabbitMqProvider rabbitMqProvider;
rabbitMqProvider.sendMessage(CityMapQueueConfig.CITY_MAP_DEAD_NAME, CityMapQueueConfig.CITY_MAP_ROUTE_DEAD_KEY,JSONObject.toJSONString(callTmpTaskDTO),callTmpTaskDTO.getCityMapTaskDetail().getTaskItemNo(),60);
3-3、阿里云版
直接在消息里面设置一个请求头就好了
MessagePropertiesBuilder.newInstance()
.setMessageId(messageId)
.setHeader("delay", l + "")
.build();
原文始发于微信公众号(小道仙97):RabbitMQ实现延迟消息【死信队列实现、插件实现】
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41301.html