1.1 添加依赖
SpringCloudStream集成RocketMQ,Alibaba按照SpringCloudStream的规范实现了将信息推送给发送和接收通道的绑定器
在消息发送方和订阅方都添加依赖
<!-- SpringCloudStream 集成RocketMQ -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
1.2 消息生产服务配置
在消息生产端yaml
配置相关参数
spring:
cloud:
stream:
# Binding配置项,对应Map<String,BindingProperties>,order-output为Key,是Binding的名称
# 自定义且可以定义多个,分为Input和Output两种配置,但在配置项中并不会体现出来,在@Input和@Output注解中体现
bindings:
order-output:
# rocketmq 的topic
destination: ORDER-TOPIC-01
# 消息内容格式
content-type: application/json
product-output:
destination: PRODUCT-TOPIC-01
content-type: application/json
# SpringCloudStream RocketMQ配置项
rocketmq:
binder:
# rocketmq 的nameserver地址
name-server: xxx.com:9876
bindings:
order-output:
# producer配置项,对应RocketMQProducerProperties 类
producer:
# 生产者分组
group: dev
# 是否同步发送消息,默认为false异步
sync: true
product-output:
producer:
group: product-dev
# 发送事务消息 默认为false
transactional: true
创建接口,通过@Output
注解绑定消息发送的Binding
,接口方法的返回类型为MessageChannel
,可以用来发送消息内容,需要注意的是@Output
注解的内容必须与配置文件中bingdings
中配置的名称一致
public interface OrderMessage {
@Output("order-output")
MessageChannel orderOutput();
@Output("product-output")
MessageChannel productOutput();
}
创建发送消息接口
@RequestMapping
@RestController
public class PlaceOrderController {
@Resource
OrderMessage orderMessage;
@PostMapping("placeOrder")
public boolean placeOrder(){
// 创建订单对象
Order order = new Order(1, 1, "lizhi", 2, new BigDecimal(88), 2);
//创建Spring Message对象
Message<Order> orderMessage = MessageBuilder.withPayload(order).build();
return this.orderMessage.orderOutput().send(orderMessage);
}
}
此时还并不能将MessageSource
对象注入进来,需要指定某个接口开启Bingding
功能后,才回去扫描@Output
注解。
在启动类添加EnableBinding
注解,并指定绑定的接口
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@RestController
@EnableBinding(OrderMessage.class)
@RefreshScope
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class,args);
}
@Resource
UserToOrderFeignClient userToOrderFeignClient;
@GetMapping("test")
@SentinelResource(value = "test")
List<Order> test(){
return userToOrderFeignClient.details(1);
}
}
1.3 消息消费服务配置
在消息消费端配置相关参数
spring:
cloud:
stream:
bindings:
order-input:
# rocketmq 的topic
destination: ORDER-TOPIC-01
# 消息内容格式
content-type: application/json
# 消费者组 命名规则 组名+topic名
group: order-consumer-ORDER-TOPIC-01
rocketmq:
binder:
name-server: xxx.com:9876
bindings:
order-input:
consumer:
# 是否开启消费,默认为true
enabled: true
# 是否使用广播消费,默认为false使用集群消费
broadcasting: false
创建接口,通过@Input
注解绑定消息接收的Binding
,接口方法的返回类型为SubscribableChannel
,可以用来订阅消息,需要注意的是@Input
注解的内容必须与配置文件中bingdings
中配置的名称一致
public interface MessageSource {
@Input("order-input")
SubscribableChannel orderInput();
}
订阅消息
@Component
@Log4j2
public class MessageConsumer {
@StreamListener("order-input")
public void onMessage(@Payload Order order){
log.info("订单内容:[{}]",order);
}
}
注:因为我们的消息内容为Order
实体,所以需要@Payload
注解来进行反序列化
指定接口开启Binding
功能
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableBinding(MessageSource.class)
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class,args);
}
}
然后调用接口测试
http://localhost:9527/user/placeOrder
日志可以看到接收到订单信息
[ConsumeMessageThread_1] com.lizhi.config.MessageConsumer : 订单内容:[Order(oid=1, uid=1, userName=lizhi, pid=2, productPrice=88, num=2)]
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153615.html