分布式事务之可靠消息最终一致性RocketMQ实现落地


  • 说明

  • 适用场景

  • 实现方案

    • 本地消息表实现方案

    • 基于RocketMQ事务消息实现

    • 原理

    • 异常情况说明

    • 实现伪代码

  • 参考


说明

可靠性消息最终一致性是分布式事务解决方案中的一种典型的柔性事务解决方案,不保证数据的强一致性,只保证数据的最终一致性

适用场景

  1. 及时性要求不是很高的
  2. 能接受数据暂时的不一致情况

实现方案

  1. 本地消息表 + MQ普通消息
  2. 基于RocketMQ事务消息实现

本地消息表实现方案

原理

  1. 事务发起方向业务数据表写入数据,同时向本地消息表写入一条消息数据,由于业务数据表和本地消息表在同一个事务中,所以数据不会不一致
  2. 添加一个定时器,定时去拿本地消息表未消费的数据向消息中间件(如RocketMQ)发送一条消息,如果发送成功,则更新本地消息表为成功,否则则一直重试直到消息发送到消息中间件成功。这里可能存在消息重新投递的情况,所以消息消费者的接口一定要保证幂等
  3. 消息投递到MQ后,事务参与方(消息消费者)则订阅消息进行消费,完成本地事务
  4. 如果事务参与方(消息消费者)消费失败,根据一定的业务规则消费重试一直失败达到最大消费次数,则给事务发起方(消息生产者)发送一条消息执行失败的消息,通知事务发起方进行事务回滚(注意事务参与方也需要本地消息表或使用死信队列人工重试解决)

优点:

  1. 业界常用的实现方案,比较成熟。实现方式也简单

缺点:

  1. 无法保证各个服务的数据强一致性
  2. 本地消息表会耦合到业务数据库中,需要手动额外处理很多消息发送逻辑。需要注意控制本地事务表的数据量,过大会影响正常的业务性能
  3. 消息服务与业务服务偶尔,不便于消息服务的扩展和维护
  4. 消息服务不能公用,每次需要实现分布式事务时,都需要独立开发消息服务的逻辑

这种方式网上实现方式很多也比较简单,这里就不做详细实现,重点是后面的RocketMQ事务消息实现

基于RocketMQ事务消息实现

原理

  1. 事务发送方向RocketMQ发送Half消息(此时的Half消息对消息消费者不可见)
  2. RocketMQ向事务发起方发起相应Half消息成功并回调事务发起方的回调接口
  3. 事务发起方在回调接口中执行本地事务,然后提交Half消息供消费者消费
  4. 事务参与方(消息消费者)消费消息,如果事务参与方(消息消费者)多次消费失败则将消息放入死信队列,后续人工处理

优点:

  1. 实现相对简单,不依赖本地事务表

缺点:

  1. 强依赖RocketMQ事务消息

异常情况说明

  1. Half消息因为各种原因发送失败,对整体业务没有任何影响,因为其他事务还没开始执行。但是Half发送失败又分两种情况
    1. Half发送到MQ:这种MQ会主动回调我们的接口去查询事务状态Half是需要Commit还是Rollback
    2. Half未发送到MQ:这种情况无需处理
  2. Half发送成功,本地事务各种情况执行失败,同理也会调用回调接口查询Half需要Commit还是Rollback
  3. RocketMQ消息可靠性保证,在消息发送到RocketMQ后,我们需要保证消息不丢失,RocketMQ消息的不丢失我们首先需要保证RocketMQ作集群Broker作主从,即使单台Broker挂了保证还有从Broker保存消息,其次Broker的消息刷盘策略可能要改为同步刷盘,因为如果是异步刷盘会有3s的延迟由os cache刷入磁盘,可能造成消息丢失(注意开启同步刷盘严重影响MQ性能,谨慎考虑)
  4. 消费者消费超过最大次数仍然失败,消息会进入死信队列,需要人工接入处理

实现伪代码

事务发起方

public void submitOrder(Long productId, Integer payCount) {
        //生成全局分布式序列号 
        String txNo = UUID.randomUUID().toString();
        TxMessage txMessage = new TxMessage(productId, payCount, txNo);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("txMessage", txMessage);
        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
        //发送一条事务消息
        rocketMQTemplate.sendMessageInTransaction("tx_order_group""topic_txmsg", message, null);
    }


// 上面的消息发送是使用的 rocketmq-spring-boot 中的 RocketMQTemplate 去发送的

//如果只是使用简单的MQ客户端也可以使用这种原始的方式 原生方式就没有构造完整的消息,仅供参考
public static void main(String[] args){
        // 1. MQ生产者
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        // 设置回调线程池
        producer.setExecutorService(Executors.newSingleThreadScheduledExecutor());
        // 设置回调函数
        producer.setTransactionListener(new TransactionListenerImpl());
        // 启动生产者
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        // 构造下单成功消息
        Message msg = new Message();
        try{
            // 2. 发送 half消息
            SendResult sendResult = producer.sendMessageInTransaction(msg,null);
        } catch(Exception e) {
            System.out.println("消息发送异常");
        }

    }

注意上面的txNo 生产推荐使用业务唯一ID,txNo主要是用于解决消息幂等

在发送玩Half消息后,我们实现RocketMQ的回调接口RocketMQLocalTransactionListener

即当Half消息发送成功后,就会回调我们的 OrderTxMessageListenerexecuteLocalTransaction 方法

注意在我们因为各种异常原因导致的Half消息没有提交,就会每隔1分钟回调1次我们的checkLocalTransaction方法,总共回调5次

具体的时间和回调次数在BrokerConfig源码中可见

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderService orderService;
    @Autowired
    private OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msgObject obj
{
        try{
            log.info("订单微服务执行本地事务");
            TxMessage txMessage = this.getTxMessage(msg);
            //执行本地事务
            orderService.submitOrderAndSaveTxNo(txMessage);
            //提交事务
            log.info("订单微服务提交事务");
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
            //异常回滚事务
            log.info("订单微服务回滚事务");
            return RocketMQLocalTransactionState.ROLLBACK;
        }

    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("订单微服务查询本地事务");
        TxMessage txMessage = this.getTxMessage(msg);
        Integer exists = orderMapper.isExistsTx(txMessage.getTxNo());
        if(exists != null){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    private TxMessage getTxMessage(Message msg){
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr, TxMessage.class);
    }
}

事务参与方(消息消费者) 代码实现,消息消费方的代码就比较简单了,只需要正常的消费保证幂等就行

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener<String{
    @Autowired
    private StockService stockService;

    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
    }

    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr, TxMessage.class);
    }
}
  • StockService
@Service
@Slf4j
public class StockServiceImpl implements StockService {
    @Autowired
    private StockMapper stockMapper;

    @Override
    public Stock getStockById(Long id) {
        return stockMapper.getStockById(id);
    }

    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{}, 购买数量:{}", txMessage.getProductId(), txMessage.getPayCount());
        //检查是否执行过事务
        Integer exists = stockMapper.isExistsTx(txMessage.getTxNo());
        if(exists != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
        }
        Stock stock = stockMapper.getStockByProductId(txMessage.getProductId());
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
        }
        stockMapper.updateTotalCountById(txMessage.getPayCount(), stock.getId());
        //记录事务日志
        stockMapper.saveTxLog(txMessage.getTxNo());
    }
}

参考

《深入理解分布式事务原理与实战》


原文始发于微信公众号(小奏技术):分布式事务之可靠消息最终一致性RocketMQ实现落地

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30233.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!