1、幂等性
1)概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
如:支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。
在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是在响应客户端的时候也有可能出现网络中断或者异常等等
2)解决思路
-
MQ消费者的幂等性的解决一般使用 全局ID 或者 写个 唯一标识 ;比如:时间戳 或者 UUID 或者 订单消费者消费MQ中的消息,也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
-
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着消息永远不会被消费多次,即使收到了一样的消息。
业界主流的幂等性有两种操作
-
唯一ID
+指纹码机制
,利用数据库主键去重- 指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中;优势就是实现简单,就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
-
Redis原子性
- 利用 redis 执行 setnx 命令,天然具有幂等性,从而实现不重复消费
2、优先级队列
1)使用场景
订单催付:客户在天猫下的订单,淘宝会及时将订单推送给客户,如果在用户设定的时间内未付款,那么就会给用户推送一条短信提醒。但是,天猫商家肯定是要分大客户和小客户的,比如像苹果,小米这样大商家的订单必须得到优先处理,而曾经的后端系统是使用 redis 来存放的定时轮询。redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了以后,采用 RabbitMQ 进行改造和优化;如果发现是大客户的订单,给一个相对比较高的优先级,否则就是默认优先级。
2)介绍
优先级的使用:消息在队列中出现积压的情况下,才会使用优先级
- 队列设置优先级参数:
Arguments:x-max-priority = 10
,优先级的取值范围:0 ~ 255
,值越大,优先级越高
Map<String, Object> arguments = new HashMap<>();
// 官方允许是0-255之间,此处设置10,允许优先级范围为0-10,不要设置过大,浪费CPU与内存
arguments.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
- 消息需要设置优先级
// 设置优先级 .priority(5)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
// 发送消息
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes(StandardCharsets.UTF_8));
3)代码实现
SpringBoot实现
(1)配置文件/类
- 配置文件
spring:
rabbitmq:
host: 192.168.19.101
port: 5672
username: admin
password: admin
- 配置类
package com.tuwer.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author 土味儿
* Date 2022/3/29
* @version 1.0
* 优先级队列配置类
* 使用默认交换机,不需要配置
*/
@Configuration
public class PriorityQueueConfig {
/**
* 优先级队列
*/
public static final String PRIORITY_QUEUE_NAME = "priority.queue";
/**
* 获取优先级队列
*/
@Bean("priorityQueue")
public Queue getPriorityQueue(){
// 简写
//return QueueBuilder.durable(PRIORITY_QUEUE_NAME).maxPriority(10).build();
// 通用写法
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-max-priority", 10);
return QueueBuilder.durable(PRIORITY_QUEUE_NAME).withArguments(arguments).build();
}
}
(2)生产者
package com.tuwer.controller;
import com.tuwer.config.PriorityQueueConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
/**
* @author 土味儿
* Date 2022/3/29
* @version 1.0
*/
@RestController
@RequestMapping("/pri")
public class PriorityProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg")
public String sendMsg() {
String msg = "";
for (int i = 0; i < 10; i++) {
msg = "info" + i;
// 设置消息优先级
MessageProperties messageProperties = new MessageProperties();
if (i == 5) {
// 当i=5时,优先级为5,否则优先级为默认:0
messageProperties.setPriority(5);
}
// 生成发送消息
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
// 发送消息
rabbitTemplate.sendAndReceive(
"",
PriorityQueueConfig.PRIORITY_QUEUE_NAME,
message);
}
return "Ok!";
}
}
(3)消费者
第一次启动项目时,注释掉@Component,等消息全部发送到队列中(让消息在队列中产生积压),第二次启动项目时再开启 @Component 去消费
package com.tuwer.consumer;
import com.tuwer.config.ConfirmConfig;
import com.tuwer.config.PriorityQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 土味儿
* Date 2022/3/29
* @version 1.0
*/
@Slf4j
@Component
public class PriorityQueueConsumer {
/**
* 监听优先级队列
* @param message
*/
@RabbitListener(queues = PriorityQueueConfig.PRIORITY_QUEUE_NAME)
public void receiveConfirmMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到的消息:【{}】", msg);
}
}
(4)运行测试
- 启动项目:注释掉消费者中的@Component,只发送不接收
- 再次启动项目:开启消费者(@Component),接收消费
3、惰性队列
1、使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中;它的一个重要的设计目标:能够支持更长的队列,即支持更多的消息存储。
当消费者由于各种各样的原因(比如:消费者下线、宕机,或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时,也会在内存中驻留一份备份。当RabbitMQ 需要释放内存的时候,会将内存中的消息存至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
2、两种模式
队列具备两种模式:
default
和lazy
,默认的为 default 模式,在3.6.0之前的版本无需做任何变更。
lazy模式:即为惰性队列的模式,设置方式:
- 通过调用
channel.queueDecare
方法,在参数中设置
// 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare( "myqueue", false, false, false,args);
- 通过
Policy
的方式设置,具备更高的优先级
3、内存对比
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/70399.html