Seata TC Server集群部署:
生产环境下需要集群部署Seata TC Server,实现高可用,在集群时多个Seata TC Server通过db数据库或者redis实现全局事务会话信息的共享;
每个Seata TC Server注册自己到注册中心上,应用从注册中心获得Seata TC Server实例,这就是Seata TC Server的集群;
Seata TC Server 对Nacos注册中心的集成;
Seata TC Server集群搭建具体步骤:
1、准备两个seata-server
2、初始化Seata TC Server的db数据库,在MySQL当中创建seata数据库,并执行如下SQL脚本文件:
3、修改seata-01/seata-02下/conf/file.conf配置文件修改使用db数据库,实现Seata TC Server的全局事务会话信息的共享;
如果使用的是mysql8.0版本则driverClassName = “com.mysql.cj.jdbc.Driver”
4、设置使用Nacos注册中心:
修改/seata-01/conf/registry.conf配置文件,设置使用Nacos注册中心
5、启动数据库和Nacos,然后再启动两个TC Server
-p:Seata TC Server 监听的端口;
-n:Server node,在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突;
./seata-server.sh -p 18091 -n 1 #启动第一个TC Server
./seata-server.sh -p 28091 -n 2 #启动第二个TC Server
6、打开Nacos注册中心控制台,可以看到有两个Seata TC Server 实例;
解决办法:https://blog.csdn.net/nan1996jiang/article/details/112914186
#启动sentinel的时候 指定端口 ip这个内网IP问题自然迎刃而解
#-p:Seata TC Server 监听的端口;
#-h:指定ip
#-n:Server node,在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突;
./seata-server.sh -p 18091 -h 47.110.237.194 -n 1
7、应用测试配置文件application.properties:
# Seata应用编号,默认为${spring.application.name}
seata.application-id=springboot-seata
# Seata事务组编号,用于TC集群名
seata.tx-service-group=springboot-seata-group
# 虚拟组和分组的映射
seata.service.vgroup-mapping.springboot-seata-group=default
#
#seata-spring-boot-starter 1.1版本
#默认是true可不设置
seata.enabled=true
seata.registry.type=nacos
seata.registry.nacos.cluster=default
seata.registry.nacos.server-addr=47.110.237.194:80
seata.registry.nacos.group=SEATA_GROUP
seata.registry.nacos.application=seata-server
TCC事务模式执行机制:
分布式全局事务,整体式一个两阶段提交模型,全局事务由几个分支事务组成。分支事务必须满足两阶段提交模型的要求,且每个分支事务必须有字的:
涉及非关系型数据库与中间件的操作、跨公司服务的调用、跨语言的应用调用就需要结合TCC模式
TCC 模式,需要我们手动编写代码实现提交和回滚:
1、一阶段 prepare 行为:调用自定义的 prepare 逻辑;(真正要做的事情,比如插入订单,更新库存,更新余额)
2、二阶段 commit 行为:调用自定义的 commit 逻辑;(自己写代码实现)
3、二阶段 rollback 行为:调用自定义的 rollback 逻辑;(自己写代码实现)
所以TCC模式,就是把自定义的分支事务的提交和回滚并纳入到全局事务管理中;
通俗来说,Seata的TCC模式就是手工版本的AT模式,它允许你自定义两阶段的处理逻辑而不需要依赖AT模式的undo_log回滚表;
TCC事务模式应用实践:
/**
* @Description: 下单Controller
* @date 2021/1/17 18:43
*/
@Slf4j
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/order")
public Integer createOrder(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId) throws Exception {
log.info("请求下单, 用户:{}, 商品:{}", userId, productId);
return orderService.createOrder(userId, productId);
}
}
OrderServiceImpl代码:
/**
* @Description:
* @date 2021/1/17 18:46
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrdersMapper ordersMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@DS("order-ds")
@GlobalTransactional //seata全局事务注解,默认超时时间60000毫秒
public Integer createOrder(Integer userId, Integer productId) throws Exception {
// 购买数量暂时设置为 1
Integer amount = 1;
log.info("当前 XID: {}", RootContext.getXID());
// 减库存
Product product = productService.reduceStock(productId, amount);
// 减余额
accountService.reduceBalance(userId, product.getPrice());
// 下订单
Orders order = new Orders();
order.setUserId(userId);
order.setProductId(productId);
order.setPayAmount(product.getPrice().multiply(new BigDecimal(amount)));
//插入订单信息
ordersMapper.insertSelective(order);
log.info("下订单: {}", order.getId());
//出异常就会回滚,如果try catch抓取异常就不会回滚
//int a= 10/0;
// 返回订单编号
return order.getId();
}
}
ProductService 接口代码:
/**
* @Description: 减库存操作
* @date 2021/1/17 18:55
*/
@LocalTCC //注解标识此TCC为本地模式,即该事务是本地调用
public interface ProductService {
/**
* 减库存
*
* @param productId 商品 ID
* @param amount 扣减数量
* @throws Exception 扣减失败时抛出异常
*/
@TwoPhaseBusinessAction(name = "reduceStock",commitMethod = "commitTcc",rollbackMethod = "rollbackTcc")
Product reduceStock(@BusinessActionContextParameter(paramName = "productId") Integer productId,
@BusinessActionContextParameter(paramName = "amount") Integer amount);
/**
* 二阶段提交方法
*
* 确认方法,命名必须与commitMethod = "commitTcc"保持一致
* ontext可以传递try方法的参数
* @param context 上下文
* @return
*/
boolean commitTcc(BusinessActionContext context);
/**
* 二阶段回滚方法
* @param context 上下文
* @return
*/
boolean rollbackTcc(BusinessActionContext context);
}
ProductServiceImpl 实现类:
/**
* @Description: 减库存实现类,里面手动实现提交和回滚操作
* @date 2021/1/17 18:58
*/
@Slf4j
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@DS("product-ds")
public Product reduceStock(Integer productId, Integer amount){
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectByPrimaryKey(productId);
if (product.getStock() < amount) {
throw new RuntimeException("库存不足");
}
// 扣减库存
amount = product.getStock() - amount;
int updateCount = productMapper.reduceStock(productId, amount);
// 扣除成功
if (updateCount == 0) {
throw new RuntimeException("库存不足");
}
// 扣除成功
log.info("扣除 {} 库存成功", productId);
return product;
}
/**
* tcc模式的commit方法
* 可以空确认
* @param context 上下文
* @return
*/
@DS("product-ds")
public boolean commitTcc(BusinessActionContext context) {
log.info("Confirm阶段,ProductServiceImpl, commitTcc --> xid = " + context.getXid() + ", commitTcc提交成功");
return true;
}
/**
* tcc模式rollback方法
* @param context 上下文
* @return
*/
@DS("product-ds")
public boolean rollbackTcc(BusinessActionContext context) {
log.info("Cancel阶段,ProductServiceImpl, cancelTcc --> xid = " + context.getXid() + ", cancelTcc提交失败");
//TODO 这里可以实现中间件、非关系型数据库的回滚操作
log.info("Cancel阶段,ProductServiceImpl, cancelTcc this data: {}, {}", context.getActionContext("productId"), context.getActionContext("amount"));
//进行数据库回滚操作
Integer productId = (Integer) context.getActionContext("productId");
Integer amount = (Integer) context.getActionContext("amount");
//把库存再加回去 (避免数据出问题,加个锁,分布式环境下就需要分布式锁)
productMapper.rollbackTcc(productId,amount);
return true;
}
}
AccountService 接口代码:
/**
* @Description: 减余额
* @date 2021/1/17 18:55
*/
@LocalTCC
public interface AccountService {
/**
* 减余额
*
* 定义两段提交
* name = reduceStock为一阶段try方法
* commitMethod = commitTcc 为二阶段确认方法
* rollbackMethod = cancel 为二阶段取消方法
* BusinessActionContextParameter注解 可传递参数到二阶段方法
*
* @param userId 用户id
* @param money 扣减金额
* @throws Exception 失败时抛出异常
*/
@TwoPhaseBusinessAction(name = "reduceBalance",commitMethod = "commitBalance",rollbackMethod = "rollbackBalance")
void reduceBalance(@BusinessActionContextParameter(paramName = "userId") Integer userId,
@BusinessActionContextParameter(paramName = "money") BigDecimal money) throws Exception;
/**
确认方法、可以另命名,但要保证与commitMethod一致
* context可以传递try方法的参数
* @param context 上下文
* @return
*/
boolean commitBalance(BusinessActionContext context);
/**
* 二阶段取消方法
* @param context 上下文
* @return
*/
boolean rollbackBalance(BusinessActionContext context);
}
注解解释:
@LocalTCC注解标识此TCC为本地模式,即该事务是本地调用,非RPC调用,@LocalTCC一定需要注解在接口上,
此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法即可;
@TwoPhaseBusinessAction,该注解标识为TCC模式,注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(全局唯一),commitMethod指提交方法,rollbackMethod指事务回滚方法,指定好三个方法之后,Seata会根据事务的成功或失败,通过动态代理去帮我们自动调用提交或者回滚;
@BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法;
BusinessActionContext 是指TCC事务上下文,携带了业务方法的参数;
AccountServiceImpl 实现类代码:
/**
* @Description: 减库存
* @date 2021/1/17 18:57
*/
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@DS("account-ds")
public void reduceBalance(Integer userId, BigDecimal money) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
//检查余额
Account account = accountMapper.selectAccountByUserId(userId);
if (account.getBalance().doubleValue() < money.doubleValue()){
throw new Exception("余额不足");
}
// 扣除余额
int updateCount = accountMapper.reduceBalance(userId, money);
if (updateCount == 0){
throw new Exception("余额不足");
}
log.info("扣除用户 {} 余额成功", userId);
}
/**
* TCC服务(confirm)方法
* 可以空确认
* @param context 上下文
* @return
*/
@DS("account-ds")
public boolean commitBalance(BusinessActionContext context) {
log.info("Confirm阶段,AccountServiceImpl, commitTcc --> xid = {}", context.getXid() + ", commitTcc提交成功");
return true;
}
/**
* TCC服务(rollback)方法
* @param context 上下文
* @return
*/
@DS("account-ds")
public boolean rollbackBalance(BusinessActionContext context) {
log.info("Cancel阶段,AccountServiceImpl, cancelTcc --> xid = " + context.getXid() + ", cancelTcc提交失败");
//TODO 这里可以实现中间件、非关系型数据库的回滚操作
log.info("Cancel阶段,AccountServiceImpl, cancelTcc this data: userId= {}, money = {}", context.getActionContext("userId"), context.getActionContext("money"));
//进行数据库回滚处理
Integer userId =(Integer) context.getActionContext("userId");
BigDecimal money =(BigDecimal) context.getActionContext("money");
//余额在加回库里
accountMapper.rollbackBalance(userId,money);
return true;
}
}
以上避免数据回滚的时候出现问题, (避免数据出问题加个锁,分布式环境下就需要分布式锁)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77240.html