rocketMq下载安装
选择版本进行下载:http://rocketmq.apache.org/release_notes/
我选择4.9版本的进行下载
有点让人疑惑的是这个地址在谷歌浏览器里打开下载不了,需要在ie浏览器下载。
下载完成后的目录结构如下:
添加环境变量 ROCKETMQ_HOME
这里我是手动把文件名字改了,不让那么长。这个自己随意改就好。
rocketMq启动
在bin目录打开cmd命令窗口
启动NAMESERVER: start mqnamesrv.cmd
启动BROKER: start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
注意:如果启动时提示请设置ROCKETMQ_HOME,电脑重启一下就好了
插件部署
下载地址:点此下载
下载完成解压后,打开此文件
返回根目录
进入rocketMq-console目录,打开cmd,
在此处执行:mvn clean package -Dmaven.test.skip=true,进入编译阶段
显示success,打包成功。
执行命令:
java -jar rocketmq-console-ng-2.0.0.jar
启动完成,访问localhost:8888
至此,mq环境搭建完成,可以通过控制层查看topic信息等。
快速使用
这里创建两个工程,一个生产者,负责生产消息;一个消费者,负责消费。
添加依赖
完整依赖如下,可按自己需求进行调整。我这里使用的是log4j2日志框架,springboot 2.1.7版本。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ssw</groupId>
<artifactId>rocketmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--日志相关log4j2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!--去除logback依赖包,去掉springboot默认配置-->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 引入log4j2依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
生产者配置
application配置
server.port=5001
rocketMq.address=127.0.0.1:9876
rocketMq.group=mq_producer
生产者配置类
@Component
public class MessageProducer {
@Value("${rocketMq.address}")
private String rocketMqAddress;
@Value("${rocketMq.group}")
private String producerGroup;
private DefaultMQProducer producer;
//对象在用之前必须要调用一次,只能初始化一次
public void start() {
try {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(rocketMqAddress);
this.producer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 异步发送mq
* @param topic
* @param tags
* @param body
*/
@Async
public void sendMessage(String topic, String tags, String body) {
try {
Message message = new Message(topic, tags, body.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息生产成功, sendResult:{}" + sendResult);
}
@Override
public void onException(Throwable throwable) {
Integer retryTimes = producer.getRetryTimesWhenSendAsyncFailed();
System.out.println("消息生产失败, retryTimes:{}" + retryTimes);
}
});
} catch (Exception e) {
System.out.println("消息生产失败,topic:{}, tags:{}, body:{}" + topic + "-----" + tags + "-----" + body);
}
}
}
自启动配置
@Component
public class Start implements CommandLineRunner {
@Autowired
private MessageProducer messageProducer;
@Override
public void run(String... args) throws Exception {
messageProducer.start();
}
}
创建控制层
@RestController
@RequestMapping("production")
public class ProductionController {
@Autowired
private MessageProducer messageProducer;
/**
* 生产数据
* @param topic
* @param tags
* @param text
* @return
*/
@RequestMapping("push")
public boolean callback(String topic, String tags, String text) {
try {
messageProducer.sendMessage(topic, tags, text);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
测试生产
访问测试生产消息的地址:http://localhost:5001/production/push?topic=test_topic
消费者配置
application配置
server.port=5002
rocketMq.address=127.0.0.1:9876
rocketMq.group=mq_consumer
#监听topic
rocketMq.Listener.topic=test_topic
消费者配置类
@Log4j2
@Component
public class ListenerService {
@Value("${rocketMq.group}")
private String consumerGroup;
@Value("${rocketMq.address}")
private String address;
@Value("${rocketMq.Listener.topic}")
private String listenerTopic;
private DefaultMQPushConsumer consumer;
public void start() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(address);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(listenerTopic, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, content) -> {
Message msg = msgs.get(0);
String topic = msg.getTopic();
try {
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
log.error(topic + "---消费成功---" + topic + ",tags=" + tags + ",keys=" + keys + ",msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
log.error(topic + "---消费失败");
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
}
自启动配置
@Component
public class Start implements CommandLineRunner {
@Autowired
private ListenerService listenerService;
@Override
public void run(String... args) throws Exception {
listenerService.start();
}
}
启动
当生产者生产topic为test_topic,消费者就可以接收到。
上述所有示例代码,已上传gitee,可自行下载。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/143373.html