序言
浅谈一下消息队列,随着分布式架构的不断发展,消息队列也逐渐走进到程序员的“生活“中来,在一些特定的业务场景下消息队列是个相当不错的选择。
消息队列的作用(它究竟能做什么)
服务解耦
何为服务解耦?下面我说一个例子大家体会一下,比如说我现在有一个订单系统,订单创建成功之后会做一系列的事情,比如创建物流信息啊,增加用户的积分之类的操作,我们可以使用传统的方式通过RPC来远程调用一下这些服务,这种方式看起来没啥问题,但随着业务的增加就会发现它的不足了。
问题的产生
此时如果业务发生改变,需要在订单创建成功后向用户发送一条短信通知,那么我们需要做什么呢?除了增加短信通知的业务之外,我们还需要在订单代码处增加对于短信的调用,这样的话其实就违背了设计模式中的开闭原则,即业务的提供方修改代码而业务的使用方不修改代码。
如何解决这种问题呢?
消息队列的出现完美化解了这种问题,引入消息队列,在订单创建成功之后,仅需要向消息队列当中发送一条消息,对于其他的业务让他们来监听到这条消息即可,这样无论业务代码发生怎样的改变,让它们监听消息队列就可以了,我们订单的业务代码是不需要做出改变的,这样就解决了多个服务之间耦合性的问题了。
异步处理
大多数的时候我们所采用的处理方式都是同步处理的,但异步处理相较于同步处理会有很大的性能提升。
举个例子:在之前的年代,学生共用公寓中的洗衣机,假如我要去洗衣服,此时洗衣机里面有衣服存在,那么我最好就在洗衣机旁边等着,否则可能会被其他人给占用了,但现在不一样了,我们可以直接通过手机预约洗衣机的方式,不需要再像之前一样在洗衣机旁边等着害怕被别人占用了,我们在app预约之后就可以做一些其他的事情,等洗衣机空闲了再去即可。
这其实就是一种异步的处理方式,对于消息队列而言,消息的消费者和生产者之间也是异步的,消息生产者生产出一条消息,消息的消费者可以对消息进行异步的消费。
流量削峰
对于一些大流量的业务来讲,如果我们的服务器无法承载那么大的流量该怎么办呢?
可以通过限流的方式,来保证服务器不被大流量给打垮,但这种方式必然是会让一部分客户流失掉的。
此时我们可以采用消息队列来对这些大流量请求进行一些缓冲,将这些请求存放到消息队列当中,然后让我们消息的消费方来根据服务器的性能对其慢慢的消费。
延迟队列
延迟队列的产生:延迟队列是消息队列当中的一种队列形式,即消息在发送到消息队列后,经过指定的时间后消息的接收方才能接收到该消息。
延迟队列一般常用于一些特定的业务场景当中,比如前段时间我在写一个关于订单的业务时就需要引入消息队列,业务场景是这样的:用户创建订单成功,需在30分钟内进行付款,如果不付款的话会将该订单设置成失效的订单,然后对一些库存之类的数据进行补偿回退。
最开始我想到的方法就是定时任务轮询的方式,查询当前时间减去指定过期时间之前未支付的订单,然后对其 进行补偿,但这样的操作其实是比较耗费性能的,因为每过一段时间就需要对数据库中的订单数据做查询,数据量不大还好,一旦数据量增多所带来的性能消耗是巨大的。
于是对其进行了一些优化方案,可以在订单创建成功之后发送一条延时消息,经过指定的时间之后接收到该条消息,然后对其进行判断是否支付即可。
如何创建延迟队列
RabbitMQ不像RocketMQ一样自带延迟队列,RabbitMQ需要我们手动配置一个过期消息+死信队列的这么一个组合。
过期消息+死信队列的配置:
先配置一下死信队列(当过期队列中的消息,过期后就会转发到死信队列当中) `
//配置死信队列
@Bean(QUEUE2)
public Queue QUEUE_QINGCHENG1_POSTPAGE() {
Queue queue = new Queue(QUEUE2);
return queue;
}
然后就是配置过期队列了(订单创建成功后,发送消息到过期队列中,经过指定的时间过期后将消息转发到死信队列)
//配置过期队列
@Bean(QUEUE1)
public Queue QUEUE_QINGCHENG_POSTPAGE() {
Map<String,Object> arguments=new HashMap<>(2);
arguments.put("x-dead-letter-exchange",EX_ROUTING_CMS_POSTPAGE);//消息过期后转发到的交换机
arguments.put("x-dead-letter-routing-key","2");//消息过期后要转发到的队列的routingKey
return new Queue(QUEUE1,true,false,false,arguments);
}
业务方订单创建成功,发送消息到过期队列
、
rabbitTemplate.convertAndSend("exchange_qingcheng_1","1", JSON.toJSONString(seckillStatus),
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//消息有效期10秒
long time=1000*60*30;
message.getMessageProperties().setExpiration(String.valueOf(time));
return message;
}
});
总结
以上就是消息队列的一些用途以及死信队列的使用方式!
原文始发于微信公众号(GuoCoding):RabbitMQ实现延迟队列
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/43017.html