RabbitMQ 消息可靠性传递

导读:本篇文章讲解 RabbitMQ 消息可靠性传递,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1 简介

什么是消息的可靠性投递,保证消息百分百发送到消息队列中去,保证mq节点成功接受消息。消息发送端需要接受到mq服务端接受到消息的确认应答。完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理。
RabbitMQ消息投递路径:⽣产者–>交换机->队列->消费者。通过两个的点控制消息的可靠性投递,
⽣产者到交换机,通过confirmCallback。交换机到队列,通过returnCallback。

建议: 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真⼼不建议⽤消息确认机制。

2 SpringBoot 集成 RabbitMq

在这里插入图片描述

2.1 pom

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--公用包-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.0.RELEASE</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.2 application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.38.80
    port: 5672
    username: guest
    password: guest
    virtual-host: /

2.3 RabbitMqConsts

public final class RabbitMqConsts {

    private RabbitMqConsts() {

    }


    public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";

    public static final String DIRECT_QUEUE = "direct_queue";

    public static final String ROUTING_KEY = "direct_routing_key";


}

2.4 RabbitMqConfig

@Configuration
public class RabbitMqConfig {



    /**
     *  direct交换机
     */
    @Bean
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange(RabbitMqConsts.DIRECT_EXCHANGE_NAME).durable(true).build();
    }

    /**
     *  队列
     */
    @Bean
    public Queue directQueue() {
        return QueueBuilder.durable(RabbitMqConsts.DIRECT_QUEUE).build();
    }


    /**
     *  绑定关系
     */
    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(RabbitMqConsts.ROUTING_KEY).noargs();
    }

}

2.5 RabbitmqConsumer

@Component
@RabbitListener(queues = RabbitMqConsts.DIRECT_QUEUE)
@Slf4j
public class RabbitmqConsumer {


    @RabbitHandler
    public void receiveMessage(String body, Message message) {
        log.info("receive msgTag: {}", message.getMessageProperties().getDeliveryTag());
        log.info("receive body: {}", body);
    }

}

2.6 ProviderController

@RestController
@RequestMapping("/provider")
public class ProviderController {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("/send")
    public String send() {
        String message = "Hello SpringBoot RabbitMq";
        rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
        return "发送成功";
    }

}

2.7 启动

@SpringBootApplication
public class RabbitmqApplication {

    public static void main(String[] args) {

        SpringApplication.run(RabbitmqApplication.class);

    }

}

2.8 测试

在这里插入图片描述
在这里插入图片描述

3 confirmCallback

描述: ⽣产者到交换机,通过confirmCallback,⽣产者投递消息后,如果Broker收到消息后,会给⽣产者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否正常发送到Broker,这种⽅式是消息可靠性投递的核⼼。

3.1 配置

描述: application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.38.80
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #开启消息二次确认,生产者到broker的交换机4
    publisher-confirm-type: correlated

3.2 Controller

@RestController
@RequestMapping("/provider")
@Slf4j
public class ProviderController {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("/send")
    public String send() {
        String message = "Hello SpringBoot RabbitMq";
        rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
        return "发送成功";
    }


    @GetMapping("/send-failed")
    public String sendFailed() {
        String message = "Hello SpringBoot RabbitMq";
        rabbitTemplate.convertAndSend("no-exchange",RabbitMqConsts.ROUTING_KEY,message);
        return "发送失败测试";
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
            log.info("ack:{}", ack);
            log.info("cause:{}", cause);
            if (ack) {
                log.info("发送成功");
            } else {
                log.info("发送失败,记录数据库");
            }
        });
    }

}

3.3 测试

描述: 调用接口
http://localhost:8080/provider/send
http://localhost:8080/provider/send-failed
在这里插入图片描述

4 returnCallback

交换机到队列,通过returnCallback,消息从交换器发送到对应队列失败时触发,两种模式:

交换机到队列不成功,则丢弃消息(默认)
交换机到队列不成功,返回给消息⽣产者,触发

4.1 配置

application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.38.80
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #开启消息二次确认,生产者到broker的交换机4
    publisher-confirm-type: correlated
    #消息从exchange->queue 回调
    publisher-returns: true
    #抵达queue时,以异步发送优先回调
    template:
      mandatory: true

4.2 ProviderController

@RestController
@RequestMapping("/provider")
@Slf4j
public class ProviderController {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("/send")
    public String send() {
        String message = "Hello SpringBoot RabbitMq";
        rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
        return "发送成功";
    }


    @GetMapping("/send-failed")
    public String sendFailed() {
        String message = "Hello SpringBoot RabbitMq";
        rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME,"failed",message);
        return "发送失败测试";
    }



    @PostConstruct
    public void initRabbitTemplate() {

        //第一阶段,消息抵达到broker,回调。回调参数:消息id,ack:消息是否成功收到,cause:失败原因
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
            if (ack) {
                log.info("发送交换机成功");
            } else {
                log.info("发送交换机失败,记录数据库");
            }
        });

        //第二阶段,如果消息没有抵达到queue,那么回调。
        //message:投递失败的消息,replyCode:回复的状态码,replyText:回复的文本内容,exchange:交换机,routingKey:路由键
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
            log.info("message: {},replyCode: {},replyText: {},exchange: {}, routingKey: {}",message,replyCode,replyText,exchange,routingKey);
        });

    }

}

4.3 测试

访问接口:http://localhost:8080/provider/send-failed
在这里插入图片描述
在这里插入图片描述

5 消费者确认

5.1 简介

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列
中删除。消费者在处理消息出现了⽹络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中。只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked。

5.2 开启手动确认

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.38.80
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #开启消息二次确认,生产者到broker的交换机4
    publisher-confirm-type: correlated
    #消息从exchange->queue 回调
    publisher-returns: true
    #抵达queue时,以异步发送优先回调
    template:
      mandatory: true

    listener:
      simple:
        #手动确认
        acknowledge-mode: manual


5.3 RabbitmqConsumer

描述: 统计消费次数,如果次数大于3,那么入库。

@Component
@RabbitListener(queues = RabbitMqConsts.DIRECT_QUEUE)
@Slf4j
public class RabbitmqConsumer {


    private Map<String, Integer> map = new HashMap<>();


    @RabbitHandler
    public void receiveMessage(Message message, String body, Channel channel) throws IOException {


        //获取访问次数
        int counts = 1;
        if (map.containsKey(body)) {
            counts = map.get(body) + 1;
        }
        map.put(body, counts);

        //访问超过3次,签收入库
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (counts <= 3) {
            log.info("【拒绝】消息:{}", body);
            //拒绝接受消息并且重新入队
            channel.basicNack(deliveryTag, false, true);
        } else {
            log.info("记录数据库");
            channel.basicAck(deliveryTag, false);
        }


    }

}

5.4 测试

调用接口:http://localhost:8080/provider/send
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15077.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!