RocketMQ 延时消息的使用和延时级别的配置

导读:本篇文章讲解 RocketMQ 延时消息的使用和延时级别的配置,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. 延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2. 延时消息的使用限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码 SendMessageProcessor.java

3. 配置延时级别

在服务器端(rocketmq-broker端)的属性配置文件中加入如下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各级别与延时时间的对应映射关系。

  1. 这个配置项配置了从1级开始,各级延时的时间,能够修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。

4. 延时消息样例

4.1 启动消费者等待传入订阅消息


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // 实例化消费者
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      // 订阅Topics
      consumer.subscribe("TestTopic", "*");
      // 注册消息监听者
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // 启动消费者
      consumer.start();
  }
}

4.2 发送延时消息


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 启动生产者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 发送消息
          producer.send(message);
      }
       // 关闭生产者
      producer.shutdown();
  }
}

4.3 Springboot 延时消息样例

  1. 发送延时消息
rocketmq.producer.topic.test=topic_test
rocketmq.producer.tag.simple=simple
rocketmq.producer.tag.delay=delay
/**
 * 发送延时消息
 *
 * @param topic
 * @param orderAutoClose
 */
public void sendDelayMsg(String topic, OrderAutoClose orderAutoClose) {
    Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(orderAutoClose)).build();

    // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    // 指定发送超时时间(毫秒)和延迟等级
    this.rocketMQTemplate.syncSend(topic, message, 1000, 3);
}
@Value("${rocketmq.producer.topic.test}")
private String topic;
@Value("${rocketmq.producer.tag.delay}")
private String delayTag;
    
@Test
void sendDelayMsg() {
    OrderAutoClose orderAutoClose = new OrderAutoClose();
    orderAutoClose.setOrderNo("D202111231023141054334");
    orderAutoClose.setCreateTime(LocalDateTime.now());

    this.rocketmqSimpleProducer.sendDelayMsg(topic + ":" + delayTag, orderAutoClose);

    log.info("发送延时消息, end...");
}
  1. 消费者
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.topic.test=topic_test
rocketmq.consumer.tag.delay=delay
rocketmq.consumer.group.delay=group_${spring.profiles.active}_delay
/**
 * 延时消息监听
 *
 * @author gaoyang
 * @date 2021-12-08 10:04
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",
        selectorExpression = "${rocketmq.consumer.tag.delay}",
        consumerGroup = "${rocketmq.consumer.group.delay}")
public class RocketmqDelayListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        OrderAutoClose orderAutoClose = JSON.parseObject(msg, OrderAutoClose.class);

        log.info("RocketmqDelayListener - orderNo: {}, createTime: {}", orderAutoClose.getOrderNo(), orderAutoClose.getCreateTime());
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5414.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!