1. 延时消息的使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
2. 延时消息的使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码 SendMessageProcessor.java
3. 配置延时级别
在服务器端(rocketmq-broker端)的属性配置文件中加入如下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
- 这个配置项配置了从1级开始,各级延时的时间,能够修改这个指定级别的延时时间;
- 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
- 默认值就是上面声明的,可手工调整;
- 默认值已够用,不建议修改这个值。
4. 延时消息样例
4.1 启动消费者等待传入订阅消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// 订阅Topics
consumer.subscribe("TestTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
4.2 发送延时消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
4.3 Springboot 延时消息样例
- 发送延时消息
rocketmq.producer.topic.test=topic_test
rocketmq.producer.tag.simple=simple
rocketmq.producer.tag.delay=delay
/**
* 发送延时消息
*
* @param topic
* @param orderAutoClose
*/
public void sendDelayMsg(String topic, OrderAutoClose orderAutoClose) {
Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(orderAutoClose)).build();
// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
// 指定发送超时时间(毫秒)和延迟等级
this.rocketMQTemplate.syncSend(topic, message, 1000, 3);
}
@Value("${rocketmq.producer.topic.test}")
private String topic;
@Value("${rocketmq.producer.tag.delay}")
private String delayTag;
@Test
void sendDelayMsg() {
OrderAutoClose orderAutoClose = new OrderAutoClose();
orderAutoClose.setOrderNo("D202111231023141054334");
orderAutoClose.setCreateTime(LocalDateTime.now());
this.rocketmqSimpleProducer.sendDelayMsg(topic + ":" + delayTag, orderAutoClose);
log.info("发送延时消息, end...");
}
- 消费者
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.topic.test=topic_test
rocketmq.consumer.tag.delay=delay
rocketmq.consumer.group.delay=group_${spring.profiles.active}_delay
/**
* 延时消息监听
*
* @author gaoyang
* @date 2021-12-08 10:04
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",
selectorExpression = "${rocketmq.consumer.tag.delay}",
consumerGroup = "${rocketmq.consumer.group.delay}")
public class RocketmqDelayListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
OrderAutoClose orderAutoClose = JSON.parseObject(msg, OrderAutoClose.class);
log.info("RocketmqDelayListener - orderNo: {}, createTime: {}", orderAutoClose.getOrderNo(), orderAutoClose.getCreateTime());
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5414.html