1 简介
什么是消息的可靠性投递,保证消息百分百发送到消息队列中去,保证mq节点成功接受消息。消息发送端需要接受到mq服务端接受到消息的确认应答。完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理。
RabbitMQ消息投递路径:⽣产者–>交换机->队列->消费者。通过两个的点控制消息的可靠性投递,
⽣产者到交换机,通过confirmCallback。交换机到队列,通过returnCallback。
建议: 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真⼼不建议⽤消息确认机制。
2 SpringBoot 集成 RabbitMq
2.1 pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--日志-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--公用包-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2.2 application.yml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.38.80
port: 5672
username: guest
password: guest
virtual-host: /
2.3 RabbitMqConsts
public final class RabbitMqConsts {
private RabbitMqConsts() {
}
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
public static final String DIRECT_QUEUE = "direct_queue";
public static final String ROUTING_KEY = "direct_routing_key";
}
2.4 RabbitMqConfig
@Configuration
public class RabbitMqConfig {
/**
* direct交换机
*/
@Bean
public Exchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMqConsts.DIRECT_EXCHANGE_NAME).durable(true).build();
}
/**
* 队列
*/
@Bean
public Queue directQueue() {
return QueueBuilder.durable(RabbitMqConsts.DIRECT_QUEUE).build();
}
/**
* 绑定关系
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(RabbitMqConsts.ROUTING_KEY).noargs();
}
}
2.5 RabbitmqConsumer
@Component
@RabbitListener(queues = RabbitMqConsts.DIRECT_QUEUE)
@Slf4j
public class RabbitmqConsumer {
@RabbitHandler
public void receiveMessage(String body, Message message) {
log.info("receive msgTag: {}", message.getMessageProperties().getDeliveryTag());
log.info("receive body: {}", body);
}
}
2.6 ProviderController
@RestController
@RequestMapping("/provider")
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
String message = "Hello SpringBoot RabbitMq";
rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
return "发送成功";
}
}
2.7 启动
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class);
}
}
2.8 测试
3 confirmCallback
描述: ⽣产者到交换机,通过confirmCallback,⽣产者投递消息后,如果Broker收到消息后,会给⽣产者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否正常发送到Broker,这种⽅式是消息可靠性投递的核⼼。
3.1 配置
描述: application.yml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.38.80
port: 5672
username: guest
password: guest
virtual-host: /
#开启消息二次确认,生产者到broker的交换机4
publisher-confirm-type: correlated
3.2 Controller
@RestController
@RequestMapping("/provider")
@Slf4j
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
String message = "Hello SpringBoot RabbitMq";
rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
return "发送成功";
}
@GetMapping("/send-failed")
public String sendFailed() {
String message = "Hello SpringBoot RabbitMq";
rabbitTemplate.convertAndSend("no-exchange",RabbitMqConsts.ROUTING_KEY,message);
return "发送失败测试";
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
log.info("ack:{}", ack);
log.info("cause:{}", cause);
if (ack) {
log.info("发送成功");
} else {
log.info("发送失败,记录数据库");
}
});
}
}
3.3 测试
描述: 调用接口
http://localhost:8080/provider/send
http://localhost:8080/provider/send-failed
4 returnCallback
交换机到队列,通过returnCallback,消息从交换器发送到对应队列失败时触发,两种模式:
交换机到队列不成功,则丢弃消息(默认)
交换机到队列不成功,返回给消息⽣产者,触发
4.1 配置
application.yml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.38.80
port: 5672
username: guest
password: guest
virtual-host: /
#开启消息二次确认,生产者到broker的交换机4
publisher-confirm-type: correlated
#消息从exchange->queue 回调
publisher-returns: true
#抵达queue时,以异步发送优先回调
template:
mandatory: true
4.2 ProviderController
@RestController
@RequestMapping("/provider")
@Slf4j
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
String message = "Hello SpringBoot RabbitMq";
rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME, RabbitMqConsts.ROUTING_KEY, message);
return "发送成功";
}
@GetMapping("/send-failed")
public String sendFailed() {
String message = "Hello SpringBoot RabbitMq";
rabbitTemplate.convertAndSend(RabbitMqConsts.DIRECT_EXCHANGE_NAME,"failed",message);
return "发送失败测试";
}
@PostConstruct
public void initRabbitTemplate() {
//第一阶段,消息抵达到broker,回调。回调参数:消息id,ack:消息是否成功收到,cause:失败原因
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
if (ack) {
log.info("发送交换机成功");
} else {
log.info("发送交换机失败,记录数据库");
}
});
//第二阶段,如果消息没有抵达到queue,那么回调。
//message:投递失败的消息,replyCode:回复的状态码,replyText:回复的文本内容,exchange:交换机,routingKey:路由键
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
log.info("message: {},replyCode: {},replyText: {},exchange: {}, routingKey: {}",message,replyCode,replyText,exchange,routingKey);
});
}
}
4.3 测试
访问接口:http://localhost:8080/provider/send-failed
5 消费者确认
5.1 简介
消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列
中删除。消费者在处理消息出现了⽹络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中。只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked。
5.2 开启手动确认
server:
port: 8080
spring:
rabbitmq:
host: 192.168.38.80
port: 5672
username: guest
password: guest
virtual-host: /
#开启消息二次确认,生产者到broker的交换机4
publisher-confirm-type: correlated
#消息从exchange->queue 回调
publisher-returns: true
#抵达queue时,以异步发送优先回调
template:
mandatory: true
listener:
simple:
#手动确认
acknowledge-mode: manual
5.3 RabbitmqConsumer
描述: 统计消费次数,如果次数大于3,那么入库。
@Component
@RabbitListener(queues = RabbitMqConsts.DIRECT_QUEUE)
@Slf4j
public class RabbitmqConsumer {
private Map<String, Integer> map = new HashMap<>();
@RabbitHandler
public void receiveMessage(Message message, String body, Channel channel) throws IOException {
//获取访问次数
int counts = 1;
if (map.containsKey(body)) {
counts = map.get(body) + 1;
}
map.put(body, counts);
//访问超过3次,签收入库
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (counts <= 3) {
log.info("【拒绝】消息:{}", body);
//拒绝接受消息并且重新入队
channel.basicNack(deliveryTag, false, true);
} else {
log.info("记录数据库");
channel.basicAck(deliveryTag, false);
}
}
}
5.4 测试
调用接口:http://localhost:8080/provider/send
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15077.html