目录
1、DelayExchange插件配置
1.1、下载DelayExchange插件
地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择对应版本进行下载,比如:rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
1.2、安装DelayExchange插件
将下载的文件拷贝到服务器,比如root
目录下;
docker环境操作:
docker搭建时,拷贝到容器的/opt/rabbitmq/plugins/中
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/
linux环境操作:
# 如果不是docker搭建,则拷贝到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.27/plugins/目录下
cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.27/plugins
1.3、启动DelayExchange插件
docker环境操作:
docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
linux环境操作:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2、使用原理
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
3、使用说明
声明一个交换机,交换机的类型可以是任意类型,通过@RabbitListener注解绑定交换机并且设定delayed属性为true即可,如下:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "queue_delay"),
exchange = @Exchange(name = "exchange_delay", delayed = "true"), key = "routing_delay"))
发送消息时,通过MessageBuilder组织消息,并且设置设置header的key为x-delay
value为TTL
,TTL单位为毫秒,如下:
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).setHeader("x-delay", 1000).build();
4、完整代码
Controller:
/**
* 测试延时消息
* 由于使用了延时队列,所有再回调方法中的setReturnCallback打印异常,可以通过if判断过虑延时队列使用的outingKey即可
*/
@PostMapping("/sendDelay")
public String sendDelay() {
String msg = "发送时间:" + LocalDateTime.now() + ",延时:";
for (int i = 1; i <= 10; i++) {
String sendMsg = msg + i + "秒";
long time = i * 1000;
rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DELAY, RabbitmqContext.ROUTING_DELAY, sendMsg, message -> {
//注意这里时间可以使long,而且是设置header
message.getMessageProperties().setHeader("x-delay", time);
return message;
});
}
return "发送成功...";
}
Config:
@Configuration
public class DelayConfig {
/**
* 配置队列名称
*
* @return
*/
@Bean
public Queue delayQueue() {
return new Queue(RabbitmqContext.QUEUE_DELAY);
}
/**
* 延时队列可以配置任意类型的交换机
*
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
// x-delayed-message 固定写法
return new CustomExchange(RabbitmqContext.EXCHANGE_DELAY, "x-delayed-message", true, false, args);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitmqContext.ROUTING_DELAY).noargs();
}
}
Listener:
@Component
@Slf4j
public class DelayReceiveListener {
/**
* 设置延时队列
*
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "delay_queue")
public void receiveQueue(String msg, Channel channel, Message message) {
System.out.println("这个是延时消息:" + msg + ",当前时间:" + LocalDateTime.now());
}
}
5、测试效果
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18138.html