教程说明
- 本系列教程目录大纲:《RabbitMQ系列教程-目录大纲》
- 本系列教程配套代码:https://gitee.com/Horizon1024/rabbitmt.git(码云地址)
RabbitMQ的Consumer Ack
当Consumer
端接受到一个消息进行消费时,如果消费失败了我们希望此次消息就应该定义为”消费失败”,也就是这条消息我们希望拒绝签收,Consumer
的Ack
机制可以根据Consumer
的消费情况来决定此次消息时签收还是拒绝签收;
RabbitMQ中提供有3种消息签收方式:
-
1)
none
:自动签收(消息被消费就自动签收) -
2)
manual
:手动签收(需要调用basicNack或 basicReject方法手动签收) -
3)
auto
:由rabbitmq来决定是否签收(根据异常进行处理)
搭建消息消费者;采用RabbitMQ整合Spring环境(参考前面工程)
spring.xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 扫描包 -->
<context:component-scan base-package="com.lscl" />
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--
定义监听器容器
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
acknowledge:消息签收方式
none: 自动签收(消息被消费就自动签收)
manual: 手动签收(需要调用basicNack或 basicReject方法手动签收)
auto: 由rabbitmq来决定是否签收(根据异常进行处理)
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
</beans>
监听器:
我们之前实现的是MessageListener
,该接口的抽象方法只有一个Message参数,我们待会拒绝签收消息需要借助channel对象,因此我们实现ChannelAwareMessageListener
接口
package com.lscl.rabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* 实现ChannelAwareMessageListener接口
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 接受消息的标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3 / 0;//出现错误
//3. 手动签收
/*
参数1:接受消息的标签
参数2:允许多条消息同时被处理
*/
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收
/*
参数1:接受消息的标签
参数2:允许多条消息同时被处理
参数3:重回队列,如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag, true, true);
// 也可以拒绝签收 参数二: 是否重回队列
// channel.basicReject(deliveryTag, true);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131793.html