在项目中我们常常遇到这样的场景:后台在进行业务处理的过程中,当遇到一些特殊的场景,会及时的把异常信息及数据在前台页面给予提示;当遇到这样的场景,我们经常考虑使用
kafka
进行实现。
安装Kafka
一、下载Kafka
安装包
下载地址:https://kafka.apache.org/downloads.html

二、下载完成后,进行解压
解压目录:D:kafka

注:kafka
依赖Java
环境,首先需要安装Java 8
或以上版本的Java
,关于jdk
的安装这里不再详细介绍。
三、启动zookeeper
cmd
窗口进入kafka
安装目录,执行
.binwindowszookeeper-server-start.bat .configzookeeper.properties
启动成功后,发现zookeeper
默认端口为2181

如果需要更改端口的话,可以编辑配置文件zookeeper.properties
,找到clientPort
进行修改即可。


四、zookeeper
启动完成后,接下来启动kafka
cmd窗口进入kafka
安装目录,执行下面命令,等待kafka启动成功。
.binwindowskafka-server-start.bat .configserver.properties
注意:若修改了zookeeper默认的启动端口2181,我们需要找到config
文件夹下的server.properties
配置文件,并编辑修改zookeeper的连接端口并做同步修改:zookeeper.connect=localhost:2181

项目创建
创建父项目xxkfz-kafka-master
,并创建两个子模块分别为:消息生产端模块spring-kafka-producer
、消息消费端模块spring-kafka-consumer
整体的项目结构如下:

项目创建完成后分别在两个子模块pom.xml
添加所需依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.13</version>
</dependency>
消息生产端
一、新建消息传输实体Message.java
package com.xxkfz.simplememory.entity;
import lombok.*;
import java.io.Serializable;
@Data
@ToString
public class Message implements Serializable {
private static final long serialVersionUID = -118L;
/**
* 发送人
*/
private String sendUserName;
/**
* 发送时间
*/
private String sendTime;
/**
* 发送内容
*/
private String sendContent;
}
二、新建Kafka
简单工具类KafkaUtils.java
package com.xxkfz.simplememory.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author 公众号: SimpleMemory
* @version 1.0.0
* @ClassName KafkaUtils.java
* @Description TODO
* @createTime 2022年05月22日 19:53:00
*/
@Component
@Slf4j
public class KafkaUtils {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object message) {
String messageJson = "";
if (null != message) {
messageJson = JSON.toJSONString(message);
}
// 发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, messageJson);
// 监听回调
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.info("发送消息失败......");
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.error("成功发送信息:{}......", result);
}
});
}
}
三、新建MessageController.java
import com.xxkfz.simplememory.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 公众号: SimpleMemory
* @version 1.0.0
* @ClassName MessageController.java
* @Description TODO
* @createTime 2022年05月22日 19:53:00
*/
@RestController
public class MessageController {
@Autowired
private MessageService messageService;
@RequestMapping(path = "/send/{content}")
public void sendMessage(@PathVariable String content) {
messageService.sendMsg(content);
}
}
四、新建MessageServiceImpl.java
package com.xxkfz.simplememory.service.impl;
import cn.hutool.core.date.DateUtil;
import com.xxkfz.simplememory.entity.Message;
import com.xxkfz.simplememory.service.MessageService;
import com.xxkfz.simplememory.util.KafkaUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author 公众号: SimpleMemory
* @version 1.0.0
* @ClassName MessageServiceImpl.java
* @Description TODO
* @createTime 2022年05月22日 19:54:00
*/
@Service
public class MessageServiceImpl implements MessageService {
@Resource
private KafkaUtils kafkaUtils;
@Value("${spring.kafka.topic}")
private String topicName;
@Override
public void sendMsg(Object msg) {
Message message = new Message();
message.setSendUserName("xk_admin");
message.setSendTime(DateUtil.now());
message.setSendContent((String) msg);
kafkaUtils.send(topicName, message);
}
}
五、新建消息生产者启动类KafkaProducerMainApplication.java
package com.xxkfz.simplememory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaProducerMainApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerMainApplication.class, args);
}
}
六、新建kafka
生产者拦截器CustomProducerInterceptor.java
package com.xxkfz.simplememory.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.*;
/**
* 自定义生产者拦截器
*/
@Slf4j
public class CustomProducerInterceptor implements ProducerInterceptor {
/**
* 在发送前做一些处理
*
* @param producerRecord
* @return
*/
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
log.info("正在发送消息: {}", producerRecord.value().toString());
return producerRecord;
}
/**
* 在消息应答前,或者消息发送失败时被调用
*
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
/**
* 关闭interceptor,主要用于执行一些资源清理工作
*/
@Override
public void close() {
}
/**
* 获取配置信息和初始化数据时调用
*
* @param map
*/
@Override
public void configure(Map<String, ?> map) {
}
}
七、新建kafka
自定义分区器CustomPartitioner.java
package com.xxkfz.simplememory.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
/**
* xxkfz
* 20211023
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer integer = cluster.partitionCountForTopic(topic);
if (null == integer) {
return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % integer;
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % integer;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
八、新建生产端配置文件application.yml
server:
port: 8083
spring:
kafka:
# 配置消息的主题
topic: xxkfz
producer:
client-id: 1
# kafka server的地址,如果有多个,使用逗号分割
bootstrap-servers: 127.0.0.1:9092
# 生产者消息key和消息value的序列化处理类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者发送失败时,重试次数
retries: 3
properties:
# 自定义消费者拦截器
interceptor.classes: com.xxkfz.simplememory.interceptor.CustomProducerInterceptor
#自定义分区器
partitioner.classes: com.xxkfz.simplememory.partitioner.CustomPartitioner
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
# 批量发送的消息数量
batch-size: 1000
# 32MB的批处理缓冲区
buffer-memory: 33554432
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
消息消费端
一、新建消费端消息监听类MessageListener.java
package com.xxkfz.simplememory.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* 消息接收监听器
*/
@Component
@Slf4j
public class MessageListener {
@KafkaListener(id = "messageGroup", topics = "${spring.kafka.topic}")
public void listenerMessage(ConsumerRecord<?, ?> record) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.error("消费者接收到消息:{} ", msg);
}
}
}
二、新建消费端消息拦截器CustomConsumerInterceptor.java
package com.xxkfz.simplememory.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.*;
/**
* 消费者拦截器
*/
@Slf4j
public class CustomConsumerInterceptor implements ConsumerInterceptor {
/**
* 拉取消息时,被调用
*
* @param consumerRecords
* @return
*/
@Override
public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {
log.error("消息{}已被拉取", consumerRecords.toString());
return consumerRecords;
}
@Override
public void close() {
}
/**
* 提交请求响应成功时被调用
*
* @param map
*/
@Override
public void onCommit(Map map) {
}
@Override
public void configure(Map<String, ?> map) {
}
}
三、新建消费端启动类KafkaConsumerMainApplication.java
package com.xxkfz.simplememory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerMainApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerMainApplication.class, args);
}
}
四、新建消费端配置文件application.yml
server:
port: 8082
spring:
kafka:
topic: xxkfz
consumer:
client-id: 1
bootstrap-servers: 127.0.0.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
interceptor.classes: com.xxkfz.simplememory.interceptor.CustomConsumerInterceptor
# 默认消费者组
group-id: crm-user-service
# 最早未被消费的offset
auto-offset-reset: earliest
# 批量一次最大拉取数据量
max-poll-records: 4000
# 是否自动提交
enable-auto-commit: false
# 自动提交时间间隔,单位ms
auto-commit-interval: 1000
batch:
# 批消费并发量,小于或等于Topic的分区数
concurrency: 3
验证并测试
-
分别启动消息生产端、消费端启动类。 -
启动成功后,在浏览器地址栏输入: http://127.0.0.1:8083/send/xxkfz
,分别查看控制台日志情况


到此,生产者成功发送消息,消费者成功进行消息的接收消费。
源码获取
关于本项目完整代码获取方式:关注公众号并回复文章标题
即可获取;同时为了便于大家快速学习,也包含kafka安装包
哦~


原文始发于微信公众号(SimpleMemory):SpringBoot 整合Kafka实现消息的发送和接收
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137789.html