1. RocketMQ集群
1.1 各角色介绍
-
Producer:消息的发送者。 -
Consumer:消息的接收者。 -
Broker:暂存和传输消息。 -
NameServer:管理Broker。 -
Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息接收者可以订阅一个或者多个Topic消息。 -
Mssage Queue:相当于是Topic的分区。
1.2 集群的特点
-
NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步 -
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示为Slave。Master可以部署多个,每个broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 -
Producer与NameServer集群中其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。 -
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer中取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
1.3 集群模式
-
单Master模式
这种方式风险很大,一旦Broker宕机,会导致整个服务不可用
-
多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
-
优点:配置简单,单个Master宕机或者重启维护对应用无影响 -
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性受到影响
-
多Master多Slave模式(异步)
每个Master配置一个Slave,有多对Master-Slave,HA(高可用)采用异步复制方式,主备有短暂的消息延迟(毫秒级),这种模式的优缺点:
-
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,Master宕机后,消费仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样 -
缺点:Master宕机,磁盘损坏的情况下会丢失少量消息
-
多Master多Slave模式(同步)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功了,才会向应用返回成功,这种模式的优缺点:
-
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性和消息可用性都非常高 -
缺点:性能比异步复制模式稍微低些(大约低10%),发送单个消息的RT会高一些,当前版本,主节点宕机后备节点不能切换为主机
1.4 双主双从集群
-
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心 -
Broker启动,跟所有NameServer保持长连接,定时发送心跳包。心跳包中包含了当前Broker消息(IP+端口等)以及存储所有Topic消息。注册成功后,NameServer集群就有Topic跟Broker的映射关系。 -
收发消息时,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic -
Producer发送消息,启动时先跟NameServer集群中的一台机器建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发送消息。 -
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟broker建立连接通道,开始消费消息。
1.5 双主双从搭建过程
接下来我将会演示如何在Mac系统本地搭建双主双从的RocketMQ集群!
1. 下载RocketMQ二进制包
下载地址:https://rocketmq.apache.org/download
下载版本:rocketmq-all-4.7.0-bin-release
我们下载完之后在一个选定目录(我的是:/Users/davisyou/software/)下复制两份,分别取名为:rocketmq-all-4.7.0-bin-release-nameserver和rocketmq-all-4.7.0-bin-release-sync
2. 搭建NameServer集群
进入刚刚创建的 rocketmq-all-4.7.0-bin-release-nameserver 包,在其conf包下面创建如下两个文件:
-
namesrv1.properties -
namesrv2.properties
分别在这两个properties文件里面输入 listenPort=19876和listenPort=29876。
接下来我们编写一下start.sh脚本
返回conf包上一层,创建启动脚本文件start.sh。在这个脚本里面编写如下启动语句:
echo "starting nameserver1"
nohup sh bin/mqnamesrv -c conf/namesrv1.properties &
echo "started nameserver1"
echo "starting nameserver2"
nohup sh bin/mqnamesrv -c conf/namesrv2.properties &
echo "started nameserver2"
接下来运行sh start.sh 就可以启动两个运行在端口号是19876和29876的两个NameServer了。
3. 搭建Broker集群
我们打开rocketmq-all-4.7.0-bin-release-sync/conf/2m-2s-sync目录会发现存在几个配置文件:
-
broker-a-s.properties (broker-a的从节点配置文件) -
broker-a.properties (broker-a的主节点配置文件) -
broker-b-s.properties (broker-b的从节点配置文件) -
broker-b.properties (broker-b的主节点配置文件)
下面我们修改一下这四个配置文件:
brokerClusterName=rocketmq-cluster_2m-2s-sync
brokerName=broker-a
brokerId=0
#两个nameserver的对应的ip地址。
namesrvAddr=127.0.0.1:19876;127.0.0.1:29876
#Broker 对外服务的监听端口
listenPort=10211
#删除文件时间点,默认凌晨 4点
deleteWhen=04
fileReservedTime=48
#- Broker 的角色;SLAVE;SYNC_MASTER 同步双写Master;ASYNC_MASTER 异步复制Master
brokerRole=SYNC_MASTER
#- 刷盘方式:SYNC_FLUSH 同步刷盘;ASYNC_FLUSH 异步刷盘
flushDiskType=SYNC_FLUSH
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store
#commitLog 存储路径
storePathCommitLog=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store/consumequeue
#消息索引存储路径
storePathIndex=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store/index
#checkpoint 文件存储路径
storeCheckpoint=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store/checkpoint
#abort 文件存储路径
abortFile=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a/store/abort
#限制的消息大小
maxMessageSize=65536
brokerClusterName=rocketmq-cluster_2m-2s-sync
brokerName=broker-a
brokerId=1
namesrvAddr=127.0.0.1:19876;127.0.0.1:29876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=20111
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store
storePathCommitLog=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store/commitlog
storePathConsumeQueue=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store/consumequeue
storePathIndex=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store/index
storeCheckpoint=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store/checkpoint
abortFile=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-a-s/store/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=SYNC_FLUSH
brokerClusterName=rocketmq-cluster_2m-2s-sync
brokerName=broker-b
brokerId=0
namesrvAddr=127.0.0.1:19876;127.0.0.1:29876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10311
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store
storePathCommitLog=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store/commitlog
storePathConsumeQueue=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store/consumequeue
storePathIndex=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store/index
storeCheckpoint=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store/checkpoint
abortFile=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b/store/abort
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
brokerClusterName=rocketmq-cluster_2m-2s-sync
brokerName=broker-b
brokerId=1
namesrvAddr=127.0.0.1:19876;127.0.0.1:29876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10411
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store
storePathCommitLog=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store/commitlog
storePathConsumeQueue=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store/consumequeue
storePathIndex=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store/index
storeCheckpoint=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store/checkpoint
abortFile=/Users/davisyou/software/rocketmq-all-4.7.0-bin-release-sync_broker-b-s/store/abort
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=SYNC_FLUSH
接下来我们编写一下start.sh脚本
返回conf目录的上一层,创建启动脚本文件start.sh。在这个脚本里面编写如下启动语句:
echo "starting broker-a"
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
echo "started broker-a"
echo "starting broker-a-s"
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
echo "started broker-a-s"
echo "starting broker-b"
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
echo "started broker-b"
echo "starting broker-b-s"
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
echo "started broker-b-s"
接下来运行sh start.sh 就可以启动这些broker了,运行结果如下所示:
4. RocketMQ-dashboard可视化
第一步克隆这个项目:https://github.com/apache/rocketmq-dashboard
第二步修改这个项目的配置文件:application.yml
rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:19876
- 127.0.0.1:29876
第三步:我们使用Maven打包,得到可执行的jar包
-
首先将test模块忽略掉,点击上面的蓝底闪电圆形按钮即可。 -
按照先后 选中clean+package打包,然后等待其输出构建成功标识即可
在target包下面可以得到一个jar包,然后输入如下命令即可运行此jar包:
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
原文始发于微信公众号(Java之禅):01-RocketMQ集群初探
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/161315.html