Spring Cloud:使用Seata框架来实现分布式事务(三)

Spring Cloud:使用Seata框架来实现分布式事务(三)

Seata-TCC模式

为啥叫TCC模式呢

  • 因为TCC模式需要实现三个方法,这三个方法的首字母拼接起来就是TCC:
    • Try:资源的检测和预留
    • Confirm:完成资源操作业务,要求Try成功Confirm一定要成功
    • Cancel:预留资源释放,可以理解为try的反向操作

声明TCC接口

  • TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明
package com.study.account.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface IAccountTCCService {
    /**
     * Try逻辑
     * @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
     * commitMethod属性为确认方法
     * rollbackMethod属性为回滚方法
     * @param userId
     * @param accountMoney
     * @return
     */

    @TwoPhaseBusinessAction(name = "reduceMoney",commitMethod = "reduceMoneyConfirm",rollbackMethod = "reduceMoneyCancel")
    boolean reduceMoney(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "accountMoney") double accountMoney);

    /**
     * confirm确认方法 要对应commitMethod里面的方法名
     * @param context 上下文,可以传递try方法的参数
     * @return
     */

    boolean reduceMoneyConfirm(BusinessActionContext context);

    /**
     * cancel回滚方法 要对应rollbackMethod里面的方法名
     * @param context
     * @return
     */

    boolean reduceMoneyCancel(BusinessActionContext context);
}

动手实践

  • 在上一篇文章(Spring Cloud:使用Seata框架来实现分布式事务(二))创建的项目里修改
  • 我们需要一个新建表,用于判断TCC的状态,防止空回滚,业务悬挂,保证幂等性,我的seata版本为1.4.2所以需要建这个表,版本为1.5.0的Seata已经开始解决这些问题了,只需要在@TwoPhaseBusinessAction注解里这只useTCCFence=true就可以开启,当然也是要建表的
  • 建立事务控制记录表tcc_fence_log,我是直接去seata1.5版本的github上拿的,链接:https://github.com/seata/seata/tree/1.5.0/script/client/tcc/db
-- -------------------------------- The script use tcc fence  --------------------------------
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
    `xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',
    `branch_id`     BIGINT        NOT NULL COMMENT 'branch id',
    `action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',
    `status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
    `gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',
    `gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',
    PRIMARY KEY (`xid``branch_id`),
    KEY `idx_gmt_modified` (`gmt_modified`),
    KEY `idx_status` (`status`)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

update tcc_fence_log set status = '4',gmt_modified = now() where xid = '10.112.71.33:8091:901172362555682922' and branch_id = '901172362555682923'
Spring Cloud:使用Seata框架来实现分布式事务(三)
  • 第一步 事务的入口,也就是seata-order订单微服务
package com.study.order.service.imp;

import com.study.order.feign.AccountFeignService;
import com.study.order.feign.StockFeignService;
import com.study.order.mapper.OrderMapper;
import com.study.order.pojo.TestOrder;
import com.study.order.service.IOrderService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements IOrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private StockFeignService stockFeignService;
    @Autowired
    private AccountFeignService accountFeignService;


    @GlobalTransactional
    @Override
    public boolean createOrder(TestOrder testOrder) throws Exception {
        //采购单价*数量=订单金额
        double orderMoney = testOrder.getPurPrice()*testOrder.getOrderNum();
        testOrder.setOrderMoney(orderMoney);
        //商品编号
        String commodityCode = testOrder.getCommodityCode();
        //订单数量
        int orderNum = testOrder.getOrderNum();
        //扣减库存 需要商品编号,数量
        String flag = stockFeignService.reduceStock(commodityCode, orderNum);
        if("false".equals(flag)){
            //扣减失败 回滚
            throw new Exception("扣减失败回滚");
        }
        //扣减用户金额 需要用户Id,订单金额
        String flag1 = accountFeignService.reduceMoney(testOrder.getUserId(), orderMoney);
        if("false".equals(flag1)){
            //扣减失败 回滚
            throw new Exception("扣减金额回滚");
        }
        //创建订单
        return orderMapper.createOrder(testOrder);
    }
}

  • seata-stock库存微服务的修改

    package com.study.stock.controller;

    import com.study.stock.service.IStockTCCService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;

    @RestController
    @RequestMapping(value = "/stock")
    public class StockController {
        // 改为新建的TCCService
        @Autowired
        private IStockTCCService stockService;

        /**
         * 扣减库存数量
         * @param commodityCode
         * @param orderNum
         * @return
         */

        @PostMapping(value = "/reduceStock")
        public String reduceStock(@RequestParam(value = "commodityCode") String commodityCode,@RequestParam(value = "orderNum") int orderNum){
            //1.订单处理逻辑
            boolean flag = stockService.reduceStock(commodityCode,orderNum);
            if(!flag){
                return "false";
            }
            return "true";
        }
    }

    package com.study.stock.service;

    import io.seata.rm.tcc.api.BusinessActionContext;
    import io.seata.rm.tcc.api.BusinessActionContextParameter;
    import io.seata.rm.tcc.api.LocalTCC;
    import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

    @LocalTCC
    public interface IStockTCCService {
        /**
         * Try逻辑
         * @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
         * commitMethod属性为确认方法
         * rollbackMethod属性为回滚方法
         * @param commodityCode
         * @param orderNum
         * @return
         */

        @TwoPhaseBusinessAction(name = "reduceStock",commitMethod = "reduceStockConfirm",rollbackMethod = "reduceStockCancel")
        boolean reduceStock(@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "orderNum") int orderNum);

        /**
         * confirm确认方法
         * @param context
         * @return
         */

        boolean reduceStockConfirm(BusinessActionContext context);

        /**
         * cancel回滚方法
         * @param context
         * @return
         */

        boolean reduceStockCancel(BusinessActionContext context);
    }

    package com.study.stock.service.imp;

    import com.study.stock.mapper.StockMapper;
    import com.study.stock.mapper.TccFenceLogMapper;
    import com.study.stock.pojo.TccFenceLog;
    import com.study.stock.pojo.TestStock;
    import com.study.stock.pojo.constant.TccFenceConstant;
    import com.study.stock.service.IStockTCCService;
    import io.seata.core.context.RootContext;
    import io.seata.rm.tcc.api.BusinessActionContext;
    import org.slf4j.MDC;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    import java.util.Date;

    @Service
    public class StockTCCServiceImpl implements IStockTCCService {
        @Autowired
        private StockMapper stockMapper;
        @Autowired
        private TccFenceLogMapper tccFenceLogMapper;

        /**
         * try方法
         * @param commodityCode
         * @param orderNum
         * @return
         */

        @Override
        public boolean reduceStock(String commodityCode, int orderNum) {
            //首先插入TCCFenceLog,因为关联了索引,所以如果插入失败,则证明已存在,直接返回正常
            //获取全局事务Id
            String xid = RootContext.getXID();
            //获取分支事务Id
            String branchId = MDC.get(RootContext.MDC_KEY_BRANCH_ID);
            TccFenceLog tccFenceLog = new TccFenceLog();
            tccFenceLog.setXid(xid);
            tccFenceLog.setBranchId(branchId);
            tccFenceLog.setActionName("IStockTCCServiceImpl.reduceStock");
            //已尝试try
            tccFenceLog.setStatus(TccFenceConstant.TRIED);
            tccFenceLog.setGmtCreate(new Date());
            tccFenceLog.setGmtModified(new Date());
            //防止业务悬挂
            boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
            if(!flag){
                return true;
            }
            //先查询有无足够的库存
            TestStock testStock = stockMapper.getStockNum(commodityCode);
            int stockNum = testStock.getStockNum();
            if(stockNum-orderNum>0){
                boolean flag1 = stockMapper.updateStockNum(commodityCode,orderNum);
                if(flag1){
                    //把状态改为已提交
                    tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.COMMITTED);
                }
                return flag1;
            }else{
                return false;
            }
        }

        /**
         * confirm
         * @param context
         * @return
         */

        @Override
        public boolean reduceStockConfirm(BusinessActionContext context) {
            //因为我在try阶段都做完了,所以确认阶段直接就返回true就好了
            return true;
        }

        /**
         * cancel
         * @param context
         * @return
         */

        @Override
        public boolean reduceStockCancel(BusinessActionContext context) {
            //如果出现错误 首先查询tcc状态
            String xid = context.getXid();
            long branchId = context.getBranchId();
            TccFenceLog tccFenceLog = tccFenceLogMapper.getTccFenLogById(xid, String.valueOf(branchId));
            //防止空回滚
            if(tccFenceLog==null){
                tccFenceLog = new TccFenceLog();
                tccFenceLog.setXid(xid);
                tccFenceLog.setBranchId(String.valueOf(branchId));
                tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoneyCancel");
                tccFenceLog.setStatus(TccFenceConstant.SUSPENDED);
                tccFenceLog.setGmtCreate(new Date());
                tccFenceLog.setGmtModified(new Date());
                boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
                return true;
            }else{
                int status = tccFenceLog.getStatus();
                switch (status){
                    case TccFenceConstant.COMMITTED:
                        //写回滚逻辑
                        String commodityCode = context.getActionContext("commodityCode").toString();
                        int orderNum = Integer.valueOf(context.getActionContext("orderNum").toString());
                        orderNum = orderNum*-1;
                        boolean flag1 = stockMapper.updateStockNum(commodityCode,orderNum);
                        if(flag1){
                            boolean flag = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.ROLLBACKED);
                        }
                        return flag1;
                    default:
                        boolean flag2 = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.SUSPENDED);
                        return true;
                }
            }
        }
    }

    package com.study.stock.mapper;

    import com.study.stock.pojo.TccFenceLog;
    import org.apache.ibatis.annotations.*;

    @Mapper
    public interface TccFenceLogMapper {
        /**
         * 查询TCC信息 根据全局事务Id和分支事务Id
         * @param xid
         * @param branchId
         * @return
         */

        @Results(id = "getTccFenLogById",value = {
                @Result(column = "xid",property = "xid",id=true),
                @Result(column = "branch_id",property = "branchId"),
                @Result(column = "action_name",property = "actionName"),
                @Result(column = "status",property = "status"),
                @Result(column = "gmt_create",property = "gmtCreate"),
                @Result(column = "gmt_modified",property = "gmtModified")
        })
        @Select(value = "select xid,branch_id,action_name,status,gmt_create,gmt_modified from tcc_fence_log where xid = #{xid} and branch_id = #{branchId}")
        TccFenceLog getTccFenLogById(String xid,String branchId);

        /**
         * 插入TCC日志信息
         * @param tccFenceLog
         * @return
         */

        @Insert(value = "insert into tcc_fence_log(xid,branch_id,action_name,status,gmt_create,gmt_modified) values(#{xid},#{branchId},#{actionName},#{status},#{gmtCreate},#{gmtModified})")
        boolean insertTccFenLog(TccFenceLog tccFenceLog);

        /**
         * 更新状态
         * @param xid
         * @param branchId
         * @param status
         * @return
         */

        @Update(value = "update tcc_fence_log set status = #{status},gmt_modified = now() where xid = #{xid} and branch_id = #{branchId}")
        boolean updateTccFenLogStatus(String xid,String branchId,int status);
    }

    package com.study.stock.pojo;

    import java.util.Date;

    /**
     * TCC日志 用于解决TCC的幂等性,空回滚,业务悬挂
     */

    public class TccFenceLog {
        /**
         * 全局事务Id
         */

        private String xid;
        /**
         * 分支事务Id
         */

        private String branchId;
        private String actionName;
        /**
         * 状态
         * tried:1;已尝试
         * committed:2;已提交
         * rollbacked:3;已回滚
         * suspended:4 空悬挂
         */

        private int status;
        /**
         * 创建时间
         */

        private Date gmtCreate;
        /**
         * 修改时间
         */

        private Date gmtModified;

        public String getXid() {
            return xid;
        }

        public void setXid(String xid) {
            this.xid = xid;
        }

        public String getBranchId() {
            return branchId;
        }

        public void setBranchId(String branchId) {
            this.branchId = branchId;
        }

        public String getActionName() {
            return actionName;
        }

        public void setActionName(String actionName) {
            this.actionName = actionName;
        }

        public int getStatus() {
            return status;
        }

        public void setStatus(int status) {
            this.status = status;
        }

        public Date getGmtCreate() {
            return gmtCreate;
        }

        public void setGmtCreate(Date gmtCreate) {
            this.gmtCreate = gmtCreate;
        }

        public Date getGmtModified() {
            return gmtModified;
        }

        public void setGmtModified(Date gmtModified) {
            this.gmtModified = gmtModified;
        }
    }

    package com.study.stock.pojo.constant;

    public class TccFenceConstant {
        /**
         * tried:1;已尝试
         */

        public static final int TRIED = 1;
        /**
         * committed:2;已提交
         */

        public static final int COMMITTED = 2;
        /**
         * rollbacked:3;已回滚
         */

        public static final int ROLLBACKED = 3;
        /**
         * suspended:4 空悬挂
         */

        public static final int SUSPENDED = 4;

    }

    • constant层
    • pojo实体类
    • mapper层
    • service-imp逻辑层
    • service接口层
    • controller层
  • seata-account账户微服务的修改

    package com.study.account.controller;

    import com.study.account.service.IAccountTCCService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;

    @RestController
    @RequestMapping(value = "/account")
    public class AccountController {

        @Autowired
        private IAccountTCCService accountService;

        /**
         * 扣减用户账户余额
         * @param userId
         * @param accountMoney
         * @return
         */

        @PostMapping(value = "/reduceMoney")
        public String reduceMoney(@RequestParam(value = "userId") String userId,@RequestParam(value = "accountMoney") double accountMoney){
            //1.订单处理逻辑
            boolean flag = accountService.reduceMoney(userId,accountMoney);
            if(!flag){
                return "false";
            }
            return "true";
        }
    }

      package com.study.account.service;

      import io.seata.rm.tcc.api.BusinessActionContext;
      import io.seata.rm.tcc.api.BusinessActionContextParameter;
      import io.seata.rm.tcc.api.LocalTCC;
      import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

      @LocalTCC
      public interface IAccountTCCService {
          /**
           * Try逻辑
           * @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
           * commitMethod属性为确认方法
           * rollbackMethod属性为回滚方法
           * @param userId
           * @param accountMoney
           * @return
           */

          @TwoPhaseBusinessAction(name = "reduceMoney",commitMethod = "reduceMoneyConfirm",rollbackMethod = "reduceMoneyCancel")
          boolean reduceMoney(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "accountMoney") double accountMoney);

          /**
           * confirm确认方法 要对应commitMethod里面的方法名
           * @param context 上下文,可以传递try方法的参数
           * @return
           */

          boolean reduceMoneyConfirm(BusinessActionContext context);

          /**
           * cancel回滚方法 要对应rollbackMethod里面的方法名
           * @param context
           * @return
           */

          boolean reduceMoneyCancel(BusinessActionContext context);
      }

    package com.study.account.service.imp;


    import com.study.account.mapper.AccountMapper;
    import com.study.account.mapper.TccFenceLogMapper;
    import com.study.account.pojo.TccFenceLog;
    import com.study.account.pojo.TestAccount;
    import com.study.account.pojo.constant.TccFenceConstant;
    import com.study.account.service.IAccountTCCService;
    import io.seata.core.context.RootContext;
    import io.seata.rm.tcc.api.BusinessActionContext;
    import org.slf4j.MDC;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    import java.util.Date;

    @Service
    public class AccountTCCServiceImpl implements IAccountTCCService {

        @Autowired
        private AccountMapper accountMapper;
        @Autowired
        private TccFenceLogMapper tccFenceLogMapper;


        /**
         * try方法
         * @param userId
         * @param accountMoney
         * @return
         */

        @Override
        public boolean reduceMoney(String userId, double accountMoney) {
            //首先插入TCCFenceLog,因为关联了索引,所以如果插入失败,则证明已存在,直接返回正常
            String xid = RootContext.getXID();
            String branchId = MDC.get(RootContext.MDC_KEY_BRANCH_ID);
            TccFenceLog tccFenceLog = new TccFenceLog();
            tccFenceLog.setXid(xid);
            tccFenceLog.setBranchId(branchId);
            tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoney");
            tccFenceLog.setStatus(TccFenceConstant.TRIED);
            tccFenceLog.setGmtCreate(new Date());
            tccFenceLog.setGmtModified(new Date());
            //防止业务悬挂
            boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
            if(!flag){
                return true;
            }
            //先查询有无足够的库存
            TestAccount testAccount = accountMapper.getMoneyByUserId(userId);
            double account = testAccount.getAccountMoney();
            if(account-accountMoney>0) {
                boolean flag1 = accountMapper.updateMoneyByUserId(userId, accountMoney);
                if(!flag1){
                    tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.COMMITTED);
                }
                return flag1;
            }else{
                return false;
            }
        }
        /**
         * confirm
         * @param context
         * @return
         */

        @Override
        public boolean reduceMoneyConfirm(BusinessActionContext context) {
            //因为我在try阶段都做完了,所以确认阶段直接就返回true就好了
            return true;
        }
        /**
         * cancel
         * @param context
         * @return
         */

        @Override
        public boolean reduceMoneyCancel(BusinessActionContext context) {
            //如果出现错误 首先查询tcc状态
            String xid = context.getXid();
            long branchId = context.getBranchId();
            TccFenceLog tccFenceLog = tccFenceLogMapper.getTccFenLogById(xid, String.valueOf(branchId));
            //防止空回滚
            if(tccFenceLog==null){
                tccFenceLog = new TccFenceLog();
                tccFenceLog.setXid(xid);
                tccFenceLog.setBranchId(String.valueOf(branchId));
                tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoneyCancel");
                tccFenceLog.setStatus(TccFenceConstant.SUSPENDED);
                tccFenceLog.setGmtCreate(new Date());
                tccFenceLog.setGmtModified(new Date());
                boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
                return true;
            }else{
                int status = tccFenceLog.getStatus();
                switch (status){
                    case TccFenceConstant.COMMITTED:
                        //写回滚逻辑
                        String userId = context.getActionContext("userId").toString();
                        double accountMoney = Double.valueOf(context.getActionContext("accountMoney").toString());
                        accountMoney = accountMoney*-1;
                        boolean flag1 = accountMapper.updateMoneyByUserId(userId, accountMoney);
                        if(flag1){
                            boolean flag = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.ROLLBACKED);
                        }
                        return flag1;
                    default:
                        boolean flag2 = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.SUSPENDED);
                        return true;
                }
            }
        }
    }

    • mapper层,pojo层,constant层新增的跟上面库存微服务一致
    • service-imp逻辑层
    • service接口层
    • controller层

TCC模式运行图

Spring Cloud:使用Seata框架来实现分布式事务(三)

TCC模式优点

  • 一阶段Try完成直接提交事务,释放数据库资源,性能好
  • 相比AT模式,无需生成快照,无需使用全局锁,性能最强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库

TCC模式缺点

  • 很明显,有代码侵入,很麻烦,失败回滚逻辑都需要自己写,需要根据不同情况做出不同的处理
  • 软状态,事务是最终一致

什么叫空回滚

  • 就是因为某些原因,没有执行try操作,先执行了cancel操作,这时cancel操作不能做回滚,就是空回滚

什么叫业务悬挂

  • 就是已经空回滚的业务,如果继续执行try,就永远不可能confirm或cancel,这就是业务悬挂

总结

  • 需要自己写回滚逻辑比较麻烦,而且要考虑幂等性,还要考虑业务逻辑回滚,比如我控制了,先查询账户的余额够不够,如果不够直接都不扣账户余额了,直接返回false,那么订单接到账户微服务返回false的时候,就抛出异常,这时候账户微服务也会执行cancel方法,执行cancel方法的时候,你要知道账户微服务执行try方法的时候导致有没有扣减余额,如果没有扣减,你回滚的时候把钱加了,那不是让用户赚了嘛.
  • 最后关注一下我的微信公众号【楼梯间的男孩】,我们一起学习成长


原文始发于微信公众号(楼梯间的男孩):Spring Cloud:使用Seata框架来实现分布式事务(三)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197282.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!