文章目录
1. 基本介绍
Spring Cloud Stream 用一个用来为微服务应用构建消息驱动能力的框架。Spring Cloud Stream 中,提供了一个微服务和消息中间件之间的一个粘合剂,这个粘合剂叫做Binder,Binder 负责与消息中间件进行交互。而我们开发者则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。
Spring Cloud Stream 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,为流行的消息中间件产品(Spring Cloud Stream 原生默认支持RabbitMQ,Kafka。阿里在官方基础上提供了RocketMQ的支持)提供了个性化的自动化配置实现,引用了发布-订阅模式,消费组,分区的三大核心概念。
2. 设计思想
那么Spring Cloud Stream是怎么屏蔽底层差异的呢?它通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种消息中间件的实现
绑定器Binder的说明:
在没有绑定器这个概念的情况下,Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,可以完美地实现应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,实现微服务和具体消息中间件的解耦,使得微服务可以关注更多自己的业务流程。一个集成Spring Cloud Stream 程序的框架示意图,如下图所示:
Binder中的INPUT和OUTPUT针对Binder本身而言,INPUT对应于消费者,OUTPUT对应于生产者。 。INPUT接收消息生产者发送的消息,OUTPUT发送消息给到消息消费者消费。
Spring Cloud Stream处理消息的业务流程图如下:
-
binder: 目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
-
Source和Sink:可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接收消息就是输入。Source用于获取数据(要发送到MQ的数据),Sink用于提供数据(要接收MQ发送的数据,提供数据给消息消费者)
-
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。用于存放source接收到的数据,或者是存放binder拉取的数据。
-
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
3. 常用注解
@StreamListener注解需要通过配置属性实现JSON字符串到对象的转换,这是因为在Spring Cloud Stream中实现了一套可扩展的消息转换机制。在消息消费逻辑执行之前,消息转换机制会根据消息头信息中声明的消息类型,找到对应的消息转换器并实现对消息的自动转换。配置示例:
spring.cloud.stream.bindings.<通道名>.content-type=application/json
4. 简单入门
创建一个Maven项目,在pom.xml添加三个依赖:Web、RabbitMQ、Spring Cloud Stream。具体要引入的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
</dependencies>
项目创建成功后,配置文件中添加RabbitMQ的配置信息。
spring.rabbitmq.host=101.43.30.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/learn
创建一个简单的消息接收器MsgReceiver 类,用于接收MQ消息。
其中@StreamListener注解的作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
//@EnableBinding 表示绑定 Sink 消息通道
@EnableBinding(Sink.class)
public class MsgReceiver {
public final static Logger LOGGER = LoggerFactory.getLogger(MsgReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
LOGGER.info(" MsgReceiver Received:" + payload.toString());
}
}
@EnableBinding注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。Sink接口的源码:
除了Sink接口外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,Source接口的源码如下:
还有提供了结合Sink和Source的Processor接口。
下面启动项目,然后在RabbitMQ 后台管理页面会创建一个匿名的队列,并且应用通过RabbitMessageChannelBinder将MsgReceiver绑定为该队列的消费者。下面尝试在RabbitMQ 后台管理页面去发送一条消息。
点击Publish message按钮,查看后台输出可以看到消息能够正常接收到,并且被成功消费。
5. 自定义消息通道
我们可以通过 @Input注解和@Output注解来定义绑定消息通道的接口。如果使用了@Input注解和@Output注解没有指定具体的value值的话,将默认使用方法名作为消息通道的名称。首先创建一个名为MyChannel的接口,定义通道channel:
public interface MyChannel {
String HELLO_INPUT = "hello-input";
String HELLO_OUTPUT = "hello-output";
@Output(HELLO_OUTPUT)
MessageChannel output();
@Input(HELLO_INPUT)
SubscribableChannel input();
}
- 注意,两个消息通道的名字是不一样的
- 从 F 版开始,默认使用通道的名称作为实例命令,所以这里的通道名字不可以相同(早期版本可
以相同),这样的话,为了能够正常收发消息,需要我们在 application.properties 中做一些额外
配置。
#消息绑定
spring.cloud.stream.bindings.hello-input.destination=hello-topic
spring.cloud.stream.bindings.hello-output.destination=hello-topic
接下来,自定义一个消息接收器,用来接收自己的消息通道里的消息:
@EnableBinding(MyChannel.class)
public class MsgReceiver2 {
public final static Logger LOGGER = LoggerFactory.getLogger(MsgReceiver2.class);
@StreamListener(MyChannel.HELLO_INPUT)
public void receive(Object payload) {
LOGGER.info("MsgReceiver receive2:" + payload);
}
}
创建一个接口用于测试发送消息
@RestController
public class HelloController {
@Autowired
MyChannel myChannel;
@GetMapping("/hello")
public void hello(){
myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").build());
}
}
6. 消息分组-处理消息重复消费
在微服务架构下,通常集群部署微服务以实现服务的高可用和负载均衡。但是默认情况下使用Spring Cloud Stream,消费者程序默认分配一个匿名且独立的单成员消费者组,如果消费者是一个集群,因为消费者属于不同的组不存在竞争关系,一条消息会被多次消费,所有部署的应用都会进行消费。举例如图所示:
我们可以通过消息分组解决这个问题。每个消费者绑定都可以使用spring.cloud.stream.bindings.<bindingName>.group
属性指定组名。在Spring Boot 项目中添加如下配置:
#消息分组
spring.cloud.stream.bindings.hello-input.group=g1
spring.cloud.stream.bindings.hello-output.group=g1
#指定通道对应的的主题(对应RabbitMQ的交换机)
spring.cloud.stream.bindings.hello-input.destination=hello-exchange
spring.cloud.stream.bindings.hello-output.destination=hello-exchange
通过以上配置那么在同一个组中的成员只会有一个成员真正收到消息并进行处理。
7. 消息分区
7.1 概念
Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。通过消息分区可以实现相同特征的消息总是被同一个消费实例处理。即一个或多个生产者应用程序实例将消息数据发送到多个消费者应用程序实例,确保有共同特征标识的消息数据由同一消费者实例接收和处理。
消息分区的应用场景比如在一些监控服务上,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固定的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。
Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间价的上层实现消息分区处理,为不具备分区功能的消息中间件增加了分区功能拓展。
7.2 使用示例
添加配置示例:
#开启消息分区(消费者上配置)
spring.cloud.stream.bindings.hello-input.consumer.partitioned=true
#指定消费者实例个数(消费者上配置)
spring.cloud.stream.instance-count=2
# 当前实例的索引号(消费者上配置),从0开始,最大值为instance-count的值-1
spring.cloud.stream.instance-index=0
# 指定分区键的表达式规则,我们可以根据实际的输出消息规则配置SpEL来生成合适的分区键(生产者上配置)
spring.cloud.stream.bindings.hello-output.producer.partition-key-expression=1
# 指定消息分区的数量(生产者上配置)
spring.cloud.stream.bindings.hello-output.producer.partition-count=2
接下来使用Maven打包项目为jar包
在控制台启动两个实例,注意启动时,spring.cloud.stream.instance-index 要动态修改。
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8080 --
spring.cloud.stream.instance-index=0
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --
spring.cloud.stream.instance-index=1
调用接口/hello测试,可以看到多次发送同一个消息,消息只被一个消费者处理。
8. 延时消息
RabbitMQ实现发送延时消息需要安装插件rabbitmq_delayed_message_exchange,下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
8.1 安装插件
以Docker方式安装为例:
下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
将文件拷贝到Docker容器中
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 900822f303cd:/opt/rabbitmq/plugins
进入RabbitMQ容器
docker exec -it 900822f303cd /bin/sh
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件是否启动成功
rabbitmq-plugins list
重新启动RabbitMQ容器
8.2 具体实现
1.配置文件配置
在配置文件中配置开启通道的消息延迟功能
##开启消息延迟功能
spring.cloud.stream.rabbit.bindings.hello-input.consumer.delayed-exchange=true
spring.cloud.stream.rabbit.bindings.hello-output.producer.delayed-exchange=true
修改一下消息输入输出通道的destination定义:
spring.cloud.stream.bindings.hello-input.destination=delay_msg
spring.cloud.stream.bindings.hello-output.destination=delay_msg
2.创建接口测试
@RestController
public class HelloController {
public final static Logger LOGGER = LoggerFactory.getLogger(HelloController.class);
@Autowired
MyChannel myChannel;
@GetMapping("/hello")
public void hello(){
myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").build());
}
@GetMapping("/delay-hello")
public void delayHello(){
LOGGER.info("send msg:" + new Date());
myChannel.output().send(MessageBuilder.withPayload("hello spring cloud stream!").setHeader("x-delay", 5000).build());
}
}
测试可以发现该消息延迟5秒才消费。
参考:
1.《Spring Cloud微服务实战》
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44276.html