RocketMQ 多个实例消息聚集在一个实例的坑

导读:本篇文章讲解 RocketMQ 多个实例消息聚集在一个实例的坑,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

 

RT。上周有个问题一直未解决,先说需求:

有A和B两个实例的RocketMQ,并不是集群关系。A的实例是服务器本地的RocketMQ,B的实例是阿里云上所购买的实例。在JAVA代码中,要做到往A发消息的时候也要往B推送消息。

光看需求,其实so easy。因为自己对RocketMQ不是很了解,就“天真”的认为只是调用两个实例化对象的send方法而已。实际上这里蕴藏着一个深坑。

用过RocketMQ的同学都知道,RocketMQ是基于NameSrv的地址去发送消息的,A和B明显是两个不同的NameSrv地址,但是为什么所有消息都只能发往A或者B而不能同时发送?

解决问题的方法就是跟踪源代码,RocketMQ是基于JAVA的,所以这方面还是相对比较友好的。

不管是基于哪种的操作类(Spring的RocketMQTemplate或者是阿里云openservices下的操作类),最后的方法都基于了 producer.start() 或者是  consume.start() 方法。问题就是出在这个start方法上。

以下是start()方法中的一部分代码,可以看到是基于了单例模式获取了MQ的一个对象实例

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

以下是代码的具体实现部分

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
 private String unitName;


 public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

可以看到,所有的实例都是基于clientId,再参照以上的buildMQClientId的方法,如果在同一个服务器中,那么this.getClientIP是更古不变的。

private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
}

而通过以上代码可以看到。因为rocketmq.clinet.name 默认是写死为 DEFAULT 的,所以我们所获取的 instanceName 在每次启动时虽然不一样,但是全局获取的 instanceName是一样的。

通过上述代码可以得出,每次的clientId都是相同的,因此不管你有多少个实例,只要这些实例的instanceName没有被人为改动过,那么这些实例都会共享第一次加载的配置,因此其他的配置都无效,问题就是出在了这个 instanceName

 

解决办法:

在项目启动的时候,使用DefaultMQProducer来构建消费者和创建者,Spring生成相对应的producer以及consumer的时候,分别设置

producer.setInstanceName("****");   // 实例的名称
consumer.setInstanceName("****");

如果使用阿里云的 openservices 可以直接设置

properties.setProperty(PropertyKeyConst.InstanceName,"****");

目前如果单纯使用SpringBoot的RocketMQTemplate,本人目前没有找出其解决办法,如果想着从RocketMQTempalte中getProducer(),然后再设置 InstanceName或者先shutDown再start 都会报错,大概意思就是 这个实例已经启动无法更改值或者这个实例大概已经被启动了一次

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/6474.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!