简述
分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性。
例如在下单场景下,库存和订单如果不在同一个节点上就会设计到分布式事务处理。
案例
我们可以手动模拟一个下单操作,用户下单之后开始派送。通过以下工程演示分布式事务问题。
当我们创建完两个项目之后,可以先将派送工程中的模拟错误去除,然后运行派送工程。
在订单服务中创建测试类,当程序运行之后,我们可以发现两个数据库中都会新增相应的数据,这就代表我们的流程走通了。
但是当我们在派送工程中模拟错误,使订单服务调用派送服务失败的时候,我们可以发现dispatch表中有派送信息,但是Order表中并没有数据,两者的数据不一致了,这就是分布式事务问题。
工程创建
数据库创建
首先创建订单服务数据库以及派送服务数据库。
订单表
派送表
工程创建
依赖
首先将我们之后需要的依赖全部加上,消息队列、数据库以及json相关的依赖。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配送工程
server:
port: 9000
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbitmq-dispatcher?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
# 单机版本
# port: 5672
# host: localhost
username: admin
password: admin
virtual-host: /
# 这里是开启手动ack,让程序去控制MQ的消息重发
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启重试
max-attempts: 10 # 最大重试次数
initial-interval: 2000ms # 间隔时间
# 集群连接方式
addresses: 127.0.0.1:5672
@Service
@Transactional(rollbackFor = Exception.class)
public class DispatchService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void dispatch(String orderId) throws Exception {
//定义sql
String sql = "insert into dispatcher(order_id,dispatch_id,status,order_content,user_id,create_time) values (?,?,?,?,?,?)";
//添加记录
int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), 0, "测试数据", 1, new Date());
if (count != 1) {
throw new Exception("订单创建失败");
}
}
}
@RestController
public class DispatchController {
@Autowired
private DispatchService dispatchService;
@GetMapping("/dispatch")
public String lock(String orderId) throws Exception {
dispatchService.dispatch(orderId);
//模拟错误 导致调用方请求失败
int a = 1 / 0;
return "success";
}
}
订单工程
server:
port: 8089
spring:
datasource:
url: jdbc:mysql://localhost:3306/rabbitmq-order?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
# 单机版本
# port: 5672
# host: localhost
username: admin
password: admin
virtual-host: /
# 集群连接方式
addresses: localhost:5672
publisher-confirm-type: correlated # 确认机制 必须要
logging:
level:
root: debug
@Data
public class Order {
private String orderId;
private Integer userId;
private String orderContent;
private Date createTime;
}
@Transactional(rollbackFor = Exception.class)
@Service
public class OrderDataService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void saveOrder(Order order) throws Exception {
String sql = "insert into orders(order_id,user_id,order_content) values (?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
if (count != 1) {
throw new Exception("订单创建失败。");
}
}
}
@Service
public class OrderService {
@Autowired
private OrderDataService orderDataService;
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) throws Exception {
orderDataService.saveOrder(order);
String result = dispatcherHttpApi(order.getOrderId());
if (!result.equals("success")) {
throw new Exception("远程调用失败。");
}
}
private String dispatcherHttpApi(String orderId) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
//连接超时>3
factory.setConnectTimeout(3000);
//处理超时>2
factory.setReadTimeout(2000);
String url = "http://localhost:9000/dispatch?orderId=" + orderId;
RestTemplate restTemplate = new RestTemplate();
return restTemplate.getForObject(url, String.class);
}
}
测试
@SpringBootTest
class SpringbootOrderRabbitmqOrderApplicationTests {
@Autowired
private OrderService orderService;
@Test
void createOrder() throws Exception {
String orderId = "10005";
Order order = new Order();
order.setOrderId(orderId);
order.setUserId(1);
order.setOrderContent("脉动");
orderService.createOrder(order);
System.out.println("创建成功!");
}
}
基于MQ的分布式事务整体设计思路
为了处理分布式事务问题,所以我们引入消息队列来实现相关功能。
下图便是我们处理问题的整个流程,可以将其分为两个部分:可靠生产以及可靠消费。
可靠生产
我们在使用消息队列处理分布式问题的时候,是通过对数据进行一个冗余从而来确保消息的可靠性。所以我们首先创建相关表。(代码基于上方代码)
数据库
业务代码
首先将信息存入订单表中,同时对消息做一次冗余处理,然后将消息推送到队列中。
需要注意的是这个时候需要在配置文件中打开消息队列的确认机制。
publisher-confirm-type: correlated # 确认机制 必须要
rabbitmq配置文件,声明交换机,队列,及绑定关系;可以在web界面自行配置。
@Configuration
public class RabbitMQConfiguration {
@Bean
public FanoutExchange deadExchange() {
return new FanoutExchange("dead_order_fanout_exchange", true, false);
}
@Bean
public Queue deadOrderQueue() {
return new Queue("dead.order.queue", true);
}
@Bean
public Binding bindDeadOrder() {
return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("order_fanout_exchange", true, false);
}
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
return new Queue("order.queue", true, false, false, args);
}
@Bean
public Binding bindOrder() {
return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
}
}
相关业务代码
@Service
public class MQOrderService {
@Autowired
private OrderDataService orderDataService;
@Autowired
private OrderMQService orderMQService;
public void createOrder(Order order) throws Exception {
orderDataService.saveOrder(order);
orderMQService.sendMessage(order);
}
}
@Transactional(rollbackFor = Exception.class)
@Service
public class OrderMQService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
//java 自带注解
//被他注解的方法会在服务器加载servlet的时候运行,并且只执行一次,在init()方法前执行
@PostConstruct
public void regCallBack() {
//消息发送成功后,给与生产者的消息回执,确保生产者的可靠性
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("cause:" + s);
//如果ack为true,则代表消息收到
String orderId = correlationData.getId();
if (!b) {
// 这里可能要进行其他的方式进行存储
System.out.println("应答失败,orderid:" + orderId);
return;
}
try {
String updateSQL = "update orders_message set status = 1 where order_id = ?";
int count = jdbcTemplate.update(updateSQL, orderId);
if (count == 1) {
System.out.println("修改成功,进入消息队列。");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void sendMessage(Order order) {
//通过MQ发送消息
rabbitTemplate.convertAndSend("order_fanout_exchange", "", JSON.toJSONString(order), new CorrelationData(order.getOrderId()));
}
}
测试
可以通过打断点观察备份表中数据的变化。消息发送成功后,信息状态会改变。
@SpringBootTest
class SpringbootOrderRabbitmqOrderApplicationTests {
@Autowired
private OrderService orderService;
@Autowired
private MQOrderService mqOrderService;
@Test
void createOrder() throws Exception {
String orderId = "10005";
Order order = new Order();
order.setOrderId(orderId);
order.setUserId(1);
order.setOrderContent("脉动");
mqOrderService.createOrder(order);
System.out.println("创建成功!");
}
}
可靠消费
实现消费可靠性,在不进行处理的情况下,如果消息错误,会造成死循环,可以通过三种方式来解决这种情况。
- 控制重试次数 + 死信队列
- try/catch + 手动ack (需要注意的是他与第一种方式是互斥的)
- try/catch + 手动ack + 死信队列 + 人工干预
重试次数
重试次数我们只需要在applicaition文件直接配置即可。
# 这里是开启手动ack,让程序去控制MQ的消息重发
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启重试
max-attempts: 10 # 最大重试次数
initial-interval: 2000ms # 间隔时间
try/catch + 手动 ack + 死信队列
首先也需要打开手动ack模式,然后在监听队列的时候对异常进行相关的处理。如果出现异常,则手动控制消息是否重发,不建议重发,否则在try/catch中会出现死循环;由于我们设置了死信队列(在订单服务中的消息队列配置文件中声明了),所以该队列会将消息转交给死信队列。
我们在监听死信队列的时候,如果还出现异常就进行相对应的处理,可以将其存储到其他数据库中或者短信提示让人工进行干预,然后将消息移除即可。
@Service
public class OrderMQConsumer {
@Autowired
private DispatchService dispatchService;
private int count = 1;
@RabbitListener(queues = {"order.queue"})
public void messageConsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("消息:" + ordermsg + ",count=" + count++);
JSONObject order = JSONObject.parseObject(ordermsg);
String orderId = order.getString("orderId");
//派单处理
dispatchService.dispatch(orderId);
System.out.println(1 / 0);//出现异常
channel.basicAck(tag, false);
} catch (Exception e) {
//如果出现异常的情况下 根据实际情况重发
//重发一次后,丢失
//参数1:消息的tag 参数2:多条处理
//参数3:重发
//false 不会重发,会把消息打入到死信队列
//true 重发,建议不使用try/catch 否则会死循环
channel.basicNack(tag, false, false); //false会将数据给死信队
}
}
}
@Service
public class DeadOrderMQConsumer {
@Autowired
private DispatchService dispatchService;
private int count = 1;
@RabbitListener(queues = {"dead.order.queue"})
public void messageConsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("消息:" + ordermsg + ",count=" + count++);
JSONObject order = JSONObject.parseObject(ordermsg);
String orderId = order.getString("orderId");
//派单处理
dispatchService.dispatch(orderId);
channel.basicAck(tag, false);
} catch (Exception e) {
System.out.println("人工干预");
System.out.println("同时把消息转移到别的存储db");
channel.basicNack(tag, false, false);
}
}
}
在我们的工程中没有根据备份表中信息的状态来定时处理,我们可以自己创建一个定时任务从数据库order_message表中把消息为0的信息查询出来重新传给消息队列。
总结
基于MQ的分布式事务解决方案优点
- 通用性强
- 拓展方便
- 耦合度低,方案比较成熟
缺点
- 基于消息中间件,只适合异步场景
- 消息会延迟处理
建议
- 尽量去避免分布式事务
- 尽量将非核心业务做成异步
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16862.html