在进行SpringBoot和RocketMQ整合之前需要启动RocketMQ的name server服务和Broker服务
pom依赖文件
<dependencies>
<!-- 因为需要通过访问URL实现发送消息,所以需要web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
yml配置文件
rocketmq:
name-server: localhost:9876 #配置RocketMQ的name server
producer:
group: producer # 配置消息生产者的group,必须配置
定义消息实体类
public class MqMsg {
private String topic;
private String tag;
private Object content;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public Object getContent() {
return content;
}
public void setContent(Object content) {
this.content = content;
}
}
消息生产者接口
public interface RocketMqService {
/**
* 同步发送消息
*/
void send(MqMsg mqMsg);
/**
* 异步发送消息,异步返回消息结果
*/
void asyncSend(MqMsg mqMsg);
/**
* 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
*/
void syncSendOrderly(MqMsg mqMsg);
}
消息生产者实现类
进行SpringBoot和RocketMQ整合时,关键使用的是RocketMQTemplate类来进行消息发送,其中包含有send()、asyncSend()、sendOneWay()、sendMessageInTransaction()等方法,每个方法都至少包含有参数destination和message。
destination:消息发送到哪个topic和tag,在SpringBoot中topic和tag使用一个参数发送,其中用英文的冒号(:)进行连接。
message:消息体,需要使用 MessageBuilder.withPayload方法对消息进行封装。
@Service
public class RocketMqServiceImpl implements RocketMqService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final Logger log = LoggerFactory.getLogger(RocketMqServiceImpl.class);
@Override
public void send(MqMsg msg) {
log.info("send发送消息:{}", msg);
rocketMQTemplate.send(msg.getTopic() + ":" + msg.getTag(),
MessageBuilder.withPayload(msg.getContent()).build());
}
@Override
public void asyncSend(MqMsg msg) {
log.info("asyncSend发送消息:{}", msg);
rocketMQTemplate.asyncSend(msg.getTopic() + ":" + msg.getTag(), msg.getContent(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("事物消息发送成功:{}", sendResult.getTransactionId());
}
@Override
public void onException(Throwable throwable) {
log.info("mqMsg={}消息发送失败", msg);
}
});
}
@Override
public void syncSendOrderly(MqMsg msg) {
log.info("syncSendOrderly发送消息:{}", msg);
rocketMQTemplate.sendOneWay(msg.getTopic() + ":" + msg.getTag(), msg.getContent());
}
}
消息消费者
消息生产者中的每一个方法都需要对应一个消息消费者
@Service
@RocketMQMessageListener(topic = "topicA", consumerGroup = "consumer1", selectorExpression = "send")
public class SendConsumer implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(SendConsumer.class);
@Override
public void onMessage(String s) {
log.info("消费者成功消费消息:{}", s);
}
}
@Service
@RocketMQMessageListener(topic = "topicA", consumerGroup = "consumer2", selectorExpression = "asyncSend")
public class AsyncSendConsumer implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(AsyncSendConsumer.class);
@Override
public void onMessage(String s) {
log.info("消费者成功消费消息:{}", s);
}
}
1.消息消费者需要实现接口RocketMQListener并重写onMessage消费方法 2.@RocketMQMessageListener注解参数解释
-
topic:表示需要监听哪个topic的消息 -
consumerGroup:表示消费者组 -
selectorExpression:表示需要监听的tag
controller
@RestController
public class MqController {
@Autowired
private RocketMqServiceImpl rocketMqService;
@GetMapping("/send")
public void send(){
MqMsg msg = new MqMsg();
msg.setTopic("topicA");
msg.setTag("send");
Map<String, String> map = new HashMap<>();
map.put("name", "jack");
map.put("age", "23");
map.put("tel", "10086");
msg.setContent(map.toString());
rocketMqService.send(msg);
}
@GetMapping("/asyncSend")
public void asyncSend(){
MqMsg msg = new MqMsg();
msg.setTopic("topicA");
msg.setTag("asyncSend");
Map<String, String> userinfo = new HashMap<>();
userinfo.put("name", "lee");
userinfo.put("age", "24");
userinfo.put("tel", "10010");
msg.setContent(userinfo.toString());
rocketMqService.asyncSend(msg);
}
}
原文始发于微信公众号(良猿):SprinBoot整合RocketMQ
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/215764.html