分布式事务
随着系统的不断壮大,可能由很多的服务模块组成,每一个模块都是一个jar或者war包分别部署在不同的服务器上面,访问不同的数据库。
当单个服务访问多个库,一个方法执行时需要同时操作多个库,此时本地事务是无法保证数据一致性的。
再如多个服务多个库,一个服务调用另一个服务的接口,若此时有网络波动,调用接口超时异常,调用方收到了异常回滚数据;但是被调用方已经接收到了请求,可能就会把数据保存成功了,造成数据的不一致。
分布式事务问题演示
来个简单的例子,订单模块和配送模块,可以简单理解为一个外卖系统,下单后调用配送服务确定配送的外卖员。
两个服务,订单服务和配送服务,服务调用采用OpenFeign,数据保存暂时用JdbcTemplate对象的方法,要先整合OpenFeign和Nacos
配送服务
数据库表
create database 'distribution';
use distribution;
CREATE TABLE `order_distribution` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '表格主键',
`order_id` varchar(200) DEFAULT NULL COMMENT '订单id',
`distributor` varchar(255) DEFAULT NULL COMMENT '配送人',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
配送服务
依赖和数据源
<dependencies>
<!-- 服务注册 服务发现需要引入的 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--健康监控-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.11</version>
</dependency>
</dependencies>
yml配置,配置nacos和数据源
server:
port: 9001
spring:
application:
name: distribution # 应用名
cloud:
nacos:
discovery:
server-addr: localhost:8848 # nacos服务地址
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql:///distribution?useUnicode=true&characterEncoding=utf-8
username: root
password: root
启动类注意事务需要注解:@EnableTransactionManagement
service业务层
public interface DistributionService {
Integer distribution(String orderId); // 分配配送员
}
实现
@Service
public class DistributionServiceImpl implements DistributionService {
@Resource
JdbcTemplate jdbcTemplate;
// 模拟配送员
private final List<Integer> distributorList = new ArrayList<Integer>() {{
add(1);
add(2);
add(3);
}};
@Transactional
@Override
public Integer distribution(String orderId) {
try {
Thread.sleep(6000); // 模拟网络波动
} catch (InterruptedException e) {
e.printStackTrace();
}
// 随机一个配送员
Integer distributor = distributorList.get(RandomUtils.nextInt(0, distributorList.size() - 1));
// 添加配送的数据
int update = jdbcTemplate.update("insert into order_distribution(order_id, distributor) values(?,?)",
new Object[]{orderId, distributor});
return update;
}
}
接口
@RestController
public class DistributionController {
@Autowired
DistributionService distributionService;
@RequestMapping("/distribution")
public Integer distribution(@RequestParam("orderId") String orderId) {
return distributionService.distribution(orderId);
}
}
订单服务
建库和表
create database 'order';
use 'order';
CREATE TABLE `t_order` (
`id` int(8) NOT NULL AUTO_INCREMENT COMMENT '主键',
`order_id` varchar(200) DEFAULT NULL COMMENT '订单id',
`shop_id` int(8) DEFAULT NULL COMMENT '商品id',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
订单的yml配置和数据源不再赘述,和配送服务大同小异,但是需要集成openfeign
<!--导入openfeign的依赖坐标-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
启动类再用@EnableFeignClients、@EnableTransactionManagement
,将openfeign和事务开启。
openfeign业务实现:
@FeignClient(value = "${distribution.name}", fallback = DistributionServiceFallback.class)
public interface DistributionService {
@PostMapping("/distribution")
Integer distribution(@RequestParam("orderId") String orderId); // 分配配送员
}
@Component
public class DistributionServiceFallback implements DistributionService{
// openfeign的fallback
@Override
public Integer distribution(String orderId) {
return 0;
}
}
service业务实现
public interface OrderService {
Integer createOrder(Integer id); // 创建订单
}
@Service
public class OrderServiceImpl implements OrderService {
@Resource
JdbcTemplate jdbcTemplate; // 操作数据库
@Resource
DistributionService distributionService; // 远程调用
// 模拟菜品数据
private final Map<Integer, String> shopMap = new HashMap<Integer, String>(){{
put(1,"菜品1");
put(2,"菜品2");
put(3,"菜品3");
}};
@Override
@Transactional // 加上事务
public Integer createOrder(Integer id) {
if (shopMap.containsKey(id)) {
String orderId = UUID.randomUUID().toString().replace("-","");
// 增加订单
int update = jdbcTemplate.update("insert into t_order(order_id, shop_id) values(?,?)",
new Object[]{orderId, id});
// 调用配送服务
Integer result = distributionService.distribution(orderId);
if (result <= 0) {
throw new RuntimeException("分配配送员失败");
}
return update;
}
return null;
}
}
controller接口
@RestController
public class OrderController {
@Autowired
OrderService orderService;
@RequestMapping("/createOrder")
public Integer createOrder(@RequestParam("id") Integer id) {
return orderService.createOrder(id);
}
}
模拟测试一下,记得要把openfeign的超时时间设置少一点,模拟网络报错。
,设置为两秒就行
feign:
client:
config:
default:
connectTimeout: 2000 # 建立连接超时时间
readTimeout: 2000 # 读取资源超时时间
curl localhost:9002/createOrder?id=1
{"timestamp":"2023-09-09T02:26:28.517+00:00","status":500,"error":"Internal Server Error","path":"/createOrder"}
看到抛出异常,读取超时了,那么去库里看看
可看到,两个库中,一个有数据,一无数据,造成数据的不一致性,这就是分布式事务问题的体现。
分布式事务的解决方案
在分布式系统中看,很容易产生分布式事务的问题。本来在单体应用、单个数据库的系统中,依靠像MySql这种数据库强大的事务机制是很容易保证一个操作单元中的操作保持一致。
但是在分布式系统中,要完成一项业务功能,一般会调用多个服务并且还可能操作多个数据库,在这样的情况下要保证本次操作的行为全部成功或失败,显得有些困难。
随着不断探索,那么就产生了一些关于分布式事务保证数据一致性的解决方案。
-
二阶段提交(2PC) -
三阶段提交(3PC)
二阶段提交
顾名思义就是把整个事务单元分为两个阶段处理。
正常二阶段提交
二阶段提交异常,数据回滚
总体分为两阶段:
-
第一阶段:表决阶段,协调者将事务信息发送给参与者,参与者收到后,向协调者反馈自身是否有能力执行事务提交记录并记录undo和redo日志 -
第二阶段:执行阶段,协调者收到反馈后,再通知各参与者提交事务或回滚事务,若收到回滚信息,则会根据undo日志执行回滚操作
那么二阶段提交的,优缺点有哪些呢?
优点:提高了达到数据一致性的可能性,原理简单,实现成本低
缺点:
-
单点问题:若协调者宕机,整个流程则不可用 -
性能问题:第一阶段需要所有参与者反馈后,才能进入第二阶段 -
数据不一致:在执行阶段,若协调者发送崩溃,导致只有部分参与者收到提交的信息,那么会导致数据不一致
三阶段提交
是二阶段提交的改良版。
-
引入了一个超时机制 -
插入一个准备阶段,由此三阶段提交分为CanCommit、PreCommit、DoCommit三个阶段
-
CanCommit:协调者向各参与者发送CanCommit请求,询问是否可执行事务提交操作,各参与者响应yes/no,若全部是yes,进入下一个阶段 -
PreCommit:协调者向参与者发送PreCommit请求,进入Prepared阶段。各参与者收到后,执行事务操作,若都执行成功返回ack,继续等待进入下一次提交;若有参与者在此阶段失败或 协调者等待ack超时
,那么协调者向各参与者发送abort请求回滚数据。 -
DoCommit:是真正提交事务的阶段,各参与者开始执行正式事务提交,提交后,反馈给协调者。若有一个反馈提交失败或 协调者等待反馈超时
,那么全体回滚数据事务中断。
优点:引入超时后,能在一定程度上解决单点问题,减少阻塞。因为一旦参与者无法与协调者通信,它默认就是执行commit,不会一直阻塞。
缺点:可能在极端情况下导致数据不一致。若网络原因,协调者发送的abort指令部分参与者没有收到,那么这部分参与者就会去执行commit,导致这部分和其他接收到abort的参与者数据不一致。
保证最终一致性
所谓保证最终一致性,就是两个系统数据副本同步或是系统间的数据有关联,在一定时间内,最终保证数据的一致性,而不是实时保持数据的强一致性。也就是说这个可以是异步的。
那么一说异步,理想当然就是使用消息中间件,RocketMQ、RabbitMQ等具有支持消息确认回调功能
的中间件。
按照上面的例子讲,使用中间件的话,是否是这样?
订单系统收到订单后,订单表插入一条数据,然后给中间件发一条消息(带着订单id);配送服务作为消费者,会收到这条消息,拿到id后选择配送人,将数据插入到订单配送表。
看着没啥问题,实则若网络波动,这条消息发送给中间件再到中间件发送给消费者,两部分都可能存在消息数据丢失的可能性,一样造成数据不一致。
那么一些中间件也考虑到这种情况,有提供中间件收到消息和接收消费者反馈的回调,我们可以使用这些来保证自己系统的数据一致性问题
以RocketMQ为例:分为两阶段:Prepare
(消息预发送)、Confimr
(确认发送)
-
创建订单时,先调用Prepare,发送预备消息,消息到中间件后,暂时不给配送系统消费。 -
发送预备消息成功后,订单表插入数据 -
调用Confirm,将消息发送给消费者(配送服务) -
配送服务收到后消费且反馈ACK,再去新增配送数据即可。 -
若ACK反馈失败,那么订单服务中那条数据就需要回滚掉即可。
若重复消费,可在数据表中设置联合主键保证数据唯一性或其他方式解决都可。
原文始发于微信公众号(小路同学ovo):分布式事务问题及解决方案
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/267784.html