Spring Cloud:使用Seata框架来实现分布式事务(三)
Seata-TCC模式
为啥叫TCC模式呢
-
因为TCC模式需要实现三个方法,这三个方法的首字母拼接起来就是TCC: -
Try:资源的检测和预留 -
Confirm:完成资源操作业务,要求Try成功Confirm一定要成功 -
Cancel:预留资源释放,可以理解为try的反向操作
声明TCC接口
-
TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明
package com.study.account.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface IAccountTCCService {
/**
* Try逻辑
* @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
* commitMethod属性为确认方法
* rollbackMethod属性为回滚方法
* @param userId
* @param accountMoney
* @return
*/
@TwoPhaseBusinessAction(name = "reduceMoney",commitMethod = "reduceMoneyConfirm",rollbackMethod = "reduceMoneyCancel")
boolean reduceMoney(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "accountMoney") double accountMoney);
/**
* confirm确认方法 要对应commitMethod里面的方法名
* @param context 上下文,可以传递try方法的参数
* @return
*/
boolean reduceMoneyConfirm(BusinessActionContext context);
/**
* cancel回滚方法 要对应rollbackMethod里面的方法名
* @param context
* @return
*/
boolean reduceMoneyCancel(BusinessActionContext context);
}
动手实践
-
在上一篇文章(Spring Cloud:使用Seata框架来实现分布式事务(二))创建的项目里修改 -
我们需要一个新建表,用于判断TCC的状态,防止空回滚,业务悬挂,保证幂等性,我的seata版本为1.4.2所以需要建这个表,版本为1.5.0的Seata已经开始解决这些问题了,只需要在@TwoPhaseBusinessAction注解里这只useTCCFence=true就可以开启,当然也是要建表的 -
建立事务控制记录表tcc_fence_log,我是直接去seata1.5版本的github上拿的,链接:https://github.com/seata/seata/tree/1.5.0/script/client/tcc/db
-- -------------------------------- The script use tcc fence --------------------------------
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
update tcc_fence_log set status = '4',gmt_modified = now() where xid = '10.112.71.33:8091:901172362555682922' and branch_id = '901172362555682923'

-
第一步 事务的入口,也就是seata-order订单微服务
package com.study.order.service.imp;
import com.study.order.feign.AccountFeignService;
import com.study.order.feign.StockFeignService;
import com.study.order.mapper.OrderMapper;
import com.study.order.pojo.TestOrder;
import com.study.order.service.IOrderService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderServiceImpl implements IOrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockFeignService stockFeignService;
@Autowired
private AccountFeignService accountFeignService;
@GlobalTransactional
@Override
public boolean createOrder(TestOrder testOrder) throws Exception {
//采购单价*数量=订单金额
double orderMoney = testOrder.getPurPrice()*testOrder.getOrderNum();
testOrder.setOrderMoney(orderMoney);
//商品编号
String commodityCode = testOrder.getCommodityCode();
//订单数量
int orderNum = testOrder.getOrderNum();
//扣减库存 需要商品编号,数量
String flag = stockFeignService.reduceStock(commodityCode, orderNum);
if("false".equals(flag)){
//扣减失败 回滚
throw new Exception("扣减失败回滚");
}
//扣减用户金额 需要用户Id,订单金额
String flag1 = accountFeignService.reduceMoney(testOrder.getUserId(), orderMoney);
if("false".equals(flag1)){
//扣减失败 回滚
throw new Exception("扣减金额回滚");
}
//创建订单
return orderMapper.createOrder(testOrder);
}
}
-
seata-stock库存微服务的修改 package com.study.stock.controller;
import com.study.stock.service.IStockTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/stock")
public class StockController {
// 改为新建的TCCService
@Autowired
private IStockTCCService stockService;
/**
* 扣减库存数量
* @param commodityCode
* @param orderNum
* @return
*/
@PostMapping(value = "/reduceStock")
public String reduceStock(@RequestParam(value = "commodityCode") String commodityCode,@RequestParam(value = "orderNum") int orderNum){
//1.订单处理逻辑
boolean flag = stockService.reduceStock(commodityCode,orderNum);
if(!flag){
return "false";
}
return "true";
}
}package com.study.stock.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface IStockTCCService {
/**
* Try逻辑
* @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
* commitMethod属性为确认方法
* rollbackMethod属性为回滚方法
* @param commodityCode
* @param orderNum
* @return
*/
@TwoPhaseBusinessAction(name = "reduceStock",commitMethod = "reduceStockConfirm",rollbackMethod = "reduceStockCancel")
boolean reduceStock(@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "orderNum") int orderNum);
/**
* confirm确认方法
* @param context
* @return
*/
boolean reduceStockConfirm(BusinessActionContext context);
/**
* cancel回滚方法
* @param context
* @return
*/
boolean reduceStockCancel(BusinessActionContext context);
}package com.study.stock.service.imp;
import com.study.stock.mapper.StockMapper;
import com.study.stock.mapper.TccFenceLogMapper;
import com.study.stock.pojo.TccFenceLog;
import com.study.stock.pojo.TestStock;
import com.study.stock.pojo.constant.TccFenceConstant;
import com.study.stock.service.IStockTCCService;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
public class StockTCCServiceImpl implements IStockTCCService {
@Autowired
private StockMapper stockMapper;
@Autowired
private TccFenceLogMapper tccFenceLogMapper;
/**
* try方法
* @param commodityCode
* @param orderNum
* @return
*/
@Override
public boolean reduceStock(String commodityCode, int orderNum) {
//首先插入TCCFenceLog,因为关联了索引,所以如果插入失败,则证明已存在,直接返回正常
//获取全局事务Id
String xid = RootContext.getXID();
//获取分支事务Id
String branchId = MDC.get(RootContext.MDC_KEY_BRANCH_ID);
TccFenceLog tccFenceLog = new TccFenceLog();
tccFenceLog.setXid(xid);
tccFenceLog.setBranchId(branchId);
tccFenceLog.setActionName("IStockTCCServiceImpl.reduceStock");
//已尝试try
tccFenceLog.setStatus(TccFenceConstant.TRIED);
tccFenceLog.setGmtCreate(new Date());
tccFenceLog.setGmtModified(new Date());
//防止业务悬挂
boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
if(!flag){
return true;
}
//先查询有无足够的库存
TestStock testStock = stockMapper.getStockNum(commodityCode);
int stockNum = testStock.getStockNum();
if(stockNum-orderNum>0){
boolean flag1 = stockMapper.updateStockNum(commodityCode,orderNum);
if(flag1){
//把状态改为已提交
tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.COMMITTED);
}
return flag1;
}else{
return false;
}
}
/**
* confirm
* @param context
* @return
*/
@Override
public boolean reduceStockConfirm(BusinessActionContext context) {
//因为我在try阶段都做完了,所以确认阶段直接就返回true就好了
return true;
}
/**
* cancel
* @param context
* @return
*/
@Override
public boolean reduceStockCancel(BusinessActionContext context) {
//如果出现错误 首先查询tcc状态
String xid = context.getXid();
long branchId = context.getBranchId();
TccFenceLog tccFenceLog = tccFenceLogMapper.getTccFenLogById(xid, String.valueOf(branchId));
//防止空回滚
if(tccFenceLog==null){
tccFenceLog = new TccFenceLog();
tccFenceLog.setXid(xid);
tccFenceLog.setBranchId(String.valueOf(branchId));
tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoneyCancel");
tccFenceLog.setStatus(TccFenceConstant.SUSPENDED);
tccFenceLog.setGmtCreate(new Date());
tccFenceLog.setGmtModified(new Date());
boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
return true;
}else{
int status = tccFenceLog.getStatus();
switch (status){
case TccFenceConstant.COMMITTED:
//写回滚逻辑
String commodityCode = context.getActionContext("commodityCode").toString();
int orderNum = Integer.valueOf(context.getActionContext("orderNum").toString());
orderNum = orderNum*-1;
boolean flag1 = stockMapper.updateStockNum(commodityCode,orderNum);
if(flag1){
boolean flag = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.ROLLBACKED);
}
return flag1;
default:
boolean flag2 = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.SUSPENDED);
return true;
}
}
}
}package com.study.stock.mapper;
import com.study.stock.pojo.TccFenceLog;
import org.apache.ibatis.annotations.*;
@Mapper
public interface TccFenceLogMapper {
/**
* 查询TCC信息 根据全局事务Id和分支事务Id
* @param xid
* @param branchId
* @return
*/
@Results(id = "getTccFenLogById",value = {
@Result(column = "xid",property = "xid",id=true),
@Result(column = "branch_id",property = "branchId"),
@Result(column = "action_name",property = "actionName"),
@Result(column = "status",property = "status"),
@Result(column = "gmt_create",property = "gmtCreate"),
@Result(column = "gmt_modified",property = "gmtModified")
})
@Select(value = "select xid,branch_id,action_name,status,gmt_create,gmt_modified from tcc_fence_log where xid = #{xid} and branch_id = #{branchId}")
TccFenceLog getTccFenLogById(String xid,String branchId);
/**
* 插入TCC日志信息
* @param tccFenceLog
* @return
*/
@Insert(value = "insert into tcc_fence_log(xid,branch_id,action_name,status,gmt_create,gmt_modified) values(#{xid},#{branchId},#{actionName},#{status},#{gmtCreate},#{gmtModified})")
boolean insertTccFenLog(TccFenceLog tccFenceLog);
/**
* 更新状态
* @param xid
* @param branchId
* @param status
* @return
*/
@Update(value = "update tcc_fence_log set status = #{status},gmt_modified = now() where xid = #{xid} and branch_id = #{branchId}")
boolean updateTccFenLogStatus(String xid,String branchId,int status);
}package com.study.stock.pojo;
import java.util.Date;
/**
* TCC日志 用于解决TCC的幂等性,空回滚,业务悬挂
*/
public class TccFenceLog {
/**
* 全局事务Id
*/
private String xid;
/**
* 分支事务Id
*/
private String branchId;
private String actionName;
/**
* 状态
* tried:1;已尝试
* committed:2;已提交
* rollbacked:3;已回滚
* suspended:4 空悬挂
*/
private int status;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
public String getXid() {
return xid;
}
public void setXid(String xid) {
this.xid = xid;
}
public String getBranchId() {
return branchId;
}
public void setBranchId(String branchId) {
this.branchId = branchId;
}
public String getActionName() {
return actionName;
}
public void setActionName(String actionName) {
this.actionName = actionName;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getGmtCreate() {
return gmtCreate;
}
public void setGmtCreate(Date gmtCreate) {
this.gmtCreate = gmtCreate;
}
public Date getGmtModified() {
return gmtModified;
}
public void setGmtModified(Date gmtModified) {
this.gmtModified = gmtModified;
}
}package com.study.stock.pojo.constant;
public class TccFenceConstant {
/**
* tried:1;已尝试
*/
public static final int TRIED = 1;
/**
* committed:2;已提交
*/
public static final int COMMITTED = 2;
/**
* rollbacked:3;已回滚
*/
public static final int ROLLBACKED = 3;
/**
* suspended:4 空悬挂
*/
public static final int SUSPENDED = 4;
} -
constant层 -
pojo实体类 -
mapper层 -
service-imp逻辑层 -
service接口层 -
controller层 -
seata-account账户微服务的修改 package com.study.account.controller;
import com.study.account.service.IAccountTCCService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/account")
public class AccountController {
@Autowired
private IAccountTCCService accountService;
/**
* 扣减用户账户余额
* @param userId
* @param accountMoney
* @return
*/
@PostMapping(value = "/reduceMoney")
public String reduceMoney(@RequestParam(value = "userId") String userId,@RequestParam(value = "accountMoney") double accountMoney){
//1.订单处理逻辑
boolean flag = accountService.reduceMoney(userId,accountMoney);
if(!flag){
return "false";
}
return "true";
}
}package com.study.account.service;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface IAccountTCCService {
/**
* Try逻辑
* @TwoPhaseBusinessAction注解中的name属性必须当前方法名一致
* commitMethod属性为确认方法
* rollbackMethod属性为回滚方法
* @param userId
* @param accountMoney
* @return
*/
@TwoPhaseBusinessAction(name = "reduceMoney",commitMethod = "reduceMoneyConfirm",rollbackMethod = "reduceMoneyCancel")
boolean reduceMoney(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "accountMoney") double accountMoney);
/**
* confirm确认方法 要对应commitMethod里面的方法名
* @param context 上下文,可以传递try方法的参数
* @return
*/
boolean reduceMoneyConfirm(BusinessActionContext context);
/**
* cancel回滚方法 要对应rollbackMethod里面的方法名
* @param context
* @return
*/
boolean reduceMoneyCancel(BusinessActionContext context);
}package com.study.account.service.imp;
import com.study.account.mapper.AccountMapper;
import com.study.account.mapper.TccFenceLogMapper;
import com.study.account.pojo.TccFenceLog;
import com.study.account.pojo.TestAccount;
import com.study.account.pojo.constant.TccFenceConstant;
import com.study.account.service.IAccountTCCService;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
public class AccountTCCServiceImpl implements IAccountTCCService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private TccFenceLogMapper tccFenceLogMapper;
/**
* try方法
* @param userId
* @param accountMoney
* @return
*/
@Override
public boolean reduceMoney(String userId, double accountMoney) {
//首先插入TCCFenceLog,因为关联了索引,所以如果插入失败,则证明已存在,直接返回正常
String xid = RootContext.getXID();
String branchId = MDC.get(RootContext.MDC_KEY_BRANCH_ID);
TccFenceLog tccFenceLog = new TccFenceLog();
tccFenceLog.setXid(xid);
tccFenceLog.setBranchId(branchId);
tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoney");
tccFenceLog.setStatus(TccFenceConstant.TRIED);
tccFenceLog.setGmtCreate(new Date());
tccFenceLog.setGmtModified(new Date());
//防止业务悬挂
boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
if(!flag){
return true;
}
//先查询有无足够的库存
TestAccount testAccount = accountMapper.getMoneyByUserId(userId);
double account = testAccount.getAccountMoney();
if(account-accountMoney>0) {
boolean flag1 = accountMapper.updateMoneyByUserId(userId, accountMoney);
if(!flag1){
tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.COMMITTED);
}
return flag1;
}else{
return false;
}
}
/**
* confirm
* @param context
* @return
*/
@Override
public boolean reduceMoneyConfirm(BusinessActionContext context) {
//因为我在try阶段都做完了,所以确认阶段直接就返回true就好了
return true;
}
/**
* cancel
* @param context
* @return
*/
@Override
public boolean reduceMoneyCancel(BusinessActionContext context) {
//如果出现错误 首先查询tcc状态
String xid = context.getXid();
long branchId = context.getBranchId();
TccFenceLog tccFenceLog = tccFenceLogMapper.getTccFenLogById(xid, String.valueOf(branchId));
//防止空回滚
if(tccFenceLog==null){
tccFenceLog = new TccFenceLog();
tccFenceLog.setXid(xid);
tccFenceLog.setBranchId(String.valueOf(branchId));
tccFenceLog.setActionName("AccountTCCServiceImpl.reduceMoneyCancel");
tccFenceLog.setStatus(TccFenceConstant.SUSPENDED);
tccFenceLog.setGmtCreate(new Date());
tccFenceLog.setGmtModified(new Date());
boolean flag = tccFenceLogMapper.insertTccFenLog(tccFenceLog);
return true;
}else{
int status = tccFenceLog.getStatus();
switch (status){
case TccFenceConstant.COMMITTED:
//写回滚逻辑
String userId = context.getActionContext("userId").toString();
double accountMoney = Double.valueOf(context.getActionContext("accountMoney").toString());
accountMoney = accountMoney*-1;
boolean flag1 = accountMapper.updateMoneyByUserId(userId, accountMoney);
if(flag1){
boolean flag = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.ROLLBACKED);
}
return flag1;
default:
boolean flag2 = tccFenceLogMapper.updateTccFenLogStatus(xid, String.valueOf(branchId), TccFenceConstant.SUSPENDED);
return true;
}
}
}
} -
mapper层,pojo层,constant层新增的跟上面库存微服务一致 -
service-imp逻辑层 -
service接口层 -
controller层
TCC模式运行图

TCC模式优点
-
一阶段Try完成直接提交事务,释放数据库资源,性能好 -
相比AT模式,无需生成快照,无需使用全局锁,性能最强 -
不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
TCC模式缺点
-
很明显,有代码侵入,很麻烦,失败回滚逻辑都需要自己写,需要根据不同情况做出不同的处理 -
软状态,事务是最终一致
什么叫空回滚
-
就是因为某些原因,没有执行try操作,先执行了cancel操作,这时cancel操作不能做回滚,就是空回滚
什么叫业务悬挂
-
就是已经空回滚的业务,如果继续执行try,就永远不可能confirm或cancel,这就是业务悬挂
总结
-
需要自己写回滚逻辑比较麻烦,而且要考虑幂等性,还要考虑业务逻辑回滚,比如我控制了,先查询账户的余额够不够,如果不够直接都不扣账户余额了,直接返回false,那么订单接到账户微服务返回false的时候,就抛出异常,这时候账户微服务也会执行cancel方法,执行cancel方法的时候,你要知道账户微服务执行try方法的时候导致有没有扣减余额,如果没有扣减,你回滚的时候把钱加了,那不是让用户赚了嘛. -
最后关注一下我的微信公众号【楼梯间的男孩】,我们一起学习成长
原文始发于微信公众号(楼梯间的男孩):Spring Cloud:使用Seata框架来实现分布式事务(三)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197282.html