SpringBoot集成Kafka(生产)

导读:本篇文章讲解 SpringBoot集成Kafka(生产),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

  • 版本 : kafka_2.12-2.4.0

  • Kafka是由ScalaJava编写。

  • Scala 2.12 指的是编译Kafka源代码的Scala编译器版本号。

  • 2.4.0是指Kafka的版本号。

  • JDK 1.8

  • spring-boot 2.6.3


一、准备工作 :

1、确认配置

在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP)。

vim config/server.properties
advertised.listeners=PLAINTEXT://123.57.16.92:9092

2、maven依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.1</version>
        </dependency>

二、项目结构

实际开发需求,一般生产者都是在调用某些接口的服务处理完逻辑之后然后往kafka里面扔数据,然后有一个消费者不停的监控这个Topic,处理数据,注意@Component注解,不然扫描不到@KafkaListener
项目结构截图

三、具体代码

1、SpringBoot配置文件

application.yml

#消费者
#spring.kafka.bootstrap-servers=47.242.104.192:9092
spring.kafka.bootstrap-servers=123.57.16.92:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
#spring.crawl.kafka.consumer.group-id=test1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.concurrency=1

#生产者
#kafka.producer.servers=47.242.104.192:9092
kafka.producer.servers=123.57.16.92:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=4096000
kafka.producer.max.size=3000000

newsTopic=news
groupId=1

2、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka</name>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.6.3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

        <!--工具类 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.0.M3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.1</version>
        </dependency>

    </dependencies>



</project>

3、KafkaProducerConfig 类

package com.youyue.config.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    @Value("${kafka.producer.max.size}")
    private int maxRequestSizeConfig;


    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSizeConfig);
        //props.put(ProducerConfig.ACKS_CONFIG, 1);

        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

4、KafkaConsumerConfig 类

setBatchListener

package com.youyue.config.kafka;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.consumer.enable-auto-commit:true}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-commit-interval:100}")
    private String autoCommitInterval;
    //    @Value("${spring.crawl.kafka.consumer.group-id}")
//    private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset:earliest}")
    private String autoOffsetReset;
    @Value("${spring.kafka.listener.concurrency:1}")
    private int concurrency;

    @Bean("kafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaCrawlListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        return propsMap;
    }

}

5、KafkaUtils 类

package com.youyue.utils;


import cn.hutool.extra.spring.SpringUtil;
import com.youyue.vo.KafkaReEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;


@Slf4j
public class KafkaUtils {


    private static KafkaTemplate kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);

    public static KafkaReEntity send(String topic, String str) {
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, str);
        try {
            SendResult<Integer, String> integerStringSendResult = future.get();


            int partition = integerStringSendResult.getRecordMetadata().partition();
            long offset = integerStringSendResult.getRecordMetadata().offset();
            KafkaReEntity kafkaReEntity = new KafkaReEntity(partition, offset);
            return kafkaReEntity;


        } catch (Exception e) {
            e.printStackTrace();
        }
        return new KafkaReEntity();
    }

}

6、KafkaReEntity 类

package com.youyue.vo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class KafkaReEntity {


    private int partition = 0;
    private long offset = 0l;

}

7、KafkaLintener类

package com.youyue.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

@Slf4j
@Component
public class KafkaLintener {

    @KafkaListener(id = "test", topics = "test", groupId = "${groupId}")
    public void text(List<ConsumerRecord<String, String>> records) {

        for (ConsumerRecord<String, String> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            System.out.println("已消费" + kafkaMessage);
        }
    }
}

8、ApiApplication启动类

package com.youyue;

import com.youyue.utils.KafkaUtils;
import com.youyue.vo.KafkaReEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;

@Slf4j
@SpringBootApplication
@Import(cn.hutool.extra.spring.SpringUtil.class)
public class ApiApplication implements CommandLineRunner {

    @Resource
    private KafkaListenerEndpointRegistry registry;

    public static void main(String[] args) {
        SpringApplication.run(ApiApplication.class, args);
    }

    @Override
    public void run(String... args) {


        KafkaReEntity send = KafkaUtils.send("test","hello java17");
        log.info("数据发送成功 partition {},offsetId {}", send.getPartition(),send.getOffset());

        if (args.length == 0) {
            log.info("没有启动类型");
            return;
        }

        List<String> list = Arrays.asList("news","test");
        String topic = args[0];
        if(!list.contains(topic)){
            log.info("启动参数不正确");
            return;
        }
        try {
            if (!registry.getListenerContainer(topic).isRunning()) {
                registry.getListenerContainer(topic).start();
            }
            registry.getListenerContainer(topic).resume();
        }catch (Exception e){
            log.error("启动监听报错:[{}]",e);
        }
    }
}

四、测试

1、运行项目 ApiApplication 启动类

需要在启动类里添加启动参数
在这里插入图片描述

2、查看结果

可以看到发送成功日志,和消费成功日志
数据发送成功到 kafka
监听到对应的topic,已做消费。


总结

参考

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88175.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!