重试队列和死信队列,都是在触发时给当前消费者组自动创建一个对应的队列,不需要手动创建
死信队列
什么是死信队列
当一条消息初次消费失败,消息队列会自动进行消费重试;达到大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中;这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。
死信队列的特征
1、死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
2、死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除
3、死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列
4、如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列
死信队列的处理
1、实际上,当⼀条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在Bug。
2、因此,对于死信消息,通常需要开发人员进行特殊处理;关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费。
重试队列
消费端一直不回传消费结果,MQ认为消息没有收到,Consumer下一次拉取,Broker依然会发送该消息,所以,任何异常都要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这样MQ会将消息放到重试队列;
重试队列名称为:%RETRY%+consumergroup
Broker配置如下:
messageDelayLevel=5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s(测试使用)
默认配置为:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
测试demo
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RetryConsumer");
consumer.setNamesrvAddr(ExampleConstant.NAMESERVER);
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("TopicTest", "");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.printf(df.format(new Date()) + ", %s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
测试结果
重试队列、死信队列的自动创建
消息在重试队列消费16次(默认次数)后,进入死信队列,名为:%DLQ%+consumergroup
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/105997.html