最近有点浮躁,玩了一个周, 没有写文章了。今天继续搞RocketMQ
的事务实现。
往期文章
分布式事务
分布式事务,是指事务的发起者、资源及资源管理器和事务协调者分别位于分布式系统的不同节点之上。当然,分布式事务也是一个老生常谈的话题,尤其是面试中。常用的分布式事务解决方案有很多,比如:两阶段提交,TCC
,SAGA
等等。
本篇文章,我主要看看 RocketMQ
的事务实现。
从一个例子开始
// 事务消息的监听器。
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建一个 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setTransactionListener(transactionListener);
producer.start();
try {
Message msg =
new Message("TopicTest1234", "tag", "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
Thread.sleep(1000L * 60 * 5);
producer.shutdown();
这里我们介绍下事务消息的常用的API
吧。
-
producer.setTransactionListener()
.设置检测事务消息的回调。当定时任务执行检测到有事务消息时就会调用checkLocalTransaction
方法。 -
producer.setExecutorService(ExecutorService executorService)
: 设置检查事务的线程池。默认使用:ThreadPoolExecutor
, 每分钟执行一次。 -
producer.sendMessageInTransaction(final Message msg, final Object arg)
: 发送事务消息。
从上面的这个案例来看, 发送事务消息 和 普通消息的发送并没有什么太大的不同,只是需要创建 TransactionMQProducer
和 使用 sendMessageInTransaction
发送消息就可以了。
其中TransactionMQProducer
继承了DefaultMQProducer
, 这个并不陌生了,我们在RocketMQ架构设计之启动这篇文章中,已经看过很多次了。而在启动时,案例中的producer.start();
就是直接调用父类DefaultMQProducer
的 start()
方法,DefaultMQProducer
会通过 defaultMQProducerImpl.start()
完成整个生产者部分的启动。这里就不多赘述了。
我们来看最精彩的发送消息的过程.
producer.sendMessageInTransaction(msg, null)
我们一路追踪 sendMessageInTransaction
方法,很容易就看到了:DefaultMQProducerImpl#sendMessageInTransaction
这个方法主要有两个部分,如下图:
第一部分:发送带有 事务标识(TRAN_MSG
)的消息. 第二部分: 结束事务。主要有提交 offset
, 和 同步给 Broker
是 commit
还是 rollback
.
所以,RocketMQ
使用的分布式事务方案是: 二阶段提交(XA)
事务消息生产过程
发送消息的过程,在上篇文章中我们也已经说过了: RocketMQ系列-架构设计之消息。可以总结为3
步,根据 topic
获取路由元数据, 选择适合的 MessageQueue
进行存储, 将 Message
通过 Netty
发送给 Broker
.
特别需要注意的时候,在发送事务消息的时候, 会给消息添加事务标志。MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
这里是干什么用的呢? 接着往下看
TransactionalMessageService
和 MessageStore
这两种存储方式有什么不同呢?
其实存储并没有什么不同,而是存储的消息是不一样的,我们继续往下看:
TransactionalMessageService
使用的存储和 普通消息都是一样的,都是在启动时候创建的 MessageStore
. 不同的是,事务消息存储的 经过 parseHalfMessageInner
处理之后的 消息.
可以看到,处理之后消息,topic
和 queueId
都被设置成了默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC
和 0。
所以: 事务消息会被存储到 默认的 RMQ_SYS_TRANS_OP_HALF_TOPIC
中,并根据是否为延迟消息和延迟的等级存储到不同的 MessageQueue
中。
开源RocketMQ
支持延迟消息,但是不支持秒级精度。默认支持18
个level
的延迟消息,这是通过broker
端的messageDelayLevel
配置项确定的.
具体延迟队列的实现,我们会在下一篇文章中详细的分析一下 RocketMQ
延时消息队列的实现。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
看完了,事务的第一个阶段,发送事务消息。接下来,我们继续看第二个阶段。
结束事务: commit or callback.
结束事务主要是做两件事情:
-
提交消息的偏移量 -
发送提交事务还是回滚事务。
实现逻辑如下图:
主要的流程就是:封装RequestHeader
, 会设置 事务id
, 事务的状态。然后设置本次请求的RequestCode
为 END_TRANSACTION(37)
.
在 Broker
端 接收到对应的请求之后,会根据 事务的状态执行提交事务或者回滚操作。如右上图。
我们从两方面来分析一下:
提交事务
可以看到 根据 offset
查到了消息,然后根据消息的状态封装成 MessageExtBrokerInner
对象。通过通过sendFianlMessage
方法完成将消息写入到 消息原来的队列中(不是默认的事务队列了). 最后删除 prepare
消息。如上图。
回滚事务
回滚事务就比较简单了,检查消息的状态,查看直接删除掉 prepare
消息就OK了。
以上就是 整个事务消息的生产过程了。
事务回查
我们在介绍 TranscationMQProducer
的api
时候,有一个 setExecutorService(ExecutorService executorService)
方法。方法的作用是:设置检查事务的线程池。这个线程池的作用就是 事务回查。
我们追踪这个方法的调用方,可以发现,调用是 NettyServer
调用的,那是谁请求的呢? 我们跟踪这个请求码发现在 Broker2Client 的类中发现了这个 请求码。跟踪这个方法,查看其调用方,最终终于在 TransactionalMessageCheckService
发现了新大陆。当我看到 public class TransactionalMessageCheckService extends ServiceThread
的时候,我就不禁想起了, Broker在启动的时候会初始化事务,这是会创建一个 TransactionalMessageCheckService
对象.
我们从 Broker
端开始还原一下这个事务回查的场景。
在Broker端进行启动时会初始化
/**
* 初始化事务逻辑
*/
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
/**
* Broker 检查事务状态的listner.。Producer 写入 half 消息,但是没有收到 finalMessage .回调。
*/
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
/**
* Broker 检查事务状态的线程。Producer 写入 half 消息,但是没有收到 finalMessage.
*/
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
这里初始化了 transactionalMessageCheckService
和 transactionalMessageCheckListener
. 然后在 start
方法中通过 startProcessorByHa
启动了 transactionalMessageCheckService
线程
// Broker的容灾处理
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
这样 transactionalMessageCheckService
就启动起来了。运行代码如下:
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
// 限制只有一个本类可以运行,最终会调用 onWaitEnd()
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 回查的最大次数
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 开始回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
在回查的过程中, RocketMQ
会获取 事务Topic下的所有MessageQueue.
// 获取 事务topic中下的所有MessageQueue.
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
然后遍历所有的 MessageQueue
,挨个处理所有队列里的待回查的消息。怎么判断消息需要回查呢?前面说过了,通过Op队列
判断,因此还需要定位到HalfQueue
对应的OpQueue
,以及它们的ConsumeQueue
偏移量, 获取到 halfMessage
, 然后判断是否需要检测事务状态,如果需要检测则会调用 AbstractTransactionalMessageCheckListener
的 resolveHalfMsg
方法,即会发送 检测的请求 给 Producer
.
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
checkProducerTransactionState
方法的实现则是:向 Producer 端发送 请求码为39(RequestCode.CHECK_TRANSACTION_STATE
)的请求.
如下。
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
这样就触发了 Producer
的 事务回查。我们接着看、
在 ClientRemotingProcessor
中可以看到 CHECK_TRANSACTION_STATE
的处理方法。
// 检查事务状态
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
然后获取到 Producer
实例,通过 producer
开始回查事务.
// 查询到对应的 Producer实例
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 开始回查。
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
继续跟踪这个 checkTransactionState
便看到了我们一开始说的这个事务回查了。
其中 checkExecutor
就是一个线程池了,我们重点来看 回查的任务, 即 图中的 Runnable
的实现。
首先会 获取消息的事务状态,然后将消息的事务状态发送给Broker。这样就完成了 事务回查。
如下:
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
// 1、查看 消息事务的状态
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
}
// 2、处理消息事务, 把事务的状态发送给 Broker。
this.processTransactionState(localTransactionState, group, exception);
}
/**
* 处理消息事务。
*
* 将消息事务的状态发送给 Broker
*
* @param localTransactionState
* @param producerGroup
* @param exception
*/
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
// 封装RequestHeader
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.swetFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
// 设置 事务的状态.
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
// 向Broker端发送请求。
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
}
};
以上就是整个事务回查的过程了。这里我再来梳理一下执行的时序图。
最后我们想一下,为什么 事务回查呢?Half
消息写入成功,可能因为网络,服务重启等等原因没有收到Producer
的事务状态请求,这是,Broker
就会主动放弃事务回查给Producer
. 来决定该事务消息是提交还是回滚。为了避免消息被无限次的回查,RocketMQ通过transactionCheckMax属性设置消息回查的最大次数,默认是15次。
总结
-
RocketMQ
的事务消息是使用 二阶段提交(XA
) 的这种分布式事务解决方案。 -
RocketMQ
第一阶段会发送带有 事务标志的消息给Broker
.Broker
会把消息存储到固定的Topic中,并根据延迟级别存储到不同的queue
中。 -
RocketMQ
第二阶段是结束事务,会提交offset
,根据消息的事务状态提交或者回滚事务。
RocketMQ
通过改写 Topic
和 queueId
,将消息暂时存储到的一个对Consumer
不可见的队列中,然后等待Producer
执行本地事务,提交事务装填后再决定将Half
消息commit
或者 rollback
.
最后
希望和你一起遇到更好的自己
原文始发于微信公众号(方家小白):RocketMQ事务实现原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37650.html