背景
延时消息目前的实现目前有两种方式
-
旧版本的延时消息,仅支持固定时间的延时消息
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
-
5.0基于时间轮重新设计支持任意时间的延时消息
这里我们要讨论的还第一种延时消息
延时消息投递
延时消息投递后比如通过如下方式
message.setDelayTimeLevel(3);
仅需要对消息设置DelayTimeLevel
即可。本质上仅仅是加了一个DELAY
这个属性标识
延时消息查询
延时消息默认不是直接存储在我们发送的topic
中
举个简单例子
如果我们对xiaozou-topic
进行发送延时消息。
实际投递到的topic
是SCHEDULE_TOPIC_XXXX
然后后台会有一个定时线程池去扫描,如果发现到期了后就从SCHEDULE_TOPIC_XXXX
这个topic
取出消息重新投递到xiaozou-topic
这个topic
所以我们要查询延时消息需要去SCHEDULE_TOPIC_XXXX
这个topic
查询
如何知道延时消息是哪个topic的延时消息
如果我们通过SCHEDULE_TOPIC_XXXX
查询出来的消息实际只有body
并不能看出来这条消息属于哪个topic
可能比如线上延时消息太多了。我们想要抽查一些延时消息看看哪个topic
发送了大量延时消息,这时候该如何处理呢
实际我们知道延时消息的原始topic
保存在properties
中
所以我们可以通过构造DefaultMQAdminExt
去查询
比如我们使用如下代码
System.setProperty("rocketmq.namesrv.domain", "xiaozoujishu-nameserver.com:80");
System.setProperty("rocketmq.namesrv.domain.subgroup", "nsaddr-1");
DefaultMQAdminExt mqAdmin = new DefaultMQAdminExt(10000);
mqAdmin.setAdminExtGroup("xiaozou-jishu");
mqAdmin.start();
MessageExt messageExt = mqAdmin.viewMessage("SCHEDULE_TOPIC_XXXX", "AC00960B000A09B4740022AA7BD20331");
这时候我们查询出来的消息我们看看properties
属性
这里可以看到REAL_TOPIC
能看到topic
是%RETRY%
开头的
说明这条延时消息的原始topic
就是这个,说明这是一条重试消息。
还记得我们之前聊过MQ的重试机制就是依赖于延时消息吗
上面其实还能看到RETRY_TOPIC
可以看到重试前的topic
如果我们找一条正常的延时消息看看properties
这里可以看到相应的属性就少了几个
总结
老版本的延时消息默认是存储在SCHEDULE_TOPIC_XXXX
这个topic
,直接去原始topic
查询消息是查询不到的
然后查询到延时消息想通过官方dashboard
直接查看原始的topic
也是不支持的
如果我们想查看延时消息具体是哪个topic
的。我们可以通过viewMessage
方法获取MessageExt
对象,然后通过MessageExt
对象获取properties
,properties
中就有原始的topic
信息
原文始发于微信公众号(小奏技术):RocketMQ如何查询延时消息原始Topic
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/294406.html