分布式事务:
事务是数据库概念,数据库事务ACID(Atomicity 原子性、Consistency 一致性、Isolation 隔离性和Durability 持久性);
分布式事务的产生是由于数据库的拆分和分布式架构(微服务)带来的,在常规情况下,我们在一个进程中操作一个数据库,这属于本地事务,如果在一个进程中操作多个数据库,或者在多个进程中操作一个或多个数据库,就产生了分布式事务;
(1)数据库分库分表产生了分布式事务:
(2)项目拆分服务化也产生了分布式事务:
什么是seata?
官网:http://seata.io/zh-cn/
Seata是一款开源的分布式解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务。Seata将为用户提供AT,TCC,SAGA和XA交易模型,以为用户创建一站式分布式解决方案。
Seata架构中,一共有三个角色:
TC (Transaction Coordinator) – 事务协调者
定义全局事务范围:开始全局事务、提交或回滚全局事务;
TM (Transaction Manager) – 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务;
RM (Resource Manager) – 资源管理器
管理分支事务处理的资源,与TC交互以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚;
其中TC为单独部署的 Server 服务端,TM和RM为嵌入到应用中的 Client 客户端;
Seata中,一个分布式事务的生命周期如下:
TM(事务管理器)请求TC(事务协调者)开启一个全局事务,TC会生成一个XID作为该全局事务的编号,XID会在微服务的调用链路当中传播,保证多个微服务的子事务关联在一起(即同一个XID);
RM(资源管理器)请求TC将本地事务注册为全局事务的分支,通过全局事务的XID进行关联;
TM请求TC告诉XID对应的全局事务是进行提交还是回滚;
TC驱动RM对应自己的本地事务进行提交还是回滚(与全局事务保持一致);
TC Server运行环境部署:
由于TC需要进行全局事务和分支事务的记录,所以需要对应的存储,目前TC有三种存储模式(store.model):
file模式:适合单机模式,全局事务会话信息在内存中读取,并持久化本地文件root.data,性能较高;
db模式:集群模式,全局事务绘画信息通过db共享,相对性能较差;
redis模式:解决db存储的性能问题;
Seata运行环境部署:
下载地址:http://seata.io/zh-cn/blog/download.html
解压:tar -zxvf seata-server-1.3.0.tar.gz -C /usr/local
在bin目录下启动:./seata-server.sh
默认配置下,Seata TC Server 启动在 8091 端口;
由于seata默认情况下使用的是file模式进行数据持久化,没有修改过配置文件,可以看到/bin/sessionStore目录下用于持久化的本地文件rooe.data。
Seata下的AT模型事务案例:
单体应用多数据源分布式事务:
在Spring Boot单体项目当中,如果使用了多数据源,就需要考虑多个数据源的一致性,即产生了分布式事务的问题,我们采用Seata的AT事务模式来解决该分布式事务问题。
已电商下单为例了解AT事务
1、准备数据库和表数据
其中每个库中的undo_log表,是 Seata AT模式必须创建的表,主要用于分支事务的回滚;
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2、开发SpringBoot单体应用(仅提供controller和service代码)
(1)产品、订单和用户实体类
/**
* @Description: 产品实体类
* @date 2021/1/17 18:17
*/
@Data
public class Product {
//产品ID
private Integer id;
//产品名称
private String name;
//产品价格
private BigDecimal price;
//产品总数量
private Integer stock;
//入库时间
private Date addTime;
//更新时间
private Date updateTime;
}
/**
* @Description: 订单实体类
* @date 2021/1/17 18:16
*/
@Data
public class Orders {
//订单ID
private Integer id;
//用户ID
private Integer userId;
//产品ID
private Integer productId;
//支付价格
private BigDecimal payAmount;
//添加时间
private Date addTime;
//更新时间
private Date updateTime;
}
/**
* @Description: 用户实体类
* @date 2021/1/17 18:15
*/
@Data
public class Account {
//主键ID唯一
private Integer id;
//用户ID
private Integer userId;
//用户余额
private BigDecimal balance;
//更新时间
private Date updateTime;
}
(2)OrderController代码:
/**
* @Description: 下订单入口
* @date 2021/1/17 18:43
*/
@Slf4j
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/order")
public Integer createOrder(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId) throws Exception {
log.info("请求下单, 用户:{}, 商品:{}", userId, productId);
return orderService.createOrder(userId, productId);
}
}
(3)OrderServiceImpl下订单的实现:
/**
* @Description: 下订单的实现
* @date 2021/1/17 18:46
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrdersMapper ordersMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@DS("order-ds") //用来指定连接哪个数据库
@GlobalTransactional //开启seata全局事务注解
public Integer createOrder(Integer userId, Integer productId) throws Exception {
//手动设置购买数量暂时设置为 1
Integer amount = 1;
//全局事务获取的唯一标识XID
log.info("当前 XID: {}", RootContext.getXID());
//产品减库存操作
Product product = productService.reduceStock(productId, amount);
//用户减余额操作
accountService.reduceBalance(userId, product.getPrice());
//生成订单
Orders order = new Orders();
order.setUserId(userId);
order.setProductId(productId);
order.setPayAmount(product.getPrice().multiply(new BigDecimal(amount)));
ordersMapper.insertSelective(order);
log.info("下订单: {}", order.getId());
//出异常就会回滚,如果try catch抓取异常就不会回滚
//int a= 10/0;
// 返回订单编号
return order.getId();
}
}
(4)减产品库存操作:
/**
* @Description: 减产品库存操作
* @date 2021/1/17 18:58
*/
@Slf4j
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@DS("product-ds") //要连接的哪个库
//@Transactional(propagation = Propagation.REQUIRES_NEW) //开启新事务(Spring的事务注解)
public Product reduceStock(Integer productId, Integer amount) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
//检查产品库存对比熬购买的库存
Product product = productMapper.selectByPrimaryKey(productId);
if (product.getStock() < amount) {
throw new Exception("库存不足");
}
// 扣减库存
amount = product.getStock() - amount;
int updateCount = productMapper.reduceStock(productId, amount);
// 扣除成功
if (updateCount == 0) {
throw new Exception("库存不足");
}
// 扣除成功
log.info("扣除 {} 库存成功", productId);
return product;
}
}
(5)更新用户余额操作:
/**
* @Description: 更新账户余额
* @date 2021/1/17 18:57
*/
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@DS("account-ds") //要连接的哪个库
//@Transactional(propagation = Propagation.REQUIRES_NEW) //开启新事务(Spring事务)
public void reduceBalance(Integer userId, BigDecimal money) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
//检查用户余额和购买金额对比
Account account = accountMapper.selectAccountByUserId(userId);
if (account.getBalance().doubleValue() < money.doubleValue()){
throw new Exception("余额不足");
}
// 扣除余额
int updateCount = accountMapper.reduceBalance(userId, money);
if (updateCount == 0){
throw new Exception("余额不足");
}
log.info("扣除用户 {} 余额成功", userId);
}
}
(6)application.properties配置文件:
#开启对Seata的集成,默认是false
spring.datasource.dynamic.seata=true
#Seate默认编号,默认为${spring.application.name}
seata.application-id=${spring.application.name}
#Seata事务组编号,用于TC集群名
seata.tx-service-group=29-seata-distributed-transaction-group
#虚拟组和分组的映射
seata.service.vgroup-mapping.29-seata-distributed-transaction-group=default
#分组和Seata服务的映射
seata.service.grouplist.default=47.110.237.194:8091
seata.config.type=file
seata.registry.type=file
测试地址:http://localhost:8089/order?userId=1&productId=1
微服务分布式事务:
AT事务模式分布式事务工作机制:
前提是基于支持本地ACID事务的关系型数据库(mysql,Oracle)
整体机制:
就是根据两阶段提交协议的演变:
第一阶段:
业务数据和回滚日志记录在同一个本地事务当中提交,释放本地锁和连接资源;
第二阶段:
如果没异常异步化提交则非常快速完成
如果有异常回滚通过一阶段的回滚日志进行反向补偿;
写隔离:
一阶段本地事务提交前,需要确保先拿到全局锁(global lock)。
拿不到全局锁,不能提交本地事务。
拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁
案例如下:
两个或多个全局事务tx1和tx2等,分别并发对product表的字段进行更新操作,m的初始值为1000;
tx1先开始,开启本地事务,拿到本地锁,更新操作m=1000-100=900,本地事务提交前,先拿到该记录的全局锁,拿到了全局锁本地提交释放本地锁;
tx2后开始,开启本地事务,拿到本地锁。更新操作m=900-100=800,本地事务提交前,尝试获取该记录的全局锁,tx1全局提交之前,该记录的全局锁会一直被tx1持有,tx2需要重试等待获取全局锁。
tx1二阶段全局提交,释放全局锁,tx2拿到全局锁提交本地事务。
如果tx1的二阶段全局回滚,则tx1需要重新获取该数据记录的本地锁,进行反向补偿操作实现分支的回滚。
如果此时tx2在等待该数据全局锁的同时还持有本地锁,则tx1的分支回滚会失败。分支的回滚会一直重试,直到tx2的全局锁等待超时,放弃全局锁回滚本地事务释放本地锁之后,tx1的分支回滚获取到本地锁才会分支回滚成功。
因为整个过程全局锁在tx1结束前一直是被tx1持有所以不会出现发生脏写问题
读隔离:
数据库本地事务隔离级别读已提交(Read Commited)或以上的基础,Seata(AT模式)的默认全局隔离级别是读未提交(Read UnCommited)。
如果在应用特定场景下,必须要求全局读已提交,目前Seata的方式是通过SELECT FOR UPDATE语句的代理。(避免脏读)
SELECT FOR UPDATE语句的执行会申请全局锁,如果全局锁被其他事物持有,则释放本地锁(回滚SELECT FOR UPDATE语句的本地执行)并重试,这个过程中,查询是被block住的,直到全局锁拿到,即读取相关已提交数据才返回;
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
AT分支事务工作机制:
update product set name = ‘GTS’ where name = ‘TXC’;
一阶段过程:
1、解析SQL,得到SQL的类型等相关信息;
2、查询前置镜像:根据解析得到的条件信息,生成查询语句,定位数据select id, name, since from product where name = ‘TXC’;得到前置镜像;
3、执行SQL更新product这条记录的name为GTS
4、查询后置镜像:根据前置镜像的结果通过主键定位数据,select id, name, since from product where id = 1;得到后置镜像;
5、插入回滚日志:把前后镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中;
undo_log当中数据:
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "172.17.16.69:8091:94220248190046208",
"branchId": 94220253042855936,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "product",
//前置镜像
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "product",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "stock",
"keyType": "NULL",
"type": 4,
"value": 100
}]]
}]]
},
//后置镜像
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "product",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "stock",
"keyType": "NULL",
"type": 4,
"value": 99
}]]
}]]
}
}]]
}
6、分支事务提交前向TC注册分支,申请product表中主键值为1的记录的全局锁(在当前的同一个全局事务id范围内是可以申请到全局锁的,不同的全局事务id才会排斥);
7、本地事务提交:业务数据的更新和步骤中生成的undo_log的数据一并提交;
8、将本地事务提交的结果上交给TC;
二阶段-回滚:
1、收到TC的分支回滚请求,开启本地事务,执行如下操作;
2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录;
3、数据校验:拿UNDO_LOG中的后置镜像数据与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改,这种情况,需要人工来处理;
4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
5、提交本地事务,并把本地事务的执行结果(分支回滚结果)上报给TC(事务协调者)
二阶段-提交:
1、收到TC的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的给TC;
2、异步任务阶段的分支提交将异步和批量的删除UNDO_LOG记录。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77241.html