在数据库修改单条数据时,常用的方式是select for update的悲观锁机制,如果锁竞争比较大,没有获得锁的操作会阻塞。使用CAS乐观锁的方式,可以大大提高并发性。例如,在分布式服务中,多个用户并发下单操作前会先扣减库存时,网上盗图,服务1,服务2和服务3为不同机器上的库存服务。库存扣减操作流程如下:
使用cas方式的乐观锁,当库存还剩3个,3个用户同时下单,服务同时扣减库存,可以并发地扣减成功,提高了并发性。如果库存还剩1个,3个用户同时下单,同时扣减库存,这时只有1个用户会操作成功,其余2个会失败,避免了超卖。
在库存的CAS操作中,先进行库存查询操作,然后根据value+version方式修改库存,如果操作失败,则幂等地重复操作。通常会从操作次数和执行时间两个条件限制CAS的操作,如果两个条件中有某个条件触发,可以抛出乐观锁异常。
在Java项目中,通过AOP+注解的方式实现数据库操作的CAS幂等操作的统一处理。定义OptimisticRetry注解,标识CAS重试Spring AOP的切点,并且设置属性value(最大重试次数条件),以及属性maxExecuteTime(最大执行时间条件)。OptimisticRetryAOP定义CAS重试的切面,乐观锁的实现。OptimisticRetry注解定义代码如下:
/**
* 乐观锁的重试
**/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface OptimisticRetry {
/**
* 最大重试次数
*/
int value() default 200;
/**
* 最大执行时间
*/
int maxExecuteTime() default 2 * 60 * 60 * 1000;
}
OptimisticRetryAOP切面,定义标注了OptimisticRetry注解的方法则进行,CAS幂等重试,如果达到最大重试次数限制或者最大执行时间限制,则抛出乐观锁异常OptimisticLockingFailureException。代码如下:
/**
* 乐观锁的重试
**/
@Aspect
@Component
@Order(10000)
@Slf4j
public class OptimisticRetryAOP {
@Pointcut("@annotation(com.smcx.winemall.common.lock.OptimisticRetry)|| @within(com.smcx.winemall.common.lock.OptimisticRetry)")
public void optimisticRetryPointcut() {
}
@Around("optimisticRetryPointcut()")
public Object doConcurrentOperation(ProceedingJoinPoint joinPoint) throws Throwable {
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
// 代理目标对象Class
Class targetClazz = joinPoint.getTarget().getClass();
Method method = targetClazz.getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
Optional<OptimisticRetry> optimisticRetryOption = getOptimisticRetry(method.getAnnotations());
if (!optimisticRetryOption.isPresent()) {
log.warn("no OptimisticRetry Annotation to execute!");
return joinPoint.proceed();
}
OptimisticRetryConfig optimisticRetryConfig = obtainOptimisticRetryConfig(optimisticRetryOption.get());
int numAttempts = 0;
OptimisticLockingFailureException lockFailureException;
long startTime = System.currentTimeMillis();
do {
numAttempts++;
try {
return joinPoint.proceed();
} catch (OptimisticLockingFailureException ex) {
lockFailureException = ex;
long executeTime = System.currentTimeMillis() - startTime;
long maxExecuteTime = optimisticRetryConfig.getMaxExecuteTime();
if (isLargerThanMaxExecuteTime(executeTime, maxExecuteTime)) {
log.warn("throw optimistic locking failure exception!num attempts [{}],start time [{}]," +
"actual execute time [{}] ms, max execute time [{}] ms",
numAttempts, startTime, executeTime, maxExecuteTime);
throw lockFailureException;
}
}
}
while (numAttempts <= optimisticRetryConfig.getMaxTryCount());
log.warn("throw optimistic locking failure exception!num attempts {} ", numAttempts);
throw lockFailureException;
}
/**
* 大于限制的重试执行时间
*
* @param executeTime
* @return
*/
private boolean isLargerThanMaxExecuteTime(long executeTime, long maxExecuteTime) {
if (executeTime <= 0) {
return false;
}
if (maxExecuteTime > executeTime) {
return true;
}
return false;
}
private OptimisticRetryConfig obtainOptimisticRetryConfig(OptimisticRetry optimisticRetry) {
// 从注解中获取配的值
OptimisticRetryConfig optimisticRetryConfig = new OptimisticRetryConfig();
optimisticRetryConfig.setMaxTryCount(optimisticRetry.value());
optimisticRetryConfig.setMaxExecuteTime(optimisticRetry.maxExecuteTime());
return optimisticRetryConfig;
}
private Optional<OptimisticRetry> getOptimisticRetry(Annotation[] annotations) {
if (ArrayUtils.isEmpty(annotations)) {
return Optional.empty();
}
for (Annotation anno : annotations) {
if (anno.annotationType().getName().equals(OptimisticRetry.class.getName())) {
return Optional.of((OptimisticRetry) anno);
}
}
return Optional.empty();
}
/**
* 重试配置
*/
@Data
private static class OptimisticRetryConfig implements Serializable {
private static final long serialVersionUID = -182211651320526367L;
/**
* 最大重试次数
*/
private int maxTryCount;
/**
* 最大执行时间
*/
private long maxExecuteTime;
}
}
业务代码中操作,代码如下:
@Override
@Transactional(rollbackFor = Exception.class)
@OptimisticRetry
public void decreaseStockRemainingAmount(OperateStockDto operateStock) {
// 获取存储
WinemallStock existStock = winemallStockMapper.selectByOperateStock(operateStock);
// TODO 库存是否充足判断等条件判断
// 修改库存数据
if (0 == winemallStockMapper.updateStockRemainingAmount(existStock, 1)) {
throw new OptimisticLockingFailureException("decrease stock remaining amount optimistic locking failure!");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13647.html