1、项目结构:
2、Provider
2.1、Provider pom
<!--spring boot 版本依赖-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
</dependencies>
2.2、application.yml
server:
port: 8080
rocketmq:
producer:
groupName: hello_group_provider
nameServer: 192.168.92.39
port: 9876
2.3、RocketMQProducer
@Component
public class RocketMQProducer {
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.nameServer}")
private String nameServer;
@Value("${rocketmq.producer.port}")
private String port;
private DefaultMQProducer producer;
/**
* 功能描述 获取当前producer
*/
@Bean
public DefaultMQProducer getProducer() {
//设置producer 并且开启producer
producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(nameServer + ":" + port);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
2.4、Service
@Service
public class ProducerService {
@Autowired
private DefaultMQProducer producer;
private static final String topic = "helloTopic";
private static final String tag = "helloTag";
public void sendMsg() {
Message msg = new Message(topic, tag, "hello RocketMQ".getBytes());
try {
SendResult send = producer.send(msg);
System.out.println(send);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5、测试方法
@RunWith(SpringRunner.class)
//主application方法
@SpringBootTest(classes = RocketMQProducerApplication.class)
public class ProducerTest {
@Autowired
private ProducerService producerService;
@Test
public void testSend() {
producerService.sendMsg();
}
}
2.6、运行
No route info of this topic, xxxxxxxxxx
解决思路:maven版本必须和RocketMQ b版本一致
发送超时
rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker。
#修改文件
vim conf/broker.conf
brokerIP1=192.168.92.39
#重启broker
sh ./mqshutdown broker
nohup sh ./mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 &
**2,7、重新运行
3、Customer
3.1、pom文件同Customer
3.2、application.yml
server:
port: 8081
rocketmq:
consumer:
groupName: hello_group_consumer
nameServer: 192.168.92.39
port: 9876
3.3、消费者
@Component
public class RocketMQCustomer {
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.nameServer}")
private String nameServer;
@Value("${rocketmq.consumer.port}")
private String port;
private static final String topic = "helloTopic";
@Bean
public DefaultMQPushConsumer getConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
//配置组名 IP
consumer.setNamesrvAddr(nameServer + ":" + port);
consumer.setConsumerGroup(groupName);
//最后偏移量读取消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
//订阅主题,监听主题下哪些标签
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
//默认一条一条消息消费
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
//返回稍后进行投递
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//开始
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return consumer;
}
}
3.4、启动消费者
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15177.html