一、Stream是什么?
SpringCloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs 或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream目前仅支持RabbitMQ、Kafka。
Stream中的消息通信方式遵循了发布-订阅模式,在RabbitMQ就是Exchange,在kafka中就是Topic。
官网:https://spring.io/projects/spring-cloud-stream#overview
中文指导手册:https://m.wang1314.com/doc/webapp/topic/20971999.html
二、为什么使用Stream
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
常规的MQ结构。
假如系统用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange, kafka有Topic和Partitions分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了—种解耦合的方式。
Stream通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Stream包含input(对应于消费者)和output(对应于生产者)。
Spring Cloud Stream流程套路
Binder:很方便的连接中间件,屏蔽MQ之间的差异。
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置。
Source:用于发布消息,通过在serverImpl添加注解@EnableBinding(Source.class)。
Sink:用于接受消息,通过在serverImpl添加注解@EnableBinding(Sink.class)。
Spring Cloud Stream编码API和常用注解
三、Stream的使用
注意:以下案例使用的RabbitMQ,所以在使用前要先安装RabbitMQ环境。
3.1 Stream消息提供者
在pom.xml添加依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yml编写配置。
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
主启动类
/**
* 消息生产者
*/
@SpringBootApplication
public class StreamRabbitmqProvider8801Application {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqProvider8801Application.class, args);
}
}
IMessageProviderService类
public interface IMessageProviderService {
/**
* 定义消息的推送管道
*
* @return
*/
String send();
}
MessageProviderServiceImpl 类
/**
* @EnableBinding(Source.class) 定义消息的推送管道 将Channel和Exchanges绑定在一起
*/
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
/**
* 消息发送管道/信道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
Message<String> stringMessage = MessageBuilder.withPayload(serial).build();
output.send(stringMessage);
System.out.println("*****serial: " + serial);
return serial;
}
}
SendMessageController 类
@RestController
public class SendMessageController {
@Resource
private IMessageProviderService messageProviderService;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProviderService.send();
}
}
启动RabbitMQ,然后启动Stream消息提供者8801,访问 http://localhost:8801/sendMessage 出现值,则成功。
3.2 Stream消息消费者。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yml编写配置。
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: atguiguA
主启动类。
/**
* 消息消费者8802
*/
@SpringBootApplication
public class StreamRabbitmqConsumer8802Application {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitmqConsumer8802Application.class, args);
}
}
ReceiveMessageListener 类
/**
* 8802 接收消息
*
*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
//消息监听
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消息监听 port:" + serverPort + "\t接受:" + message.getPayload());
}
}
先启动RabbitMQ,再启动消息提供者(8801),然后启动消息消费者(8802),访问http://localhost:8801/sendMessage,查看消息消费者控制台是否打印有内容。有则成功。
四、Stream分组消费与持久化。
在使用stream时,会存在消息重复消费和持久化的问题。
先说消息重复消费问题,为什么会存在消息重复消费?当我们在微服务架构中,存在多个消息消费者时,当一个消费者消费了一个消息,其余的消费者也会去消费,这就是我们说的消息重复消费,怎么解决,在application.yml添加group: atguiguA(atguiguA可以为任何值) 属性,这个属性是分组的意思,每个消息消费者都可以单独设置,但要注意,只有在同一个组中才不会存在重复消费,并且这个属性自带持久化操作。
如果当前案例要测试重复消费,就将消息消费者(8802)复制一份为8803,测试重复消费就将group属性注释,然后分别启动RabbitMQ,8801(消息提供者),8802和8803,访问http://localhost:8801/sendMessage 看8802和8803的控制台会打印对应的流水号。
测试持久化,将8802配置中的group属性注释,8803添加group属性,启动RabbitMQ,8801(消息提供者),这里注意,先访问http://localhost:8801/sendMessage 多刷新几次后,启动8802控制台是看不见流水号的,然后启动8803,会在控制台打印对应流水号,说明group具有持久化。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77187.html