1.分布式事务问题
1.1.本地事务
1.2.分布式事务
2.理论基础
2.1.CAP定理
Consistency(一致性): 用户访问分布式系统中的任意节点,得到的数据必须一致
Availability(可用性): 用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。
Partition(分区): 因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。
tolerance(容错): 在集群出现分区时,整个系统也要持续对外提供服务
2.1.1.一致性
2.1.2.可用性
2.1.3.分区容错
2.1.4.矛盾
2.2.BASE理论
2.3.解决分布式事务的思路
3.Alibaba分布式事务组件Seata
3.1.Seata的架构
4.部署Seata
4.1.下载解压
4.2.修改seata配置文件
registry {
# tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
type = "nacos"
发布
nacos {
# seata tc 服务注册到 nacos的服务名称,可以自定义 spring.application.name
application = "seata-tc-server"
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP"
namespace = ""
cluster = "SH"
username = "nacos"
password = "nacos"
}
}
config {
# 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
type = "nacos"
# 配置nacos地址等信息
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
4.3.在nacos添加配置
# 数据存储方式,db代表数据库
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
# 事务、日志等配置
server.recovery.committingRetryPeriod=60000
server.recovery.asynCommittingRetryPeriod=60000
server.recovery.rollbackingRetryPeriod=60000
server.recovery.timeoutRetryPeriod=60000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
4.4.创建数据库表
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
4.5.启动nacos注册中心
4.6.启动TC服务
5.微服务集成seata
https://pan.baidu.com/s/1ODzKZb-9wfPGfzFFsE5kJg?pwd=wspt
提取m:wspt
package cn.qingzhou.account.service.impl;
import cn.qingzhou.account.mapper.AccountMapper;
import cn.qingzhou.account.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author qingzhuo
*/
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Override
public void deduct(String userId, int money) {
log.info("开始扣款");
try {
accountMapper.deduct(userId, money);
} catch (Exception e) {
throw new RuntimeException("扣款失败,可能是余额不足!", e);
}
log.info("扣款成功");
}
}
package cn.qingzhou.order.service.impl;
import cn.qingzhou.order.client.AccountClient;
import cn.qingzhou.order.client.StorageClient;
import cn.qingzhou.order.entity.Order;
import cn.qingzhou.order.mapper.OrderMapper;
import cn.qingzhou.order.service.OrderService;
import feign.FeignException;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author qingzhou
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
private final AccountClient accountClient;
private final StorageClient storageClient;
private final OrderMapper orderMapper;
public OrderServiceImpl(AccountClient accountClient, StorageClient storageClient, OrderMapper orderMapper) {
this.accountClient = accountClient;
this.storageClient = storageClient;
this.orderMapper = orderMapper;
}
@GlobalTransactional(rollbackFor = Exception.class, timeoutMills = 60000)
@Override
public Long create(Order order) {
// 创建订单
orderMapper.insert(order);
try {
// 扣用户余额
accountClient.deduct(order.getUserId(), order.getMoney());
// 扣库存
storageClient.deduct(order.getCommodityCode(), order.getCount());
} catch (FeignException e) {
log.error("下单失败,原因:{}", e.contentUTF8(), e);
throw new RuntimeException(e.contentUTF8(), e);
}
return order.getId();
}
}
package cn.qingzhou.storage.service.impl;
import cn.qingzhou.storage.mapper.StorageMapper;
import cn.qingzhou.storage.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author qingzhou
*/
@Slf4j
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
public void deduct(String commodityCode, int count) {
log.info("开始扣减库存");
try {
storageMapper.deduct(commodityCode, count);
} catch (Exception e) {
throw new RuntimeException("扣减库存失败,可能是库存不足!", e);
}
log.info("扣减库存成功");
}
}
5.1.seata依赖
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--版本较低,1.3.0,因此排除-->
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<!--seata starter 采用1.4.2版本-->
<version>${seata.version}</version>
</dependency>
5.2.配置TC地址
seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
type: nacos # 注册中心类型 nacos
nacos:
server-addr: 127.0.0.1:8848 # nacos地址
namespace: "" # namespace,默认为空
group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
application: seata-tc-server # seata服务名称
username: nacos
password: nacos
tx-service-group: seata-demo # 事务组名称,根据这个获取tc服务的cluster名称
service:
vgroup-mapping: # 事务组与cluster的映射关系
seata-demo: SH
data-source-proxy-mode: XA # 开启XA模式,强一致性,此配置只需加在orderservice里
5.3.数据库表
------------db_account--------------
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
------------db_order---------------
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
------------db_storage---------------
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) UNSIGNED DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;
5.4.测试全局事务是否生效
原文始发于微信公众号(轻洲技术):Seata 分布式事务入门篇之 XA 模式
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/242014.html