我们知道RabbitMQ的几个主要作用分别是
- 解耦
- 异步通讯
- 削峰
下面对这三个作用展开讲解
解耦
我们不难发现在不断地迭代过程中,服务之间的调用网络逐渐变成了一个网状结构。借助RabbitMQ可以将网状结构梳理成星型结构。
举个解耦的例子,假设我们有服务A负责向其他服务下发一些通知,下游服务分别是B、C。传统做法是我们的A中调用B、C的Api。后续要增加通知D那么我们就需要更改代码才可以,并且在调用的过程中还可能调用失败,我们还得处理失败的逻辑
我们借助RabbitMQ将会是什么样子呢?对于例子的逻辑我们选用Fanout
交换机用来广播消息我们只需要保证消息成功发出即可,下游服务监听不同的队列(需绑定到服务A发送消息的交换机)即可。
原始结构
借助RabbitMQ
异步通讯
我们在工作中会有一些场景,主流程处理完成后可以直接返回结果,其他的后续操作我们可以异步去完成。
例如:订票通知、注册结果短信之类的
假设我们是同步调用,我们接口的响应时间增加了后置操作的时间,还可能因为调用超时而导致异常,我们需要做更多的处理才可以解决这一系列的问题。但其实我们并不关心后置操作的处理结果,我们借助RabbitMQ可以避免这个问题,主流程操作完成后只需要发送操作完成的消息,后置操作的服务监听操作完成的消息即可,而主流程在发送完操作完成的消息后就可以直接返回结果了
我们看到在这个例子中执行主流程只需要100ms而我们加上一系列的后置操作后整个接口变成了300ms
借助RabbitMQ优化后响应时间变成了105ms,可以看到效果还是非常显著的
削峰
假设我们已经借助RabbitMQ做了解耦以及异步通讯,那么我们在面对瞬时流量激增的情况下如果不做控制很可能导致数据库被打死亦或者其他情况从而导致服务宕机,往往一个服务的宕机会产生一系列连锁反应,甚至可能导致大面积的宕机而导致全线产品不可用,毫无疑问这种意外时灾难性的。
RabbitMQ为我们提供了限流的处理方法:对于每一个实例,可以设置最少消费者数量、最大消费者数量和消费者每次从队列取出的消息数量
我们根据服务的性能进行合理的设置即可,如此设置就像是一个水库一样根据水渠所能承受的水流量来放水,多余的水流量存储在水库之中(RabbitMQ为此类比中的水库),就像下面这张图一样
代码实现
方式有很多,我们这里讲自定义RabbitListenerContainerFactory
的方式
@Bean
public SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置连接工厂,若是项目中有多套RabbitMQ连接配置则注入的需要注意区分
factory.setConnectionFactory(connectionFactory);
// 每个消费者每次从队列中取出的消息数量
factory.setPrefetchCount(1);
// 最少消费者数
factory.setConcurrentConsumers(10);
// 最大消费者数
factory.setMaxConcurrentConsumers(10);
// 消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
随后我们在监听的地方配置上该容器工厂即可
@RabbitListener(queues = "test.queue", ackMode = "MANUAL", containerFactory = "testSimpleRabbitListenerContainerFactory")
public void listen(String in, Channel channel, Message message) throws IOException, InterruptedException {
// 模拟复杂操作
TimeUnit.SECONDS.sleep(1);
log.info("-------------test queue listen---------------: {}", in);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
关键是这一段
containerFactory = "testSimpleRabbitListenerContainerFactory"
,值是bean名称
核心代码就这么些,下面是完整的测试代码
@Bean
public DirectExchange testExchange() {
return new DirectExchange("test.direct.exchange");
}
@Bean
public Queue testQueue() {
return new Queue("test.queue");
}
@Bean
public Binding testBinding(@Qualifier("testExchange") DirectExchange exchange, @Qualifier("testQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("test.routing.key");
}
@Bean
SimpleRabbitListenerContainerFactory testSimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 每个消费者每次从队列中取出的消息数量
factory.setPrefetchCount(1);
// 最少消费者数
factory.setConcurrentConsumers(2);
// 最大消费者数
factory.setMaxConcurrentConsumers(2);
// 消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@RabbitListener(queues = "test.queue", ackMode = "MANUAL", containerFactory = "testSimpleRabbitListenerContainerFactory")
public void listen(String in, Channel channel, Message message) throws IOException, InterruptedException {
// 模拟复杂操作
TimeUnit.SECONDS.sleep(1);
log.info("-------------test queue listen---------------: {}", in);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@Resource
RabbitTemplate rabbitTemplate;
@GetMapping("test")
void testSend() {
rabbitTemplate.convertAndSend("test.direct.exchange", "test.routing.key", "test");
}
测试打印结果
我们可以看到同一时间处理的只有两个线程并且是处理完成之后才消费下一条消息
因为我们在容器工厂中配置的最大消费者数量=2
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/195203.html