前言
基于之前的RPC服务手写一个简易的分布式事务TCC
实现细节有很多种做法 可以提出来一起讨论
实现思路
- 创建全局唯一ID能够让RPC中各个服务感知是同一个TCC
- 通过注册事务同步器在事务提交成功之后 先将confirm和cancel方法注册至zk服务
- 当所有的try方法都提交成功之后 获取zk中所有的confirm方法名称等信息 通过PRC调用各自写好的confirm方法(最终都是在主方法中执行)
- 当有try方法执行失败的时候 获取zk中所有的cancel方法名称等信息 通过PRC调用各自写好的cancel方法(最终都是在主方法中执行 所以调用RPC接口时需要返回该方法是否回滚 然后再主方法中抛出)
- 如果有confirm和cancel方法失败则需要存储到各个本地数据库中(后续的操作还没想好)
TCC注解
按照本地事务提交成功之后再注册confirm和cancel 方法的话 需要确定是谁开启TCC为了所有try方法执行完之后执行confirm方法
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
public @interface TccTry {
//对应的消费接口
Class<?> interfaceClass();
//对应的提交方法
String confirmableKey() default "";
//对应的取消方法
String cancellableKey() default "";
//是否为主方法
boolean isMain() default false;
}
AOP注册事务同步器
@Aspect
@Component
public class TccTryAspect {
@Pointcut("execution(* com.ljl.server.service.Impl.*.*(..))")
public void pointcutName(){}
@Autowired
private ZkTccHander zkTccHander;
@Before("pointcutName())")
public void permissionCheck(JoinPoint point) {
String className = point.getSignature().getDeclaringTypeName();
String uuid = point.getArgs()[0].toString();
try {
TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
String comfirm = tccTry.confirmableKey();
String cancel = tccTry.confirmableKey();
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
if(method.getName().equals(comfirm)|| method.getName().equals(cancel)){
//如果是comfirm或者cancel提交有成功或者失败则需要进行处理 这里先不考虑
}else{
// 注册事务同步器,在事务提交后进行回调 当所有的try都执行成功
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
//在事务提交/回滚后调用
@Override
public void afterCompletion(int status) {
switch (status) {
case 0:
//transaction status is commit
zkTccHander.invokeForPRC(uuid, className,"0");
//twoPhaseProcess(TransactionStatusEnum.STATUS_COMMITTED.getCode());
break;
case 1:
//transaction status is rollback
zkTccHander.invokeForPRC(uuid,className,"1");
break;
default:
//logger.error("tcc transaction status is unknown");
throw new RuntimeException("tcc transaction status is unknown");
}
}
});
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
ZkTccHander
处理try-confirm-cancel 执行成功或者失败的逻辑
@Component
public class ZkTccHander {
@Autowired
public ZookeeperTcc zk;
@Autowired
private static ApplicationContextProviderForTcc applicationContex;
public void invokeForPRC(String uuid,String className,String code){
if(code.equals("0")){
//同时插入本地数据库 失败之后继续调用
//将tcc中的comfirm 和 cancel 存储到zk中用于后续的本地事务服务
try {
TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
String comfirm = tccTry.confirmableKey();
String cancel = tccTry.cancellableKey();
//通过唯一id进行创建 存在就不创建 并存储comfirm和cancel 如果方法有参数应该还要存储参数
zk.createNodeNoWatch("/TCC/"+uuid,uuid);
zk.createNodeNoWatch("/TCC/"+uuid+"/comfirm","comfirm");
zk.createNodeNoWatch("/TCC/"+uuid+"/comfirm/"+comfirm,tccTry.interfaceClass().getName());
zk.createNodeNoWatch("/TCC/"+uuid+"/cancel","cancel");
zk.createNodeNoWatch("/TCC/"+uuid+"/cancel/"+cancel,tccTry.interfaceClass().getName());
//通过uuid获取存储在zk上面的信息之后再通过反射调用@RPCClient
//当所有方法try成功之后调用comfirm
if(tccTry.isMain()){
List<String> childrens = zk.getChildsNoWatch("/TCC/"+uuid+"/comfirm");
System.out.println(childrens);
if(childrens.size() !=0){
for (String child : childrens) {
String data = zk.getData("/TCC/"+uuid+"/comfirm/"+child);
System.out.println(data);
Class tcc = Class.forName(data);
Object implObj= applicationContex.getBean(StringUtil.toLowerCaseFirstOne(tcc.getSimpleName()));
Method method = tcc.getDeclaredMethod(child,String.class);
method.invoke(implObj,uuid);
}
}
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else{
//当tcc中的comfirm 和 cancel中有出现失败的时候需要记录下来 并进行重试
try {
TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
//当有方法try失败之后在Main中调用cancel
if(tccTry.isMain()){
List<String> childrens = zk.getChildsNoWatch("/TCC/"+uuid+"/cancel");
if(childrens.size() !=0){
for (String child : childrens) {
String data = zk.getData("/TCC/"+uuid+"/cancel/"+child);
Class tcc = Class.forName(data);
System.out.println(data);
System.out.println(child);
Object implObj= applicationContex.getBean(StringUtil.toLowerCaseFirstOne(tcc.getSimpleName()));
Method method = tcc.getDeclaredMethod(child,String.class);
method.invoke(implObj,uuid);
}
}
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
}
结果展示
原始状态
调用成功
调用失败
这里可以看到调用了cancel方法
示例下单为一个RPC项目
@Service
@TccTry(interfaceClass= TccTestClient.class,confirmableKey="placeAnOrderForComfirm",cancellableKey="placeAnOrderForCancel",isMain=true)
@RPCServer
public class TccTestServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder> implements TccTestService {
@Autowired
private BalanceManageClient balanceManageClient;
@Autowired
private CouponManageClient couponManageClient;
@Autowired
private BaseOrderMapper baseOrderMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void placeAnOrderForTry(String uuid) throws Exception {
//开始分布式事务 示例就调用下单、库存、积分
//step1.锁定下单 将订单修改成中间状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(1);
baseOrder.setStatus("锁定状态");
baseOrderMapper.updateById(baseOrder);
//step2.现金扣减
String bcode = balanceManageClient.balanceReduceForTry(uuid);
//0代表RPC方法回滚了
if("0".equals(bcode)){
throw new Exception();
}//step3.积分
String ccode = couponManageClient.couponUseForTry(uuid);
//0代表RPC方法回滚了
if("0".equals(ccode)){
throw new Exception();
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void placeAnOrderForComfirm(String uuid) {
//step1.将订单修改成下单完成状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(1);
baseOrder.setStatus("完成状态");
baseOrderMapper.updateById(baseOrder);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void placeAnOrderForCancel(String uuid) {
//step1.将订单修改成未支付状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(1);
baseOrder.setStatus("支付异常");
baseOrderMapper.updateById(baseOrder);
}
}
示例积分和会员为第二个RPC项目
@Service
@TccTry(interfaceClass= BalanceManageClient.class,confirmableKey="balanceReduceForComfirm",cancellableKey="balanceReduceForCancel")
@RPCServer
public class BalanceManageServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder> implements BalanceManageService {
@Autowired
private BalanceManageService balanceManageService;
@Autowired
private BaseOrderMapper baseOrderMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public String balanceReduceForTry(String uuid) {
try {
balanceManageService.balanceReduceForTry2(uuid);
System.out.println("1");
return "1";
}catch (Exception e){
System.out.println("失败");
return "0";
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public String balanceReduceForTry2(String uuid) {
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(2);
baseOrder.setStatus("库存100");
baseOrderMapper.updateById(baseOrder);
return "1";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String balanceReduceForComfirm(String uuid) {
//step1.将订单修改成下单完成状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(2);
baseOrder.setStatus("库存99");
baseOrderMapper.updateById(baseOrder);
return "1";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String balanceReduceForCancel(String uuid) {
//step1.将订单修改成未支付状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(2);
baseOrder.setStatus("库存100");
baseOrderMapper.updateById(baseOrder);
return "1";
}
}
@Service
@TccTry(interfaceClass= CouponManageClient.class,confirmableKey="couponUseForComfirm",cancellableKey="couponUseForCancel")
@RPCServer
public class CouponManageServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder> implements CouponManageService {
@Autowired
private CouponManageService couponManageService;
@Autowired
private BaseOrderMapper baseOrderMapper;
@Override
public String couponUseForTry(String uuid) {
try {
couponManageService.couponUseForTry2(uuid);
System.out.println("1");
return "1";
}catch (Exception e){
return "0";
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public String couponUseForTry2(String uuid) {
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(3);
baseOrder.setStatus("会员积分100");
baseOrderMapper.updateById(baseOrder);
//int i = 1/0;
return "1";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String couponUseForComfirm(String uuid) {
//step1.将订单修改成下单完成状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(3);
baseOrder.setStatus("会员积分120");
baseOrderMapper.updateById(baseOrder);
return "1";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String couponUseForCancel(String uuid) {
//step1.将订单修改成未支付状态
BaseOrder baseOrder = new BaseOrder();
baseOrder.setId(3);
baseOrder.setStatus("会员积分100");
baseOrderMapper.updateById(baseOrder);
return "1";
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15303.html