消息的可靠性传递+消费
–>
交换机
->
队列
->
消费者
开发实战
springboot依赖版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.3</version>
</dependency>
配置文件
spring:
rabbitmq:
host: IP地址
port: 5672
#自己创建的虚拟主机
virtual-host: dev
username: admin
password: password
#开启二次确认 生产者到broker的交换机
publisher-confirm-type: correlated
#开启二次确认 交换机到队列的可靠性投递
publisher-returns: true
#为true,如果交换机处理消息路由到队列失败,则会返回给生产者
template:
mandatory: true
#消息者消费消息确认 手动ACK
listener:
simple:
acknowledge-mode: manual
创建交换机,队列,以及交换机与队列的绑定关系
@Configuration
public class TopicRabbitMqConfig {
public static final String EXCHANGE_NAME = "order-exchange";
public static final String QUEUE_NAME = "order-queue";
/**
* TOPIC交换机
* @return
*/
@Bean("exchange")
public Exchange orderExchange(){
//是否持久化 是
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 队列
* @return
*/
@Bean("queue")
public Queue orderQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 交换机与队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
confirmCallback与returnCallback测试
@Test
void testConfirmCallback(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param b 交换机是否接收到消息 true是成功 false是失败
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("correlationData============"+correlationData);
System.out.println("b============"+b);
System.out.println("s============"+s);
if (b){
System.out.println("发送成功");
//成功 更新数据库 消息的状态为发送成功 TODO
}else {
System.out.println("发送失败,记录日志或插入数据库");
//失败 更新数据库 消息的状态为发送失败 TODO
}
}
});
rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"order.new","新订单");
//不存在的交互机,模拟消息不能发送到交换机
//rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME+"iii","order.new","新订单");
}
@Test
void testReturnCallback(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//交换机处理消息路由到队列失败才会触发此步骤 可记录日志或插入数据库
System.out.println("交换机处理消息路由到队列失败");
int code = returnedMessage.getReplyCode();
System.out.println("code======="+code);
System.out.println("message========"+returnedMessage);
}
});
//rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"order.new","新订单");
//无效的routingkey 测试失败情况
rabbitTemplate.convertAndSend(TopicRabbitMqConfig.EXCHANGE_NAME,"xxx.order.new","新订单");
}
消费者确认消费消息(rabbitMQ的ACK机制)
消费者从rabbtiMQ收到消息并处理完成后,反馈给rabbitMQ,rabbitMQ收到反馈后才将消息从队列中删除。消费者在处理消息时出现网络不稳定,服务器异常等情况,那么不会有ACK反馈,rabbitMQ认为这个消息没有被正常消费,会将消息重新放入队列,只有当消费者正确的发送ACK反馈,rabbitMQ确认收到后,消息才会从队列中删除,消息的ACK机制是默认打开的。
确认方式:自动(默认),手动manual
开发实战
@Component
@RabbitListener(queues = "order-queue")
public class OrderMqListener {
@RabbitHandler
public void messageHandle(Message message, String body, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
System.out.println("tag=" + tag);
System.out.println("message=" + message.toString());
System.out.println("body=" + body);
//告诉broker消息已经被确认
channel.basicAck(tag, false);
//true表示重新入队列,会被重复消费,可以设置一个重复消费的最大次数,超过可插入数据库,业务排查原因
//channel.basicNack(tag,false,true);
}
}
死信队列+延迟队列
死信队列:没有被及时消费的信息存放的队列
死信交换机 dead letter exchange:当消息成为死信后,会被重新放送到另一个交换机,这个交换机就是死信交换机
消息怎么成为死信消息 ?
basic.reject/ basic.nack
)
,并且没有重新⼊队 requeue=false
延迟队列:⼀种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望
这条消息⽴⻢投递,⽽是推迟到在当前时间点之后的某⼀个时间投递到Consumer 进⾏消费,
该消息即定时消息
使用场景:电商交易中提交订单后30分钟未支付,需关闭订单
实战开发
要求商家注册成功后10秒内发布商品
springboot依赖版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.3</version>
</dependency>
配置文件
spring:
rabbitmq:
host: IP地址
port: 5672
#自己创建的虚拟主机
virtual-host: dev
username: admin
password: password
#消息者消费消息确认 手动ACK
listener:
simple:
acknowledge-mode: manual
创建交换机、队列、交换机与队列的绑定关系
/**
* 普通队列->死信交换机->死信队列
*/
@Configuration
public class RabbitMqConfig {
/**
* 死信队列
*/
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
/**
* 死信交换机
*/
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
/**
* 进入死信队列路由健
*/
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
/**
* 创建死信交换机
*
* @return
*/
@Bean
public Exchange lockMerchantDeadExchange() {
return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
}
/**
* 创建死信队列
*
* @return
*/
@Bean
public Queue lockMerchantDeadQueue() {
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
/**
* 绑定死信队列和死信交换机
*
* @return
*/
@Bean
public Binding lockMerchantBinding() {
return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
}
/**
* 普通队列,绑定死信交换机
*/
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
/**
* 普通topic交换机
*/
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
/**
* 路由可以
*/
public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant_routing_key";
/**
* 创建普通交换机
*
* @return
*/
@Bean
public Exchange newMerchantExchange() {
return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
}
/**
* 创建普通队列
*
* @return
*/
@Bean
public Queue newMerchantQueue() {
HashMap<String, Object> args = new HashMap<>(3);
//过期后,进入到死信交换机
args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
//过期后,进入死信交换机的路由key
args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
//过期时间 10秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
/**
* 绑定普通队列和普通交换机
*
* @return
*/
@Bean
public Binding newMerchantBinding() {
return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null);
}
}
发送消息
@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/check")
public Object check() {
rabbitTemplate.convertAndSend(RabbitMqConfig.NEW_MERCHANT_EXCHANGE, RabbitMqConfig.NEW_MERCHANT_ROUTING_KEY, "商家通过验证");
Map<String, Object> map = new HashMap<>();
map.put("code", "0");
map.put("msg", "请于10秒内上传商品");
return map;
}
}
监听消息,只需监听死信队列
@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class OrderMqListener {
@RabbitHandler
public void messageHandle(Message message, String body, Channel channel) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
System.out.println("tag=" + tag);
System.out.println("message=" + message.toString());
System.out.println("body=" + body);
//告诉broker消息已经被确认
channel.basicAck(tag, false);
//true表示重新入队列,会被重复消费,可以设置一个重复消费的最大次数,超过可插入数据库,业务排查原因
//channel.basicNack(tag,false,true);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/1274.html