0.前置条件已经搭建好了rocketMQ服务器端服务。
1.添加rocketMQ pom相关依赖
<!--集成rocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2.配置文件application.yml 中添加rocketMQ相关配置
rocketmq:
# rocketMQ的命名服务器,格式为: host:port;host:port
name-server: ip:9876
producer:
# 生产者的组名
group: hc1
# 发送消息超时时间 默认3秒
send-message-timeout: 30000
3.项目代码
ProducerService 端
@Component
@Slf4j
public class ProducerService {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(Object msgDTO) {
log.info("==========>发送消息 消息类型:【{}】 消息内容:{}", "test", msgDTO);
rocketMQTemplate.asyncSend("MQConstant", MessageBuilder.withPayload(msgDTO).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==========>消息发送成功", JSON.toJSONString(msgDTO));
}
@Override
public void onException(Throwable e) {
log.error("==========>消息发送失败", e);
}
});
}
}
ConsumerService端
@Slf4j
@Component
@RocketMQMessageListener(topic = "MQConstant", consumerGroup ="hc1")
public class ConsumerService implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgBody = new String(message.getBody());
log.info("==========>消费消息 消息内容:{}", JSON.toJSONString(msgBody));
}
}
Controller使用
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
ProducerService producerService;
@Resource
private IUserService userService;
/**
*根据id查询用户,然后放mq里传送
* @param id
* @return
*/
@GetMapping("/getUserByIdMQ/{id}")
public Result getUserByIdMQ(@PathVariable("id") Integer id) {
User user = userService.getById(id);
if (BeanUtil.isEmpty(user)) return Result.ok();
//发mq
producerService.send(JSONUtil.toJsonStr(user));
return Result.ok(UserCovertBasic.INSTANCE.toConvertUserVO(user));
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/65732.html