一、Rocket 安装
1.1 下载
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
1.2 环境
1.jdk1.8
2.linux
1.3 安装
# 1.上传服务器
rocketmq-all-4.4.0-bin-release.zip
# 2.解压
unzip rocketmq-all-4.4.0-bin-release.zip
# 3.移动到/usr/local
mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq
1.4 修改配置文件
编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
1.5 测试启动
# 进入bin目录下
# 1.启动nameserver
nohup ./bin/mqnamesrv &
# 2.启动broker
nohup ./mqbroker -n localhost:9876 &
1.6 测试消息发送
export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
1.7 测试消息接受
export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
1.8 关闭
bin/mqshutdown broker
bin/mqshutdown namesrv
二、Rocket 控制台安装
2.1 下载
在 git 上 下 载 下 面 的 工 程 rocketmq-console-1.0.0 https://github.com/apache/rocketmq-externals/releasess
2.2 修改配置
修 改 配 置 文 件
rocketmq-consolesrcmainresourcesapplication.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启9876端口
2.3 打包启动
# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true # 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar
2.4 测试访问
localhost:7777
三、springboot集成rocketmq
3.1 pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
3.2 发送消息
public class RocketMQSendTest {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("192.168.109.131:9876");
//3. 启动生产者
producer.start();
//4. 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag",
("RocketMQ Message").getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg,10000); System.out.println(sendResult);
//6. 关闭生产者
producer.shutdown();
}
}
3.3 接受消息
//接收消息
public class RocketMQReceiveTest {
public static void main(String[] args) throws MQClientException {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer- group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.109.131:9876");
//3. 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
ConsumeConcurrentlyContext
context) {
System.out.println("Receive New Messages: " + msgs);
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start(); System.out.println("Consumer Started.");
}
}
原文始发于微信公众号(Coding路人王):阿里RocketMQ安装服务器及客户端
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/41701.html