JAVA-kafka-生产者(二)

导读:本篇文章讲解 JAVA-kafka-生产者(二),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1、概述(摘自Kafka权威指南)

在这里插入图片描述

2、kafka生产者的创建(摘自Kafka权威指南)

在这里插入图片描述

3、入门程序

3.1、pom文件

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>

3.2、KafkaConsts

public class KafkaConsts {

   /*==================入门程序=========================*/
    public final static String HELLO_TOPIC = "hello.topic";
    public final static String HELLO_KEY = "helloKey";
    /*=====================线程========================*/
    public final static String THREAD_POLL_TOPIC = "thread.pool.topic";
    public final static String THREAD_POLL_KEY = "threadPoolKey";
    /*=====================vo========================*/
    public final static String VO_TOPIC = "vo.topic";
    public final static String VO_KEY = "voKey";
}

3.3、HelloKafkaProducer

package com.kafka.hellokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class HelloKafkaProducer {
    public static void main(String[] args) {
        //1、kafka启动配置,配置ip:端口,key value序列化配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.92.39:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //2、生产者配置
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        try {
            //3、构建消息,注意,钥匙,值
            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, "Hello Topic");
            //4、发送
            producer.send(record);
            System.out.println("message is sent");
        } finally {
            //5、关闭
            producer.close();
        }
    }
}

3.4、HelloKafkaConsumer

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class HelloKafkaConsumer {
    public static void main(String[] args) {
        //1、配置消费者ip,返序列化,消费者群组
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.92.39:9092");
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "test");
        //2、定义消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        try {
            //3、订阅主题
            consumer.subscribe(Collections.singletonList(KafkaConsts.HELLO_TOPIC));
            //4、拉取消息
            while (true) {
                //4.1、500ms拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                //4.2、读取消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("主题:" + record.topic());
                    System.out.println("分区:" + record.partition());
                    System.out.println("偏移量:" + record.offset());
                    System.out.println("key:" + record.key());
                    System.out.println("value:" + record.value());
                    System.out.println("####################################");
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3.5、运行消费者运行生产者
在这里插入图片描述

4、生产者发送消息基本流程

消息格式:每个消息是一个ProducerRecord对象,必须指定消息所属的Topic和消息值Value,此外还可以指定消息所属的Partition以及消息的Key。
1:序列化ProducerRecord
2:如果ProducerRecord中指定了Partition,则Partitioner不做任何事情;否则,Partitioner根据消息的key得到一个Partition。这是生产者就知道向哪个Topic下的哪个Partition发送这条消息。
3:消息被添加到相应的batch中,独立的线程将这些batch发送到Broker上
4:broker收到消息会返回一个响应。如果消息成功写入Kafka,则返回RecordMetaData对象,该对象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失败,返回一个错误
在这里插入图片描述
*

5、设置公用配置文件

这样的代码写起来不整洁,也容易出错。其实,Kafka帮我们封装好了常用的配置信息,我们提取出来就行了。
在这里插入图片描述
KafkaCommonConfig:

package com.kafka.common;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class KafkaCommonConfig {
    /*生产者消费者共用配置常量*/
    private static final String KAFKA_HOST = "192.168.92.39:9092";
    /**
     * 功能描述  获取kafka生产者
     *
     * @param keySerializeClazz   key的序列化类
     * @param valueSerializeClazz value的序列化类
     * @author Zrs
     * @date 2019/8/4
     */
    public static Properties getProducerProperties(Class keySerializeClazz, Class valueSerializeClazz) {
        Properties properties = new Properties();
        //配置IP
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
        // 配置key的序列化和value的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializeClazz);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializeClazz);
        return properties;
    }
    /**
     * 功能描述  得到消费者
     *
     * @param groupid               组ID
     * @param keyDeserializeClazz   key的反序列化 class
     * @param valueDeserializeClazz value的反序列化 class
     * @author Zrs
     * @date 2019/8/4
     */
    public static Properties getCustomerProperties(String groupid, Class keyDeserializeClazz, Class valueDeserializeClazz) {
        Properties properties = new Properties();
        //配置IP
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
        //配置组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        //配置反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializeClazz);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializeClazz);
        return properties;
    }
}

6、发送者有3种发送方式

在这里插入图片描述
6.1、HelloKafkaProducer
忽略send方法的返回值,不做任何处理,如果不关心发送结果,那么可以使用这种发送方式。比如:记录消息日志,或记录不太重要的应用程序日志。

6.2、FutureProducer 同步发送
在这里插入图片描述
获得send方法返回的Future对象,在合适的时候调用Future的get方法。

import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.Future;
public class FutureProducer {
    public static void main(String[] args) {
        //1、获取生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
        try {
            //2、建立消息
            String msg = "hello kafka future";
            ProducerRecord<String, String> record =
                    new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, msg);
            //3、发送并获取future
            Future<RecordMetadata> sendFuture = producer.send(record);
            //4、拿到发送结果
            RecordMetadata recordMetadata = sendFuture.get();
            if (recordMetadata != null) {
                System.out.println("发送的消息为:" + msg + ",偏移量为:" + recordMetadata.offset()
                        + ",分区为:" + recordMetadata.partition());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

在这里插入图片描述
6.3、AsyncProducer异步发送:
实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给send方法。
在这里插入图片描述

import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.Future;
public class AsyncProducer {
    public static void main(String[] args) {
        //1、获取生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
        try {
            //2、建立消息
            String msg = "hello kafka future";
            ProducerRecord<String, String> record =
                    new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, msg);
            //3、发送并获取future
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("记录错误");
                    }
                    if (metadata != null) {
                        System.out.println("发送的消息为:" + msg + ",偏移量为:" + metadata.offset()
                                + ",分区为:" + metadata.partition());
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

在这里插入图片描述

7、更多的消息配置

生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考org.apache.kafka.clients.producer包下的ProducerConfig类。
acks:
指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
acks=1:只要集群的首领节点(第一个加入集群的broker)收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。

buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)

max.block.ms
指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。缺省60000ms

retries
发送失败时,指定生产者可以重发消息的次数。默认情况下,生产者在每次重试之间等待100ms,可以通过参数retry.backoff.ms参数来改变这个时间间隔。缺省0

receive.buffer.bytes和send.buffer.bytes
指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。缺省102400

batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省16384(16k)

linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比
batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。

compression.type
producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy占用cpu少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,用gzip,会占用较多的cpu,但提供更高的压缩比。

client.id
当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。

max.in.flight.requests.per.connection
指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是5 (修改)。
如果需要保证消息在一个分区上的严格顺序,这个值应该设为1。不过这样会严重影响生产者的吞吐量。

request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常

metadata.fetch.timeout.ms
是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据fetch成功完成所需要的时间,否则会跑出异常给客户端

timeout.ms
此配置选项控制broker等待副本确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟。这个参数和acks的配置相匹配。

max.request.size
控制生产者发送请求最大大小。假设这个值为1M,如果一个请求里只有一个消息,那这个消息不能大于1M,如果一次请求是一个批次,该批次包含了1000条消息,那么每个消息不能大于1KB。注意:broker具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产者的这个设置,会导致消息被拒绝。

8、顺序的保证

在这里插入图片描述

9、发送自定义pojo(Kafka权威指南 发送者 序列化)

在这里插入图片描述
kafka为我们提供了这些序列化实现:
在这里插入图片描述

发送自定义VO:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private Integer id;
    private String name;
    private String address;
}

KafkaVoProducer

public class KafkaVoProducer {
    public static void main(String[] args) {
        //1、获取生产者
        KafkaProducer<String, String> producer =
                new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
        try {
            //2、建立消息
            User user = new User(1, "张三", "北京市软件园中关村");
            ProducerRecord<String, String> record =
                    new ProducerRecord<>(KafkaConsts.VO_TOPIC, KafkaConsts.VO_KEY, JSON.toJSONString(user));
            //3、发送并获取future
            Future<RecordMetadata> sendFuture = producer.send(record);
            //4、拿到发送结果
            RecordMetadata recordMetadata = sendFuture.get();
            if (recordMetadata != null) {
                System.out.println("发送的消息为:" + JSON.toJSONString(user) + ",偏移量为:" + recordMetadata.offset()
                        + ",分区为:" + recordMetadata.partition());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

KafkaVoCustomer

public class KafkaVoCustomer {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaCommonConfig.getCustomerProperties
                ("threadPool", StringDeserializer.class, StringDeserializer.class));
        try {
            consumer.subscribe(Collections.singletonList(KafkaConsts.VO_TOPIC));
            //4、拉取消息
            while (true) {
                //4.1、500ms拉取一次
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                //4.2、读取消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.print("主题:" + record.topic());
                    System.out.print(",分区:" + record.partition());
                    System.out.print(",偏移量:" + record.offset());
                    System.out.print(",key:" + record.key());
                    User user = JSON.parseObject(record.value(), User.class);
                    System.out.print(",value:" + user.toString());
                    System.out.println();
                }
            }
        } finally {
            consumer.close();
        }
    }
}

在这里插入图片描述

10、多线程下使用生产者
kafka生产者在多线程情况下是线程安全的
ThreadPoolProducer:

 import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolProducer {
    //发送消息的个数
    private static final int MSG_SIZE = 1000;
    //负责发送消息的线程池,线程为当前电脑的核数
    private static ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors());
    //用countDownLatch当做发令枪,让所用线程同时运行
    private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
  
    //自定义类用于发送消息
    private static class ProducerWorker implements Runnable {
        //消息
        private ProducerRecord<String, String> record;
        //消息发送者
        private KafkaProducer<String, String> producer;
        public ProducerWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {
            this.record = record;
            this.producer = producer;
        }
        @Override
        public void run() {
            try {
                producer.send(record, (metadata, exception) -> {
                    if (null != exception) {
                        exception.printStackTrace();
                    }
                    if (null != metadata) {
                        if (exception != null) {
                            System.out.println("记录错误");
                        }
                        if (metadata != null) {
                            System.out.println("偏移量为:" + metadata.offset() + ",分区为:" + metadata.partition());
                        }
                    }
                });
                System.out.println(++id + ":数据[" + record + "]已发送。");
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        //1、定义消息生产者
        KafkaProducer<String, String> producer =
                new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
        try {
            //2、发送
            for (int i = 1; i <= MSG_SIZE; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        KafkaConsts.THREAD_POLL_TOPIC, KafkaConsts.THREAD_POLL_KEY, "Hello ThreadPool");
                executorService.submit(new ProducerWorker(record, producer));
            }
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            producer.close();
            executorService.shutdown();
        }
    }
}

修改上文消费者监听当前topic:

在这里插入图片描述
在这里插入图片描述

ThreadPoolCustomer:
KafkaConsumer,线程不安全,几个Runnable就建几个KafkaConsumer,多测了几次。

public class ThreadPoolCustomer {
    private static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static class ConsumerWorker implements Runnable {
        KafkaConsumer<String, String> consumer;
        public ConsumerWorker( Properties properties) {
            this.consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singletonList(KafkaConsts.THREAD_POLL_TOPIC));
        }
        public void run() {
            try {
                //4、拉取消息
                while (true) {
                    //4.1、500ms拉取一次
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                    //4.2、读取消息
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.print("主题:" + record.topic());
                        System.out.print(",分区:" + record.partition());
                        System.out.print(",偏移量:" + record.offset());
                        System.out.print(",key:" + record.key());
                        System.out.print(",value:" + record.value());
                        System.out.println();
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }
    public static void main(String[] args) {
        Properties kafkaConfig = KafkaCommonConfig.getCustomerProperties
                ("threadPool", StringDeserializer.class, StringDeserializer.class);
        for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
            executorService.submit(new ConsumerWorker(kafkaConfig));
        }
    }
}

在这里插入图片描述

11、分区
在这里插入图片描述
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15183.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!