文章目录
前言
-
版本 : kafka_2.12-2.4.0
-
Kafka是由
Scala
和Java
编写。 -
Scala 2.12
指的是编译Kafka源代码的Scala
编译器版本号。 -
2.4.0
是指Kafka的版本号。 -
JDK
1.8
-
spring-boot
2.6.3
一、准备工作 :
1、确认配置
在项目中连接
kafka
,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP)。
vim config/server.properties
advertised.listeners=PLAINTEXT://123.57.16.92:9092
2、maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
二、项目结构
实际开发需求,一般生产者都是在调用某些接口的服务处理完逻辑之后然后往kafka里面扔数据,然后有一个消费者不停的监控这个Topic,处理数据,注意@Component注解,不然扫描不到@KafkaListener
三、具体代码
1、SpringBoot配置文件
application.yml
#消费者
#spring.kafka.bootstrap-servers=47.242.104.192:9092
spring.kafka.bootstrap-servers=123.57.16.92:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
#spring.crawl.kafka.consumer.group-id=test1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.concurrency=1
#生产者
#kafka.producer.servers=47.242.104.192:9092
kafka.producer.servers=123.57.16.92:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=4096000
kafka.producer.max.size=3000000
newsTopic=news
groupId=1
2、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<name>kafka</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<!--工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.0.M3</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
</project>
3、KafkaProducerConfig 类
package com.youyue.config.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
@Value("${kafka.producer.max.size}")
private int maxRequestSizeConfig;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSizeConfig);
//props.put(ProducerConfig.ACKS_CONFIG, 1);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
4、KafkaConsumerConfig 类
setBatchListener
package com.youyue.config.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit:true}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval:100}")
private String autoCommitInterval;
// @Value("${spring.crawl.kafka.consumer.group-id}")
// private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset:earliest}")
private String autoOffsetReset;
@Value("${spring.kafka.listener.concurrency:1}")
private int concurrency;
@Bean("kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaCrawlListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
5、KafkaUtils 类
package com.youyue.utils;
import cn.hutool.extra.spring.SpringUtil;
import com.youyue.vo.KafkaReEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
@Slf4j
public class KafkaUtils {
private static KafkaTemplate kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);
public static KafkaReEntity send(String topic, String str) {
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, str);
try {
SendResult<Integer, String> integerStringSendResult = future.get();
int partition = integerStringSendResult.getRecordMetadata().partition();
long offset = integerStringSendResult.getRecordMetadata().offset();
KafkaReEntity kafkaReEntity = new KafkaReEntity(partition, offset);
return kafkaReEntity;
} catch (Exception e) {
e.printStackTrace();
}
return new KafkaReEntity();
}
}
6、KafkaReEntity 类
package com.youyue.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class KafkaReEntity {
private int partition = 0;
private long offset = 0l;
}
7、KafkaLintener类
package com.youyue.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
public class KafkaLintener {
@KafkaListener(id = "test", topics = "test", groupId = "${groupId}")
public void text(List<ConsumerRecord<String, String>> records) {
for (ConsumerRecord<String, String> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
System.out.println("已消费" + kafkaMessage);
}
}
}
8、ApiApplication启动类
package com.youyue;
import com.youyue.utils.KafkaUtils;
import com.youyue.vo.KafkaReEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
@Slf4j
@SpringBootApplication
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class ApiApplication implements CommandLineRunner {
@Resource
private KafkaListenerEndpointRegistry registry;
public static void main(String[] args) {
SpringApplication.run(ApiApplication.class, args);
}
@Override
public void run(String... args) {
KafkaReEntity send = KafkaUtils.send("test","hello java17");
log.info("数据发送成功 partition {},offsetId {}", send.getPartition(),send.getOffset());
if (args.length == 0) {
log.info("没有启动类型");
return;
}
List<String> list = Arrays.asList("news","test");
String topic = args[0];
if(!list.contains(topic)){
log.info("启动参数不正确");
return;
}
try {
if (!registry.getListenerContainer(topic).isRunning()) {
registry.getListenerContainer(topic).start();
}
registry.getListenerContainer(topic).resume();
}catch (Exception e){
log.error("启动监听报错:[{}]",e);
}
}
}
四、测试
1、运行项目 ApiApplication 启动类
2、查看结果
总结
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88175.html