分布式事务–手写TCC

导读:本篇文章讲解 分布式事务–手写TCC,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

基于之前的RPC服务手写一个简易的分布式事务TCC
实现细节有很多种做法 可以提出来一起讨论

实现思路

  1. 创建全局唯一ID能够让RPC中各个服务感知是同一个TCC
  2. 通过注册事务同步器在事务提交成功之后 先将confirm和cancel方法注册至zk服务
  3. 当所有的try方法都提交成功之后 获取zk中所有的confirm方法名称等信息 通过PRC调用各自写好的confirm方法(最终都是在主方法中执行)
  4. 当有try方法执行失败的时候 获取zk中所有的cancel方法名称等信息 通过PRC调用各自写好的cancel方法(最终都是在主方法中执行 所以调用RPC接口时需要返回该方法是否回滚 然后再主方法中抛出)
  5. 如果有confirm和cancel方法失败则需要存储到各个本地数据库中(后续的操作还没想好)

TCC注解

按照本地事务提交成功之后再注册confirm和cancel 方法的话 需要确定是谁开启TCC为了所有try方法执行完之后执行confirm方法

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
public @interface TccTry {
    //对应的消费接口
    Class<?> interfaceClass();
    //对应的提交方法
    String confirmableKey() default "";
    //对应的取消方法
    String cancellableKey() default "";
    //是否为主方法
    boolean isMain() default false;
}

AOP注册事务同步器

@Aspect
@Component
public class TccTryAspect {

    @Pointcut("execution(* com.ljl.server.service.Impl.*.*(..))")
    public void pointcutName(){}

    @Autowired
    private ZkTccHander zkTccHander;

    @Before("pointcutName())")
    public void permissionCheck(JoinPoint point) {
        String className = point.getSignature().getDeclaringTypeName();
        String uuid = point.getArgs()[0].toString();
        try {
            TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
            String comfirm = tccTry.confirmableKey();
            String cancel = tccTry.confirmableKey();
            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();
            if(method.getName().equals(comfirm)|| method.getName().equals(cancel)){
                //如果是comfirm或者cancel提交有成功或者失败则需要进行处理 这里先不考虑
            }else{
                // 注册事务同步器,在事务提交后进行回调 当所有的try都执行成功
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                    //在事务提交/回滚后调用
                    @Override
                    public void afterCompletion(int status) {
                        switch (status) {
                            case 0:
                                //transaction status is commit
                                zkTccHander.invokeForPRC(uuid, className,"0");
                                //twoPhaseProcess(TransactionStatusEnum.STATUS_COMMITTED.getCode());
                                break;
                            case 1:
                                //transaction status is rollback
                                zkTccHander.invokeForPRC(uuid,className,"1");
                                break;
                            default:
                                //logger.error("tcc transaction status is unknown");
                                throw new RuntimeException("tcc transaction status is unknown");
                        }
                    }
                });
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

    }

}

ZkTccHander

处理try-confirm-cancel 执行成功或者失败的逻辑

@Component
public class ZkTccHander {

    @Autowired
    public ZookeeperTcc zk;
    @Autowired
    private static ApplicationContextProviderForTcc applicationContex;


    public void invokeForPRC(String uuid,String className,String code){
        if(code.equals("0")){
            //同时插入本地数据库 失败之后继续调用
            //将tcc中的comfirm 和 cancel 存储到zk中用于后续的本地事务服务
            try {
                TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
                String comfirm = tccTry.confirmableKey();
                String cancel = tccTry.cancellableKey();
                //通过唯一id进行创建 存在就不创建 并存储comfirm和cancel  如果方法有参数应该还要存储参数
                zk.createNodeNoWatch("/TCC/"+uuid,uuid);
                zk.createNodeNoWatch("/TCC/"+uuid+"/comfirm","comfirm");
                zk.createNodeNoWatch("/TCC/"+uuid+"/comfirm/"+comfirm,tccTry.interfaceClass().getName());
                zk.createNodeNoWatch("/TCC/"+uuid+"/cancel","cancel");
                zk.createNodeNoWatch("/TCC/"+uuid+"/cancel/"+cancel,tccTry.interfaceClass().getName());
                //通过uuid获取存储在zk上面的信息之后再通过反射调用@RPCClient
                //当所有方法try成功之后调用comfirm
                if(tccTry.isMain()){
                    List<String> childrens = zk.getChildsNoWatch("/TCC/"+uuid+"/comfirm");
                    System.out.println(childrens);
                    if(childrens.size() !=0){
                        for (String child : childrens) {
                            String data = zk.getData("/TCC/"+uuid+"/comfirm/"+child);
                            System.out.println(data);
                            Class tcc = Class.forName(data);
                            Object implObj= applicationContex.getBean(StringUtil.toLowerCaseFirstOne(tcc.getSimpleName()));
                            Method method = tcc.getDeclaredMethod(child,String.class);
                            method.invoke(implObj,uuid);
                        }
                    }
                }
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }else{
            //当tcc中的comfirm 和 cancel中有出现失败的时候需要记录下来 并进行重试
            try {
                TccTry tccTry = Class.forName(className).getAnnotation(TccTry.class);
                //当有方法try失败之后在Main中调用cancel
                if(tccTry.isMain()){
                    List<String> childrens = zk.getChildsNoWatch("/TCC/"+uuid+"/cancel");
                    if(childrens.size() !=0){
                        for (String child : childrens) {
                            String data = zk.getData("/TCC/"+uuid+"/cancel/"+child);
                            Class tcc = Class.forName(data);
                            System.out.println(data);
                            System.out.println(child);
                            Object implObj= applicationContex.getBean(StringUtil.toLowerCaseFirstOne(tcc.getSimpleName()));
                            Method method = tcc.getDeclaredMethod(child,String.class);
                            method.invoke(implObj,uuid);
                        }
                    }
                }
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }


    }
}

结果展示

原始状态

在这里插入图片描述

调用成功

在这里插入图片描述

调用失败

这里可以看到调用了cancel方法
在这里插入图片描述

示例下单为一个RPC项目

@Service
@TccTry(interfaceClass= TccTestClient.class,confirmableKey="placeAnOrderForComfirm",cancellableKey="placeAnOrderForCancel",isMain=true)
@RPCServer
public class TccTestServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder>  implements TccTestService {

    @Autowired
    private BalanceManageClient balanceManageClient;

    @Autowired
    private CouponManageClient couponManageClient;
    @Autowired
    private BaseOrderMapper baseOrderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void placeAnOrderForTry(String uuid) throws Exception {
        //开始分布式事务  示例就调用下单、库存、积分

        //step1.锁定下单 将订单修改成中间状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(1);
        baseOrder.setStatus("锁定状态");
        baseOrderMapper.updateById(baseOrder);
        //step2.现金扣减
        String bcode = balanceManageClient.balanceReduceForTry(uuid);
        //0代表RPC方法回滚了
        if("0".equals(bcode)){
            throw new Exception();

        }//step3.积分
        String ccode = couponManageClient.couponUseForTry(uuid);
        //0代表RPC方法回滚了
        if("0".equals(ccode)){
            throw new Exception();

        }

    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void placeAnOrderForComfirm(String uuid) {
        //step1.将订单修改成下单完成状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(1);
        baseOrder.setStatus("完成状态");
        baseOrderMapper.updateById(baseOrder);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void placeAnOrderForCancel(String uuid) {
        //step1.将订单修改成未支付状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(1);
        baseOrder.setStatus("支付异常");
        baseOrderMapper.updateById(baseOrder);
    }
}

示例积分和会员为第二个RPC项目

@Service
@TccTry(interfaceClass= BalanceManageClient.class,confirmableKey="balanceReduceForComfirm",cancellableKey="balanceReduceForCancel")
@RPCServer
public class BalanceManageServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder>  implements BalanceManageService {

    @Autowired
    private BalanceManageService balanceManageService;


    @Autowired
    private BaseOrderMapper baseOrderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String balanceReduceForTry(String uuid)  {
        try {
            balanceManageService.balanceReduceForTry2(uuid);
            System.out.println("1");
            return "1";
        }catch (Exception e){
            System.out.println("失败");
            return "0";
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String balanceReduceForTry2(String uuid) {
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(2);
        baseOrder.setStatus("库存100");
        baseOrderMapper.updateById(baseOrder);
        return "1";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String balanceReduceForComfirm(String uuid) {
        //step1.将订单修改成下单完成状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(2);
        baseOrder.setStatus("库存99");
        baseOrderMapper.updateById(baseOrder);
        return "1";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String balanceReduceForCancel(String uuid) {
        //step1.将订单修改成未支付状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(2);
        baseOrder.setStatus("库存100");
        baseOrderMapper.updateById(baseOrder);
        return "1";
    }
}


@Service
@TccTry(interfaceClass= CouponManageClient.class,confirmableKey="couponUseForComfirm",cancellableKey="couponUseForCancel")
@RPCServer
public class CouponManageServiceImpl extends ServiceImpl<BaseOrderMapper, BaseOrder>  implements CouponManageService {



    @Autowired
    private CouponManageService couponManageService;
    @Autowired
    private BaseOrderMapper baseOrderMapper;

    @Override
    public String couponUseForTry(String uuid) {
        try {
            couponManageService.couponUseForTry2(uuid);
            System.out.println("1");
            return "1";
        }catch (Exception e){
            return "0";
        }


    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String couponUseForTry2(String uuid) {

        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(3);
        baseOrder.setStatus("会员积分100");
        baseOrderMapper.updateById(baseOrder);
        //int i = 1/0;
        return "1";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String couponUseForComfirm(String uuid) {
        //step1.将订单修改成下单完成状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(3);
        baseOrder.setStatus("会员积分120");
        baseOrderMapper.updateById(baseOrder);
        return "1";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String couponUseForCancel(String uuid) {
        //step1.将订单修改成未支付状态
        BaseOrder baseOrder = new BaseOrder();
        baseOrder.setId(3);
        baseOrder.setStatus("会员积分100");
        baseOrderMapper.updateById(baseOrder);
        return "1";
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15303.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!