RabbitMQ高级——分布式事务

导读:本篇文章讲解 RabbitMQ高级——分布式事务,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

简述

分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性。
例如在下单场景下,库存和订单如果不在同一个节点上就会设计到分布式事务处理。

案例

在这里插入图片描述
我们可以手动模拟一个下单操作,用户下单之后开始派送。通过以下工程演示分布式事务问题。
当我们创建完两个项目之后,可以先将派送工程中的模拟错误去除,然后运行派送工程。
在订单服务中创建测试类,当程序运行之后,我们可以发现两个数据库中都会新增相应的数据,这就代表我们的流程走通了。
但是当我们在派送工程中模拟错误,使订单服务调用派送服务失败的时候,我们可以发现dispatch表中有派送信息,但是Order表中并没有数据,两者的数据不一致了,这就是分布式事务问题。

工程创建

数据库创建

首先创建订单服务数据库以及派送服务数据库。
在这里插入图片描述
订单表
在这里插入图片描述
派送表
在这里插入图片描述

工程创建

依赖
首先将我们之后需要的依赖全部加上,消息队列、数据库以及json相关的依赖。

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.9</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配送工程
在这里插入图片描述

server:
  port: 9000

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq-dispatcher?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    # 单机版本
    # port: 5672
    # host: localhost
    username: admin
    password: admin
    virtual-host: /
    # 这里是开启手动ack,让程序去控制MQ的消息重发
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试次数
          initial-interval: 2000ms # 间隔时间
    # 集群连接方式
    addresses: 127.0.0.1:5672


@Service
@Transactional(rollbackFor = Exception.class)
public class DispatchService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void dispatch(String orderId) throws Exception {
        //定义sql
        String sql = "insert into dispatcher(order_id,dispatch_id,status,order_content,user_id,create_time) values (?,?,?,?,?,?)";
        //添加记录
        int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), 0, "测试数据", 1, new Date());

        if (count != 1) {
            throw new Exception("订单创建失败");
        }
    }
}
@RestController
public class DispatchController {

    @Autowired
    private DispatchService dispatchService;

    @GetMapping("/dispatch")
    public String lock(String orderId) throws Exception {
        dispatchService.dispatch(orderId);

        //模拟错误 导致调用方请求失败
        int a = 1 / 0;

        return "success";
    }
}

订单工程
在这里插入图片描述

server:
  port: 8089

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq-order?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    # 单机版本
    # port: 5672
    # host: localhost
    username: admin
    password: admin
    virtual-host: /
    # 集群连接方式
    addresses: localhost:5672
    publisher-confirm-type: correlated # 确认机制 必须要
logging:
  level:
    root: debug
@Data
public class Order {

    private String orderId;

    private Integer userId;

    private String orderContent;

    private Date createTime;

}

@Transactional(rollbackFor = Exception.class)
@Service
public class OrderDataService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void saveOrder(Order order) throws Exception {
        String sql = "insert into orders(order_id,user_id,order_content) values (?,?,?)";
        int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());

        if (count != 1) {
            throw new Exception("订单创建失败。");
        }


    }
}

@Service
public class OrderService {

    @Autowired
    private OrderDataService orderDataService;

    @Transactional(rollbackFor = Exception.class)
    public void createOrder(Order order) throws Exception {
        orderDataService.saveOrder(order);

        String result = dispatcherHttpApi(order.getOrderId());
        if (!result.equals("success")) {
            throw new Exception("远程调用失败。");
        }
    }

    private String dispatcherHttpApi(String orderId) {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        //连接超时>3
        factory.setConnectTimeout(3000);
        //处理超时>2
        factory.setReadTimeout(2000);

        String url = "http://localhost:9000/dispatch?orderId=" + orderId;
        RestTemplate restTemplate = new RestTemplate();
        return restTemplate.getForObject(url, String.class);
    }
}

测试

@SpringBootTest
class SpringbootOrderRabbitmqOrderApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void createOrder() throws Exception {
        String orderId = "10005";
        Order order = new Order();
        order.setOrderId(orderId);
        order.setUserId(1);
        order.setOrderContent("脉动");

        orderService.createOrder(order);
        System.out.println("创建成功!");

    }

}

基于MQ的分布式事务整体设计思路

为了处理分布式事务问题,所以我们引入消息队列来实现相关功能。
下图便是我们处理问题的整个流程,可以将其分为两个部分:可靠生产以及可靠消费。
在这里插入图片描述

可靠生产可靠

我们在使用消息队列处理分布式问题的时候,是通过对数据进行一个冗余从而来确保消息的可靠性。所以我们首先创建相关表。(代码基于上方代码)

数据库
在这里插入图片描述
在这里插入图片描述
业务代码
首先将信息存入订单表中,同时对消息做一次冗余处理,然后将消息推送到队列中。
需要注意的是这个时候需要在配置文件中打开消息队列的确认机制。

publisher-confirm-type: correlated # 确认机制 必须要

rabbitmq配置文件,声明交换机,队列,及绑定关系;可以在web界面自行配置。

@Configuration
public class RabbitMQConfiguration {

    @Bean
    public FanoutExchange deadExchange() {
        return new FanoutExchange("dead_order_fanout_exchange", true, false);
    }

    @Bean
    public Queue deadOrderQueue() {
        return new Queue("dead.order.queue", true);
    }

    @Bean
    public Binding bindDeadOrder() {
        return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("order_fanout_exchange", true, false);
    }

    @Bean
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
        return new Queue("order.queue", true, false, false, args);
    }

    @Bean
    public Binding bindOrder() {
        return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
    }
}

相关业务代码

@Service
public class MQOrderService {

    @Autowired
    private OrderDataService orderDataService;

    @Autowired
    private OrderMQService orderMQService;

    public void createOrder(Order order) throws Exception {

        orderDataService.saveOrder(order);

        orderMQService.sendMessage(order);
    }
}

@Transactional(rollbackFor = Exception.class)
@Service
public class OrderMQService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //java 自带注解
    //被他注解的方法会在服务器加载servlet的时候运行,并且只执行一次,在init()方法前执行
    @PostConstruct
    public void regCallBack() {
        //消息发送成功后,给与生产者的消息回执,确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("cause:" + s);

                //如果ack为true,则代表消息收到
                String orderId = correlationData.getId();

                if (!b) {
                    // 这里可能要进行其他的方式进行存储
                    System.out.println("应答失败,orderid:" + orderId);
                    return;
                }

                try {
                    String updateSQL = "update orders_message set status = 1 where order_id = ?";
                    int count = jdbcTemplate.update(updateSQL, orderId);
                    if (count == 1) {
                        System.out.println("修改成功,进入消息队列。");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        });
    }

    public void sendMessage(Order order) {
        //通过MQ发送消息
        rabbitTemplate.convertAndSend("order_fanout_exchange", "", JSON.toJSONString(order), new CorrelationData(order.getOrderId()));
    }
}

测试
可以通过打断点观察备份表中数据的变化。消息发送成功后,信息状态会改变。

@SpringBootTest
class SpringbootOrderRabbitmqOrderApplicationTests {

    @Autowired
    private OrderService orderService;

    @Autowired
    private MQOrderService mqOrderService;

    @Test
    void createOrder() throws Exception {
        String orderId = "10005";
        Order order = new Order();
        order.setOrderId(orderId);
        order.setUserId(1);
        order.setOrderContent("脉动");

        mqOrderService.createOrder(order);
        System.out.println("创建成功!");
    }
}

可靠消费

在这里插入图片描述
实现消费可靠性,在不进行处理的情况下,如果消息错误,会造成死循环,可以通过三种方式来解决这种情况。

  • 控制重试次数 + 死信队列
  • try/catch + 手动ack (需要注意的是他与第一种方式是互斥的)
  • try/catch + 手动ack + 死信队列 + 人工干预

重试次数
重试次数我们只需要在applicaition文件直接配置即可。

    # 这里是开启手动ack,让程序去控制MQ的消息重发
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试次数
          initial-interval: 2000ms # 间隔时间

try/catch + 手动 ack + 死信队列
首先也需要打开手动ack模式,然后在监听队列的时候对异常进行相关的处理。如果出现异常,则手动控制消息是否重发,不建议重发,否则在try/catch中会出现死循环;由于我们设置了死信队列(在订单服务中的消息队列配置文件中声明了),所以该队列会将消息转交给死信队列。
我们在监听死信队列的时候,如果还出现异常就进行相对应的处理,可以将其存储到其他数据库中或者短信提示让人工进行干预,然后将消息移除即可。

@Service
public class OrderMQConsumer {

    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitListener(queues = {"order.queue"})
    public void messageConsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            System.out.println("消息:" + ordermsg + ",count=" + count++);
            JSONObject order = JSONObject.parseObject(ordermsg);
            String orderId = order.getString("orderId");
            //派单处理
            dispatchService.dispatch(orderId);

            System.out.println(1 / 0);//出现异常
            channel.basicAck(tag, false);
        } catch (Exception e) {
            //如果出现异常的情况下 根据实际情况重发
            //重发一次后,丢失
            //参数1:消息的tag 参数2:多条处理
            //参数3:重发
            //false 不会重发,会把消息打入到死信队列
            //true 重发,建议不使用try/catch 否则会死循环
            channel.basicNack(tag, false, false); //false会将数据给死信队
        }
    }
}

@Service
public class DeadOrderMQConsumer {

    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitListener(queues = {"dead.order.queue"})
    public void messageConsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        try {
            System.out.println("消息:" + ordermsg + ",count=" + count++);
            JSONObject order = JSONObject.parseObject(ordermsg);
            String orderId = order.getString("orderId");
            //派单处理
            dispatchService.dispatch(orderId);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            System.out.println("人工干预");
            System.out.println("同时把消息转移到别的存储db");

            channel.basicNack(tag, false, false);
        }


    }
}

在我们的工程中没有根据备份表中信息的状态来定时处理,我们可以自己创建一个定时任务从数据库order_message表中把消息为0的信息查询出来重新传给消息队列。

总结

基于MQ的分布式事务解决方案优点

  • 通用性强
  • 拓展方便
  • 耦合度低,方案比较成熟

缺点

  • 基于消息中间件,只适合异步场景
  • 消息会延迟处理

建议

  • 尽量去避免分布式事务
  • 尽量将非核心业务做成异步

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16862.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!