-
源码版本
-
消息发送源码源码入口
-
消息发送核心源码
-
Product发送消息的重试机制
-
Broker故障转移机制
-
选择MessageQueue源码
-
总结
源码版本
4.8.0
消息发送源码源码入口
最简单的消息发送代码
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
由于我们今天重点分析的是RocketMQ 消息发送MessgQueue选择及高可用机制
所以
producer.start();
这段代码我们不看,我们重点进入producer.send(msg);
这段源码。深入进入后我们会看到发送消息的核心源码
消息发送核心源码
核心方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
/**
* 消息发送
* @param msg 消息
* @param communicationMode 同步 异步 ONEWAY
* @param sendCallback 消息回调
* @param timeout 超时时间
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 生产者状态检查
this.makeSureStateOK();
// 外面不是校验一次了吗?消息校验
Validators.checkMessage(msg, this.defaultMQProducer);
// 获取随机id
final long invokeID = random.nextLong();
// 开始时间
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 从缓存的topic路由表中获取topic路由,不存在则向 NameServer发起查找
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 获取总发送次数 如果同步发送 则 发送消息失败会重试2次 其他发送方式不会重新发送
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择发送到哪个 MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 计算一下发送消耗的时间
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 真正的消息发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 如果是同步发送处理返回结果
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 如果执行不成功 进行重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
整个核心代码流程很长,我们今天的重点是这行代码。这行代码就是去选择消息发送到Broker
的哪个MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
Product发送消息的重试机制
在发送消息中我们可以看到如下代码
// DefaultMQProducer.retryTimesWhenSendFailed
private int retryTimesWhenSendFailed = 2;
// 获取总发送次数 如果同步发送 则 发送消息失败会重试2次 其他发送方式不会重新发送
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
for (; times < timesTotal; times++) {
// 消息发送
}
可以看到生产者发送消息如果是同步发送消息默认会重试发送两次消息(由参数retryTimesWhenSendFailed
设置),这里是基于重试去解决异常消息的发送
Broker故障转移机制
在上面的发送消息源码中我们会发现所有的异常都会在catch中调用了如下方法
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
// -----------------------------------------------------
// org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 故障
if (this.sendLatencyFaultEnable) {
// 如果不可用隔离时间 30s
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
// ------------------------------------------------------
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
// org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 有问题列表
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
这里就是将消息发送失败的Broker保存在一个ConcurrentHashMap
中,后续再检测Broker是否可用的时候会用到
选择MessageQueue源码
在上面发送消息的一些product重试和Broker问题规避方法分析完后我们来分析选择MessageQueue
的核心方法
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
从selectOneMessageQueue
方法深入后看到selectOneMessageQueue
的如下源码
/**
*
* @param tpInfo topic相关信息
* @param lastBrokerName 上一次选择的执行发送消息失败的Broke
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 是否启动Broker故障延迟
/**
* 如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。
* 所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。
* 例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;
* 如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,
* latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
*/
if (this.sendLatencyFaultEnable) {
try {
// 自增取值 简单的轮询 通过 ThreadLocal 维护
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index与当前路由表中的对列总个数取模 简单轮询
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 获取到当前对应的待发送队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 校验队列是否可用 实际是检测Broker是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 尝试从规避的Broker中选择一个可用的Broker,如果没有找到,将返回null。
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 取broker中的可写队列数
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 如果可写队列数>0,则选取一个队列
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
// // 可写队列数 <= 0 移除该broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 选择 messageQueue
return tpInfo.selectOneMessageQueue();
}
其中有一个核心判断
private boolean sendLatencyFaultEnable = false;
if (this.sendLatencyFaultEnable)
sendLatencyFaultEnable
这个参数是RocketMQ消息发送高可用的核心关键参数,默认不开启,开启后就会自动规避故障的Broker。
我们先看不开启sendLatencyFaultEnable
最简单的方式
return tpInfo.selectOneMessageQueue(lastBrokerName);
// org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
可以看到这种规避算法很简单,就是只要broker不等于上次异常的Broker就可以了
我们重点来分析下if (this.sendLatencyFaultEnable) {}
里面的代码
-
获取 messageQueueList
列表 -
通过 ThreadLocal
使用简单的轮询算法去获取MessageQueue
-
通过 faultItemTable
去检测MessageQueue
所在的Broker是否可用,可用则返回,不可用则继续轮训下一个MessageQueue
-
如果所有 Broker
都不可用 则通过不是最差随机法 选取一个Broker,如果这个队列有可写MessageQueue
则返回,一个都没有则移除该Broker
至此RocketMQ 消息发送MessgQueue选择及高可用机制源码分析就到此结束了
总结
RocketMQ 消息发送MessgQueue选择及高可用机制
-
Product 发送消息重试 -
对异常Broker进行内存规避
NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中
原文始发于微信公众号(小奏技术):和面试官吹吹RocketMQ消息发送MessgQueue选择及高可用机制源码
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30196.html