ActiveMQ集群模式

导读:本篇文章讲解 ActiveMQ集群模式,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

目录

一、面试题

二、多节点集群是什么

三、zookeeper+replicated-leveldb-store的主从集群

四、官网集群原理图

五、部署规划和步骤

六、集群可用性测试


一、面试题

引入消息队列之后该如何保证其高可用性

二、多节点集群是什么

基于ZooKeeper和LevelDB搭建ActiveMQ 集群集群仅提供主备方式的高可用集群功能,避免单点故障

三、zookeeper+replicated-leveldb-store的主从集群

官网:ActiveMQ (apache.org)

ActiveMQ集群模式

 两种集群方式

  1. 基于sharedFileSystem共享文件系统(KahaDB默认)
  2. 基于JDBC

四、官网集群原理图

ActiveMQ集群模式

ActiveMQ集群模式

五、部署规划和步骤

(一)环境和版本

CentOS 7 x86_64

JDK1.8

zookeeper-3.5.7

activemq-5.16.5

(二)关闭防火墙并保证win可以ping通过ActimveMq服务器

(三)要求具备ZK集群并可以成功启动

(四)集群部署规划列表

ActiveMQ集群模式

(五)创建3台集群目录

ActiveMQ集群模式

(六)修改管理控制台端口

1.mq_node01全部不动

2.mq_node02进入conf目录下得jetty.xml文件进行修改。port改为8162

ActiveMQ集群模式

ActiveMQ集群模式

3.mq_node03进入conf目录下得jetty.xml文件进行修改。port改为8163

 ActiveMQ集群模式

ActiveMQ集群模式

这里修改port的同时可以将上面的host那一行注释掉,不然只允许本机访问,Windows打不开前台

将下面这一行注释掉或者将value改为主机ip 

<property name=”host” value=”127.0.0.1″/>

(七)AcitveMQ集群配置

1.三个节点的brokername要全部一致

进入activemq.xml文件进行修改

ActiveMQ集群模式

brokername改为mqcluster 

ActiveMQ集群模式

2.3个节点的持久化配置

node1,2,3分别对应

bind:63631,63632,63633

zkAddress对应三台zookeeper集群所在的服务ip和zookeeper端口号(我这里zookeeper集群分布在三台虚拟机上)

ActiveMQ集群模式

 
(八)修改各节点的消息端口

node1,2,3分别对应

openwire的url对应61616,61617,61618 

ActiveMQ集群模式
(九)按顺序启动3个ActiveMQ节点到这步前提是zk集群已经成功启动运行

首先通过之前编写的zookeeper脚本启动位于3台虚拟机的zookeeper集群

ActiveMQ集群模式

编写一个启动和关闭脚本

ActiveMQ集群模式

  amq_batch.sh

#!/bin/sh

cd /mq_cluster/mq_node01/bin
./activemq start

cd /mq_cluster/mq_node02/bin
./activemq start

cd /mq_cluster/mq_node03/bin
./activemq start

 amq_batch_stop.sh

#!/bin/sh

cd /mq_cluster/mq_node01/bin
./activemq stop

cd /mq_cluster/mq_node02/bin
./activemq stop

cd /mq_cluster/mq_node03/bin
./activemq stop

记得用chmod 777 文件名修改文件权限,使文件可执行

ActiveMQ集群模式

(十)zk集群的节点状态说明

登录另一台虚拟机的zookeeper客户端,查看/节点下是否有activemq节点

./zkCli.sh -server 127.0.0.1:2181

ls /

ActiveMQ集群模式

 可以看到三个结点都配置成功了ActiveMQ集群模式

 再查看节点的具体信息

可以看到63633也就是我们的node03activemq有elected,即被选中作为master,其他两台为slave

ActiveMQ集群模式

 windows下访问8363,成功

ActiveMQ集群模式

六、集群可用性测试

测试队列发送与接收

生产者

public class JMSProduce {

    public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)?randomize=false";
    public static final String QUEUE_NAME = "queue-cluster";

    public static void main(String[] args) throws JMSException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建生产者
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        //6.通过MessageProducer生成3条消息到MQ队列中
        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("clusterMessage--" + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();

        System.out.println("消息发布到完成……");
    }
}

消费者

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "failover:(tcp://ip:61616,tcp://ip:61617,tcp://ip:61618)?randomize=false";
    public static final String QUEUE_NAME = "queue-cluster";

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到cluster消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //连接AvtiveMQ需要等待时间
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}

 ActiveMQ集群模式

 ActiveMQ集群模式

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/118466.html

(0)
seven_的头像seven_bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!