阿里RocketMQ安装服务器及客户端

一、Rocket 安装

1.1 下载

http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

1.2 环境

1.jdk1.8

2.linux

1.3 安装

1.上传服务器
rocketmq-all-4.4.0-bin-release.zip
2.解压
unzip rocketmq-all-4.4.0-bin-release.zip
3.移动到/usr/local
mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq

1.4 修改配置文件

编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的 
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" 

1.5 测试启动

# 进入bin目录下
1.启动nameserver
nohup ./bin/mqnamesrv & 

2.启动broker
nohup ./mqbroker -n localhost:9876 &

1.6 测试消息发送

export NAMESRV_ADDR=localhost:9876
bin/tools.sh  org.apache.rocketmq.example.quickstart.Producer

1.7 测试消息接受

export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

1.8 关闭

bin/mqshutdown broker
bin/mqshutdown namesrv

二、Rocket 控制台安装

2.1 下载

在 git 上 下 载 下 面 的 工 程 rocketmq-console-1.0.0 https://github.com/apache/rocketmq-externals/releasess

2.2 修改配置

修 改 配 置 文 件

rocketmq-consolesrcmainresourcesapplication.properties

server.port=7777 #项目启动后的端口号

rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启9876端口

2.3 打包启动

# 进入控制台项目,将工程打成jar包

mvn clean package -Dmaven.test.skip=true # 启动控制台

java -jar target/rocketmq-console-ng-1.0.0.jar

2.4 测试访问

localhost:7777

三、springboot集成rocketmq

3.1 pom依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>

3.2 发送消息

public class RocketMQSendTest {
    public static void main(String[] args) throws Exception {
        //1. 创建消息生产者, 指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.109.131:9876");
        //3. 启动生产者
        producer.start();
        //4. 创建消息对象,指定主题、标签和消息体
        Message msg = new Message("myTopic""myTag",
        ("RocketMQ Message").getBytes());
        //5. 发送消息
        SendResult sendResult = producer.send(msg,10000); System.out.println(sendResult);
        //6. 关闭生产者
        producer.shutdown();
 }
}

3.3 接受消息

//接收消息
public class RocketMQReceiveTest {
    public static void main(String[] args) throws MQClientException {
        //1. 创建消息消费者, 指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer- group");
        //2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.109.131:9876");
        //3. 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic""*");
        //4. 设置回调函数,编写处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
        msgs,
        ConsumeConcurrentlyContext
        context)
 
{
        System.out.println("Receive New Messages: " + msgs);
        //返回消费状态
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        });
        //5. 启动消息消费者
        consumer.start(); System.out.println("Consumer Started.");
 }
}


原文始发于微信公众号(Coding路人王):阿里RocketMQ安装服务器及客户端

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41701.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!