一、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.10.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
</dependencies>
二、消息的可靠投递
RabbitMQ 提供了两种模式用来控制消息投递的可靠性。保证消息发送方杜绝任何消息丢失或者投递失败。
rabbitmq 的消息投递路径:producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
利用这两个 callback 可以控制消息的可靠性投递。
1、confirm
确认模式
confirm确认模式,消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
生产者接收应答,用来确定这条消息是否正常的发送到Broker。
使用步骤:
1.开启确认模式:设置ConnectionFactory的publisher-confirms="true"
2.rabbitTemplate.setConfirmCallback设置回调函数。
3.当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
配置spirng-rabbitMq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory ,publisher-confirms="true"开启confirm 确认模式-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义队列-->
<rabbit:queue id="queue_confirm" name="queue_confirm"></rabbit:queue>
<!--定义交换机及与队列绑定-->
<rabbit:direct-exchange name="exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="queue_confirm" key="confirm" ></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
执行测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 1.开启confirm 确认模式, <rabbit:connection-factory></rabbit:connection-factory>该标签配置publisher-confirms="true"
*
* 2.rabbitTemplate中定义ConfirmCallback回调函数
*
* 3.发送消息
*/
@Test
public void TestConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange是否成功接受消息 true:成功 false:失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm回调执行。。。");
if (ack){
System.out.println("成功接受消息: "+cause);
}else{
System.out.println("接受消息失败: "+cause);
//发送失败做出相应失败处理
}
}
});
//发送消息
rabbitTemplate.convertAndSend("exchange_confirm","confirm","test_confirm_callback...");
try {
//刚刚发送消息出去,避免异步的ConfirmCallback由于资源关闭而出现clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)异常
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.当交换机接收到消息时
2.当发送消息时指定错误的交换机,交换机接受消息失败时。
2、return
退回模式
return回退模式,用于处理一些不可路由的消息。
消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作。
若exchange不存在或者指定的路由key路由不到队列,此时就需要监听处理这种不可达的消息。
使用步骤:
1.开启退回模式:设置ConnectionFactory的publisher-returns="true"
2.rabbitTemplate.setReturnCallback设置退回函数。
3.当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则将消息退回给producer。并执行回调函数returnedMessage。
配置spirng-rabbitMq.xml
配置publisher-returns=”true”开启Return模式
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory ,publisher-returns="true"开启return 确认模式-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义队列-->
<rabbit:queue id="queue_return" name="queue_return"></rabbit:queue>
<!--定义交换机及与队列绑定-->
<rabbit:direct-exchange name="exchange_return">
<rabbit:bindings>
<rabbit:binding queue="queue_return" key="return" ></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
执行测试
/**
* 消息发送给Exchange,Exchange路由到Queue失败时,执行ReturnCallback
*
* 1.开启回退模式:<rabbit:connection-factory></rabbit:connection-factory>该标签配置publisher-returns="true"
* 2.设置执行ReturnCallback
* 3.设置Exchange处理消息的模式:1.消息没有路由到queue,则丢弃消息(默认)2.消息没有路由到queue,则ReturnCallback返回消息到发送方
*
*/
@Test
public void testReturn(){
//设置Exchange处理失败消息的模式
// rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback.......");
System.out.println("message = " + message);
System.out.println("replyCode = " + replyCode);
System.out.println("replyText = " + replyText);
System.out.println("exchange = " + exchange);
System.out.println("routingKey = " + routingKey);
}
});
rabbitTemplate.convertAndSend("exchange_return","return111","test_return_callback...");
try {
//刚刚发送消息出去,避免异步的ReturnCallback由于资源关闭导致接收回调消息失败!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1.设置错误的路由key,消息不会路由到queue,丢弃消息,ReturnCallback不会执行
查看控制台,消息路由队列失败,消息被丢弃。
2.正确的路由key,消息成功路由到Queue。queue接受到消息,ReturnCallback不会执行
3.开启return回退模式,设置错误的路由key,消息路由队列失败,ReturnCallback执行
//设置Exchange处理失败消息的模式
rabbitTemplate.setMandatory(true);
三、Ack确认机制
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"
确认方式 | 描述 |
---|---|
自动确认 | 自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。实际业务处理中,很可能消息接收到,业务处理出现异常,该消息就会丢失。 |
手动确认 | 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。 |
根据异常情况确认 | 该方式很麻烦,不使用 |
ack使用步骤:
1.rabbit:listener-container标签中设置acknowledge属性, none:自动确认,manual:手动确认,auto: 根据异常情况确认
2.如果消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
3.如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息
配置spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--扫描消息监听器-->
<context:component-scan base-package="cn.ybzy.rabbitmq"></context:component-scan>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义队列-->
<rabbit:queue id="ack_queue" name="ack_queue" auto-declare="true"></rabbit:queue>
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener ref="customerListener" queue-names="ack_queue" ></rabbit:listener>
</rabbit:listener-container>
</beans>
创建消息监听器
/**
* Consumer Ack机制
*
* 1.ack机制默认自动签收,设置手动签收,acknowledge="manual"
* 2.让监听器实现MessageListener的子类接口ChannelAwareMessageListener的onMessage()方法
* 3.如果消息调用成功,则调用channel的basicAck()方法
* 如果消息调用失败,则调用channel的basicNack()方法拒绝签收,borker重新发送消息给customer
*
*/
@Component
public class CustomerListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//设置sleep,延迟borker发送消息给消费者
Thread.sleep(2000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收消息: "+ new String(message.getBody()));
int i=1/0;
System.out.println("处理业务成功....");
//deliveryTag:收到消息的Tag标签
//multiple:签收多条消息
channel.basicAck(deliveryTag,true);
}catch (Exception e){
System.out.println("业务处理失败...");
//requeue:设置true,消息重合队列,borker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
//basicReject单条处理消息
//channel.basicReject(deliveryTag,true);
}
}
}
MessageListener的子类实现
执行测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
//1.注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 开启消费者监听
*/
@Test
public void customerListener(){
while (true){
}
}
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("ack_queue","hello world ack....");
}
}
1.正常情况下,发送消息,无异常出现,执行手动签收.
2.自造 int i=1/0;
异常,basicNack()拒收消息,borker从新发送消息给消费者.
队列中的消息此时是未签收状态.
关闭消费者监听,此时消息回到最初状态
四、消费端限流
如果Rabbitmq服务器有上万条未处理的消息,当打开一个消费者客户端系统,就会出现巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多数据,这个时候很容易导致服务器崩溃,出现故障。
RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下,如果一定数量的消息未被确认前,不会进行消费新的消息。
使用步骤:
1.消费端的确认模式一定为手动确认。acknowledge="manual"
2.在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息
配置spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--扫描消息监听器-->
<context:component-scan base-package="cn.ybzy.rabbitmq"></context:component-scan>
<!--定义队列-->
<rabbit:queue id="ack_queue" name="ack_queue" auto-declare="true"></rabbit:queue>
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="confineListenner" queue-names="ack_queue" ></rabbit:listener>
</rabbit:listener-container>
</beans>
创建监听器
/**
* Consumer 限流机制
* 1. 确保ack机制为手动确认。
* 2. listener-container配置属性
* perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
*/
@Component
public class ConfineListenner implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//限制消费过快
Thread.sleep(2000);
//获取消息
System.out.println("接收消息: "+new String(message.getBody()));
//处理业务逻辑
System.out.println("处理业务....");
//手动签收
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
执行测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 开启消费者监听
*/
@Test
public void customerListener(){
while (true){
}
}
@Test
public void sendTest(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("ack_queue","发送消息: 消费端限流....");
}
}
}
先往队列发送10条消息
1.perfetch = 1,关闭手动签收.开启消费者监听
此时,由于配置消费端每次从mq拉去一条消息来消费且每次确认消费后在继续拉取消息消费,所以此时只有一条数据未被确认签收
2.取消perfetch = 1,关闭手动签收.开启消费者监听
服务端一次性拉去mq中消息来进行消费,但关闭了手动签收,所以消息都未被签收
停止消费者监听
3.设置perfetch = 1,开启手动签收.开启消费者监听
消费者每隔2秒拉取消息进行消费确认签收。此次消费6条。
五、TTL队列/消息
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除,是对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),在消息发送时可以进行指定,当该消息在队列头部时(消费时),会单独判断这一消息是否过期。如果两者都进行了设置,以时间短的为准。
TTL过期时间分类
1.队列统一过期
2.消息单独过期
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
队列过期后,会将队列所有消息全部移除。
消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
配置spring-rabbitmq.mxl
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory ,publisher-returns="true"开启return 确认模式-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--ttl 队列-->
<rabbit:queue name="queue_ttl" id="queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--队列与交换机绑定-->
<rabbit:topic-exchange name="exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.*" queue="queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
执行测试
1.队列统一过期,队列过期后,会将队列所有消息全部移除。
@Test
public void TtlTest() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.rabbit", "ttl test....");
}
}
2.消息单独过期,如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
@Test
public void TtlTest() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息,5秒后过期
message.getMessageProperties().setExpiration("5000");
//2.返回该消息
return message;
}
};
//消息单独过期
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.rabbit", "ttl test....",messagePostProcessor);
}
3.当消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
@Test
public void TtlTest() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息,5秒后过期
message.getMessageProperties().setExpiration("5000");
//2.返回该消息
return message;
}
};
//不过期的消息
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.rabbit", "ttl Test....");
//消息单独过期
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.rabbit", "ttl Test....",messagePostProcessor);
}
消息超过5秒后也不会过期,超过队列过期时间,全部消息清除。
六、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
死信交换机和死信队列和普通的没有区别 , 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
使用步骤:
队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
配置spring-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory ,publisher-returns="true"开启return 确认模式-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--
死信队列:
1. 声明正常的队列(queue1_dlx)和交换机(exchange1_dlx)
2. 声明死信队列(queue2_dlx)和死信交换机(exchange2_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!-- 1.声明正常的队列(queue1_dlx)和交换机(exchange1_dlx)-->
<rabbit:queue name="queue1_dlx" id="queue1_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange2_dlx" />
<!-- x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.rabbit" />
<!-- 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="exchange1_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="queue1_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 声明死信队列(queue2_dlx)和死信交换机(exchange2_dlx)-->
<rabbit:queue name="queue2_dlx" id="queue2_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange2_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue2_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
执行测试
- 测试过期时间,死信消息
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend("exchange1_dlx","test.dlx.rabbit","dlx test....");
}
2. 测试长度限制后,消息死信
@Test
public void testDlx(){
//测试长度限制后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("exchange1_dlx","test.dlx.rabbit","dlx test....");
}
}
发送消息20条,由于配置队列长度为10,超出消息进入死信队列
正常队列中的10条消息过期后同样进入死信队列。
3. 测试消息拒收
上述spring-rabbit.xml中添加消费者监听器配置
<!--扫描包-->
<context:component-scan base-package="cn.ybzy.rabbitmq" />
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
<!--定义监听器,监听正常队列-->
<rabbit:listener ref="dlxListener" queue-names="queue1_dlx"></rabbit:listener>
</rabbit:listener-container>
创建消费者监听器
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接收消息
System.out.println(new String(message.getBody()));
//处理业务
System.out.println("处理业务...");
//出现错误
int i = 1/0;
//手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
e.printStackTrace();
System.out.println("出现异常,拒绝签收");
//拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
@Test
public void testDlx(){
//测试消息拒收
rabbitTemplate.convertAndSend("exchange1_dlx","test.dlx.rabbit","dlx test....");
}
发送消息后,由于异常,拒绝签收,消息进入死信队列。
七、延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
在RabbitMQ中并未提供延迟队列功能。 但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
模拟下单超时未支付,则取消订单过程
配置spring-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!--定义rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!-- 定义rabbitmq connectionFactory ,publisher-returns="true"开启return 确认模式-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为10秒
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定,设置正常队列过期时间为10秒-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
创建消息监听器
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//接收消息
System.out.println(new String(message.getBody()));
//处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
//手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
e.printStackTrace();
System.out.println("出现异常,拒绝签收");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
配置监听器
<!--扫描包-->
<context:component-scan base-package="cn.ybzy.rabbitmq" />
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
<!--延迟队列效果实现: 一定要监听的是 死信队列-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
执行测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 开启消费者监听
*/
@Test
public void customerListener(){
while (true){
}
}
@Test
public void testDelay() {
//发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息。。。。");
}
}
发送消息10秒后,消费端接受到消息。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137123.html