RT。上周有个问题一直未解决,先说需求:
有A和B两个实例的RocketMQ,并不是集群关系。A的实例是服务器本地的RocketMQ,B的实例是阿里云上所购买的实例。在JAVA代码中,要做到往A发消息的时候也要往B推送消息。
光看需求,其实so easy。因为自己对RocketMQ不是很了解,就“天真”的认为只是调用两个实例化对象的send方法而已。实际上这里蕴藏着一个深坑。
用过RocketMQ的同学都知道,RocketMQ是基于NameSrv的地址去发送消息的,A和B明显是两个不同的NameSrv地址,但是为什么所有消息都只能发往A或者B而不能同时发送?
解决问题的方法就是跟踪源代码,RocketMQ是基于JAVA的,所以这方面还是相对比较友好的。
不管是基于哪种的操作类(Spring的RocketMQTemplate或者是阿里云openservices下的操作类),最后的方法都基于了 producer.start() 或者是 consume.start() 方法。问题就是出在这个start方法上。
以下是start()方法中的一部分代码,可以看到是基于了单例模式获取了MQ的一个对象实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
以下是代码的具体实现部分
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
private String unitName;
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
可以看到,所有的实例都是基于clientId,再参照以上的buildMQClientId的方法,如果在同一个服务器中,那么this.getClientIP是更古不变的。
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
而通过以上代码可以看到。因为rocketmq.clinet.name 默认是写死为 DEFAULT 的,所以我们所获取的 instanceName 在每次启动时虽然不一样,但是全局获取的 instanceName是一样的。
通过上述代码可以得出,每次的clientId都是相同的,因此不管你有多少个实例,只要这些实例的instanceName没有被人为改动过,那么这些实例都会共享第一次加载的配置,因此其他的配置都无效,问题就是出在了这个 instanceName 上
解决办法:
在项目启动的时候,使用DefaultMQProducer来构建消费者和创建者,Spring生成相对应的producer以及consumer的时候,分别设置
producer.setInstanceName("****"); // 实例的名称
consumer.setInstanceName("****");
如果使用阿里云的 openservices 可以直接设置
properties.setProperty(PropertyKeyConst.InstanceName,"****");
目前如果单纯使用SpringBoot的RocketMQTemplate,本人目前没有找出其解决办法,如果想着从RocketMQTempalte中getProducer(),然后再设置 InstanceName或者先shutDown再start 都会报错,大概意思就是 这个实例已经启动无法更改值或者这个实例大概已经被启动了一次
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/6474.html