Seata分布式事务AT模式以及工作机制

导读:本篇文章讲解 Seata分布式事务AT模式以及工作机制,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

分布式事务:

  事务是数据库概念,数据库事务ACID(Atomicity 原子性、Consistency 一致性、Isolation 隔离性和Durability 持久性);
  分布式事务的产生是由于数据库的拆分和分布式架构(微服务)带来的,在常规情况下,我们在一个进程中操作一个数据库,这属于本地事务,如果在一个进程中操作多个数据库,或者在多个进程中操作一个或多个数据库,就产生了分布式事务;
(1)数据库分库分表产生了分布式事务:
分库分表
(2)项目拆分服务化也产生了分布式事务:
项目拆分服务器化

什么是seata?

官网:http://seata.io/zh-cn/
  Seata是一款开源的分布式解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务。Seata将为用户提供AT,TCC,SAGA和XA交易模型,以为用户创建一站式分布式解决方案。

Seata架构中,一共有三个角色:

事务角色
TC (Transaction Coordinator) – 事务协调者
  定义全局事务范围:开始全局事务、提交或回滚全局事务;
TM (Transaction Manager) – 事务管理器
  定义全局事务的范围:开始全局事务、提交或回滚全局事务;
RM (Resource Manager) – 资源管理器
  管理分支事务处理的资源,与TC交互以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚;
其中TC为单独部署的 Server 服务端,TM和RM为嵌入到应用中的 Client 客户端;

Seata中,一个分布式事务的生命周期如下:

分布式事务生命周期
  TM(事务管理器)请求TC(事务协调者)开启一个全局事务,TC会生成一个XID作为该全局事务的编号,XID会在微服务的调用链路当中传播,保证多个微服务的子事务关联在一起(即同一个XID);
  RM(资源管理器)请求TC将本地事务注册为全局事务的分支,通过全局事务的XID进行关联;
  TM请求TC告诉XID对应的全局事务是进行提交还是回滚;
  TC驱动RM对应自己的本地事务进行提交还是回滚(与全局事务保持一致);

TC Server运行环境部署:

  由于TC需要进行全局事务和分支事务的记录,所以需要对应的存储,目前TC有三种存储模式(store.model):
file模式:适合单机模式,全局事务会话信息在内存中读取,并持久化本地文件root.data,性能较高;
db模式:集群模式,全局事务绘画信息通过db共享,相对性能较差;
redis模式:解决db存储的性能问题;

Seata运行环境部署:

下载地址:http://seata.io/zh-cn/blog/download.html
解压:tar -zxvf seata-server-1.3.0.tar.gz -C /usr/local
/usr/local/seata
在bin目录下启动:./seata-server.sh
默认配置下,Seata TC Server 启动在 8091 端口;
seata启动默认端口8091
  由于seata默认情况下使用的是file模式进行数据持久化,没有修改过配置文件,可以看到/bin/sessionStore目录下用于持久化的本地文件rooe.data。
本地持久化文件

Seata下的AT模型事务案例:

单体应用多数据源分布式事务:
分库分表
  在Spring Boot单体项目当中,如果使用了多数据源,就需要考虑多个数据源的一致性,即产生了分布式事务的问题,我们采用Seata的AT事务模式来解决该分布式事务问题。
已电商下单为例了解AT事务
电商下单
1、准备数据库和表数据
  其中每个库中的undo_log表,是 Seata AT模式必须创建的表,主要用于分支事务的回滚;
分库分表

-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2、开发SpringBoot单体应用(仅提供controller和service代码)
(1)产品、订单和用户实体类

/**
 * @Description: 产品实体类
 * @date 2021/1/17 18:17
 */
@Data
public class Product {
    //产品ID
    private Integer id;
    //产品名称
    private String name;
    //产品价格
    private BigDecimal price;
    //产品总数量
    private Integer stock;
    //入库时间
    private Date addTime;
    //更新时间
    private Date updateTime;
}

/**
 * @Description: 订单实体类
 * @date 2021/1/17 18:16
 */
@Data
public class Orders {
    //订单ID
    private Integer id;
    //用户ID
    private Integer userId;
    //产品ID
    private Integer productId;
    //支付价格
    private BigDecimal payAmount;
    //添加时间
    private Date addTime;
    //更新时间
    private Date updateTime;
}

/**
 * @Description: 用户实体类
 * @date 2021/1/17 18:15
 */
@Data
public class Account {
    //主键ID唯一
    private Integer id;
    //用户ID
    private Integer userId;
    //用户余额
    private BigDecimal balance;
    //更新时间
    private Date updateTime;
}

(2)OrderController代码:

/**
 * @Description: 下订单入口
 * @date 2021/1/17 18:43
 */
@Slf4j
@RestController
public class OrderController {
    @Autowired
    private OrderService orderService;

    @RequestMapping("/order")
    public Integer createOrder(@RequestParam("userId") Integer userId,
                               @RequestParam("productId") Integer productId) throws Exception {

        log.info("请求下单, 用户:{}, 商品:{}", userId, productId);
        return orderService.createOrder(userId, productId);
    }
}

(3)OrderServiceImpl下订单的实现:

/**
 * @Description: 下订单的实现
 * @date 2021/1/17 18:46
 */
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrdersMapper ordersMapper;

    @Autowired
    private AccountService accountService;

    @Autowired
    private ProductService productService;

    @DS("order-ds")			//用来指定连接哪个数据库
    @GlobalTransactional    //开启seata全局事务注解
    public Integer createOrder(Integer userId, Integer productId) throws Exception {
        //手动设置购买数量暂时设置为 1
        Integer amount = 1;
		//全局事务获取的唯一标识XID
        log.info("当前 XID: {}", RootContext.getXID());

        //产品减库存操作
        Product product = productService.reduceStock(productId, amount);

        //用户减余额操作
        accountService.reduceBalance(userId, product.getPrice());

        //生成订单
        Orders order = new Orders();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setPayAmount(product.getPrice().multiply(new BigDecimal(amount)));

        ordersMapper.insertSelective(order);

        log.info("下订单: {}", order.getId());
        //出异常就会回滚,如果try catch抓取异常就不会回滚
		//int a= 10/0;

        // 返回订单编号
        return order.getId();
    }
}

(4)减产品库存操作:

/**
 * @Description: 减产品库存操作
 * @date 2021/1/17 18:58
 */
@Slf4j
@Service
public class ProductServiceImpl implements ProductService {

    @Autowired
    private ProductMapper productMapper;

    @DS("product-ds")	//要连接的哪个库
	//@Transactional(propagation = Propagation.REQUIRES_NEW)  //开启新事务(Spring的事务注解)
    public Product reduceStock(Integer productId, Integer amount) throws Exception {
    
        log.info("当前 XID: {}", RootContext.getXID());

        //检查产品库存对比熬购买的库存
        Product product = productMapper.selectByPrimaryKey(productId);
        if (product.getStock() < amount) {
            throw new Exception("库存不足");
        }

        // 扣减库存
        amount = product.getStock() - amount;
        int updateCount = productMapper.reduceStock(productId, amount);
        // 扣除成功
        if (updateCount == 0) {
            throw new Exception("库存不足");
        }

        // 扣除成功
        log.info("扣除 {} 库存成功", productId);
        return product;
    }
}

(5)更新用户余额操作:

/**
 * @Description: 更新账户余额
 * @date 2021/1/17 18:57
 */
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

    @Autowired
    private AccountMapper accountMapper;

    @DS("account-ds")	//要连接的哪个库
//@Transactional(propagation = Propagation.REQUIRES_NEW)  //开启新事务(Spring事务)
    public void reduceBalance(Integer userId, BigDecimal money) throws Exception {
    
        log.info("当前 XID: {}", RootContext.getXID());
        
        //检查用户余额和购买金额对比
        Account account = accountMapper.selectAccountByUserId(userId);

        if (account.getBalance().doubleValue() < money.doubleValue()){
            throw new Exception("余额不足");
        }

        // 扣除余额
        int updateCount = accountMapper.reduceBalance(userId, money);

        if (updateCount == 0){
            throw new Exception("余额不足");
        }
        log.info("扣除用户 {} 余额成功", userId);
    }
}

(6)application.properties配置文件:

#开启对Seata的集成,默认是false
spring.datasource.dynamic.seata=true


#Seate默认编号,默认为${spring.application.name}
seata.application-id=${spring.application.name}
#Seata事务组编号,用于TC集群名
seata.tx-service-group=29-seata-distributed-transaction-group

#虚拟组和分组的映射
seata.service.vgroup-mapping.29-seata-distributed-transaction-group=default
#分组和Seata服务的映射
seata.service.grouplist.default=47.110.237.194:8091
seata.config.type=file
seata.registry.type=file

测试地址:http://localhost:8089/order?userId=1&productId=1

微服务分布式事务:

分布式事务

AT事务模式分布式事务工作机制:

  前提是基于支持本地ACID事务的关系型数据库(mysql,Oracle)

整体机制:

就是根据两阶段提交协议的演变:
第一阶段:
  业务数据和回滚日志记录在同一个本地事务当中提交,释放本地锁和连接资源;
第二阶段:
  如果没异常异步化提交则非常快速完成
  如果有异常回滚通过一阶段的回滚日志进行反向补偿;

写隔离:

  一阶段本地事务提交前,需要确保先拿到全局锁(global lock)。
  拿不到全局锁,不能提交本地事务。
  拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁
案例如下:
  两个或多个全局事务tx1和tx2等,分别并发对product表的字段进行更新操作,m的初始值为1000;
  tx1先开始,开启本地事务,拿到本地锁,更新操作m=1000-100=900,本地事务提交前,先拿到该记录的全局锁,拿到了全局锁本地提交释放本地锁;
  tx2后开始,开启本地事务,拿到本地锁。更新操作m=900-100=800,本地事务提交前,尝试获取该记录的全局锁,tx1全局提交之前,该记录的全局锁会一直被tx1持有,tx2需要重试等待获取全局锁。
tx1二阶段全局提交正常
tx1二阶段全局提交,释放全局锁,tx2拿到全局锁提交本地事务。
tx1的二阶段需要回滚
如果tx1的二阶段全局回滚,则tx1需要重新获取该数据记录的本地锁,进行反向补偿操作实现分支的回滚。
如果此时tx2在等待该数据全局锁的同时还持有本地锁,则tx1的分支回滚会失败。分支的回滚会一直重试,直到tx2的全局锁等待超时,放弃全局锁回滚本地事务释放本地锁之后,tx1的分支回滚获取到本地锁才会分支回滚成功。
因为整个过程全局锁在tx1结束前一直是被tx1持有所以不会出现发生脏写问题

读隔离:

  数据库本地事务隔离级别读已提交(Read Commited)或以上的基础,Seata(AT模式)的默认全局隔离级别是读未提交(Read UnCommited)。
  如果在应用特定场景下,必须要求全局读已提交,目前Seata的方式是通过SELECT FOR UPDATE语句的代理。(避免脏读)
读隔离
  SELECT FOR UPDATE语句的执行会申请全局锁,如果全局锁被其他事物持有,则释放本地锁(回滚SELECT FOR UPDATE语句的本地执行)并重试,这个过程中,查询是被block住的,直到全局锁拿到,即读取相关已提交数据才返回;
  出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

AT分支事务工作机制:

update product set name = ‘GTS’ where name = ‘TXC’;

一阶段过程:

1、解析SQL,得到SQL的类型等相关信息;
2、查询前置镜像:根据解析得到的条件信息,生成查询语句,定位数据select id, name, since from product where name = ‘TXC’;得到前置镜像;
3、执行SQL更新product这条记录的name为GTS
4、查询后置镜像:根据前置镜像的结果通过主键定位数据,select id, name, since from product where id = 1;得到后置镜像;
5、插入回滚日志:把前后镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中;
undo_log当中数据:

{
	"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
	"xid": "172.17.16.69:8091:94220248190046208",
	"branchId": 94220253042855936,
	"sqlUndoLogs": ["java.util.ArrayList", [{
		"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
		"sqlType": "UPDATE",
		"tableName": "product",
		//前置镜像
		"beforeImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
			"tableName": "product",
			"rows": ["java.util.ArrayList", [{
				"@class": "io.seata.rm.datasource.sql.struct.Row",
				"fields": ["java.util.ArrayList", [{
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "id",
					"keyType": "PRIMARY_KEY",
					"type": 4,
					"value": 1
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "stock",
					"keyType": "NULL",
					"type": 4,
					"value": 100
				}]]
			}]]
		},
		//后置镜像
		"afterImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
			"tableName": "product",
			"rows": ["java.util.ArrayList", [{
				"@class": "io.seata.rm.datasource.sql.struct.Row",
				"fields": ["java.util.ArrayList", [{
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "id",
					"keyType": "PRIMARY_KEY",
					"type": 4,
					"value": 1
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field",
					"name": "stock",
					"keyType": "NULL",
					"type": 4,
					"value": 99
				}]]
			}]]
		}
	}]]
}

6、分支事务提交前向TC注册分支,申请product表中主键值为1的记录的全局锁(在当前的同一个全局事务id范围内是可以申请到全局锁的,不同的全局事务id才会排斥);
7、本地事务提交:业务数据的更新和步骤中生成的undo_log的数据一并提交;
8、将本地事务提交的结果上交给TC;

二阶段-回滚:

1、收到TC的分支回滚请求,开启本地事务,执行如下操作;
2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录;
3、数据校验:拿UNDO_LOG中的后置镜像数据与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改,这种情况,需要人工来处理;
4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
5、提交本地事务,并把本地事务的执行结果(分支回滚结果)上报给TC(事务协调者)

二阶段-提交:

1、收到TC的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的给TC;
2、异步任务阶段的分支提交将异步和批量的删除UNDO_LOG记录。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77241.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!