初探RocketMQ

初探RocketMQ

1、引言

Message Queue(消息队列),从字面上理解:首先它是一个队列。FIFO先进先出的数据结构-队列。消息队列就是所谓的存放消息的队列。

消息队列解决的不是消息的队列的目的,解决的是通信问题。

初探RocketMQ

比如以电商订单系统为例,如果各服务之间使用同步通信,不仅耗时较久,且过程中受到网络波动的影响,不能保证高成功率。因此,使用异步的通信方式对架构进行改造。

初探RocketMQ

使用异步的通信方式对模块间的调用进行解耦,可以快速的提升系统的吞吐量。上游执行完消息的发送业务后立即获得结果,下游多个服务订阅到消息后各自消费。通过消息队列,屏蔽底层的通信协议,使得解耦和并行消费得以实现。

2、RocketMQ介绍

随着使⽤中队列和虚拟主题的增加,阿⾥巴巴团队使⽤的ActiveMQ IO 模块达到了瓶颈。为了尽⼒通过节流、断路器或降级来解决这个问题,但效果不佳。所以开始关注当时流⾏的消息传递解决⽅案Kafka。不幸的是, Kafk⽆法满⾜要求,尤其是在低延迟和⾼可靠性⽅⾯。在这种情况下,决定发明⼀种新的消息传递引擎来处理更⼴泛的⽤例,从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前RocketMQ已经开源给Apache基⾦会。如今,已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。

消息产品 客户端SDK 协议和规范 订购 信息 预定消息 批量消息 广播消息 消息过滤器 服务器触发的重新交付 消息存储 消息追溯 消息优先级 高可用性和故障转移 消息跟踪 配置 管理和运
ActiveMQ Java、.Net、C++等 推送模型,支持OpenWire、STOMP、AMQP、MQTT、JMS Exclusive Consumer或Exclusive Queues可以保证排序 支持 不支持 支持 支持 不支持 使用 JDBC可高性能日志支持非常快速的持久化,例如levelDB、kahaDB 支持 支持 支持,取决于存储,如果使用levelDB则需要ZooKeeper服务器 不支持 默认配置为低级,用户需优化配置参数 支持
kafka Java、Scala等 拉取模型,支持TCP 确保分区内消息的排序 不支持 支持,带有异步生产者 不支持 支持,可以使用kafka Streams过滤消息 不支持 高性能文件存储 支持,偏移量指示 不支持 支持,需要ZooKeeper服务器 不支持 kafka使用键值对格式进行配置。这些值可以从文件或以编程方式提供。 支持,使用终端命令公开核心指标
RocketMQ Java、C++、Go 拉取模型,支持TCP、JMS、OpenMessaging 确保消息的严格排序,并且可以优雅地横向扩展 支持 支持,具有同步模式以避免消息丢失 支持 支持基于SQL92的属性过滤器表达式 支持 高性能和低延迟的文件存储 支持,时间戳和偏移量两种表示 不支持 支持的主从模型,无需其他套件 支持 开箱即用,用户只需注意一些配置 支持,丰富的Web和终端命令以公开核心指标

3、RocketMQ的基本概念

3.1 技术架构

初探RocketMQ

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的ZooKeeper,支持Broker的动态注册于发现。主要包括两个功能:Broker管理,NameServer接收Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息。Producer,Consumer仍然可以动态感知Broker的路由信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:

    初探RocketMQ
    • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
    • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    • HA Service:高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
    • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

3.2 部署架构

初探RocketMQ

RocketMQ网络部署特点:

  • NameServer是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
  • Broker部署相对复杂, Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master, Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义, BrokerId为0表示Master,⾮0表示Slave。 Master也可以部署多个。每个Broker与NameServer集群中的所有节点建⽴⻓连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上⽀持⼀Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建⽴⻓连接,且定时向Master发送⼼跳。 Producer完全⽆状态,可集群部署。
  • Consumer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、 Slave建⽴⻓连接,且定时向Master、 Slave发送⼼跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时, Master服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读I/O),以及从服务器是否可读等因素建议下⼀次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer, NameServer起来后监听端⼝,等待Broker、 Producer、  Consumer连上来,相当于⼀个路由控制中⼼。
  • Broker启动,跟所有的NameServer保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前Broker信息(IP+端⼝等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时⾃动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中⼀台建⽴⻓连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择⼀个队列,然后与队列所在的Broker建⽴⻓连接从⽽向Broker发消息。
  • Consumer跟Producer类似,跟其中⼀台NameServer建⽴⻓连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建⽴连接通道,开始消费消息。

4、快速开始

4.1 下载RocketMQ

这里使用的是RocketMQ4.7.1版本,去官方下载即可:

初探RocketMQ

4.2 安装RocketMQ

  • 准备一台装有Linux系统的虚拟机,这里我用的是Centos7版本
  • 安装JDK,并配置环境变量
  • 安装RocketMQ,上传RocketMQ安装包并使用unzip命令解压缩再/usr/local/rocketmq目录下。
  • 配置JDK和RocketMQ的环境变量。
初探RocketMQ

注意,RocketMQ的环境变量用来加载ROCKETMQ_HOME/conf下的配置文件,如果不配置则无法启动NameServer和Broker。

完成后执行命令,让环境变量生效:

source /etc/profile

修改bin/runserver.sh文件,由于RocketMQ默认设置的JVM内存为4G,但虚拟机一般没有这么大内存,因此调整为512MB。

vim runserver.sh
初探RocketMQ

4.3 启动NameServer

启动RocketMQ服务需要先启动NameServer。

在bin目录内使用静默方式启动:

nohup ./mqnamesrv -n 192.168.159.33:9876 &
初探RocketMQ

4.4 启动Broker

  • 修改broker的JVM参数配置,将默认8G内存修改为512MB。
vim runbroker.sh
初探RocketMQ
  • conf/broker.conf文件中加入如下配置,开启自动创建Topic功能。
autoCreateTopicEnable=true
初探RocketMQ
  • 以静默方法启动Broker

    nohup ./mqbroker -n 192.168.159.33:9876 &
初探RocketMQ

4.5 使用发送和接收消息验证MQ

  • 配置NameServer的环境变量

在发送/接收消息之前,需要告诉客户端NameServer的位置。配置环境变量NAMESRV_ADDR

export NAMESRV_ADDR=192.168.159.33:9876
  • 使用bin/tools.sh工具验证消息的发送,默认会发1000条消息
 ./tools.sh org.apache.rocketmq.example.quickstart.Producer

执行上述命令后,发送消息的日志如下:

初探RocketMQ
  • 使用bin/tools.sh工具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行上述命令后,可以看到接收的消息

初探RocketMQ

4.6 关闭服务器

  • 关闭Broker:

    ./mqshutdown broker
    初探RocketMQ
  • 关闭NameServer:

    ./mqshutdown namesrv
    初探RocketMQ


原文始发于微信公众号(全栈开发那些事):初探RocketMQ

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/286092.html

(0)
码上实战的头像码上实战

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!