使用 Spring Boot 作为一个优秀的 Java 框架提供了一种快速构建应用程序的方法。我还使用 kafka 作为一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成。所以这个片段已经准备好进行批量生产了。我就不多说了,大家看看吧
首先,我们创建一个 Spring Boot 项目。在这篇文章中,我创建了一个名为 wuzzup 的聊天应用程序,听起来很熟悉吧??您可以使用 spring boot 项目生成器,例如 start.spring.io 或您使用的其他插件。只是为了确保添加这些依赖项
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.1</version>
</dependency>
</dependencies>
除了 Spring Boot 依赖项之外,我们还使用 spring-kafka,它将核心 Spring 概念应用于基于 Kafka 的消息传递解决方案的开发。 Spring-kafka为我们提供了发送消息的高级抽象,因此可以帮助我们集成spring和kafka。 Lombok 依赖项以避免样板创建 setter 和 getter(如果您更喜欢使用原始方式,则此选项是可选的)。另一个重要的依赖项是 jackson-databind,该库用于序列化和反序列化我们的消息,因为我们使用对象作为正文消息(是的!,spring-kafka 的库中不包含此强制项🫢)。
接下来,我们注册3个配置,如下图
在深入之前,我将在这三个项目中介绍一下kafka概念。一旦我们发送了消息,它将被存储在主题中。 消息本身称为事件。一个主题可以有多个分区和多个副本。分区是主题中最小的存储桶 存储。您可以将主题想象为一个具有多个级别的鞋架,而分区本身就是一个级别。所以一场活动就是你想买一双漂亮的鞋子 🙂
Kafka 可以分布在位于不同 Kafka Broker 上的多个分区。 Broker指的是你kafka所在的服务器。这种数据的分布式放置对于可扩展性非常重要,因为它允许客户端应用程序同时从多个代理读取数据或向多个代理写入数据。然后就像名字一样,副本通过将数据复制到不同的经纪人(即使在不同的大陆)来使您的数据具有容错性和高可用性
一个事件有一个key,如果您没有声明该key,那么kafka会为您随机生成它。新事件将占用已具有相同事件键的分区。如果在所有分区中找不到相同的键,则该事件将通过循环法附加到分区中。我们甚至可以提前指定生产者将发布哪个分区,但在本节中我将仅使用默认分区
最后但并非最不重要的一点是,我们有生产者和消费者。这个名字很能描述自己。 生产者是生产事件的人,消费者是生产事件的人消耗事件:D。生产者只能一次性发布特定主题的事件,但消费者可以同时阅读某些主题。当然,生产者可以在发送一个事件后发送到不同的主题。当您发布事件时,您应该定义要发送的主题。 见图1
如果同一消费者组中的一些消费者读取同一主题,则每个分区将仅由一个消费者读取。该机制用于分配消费者读取并确保消费者之间不存在重复读取。 见图2
如果消费者的数量超过分区数量,则其余消费者将处于空闲状态。根据我们需要读取的分区数量来确定同一组中有多少个消费者非常重要。它可以少一些,也可以多一些,但随后就会变得低效。 见图3
好的,我们回到代码。首先,我们创建一个主题配置。
我们通过创建主题 bean 来注册主题。使用TopicBuilder,我们可以定义主题名称以及多少个分区和副本。还有很多其他的配置,但我这里不使用它。注册 kafka admin bean 是可选的,因为 Spring boot 的 KafkaAutoConfiguration 将默认通过在 application.properties 中定义引导 URL spring.kafka.bootstrap-servers 来创建它。除非您需要自定义其他配置,否则您应该注册它。
这里我把主题分成两个;私人和团体。因为我们在读取数据时这两个主题之间会有不同的行为。当然,根据具体情况,策略也会有所不同。
PS:在本节中,我只使用私有主题。
在生产者配置类中,我们有 ProducerFactory bean和kafkaTemplate bean。
ProducerFactory是创建 Producer 实例的策略。我们在这里定义生产者的配置。您可以在上面的图 5 中看到,我们定义了引导 URL、键序列化器和值序列化器。对于键序列化器,我只使用简单的StringSerializer。然后对于值序列化器,我使用 JsonSerializer 因为我们将存储 POJO 对象作为值。
KafkaTemplate 是一个模板用于执行高级操作,例如发布/发送事件。我们稍后会看到用法。该 bean 将返回我们上面注册的 ProducerFactory 的 KafkaTemplate 。
ConsumerFactory 与 ProducerFactory 类似,但用于创建用于创建消费者实例的模板。像往常一样,我们定义要连接的引导 URL。
之后,添加一个group-id,该消费者将被分组到其中。该组 ID 是从其下面的 bean 参数化的。
这里我们使用 keyDe序列化器和值De序列化器.在 valueDeserializer 中,我们定义要从 JSON 值解析的模型。在本例中,我创建了一个名为 Message 的模型,如图 7 所示。
最后一件事,因为我们想要将 JSON 解组到模型中,所以需要 jackson-databind 中的 JSONDeserializer 并且必须信任配置中的包。您可以通过填写“*”来注册您的模型包或所有包。
然后在最底部,我们创建mobileContainerFactory 它将注册kafkaListernerContainerFactory 见图6下次如果我们想要添加具有其他行为(可能是 Web、移动 Web 或其他行为)的另一个通道,我们可以添加另一个容器工厂 bean。稍后我将向您展示如何实现这个容器工厂。 “移动”。 带有 groupId
创建用于发送和侦听消息的服务。我们有一个用于消费消息的监听器。使用带有两个参数的 @KafkaListener 注释。 topic 参数是我们想要听的主题数组。如果您只需要一个主题,则可以只定义一个主题。然后写入我们之前注册的containerFactorybean名称。请记住,在 containerFactory 中,我们定义了消费者的组以及如何反序列化事件。因此,在这个监听器中,我们能够读取“消息”对象作为参数。 该实现假设将生产者和消费者分离到不同的微服务中。
在 sendMessage 方法,我们有四个参数。该键是一个唯一值,指示该消息将发送到同一分区。如果我们有很多分区,这个键就很重要。请参阅下面的插图
应用程序不确保每个消费者都会按顺序向客户端发送消息。例如,consumer1 的网络速度较慢,因此消息将无法按正确的顺序发送。你可以看下图
要从客户端接收消息,请创建一个控制器。注入服务 bean,然后调用 sendMessage 方法。根据您的策略,我在这里将传入消息分为每个客户端(例如移动设备、网络等)。
参见图 11
好的,现在我们测试一下。参见下图 12。在现实世界中,密钥应该是唯一的值。我们可以将密钥分配为对话的 ID,因此每个人之间或组内的每个对话都应该有一个唯一的 ID。
重新看一下图8,稍后我们可以改进这个简单的消费者。阅读消息后,然后通过 firebase 发送通知。现在参见图 13,这是一个简单的消息显示。稍后在客户端中,我们可以装饰这条消息,然后显示出漂亮的外观。例如,在右侧的气泡中显示发件人的消息,在左侧的气泡中显示对话者的消息。
原文始发于微信公众号(编译):使用springboot + kafka构建一个简单的聊天应用
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/199027.html