RabbitMQ学习二消息的可靠性传递与延迟队列

导读:本篇文章讲解 RabbitMQ学习二消息的可靠性传递与延迟队列,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

消息的可靠性传递+消费

⽣产者
–>
交换机
->
队列
->
消费者
确保消息在这几个过程中正常传递
生产者到交换机:通过confirmCallback
交换机到队列:通过returnCallback
建议:开启了消息确认机制,保证了消息的准确送达,但是由于频繁的确认交互,整体效率变低,
           因此实际生产者并不建议开启用消息确认机制

开发实战

 springboot依赖版本

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.4.3</version>
</dependency>

配置文件

spring:
  rabbitmq:
    host: IP地址
    port: 5672
    #自己创建的虚拟主机
    virtual-host: dev
    username: admin
    password: password
    #开启二次确认  生产者到broker的交换机
    publisher-confirm-type: correlated
    #开启二次确认  交换机到队列的可靠性投递
    publisher-returns: true
    #为true,如果交换机处理消息路由到队列失败,则会返回给生产者
    template:
      mandatory: true
    #消息者消费消息确认  手动ACK
    listener:
      simple:
        acknowledge-mode: manual

创建交换机,队列,以及交换机与队列的绑定关系

@Configuration
public class TopicRabbitMqConfig {

    public static final String EXCHANGE_NAME = "order-exchange";

    public static final String QUEUE_NAME = "order-queue";

    /**
     * TOPIC交换机
     * @return
     */
    @Bean("exchange")
    public Exchange orderExchange(){
        //是否持久化 是
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    
    /**
     * 队列
     * @return
     */
    @Bean("queue")
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 交换机与队列绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

confirmCallback与returnCallback测试

@Test
    void testConfirmCallback(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData  配置
             * @param b  交换机是否接收到消息 true是成功   false是失败
             * @param s   失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("correlationData============"+correlationData);
                System.out.println("b============"+b);
                System.out.println("s============"+s);
                if (b){
                    System.out.println("发送成功");
                    //成功 更新数据库 消息的状态为发送成功  TODO
                }else {
                    System.out.println("发送失败,记录日志或插入数据库");
                    //失败 更新数据库 消息的状态为发送失败  TODO
                }
            }
        });

        rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"order.new","新订单");
        //不存在的交互机,模拟消息不能发送到交换机
        //rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME+"iii","order.new","新订单");

    }
    @Test
    void testReturnCallback(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                //交换机处理消息路由到队列失败才会触发此步骤  可记录日志或插入数据库
                System.out.println("交换机处理消息路由到队列失败");
                int code = returnedMessage.getReplyCode();
                System.out.println("code======="+code);
                System.out.println("message========"+returnedMessage);
            }
        });
        //rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"order.new","新订单");
        //无效的routingkey 测试失败情况
        rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"xxx.order.new","新订单");
    }

消费者确认消费消息(rabbitMQ的ACK机制)

消费者从rabbtiMQ收到消息并处理完成后,反馈给rabbitMQ,rabbitMQ收到反馈后才将消息从队列中删除。消费者在处理消息时出现网络不稳定,服务器异常等情况,那么不会有ACK反馈,rabbitMQ认为这个消息没有被正常消费,会将消息重新放入队列,只有当消费者正确的发送ACK反馈,rabbitMQ确认收到后,消息才会从队列中删除,消息的ACK机制是默认打开的。

确认方式:自动(默认),手动manual

RabbitMQ学习二消息的可靠性传递与延迟队列

 开发实战

@Component
@RabbitListener(queues = "order-queue")
public class OrderMqListener {
    @RabbitHandler
    public void messageHandle(Message message, String body, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("tag=" + tag);
        System.out.println("message=" + message.toString());
        System.out.println("body=" + body);
        //告诉broker消息已经被确认
        channel.basicAck(tag, false);
        //true表示重新入队列,会被重复消费,可以设置一个重复消费的最大次数,超过可插入数据库,业务排查原因
        //channel.basicNack(tag,false,true);
    }
}

死信队列+延迟队列

死信队列:没有被及时消费的信息存放的队列

死信交换机  dead letter exchange:当消息成为死信后,会被重新放送到另一个交换机,这个交换机就是死信交换机

RabbitMQ学习二消息的可靠性传递与延迟队列

消息怎么成为死信消息 ?

        消费者拒收消息
basic.reject/ basic.nack

,并且没有重新⼊队 requeue=false
        消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
        队列的消息⻓度达到极限

延迟队列:⼀种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望

        这条消息⽴⻢投递,⽽是推迟到在当前时间点之后的某⼀个时间投递到Consumer 进⾏消费,

        该消息即定时消息

使用场景:电商交易中提交订单后30分钟未支付,需关闭订单

实战开发

要求商家注册成功后10秒内发布商品

 springboot依赖版本

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.4.3</version>
</dependency>

配置文件

spring:
  rabbitmq:
    host: IP地址
    port: 5672
    #自己创建的虚拟主机
    virtual-host: dev
    username: admin
    password: password
    #消息者消费消息确认  手动ACK
    listener:
      simple:
        acknowledge-mode: manual

创建交换机、队列、交换机与队列的绑定关系

/**
 * 普通队列->死信交换机->死信队列
 */
@Configuration
public class RabbitMqConfig {
    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    /**
     * 进入死信队列路由健
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";

    /**
     * 创建死信交换机
     *
     * @return
     */
    @Bean
    public Exchange lockMerchantDeadExchange() {
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     *
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 绑定死信队列和死信交换机
     *
     * @return
     */
    @Bean
    public Binding lockMerchantBinding() {
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
    }

    /**
     * 普通队列,绑定死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    /**
     * 普通topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    /**
     * 路由可以
     */
    public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant_routing_key";

    /**
     * 创建普通交换机
     *
     * @return
     */
    @Bean
    public Exchange newMerchantExchange() {
        return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
    }

    /**
     * 创建普通队列
     *
     * @return
     */
    @Bean
    public Queue newMerchantQueue() {
        HashMap<String, Object> args = new HashMap<>(3);
        //过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //过期后,进入死信交换机的路由key
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
        //过期时间 10秒
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定普通队列和普通交换机
     *
     * @return
     */
    @Bean
    public Binding newMerchantBinding() {
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null);
    }
}

发送消息

@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/check")
    public Object check() {
        rabbitTemplate.convertAndSend(RabbitMqConfig.NEW_MERCHANT_EXCHANGE, RabbitMqConfig.NEW_MERCHANT_ROUTING_KEY, "商家通过验证");
        Map<String, Object> map = new HashMap<>();
        map.put("code", "0");
        map.put("msg", "请于10秒内上传商品");
        return map;
    }
}

监听消息,只需监听死信队列

@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class OrderMqListener {
    @RabbitHandler
    public void messageHandle(Message message, String body, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        System.out.println("tag=" + tag);
        System.out.println("message=" + message.toString());
        System.out.println("body=" + body);
        //告诉broker消息已经被确认
        channel.basicAck(tag, false);
        //true表示重新入队列,会被重复消费,可以设置一个重复消费的最大次数,超过可插入数据库,业务排查原因
        //channel.basicNack(tag,false,true);
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/1274.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!