1、概述(摘自Kafka权威指南)
2、kafka生产者的创建(摘自Kafka权威指南)
3、入门程序
3.1、pom文件
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
3.2、KafkaConsts
public class KafkaConsts {
/*==================入门程序=========================*/
public final static String HELLO_TOPIC = "hello.topic";
public final static String HELLO_KEY = "helloKey";
/*=====================线程========================*/
public final static String THREAD_POLL_TOPIC = "thread.pool.topic";
public final static String THREAD_POLL_KEY = "threadPoolKey";
/*=====================vo========================*/
public final static String VO_TOPIC = "vo.topic";
public final static String VO_KEY = "voKey";
}
3.3、HelloKafkaProducer
package com.kafka.hellokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class HelloKafkaProducer {
public static void main(String[] args) {
//1、kafka启动配置,配置ip:端口,key value序列化配置
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.92.39:9092");
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//2、生产者配置
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
//3、构建消息,注意,钥匙,值
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, "Hello Topic");
//4、发送
producer.send(record);
System.out.println("message is sent");
} finally {
//5、关闭
producer.close();
}
}
}
3.4、HelloKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class HelloKafkaConsumer {
public static void main(String[] args) {
//1、配置消费者ip,返序列化,消费者群组
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.92.39:9092");
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "test");
//2、定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
try {
//3、订阅主题
consumer.subscribe(Collections.singletonList(KafkaConsts.HELLO_TOPIC));
//4、拉取消息
while (true) {
//4.1、500ms拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
//4.2、读取消息
for (ConsumerRecord<String, String> record : records) {
System.out.println("主题:" + record.topic());
System.out.println("分区:" + record.partition());
System.out.println("偏移量:" + record.offset());
System.out.println("key:" + record.key());
System.out.println("value:" + record.value());
System.out.println("####################################");
}
}
} finally {
consumer.close();
}
}
}
3.5、运行消费者运行生产者
4、生产者发送消息基本流程
消息格式:每个消息是一个ProducerRecord对象,必须指定消息所属的Topic和消息值Value,此外还可以指定消息所属的Partition以及消息的Key。
1:序列化ProducerRecord
2:如果ProducerRecord中指定了Partition,则Partitioner不做任何事情;否则,Partitioner根据消息的key得到一个Partition。这是生产者就知道向哪个Topic下的哪个Partition发送这条消息。
3:消息被添加到相应的batch中,独立的线程将这些batch发送到Broker上
4:broker收到消息会返回一个响应。如果消息成功写入Kafka,则返回RecordMetaData对象,该对象包含了Topic信息、Patition信息、消息在Partition中的Offset信息;若失败,返回一个错误
*
5、设置公用配置文件
这样的代码写起来不整洁,也容易出错。其实,Kafka帮我们封装好了常用的配置信息,我们提取出来就行了。
KafkaCommonConfig:
package com.kafka.common;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class KafkaCommonConfig {
/*生产者消费者共用配置常量*/
private static final String KAFKA_HOST = "192.168.92.39:9092";
/**
* 功能描述 获取kafka生产者
*
* @param keySerializeClazz key的序列化类
* @param valueSerializeClazz value的序列化类
* @author Zrs
* @date 2019/8/4
*/
public static Properties getProducerProperties(Class keySerializeClazz, Class valueSerializeClazz) {
Properties properties = new Properties();
//配置IP
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
// 配置key的序列化和value的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializeClazz);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializeClazz);
return properties;
}
/**
* 功能描述 得到消费者
*
* @param groupid 组ID
* @param keyDeserializeClazz key的反序列化 class
* @param valueDeserializeClazz value的反序列化 class
* @author Zrs
* @date 2019/8/4
*/
public static Properties getCustomerProperties(String groupid, Class keyDeserializeClazz, Class valueDeserializeClazz) {
Properties properties = new Properties();
//配置IP
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
//配置组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
//配置反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializeClazz);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializeClazz);
return properties;
}
}
6、发送者有3种发送方式
6.1、HelloKafkaProducer
忽略send方法的返回值,不做任何处理,如果不关心发送结果,那么可以使用这种发送方式。比如:记录消息日志,或记录不太重要的应用程序日志。
6.2、FutureProducer 同步发送
获得send方法返回的Future对象,在合适的时候调用Future的get方法。
import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.Future;
public class FutureProducer {
public static void main(String[] args) {
//1、获取生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
try {
//2、建立消息
String msg = "hello kafka future";
ProducerRecord<String, String> record =
new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, msg);
//3、发送并获取future
Future<RecordMetadata> sendFuture = producer.send(record);
//4、拿到发送结果
RecordMetadata recordMetadata = sendFuture.get();
if (recordMetadata != null) {
System.out.println("发送的消息为:" + msg + ",偏移量为:" + recordMetadata.offset()
+ ",分区为:" + recordMetadata.partition());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
6.3、AsyncProducer异步发送:
实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给send方法。
import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.Future;
public class AsyncProducer {
public static void main(String[] args) {
//1、获取生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
try {
//2、建立消息
String msg = "hello kafka future";
ProducerRecord<String, String> record =
new ProducerRecord<>(KafkaConsts.HELLO_TOPIC, KafkaConsts.HELLO_KEY, msg);
//3、发送并获取future
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("记录错误");
}
if (metadata != null) {
System.out.println("发送的消息为:" + msg + ",偏移量为:" + metadata.offset()
+ ",分区为:" + metadata.partition());
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
7、更多的消息配置
生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考org.apache.kafka.clients.producer包下的ProducerConfig类。
acks:
指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
acks=1:只要集群的首领节点(第一个加入集群的broker)收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)
max.block.ms
指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。缺省60000ms
retries
发送失败时,指定生产者可以重发消息的次数。默认情况下,生产者在每次重试之间等待100ms,可以通过参数retry.backoff.ms参数来改变这个时间间隔。缺省0
receive.buffer.bytes和send.buffer.bytes
指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。缺省102400
batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省16384(16k)
linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比
batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。
compression.type
producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好。snappy占用cpu少,提供较好的性能和可观的压缩比,如果比较关注性能和网络带宽,用这个。如果带宽紧张,用gzip,会占用较多的cpu,但提供更高的压缩比。
client.id
当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。
max.in.flight.requests.per.connection
指定了生产者在接收到服务器响应之前可以发送多个消息,值越高,占用的内存越大,当然也可以提升吞吐量。发生错误时,可能会造成数据的发送顺序改变,默认是5 (修改)。
如果需要保证消息在一个分区上的严格顺序,这个值应该设为1。不过这样会严重影响生产者的吞吐量。
request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
metadata.fetch.timeout.ms
是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据fetch成功完成所需要的时间,否则会跑出异常给客户端
timeout.ms
此配置选项控制broker等待副本确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟。这个参数和acks的配置相匹配。
max.request.size
控制生产者发送请求最大大小。假设这个值为1M,如果一个请求里只有一个消息,那这个消息不能大于1M,如果一次请求是一个批次,该批次包含了1000条消息,那么每个消息不能大于1KB。注意:broker具有自己对消息记录尺寸的覆盖,如果这个尺寸小于生产者的这个设置,会导致消息被拒绝。
8、顺序的保证
9、发送自定义pojo(Kafka权威指南 发送者 序列化)
kafka为我们提供了这些序列化实现:
发送自定义VO:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Integer id;
private String name;
private String address;
}
KafkaVoProducer
public class KafkaVoProducer {
public static void main(String[] args) {
//1、获取生产者
KafkaProducer<String, String> producer =
new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
try {
//2、建立消息
User user = new User(1, "张三", "北京市软件园中关村");
ProducerRecord<String, String> record =
new ProducerRecord<>(KafkaConsts.VO_TOPIC, KafkaConsts.VO_KEY, JSON.toJSONString(user));
//3、发送并获取future
Future<RecordMetadata> sendFuture = producer.send(record);
//4、拿到发送结果
RecordMetadata recordMetadata = sendFuture.get();
if (recordMetadata != null) {
System.out.println("发送的消息为:" + JSON.toJSONString(user) + ",偏移量为:" + recordMetadata.offset()
+ ",分区为:" + recordMetadata.partition());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
KafkaVoCustomer
public class KafkaVoCustomer {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaCommonConfig.getCustomerProperties
("threadPool", StringDeserializer.class, StringDeserializer.class));
try {
consumer.subscribe(Collections.singletonList(KafkaConsts.VO_TOPIC));
//4、拉取消息
while (true) {
//4.1、500ms拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
//4.2、读取消息
for (ConsumerRecord<String, String> record : records) {
System.out.print("主题:" + record.topic());
System.out.print(",分区:" + record.partition());
System.out.print(",偏移量:" + record.offset());
System.out.print(",key:" + record.key());
User user = JSON.parseObject(record.value(), User.class);
System.out.print(",value:" + user.toString());
System.out.println();
}
}
} finally {
consumer.close();
}
}
}
10、多线程下使用生产者
kafka生产者在多线程情况下是线程安全的
ThreadPoolProducer:
import com.kafka.common.KafkaCommonConfig;
import com.kafka.common.KafkaConsts;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolProducer {
//发送消息的个数
private static final int MSG_SIZE = 1000;
//负责发送消息的线程池,线程为当前电脑的核数
private static ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
//用countDownLatch当做发令枪,让所用线程同时运行
private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
//自定义类用于发送消息
private static class ProducerWorker implements Runnable {
//消息
private ProducerRecord<String, String> record;
//消息发送者
private KafkaProducer<String, String> producer;
public ProducerWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {
this.record = record;
this.producer = producer;
}
@Override
public void run() {
try {
producer.send(record, (metadata, exception) -> {
if (null != exception) {
exception.printStackTrace();
}
if (null != metadata) {
if (exception != null) {
System.out.println("记录错误");
}
if (metadata != null) {
System.out.println("偏移量为:" + metadata.offset() + ",分区为:" + metadata.partition());
}
}
});
System.out.println(++id + ":数据[" + record + "]已发送。");
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//1、定义消息生产者
KafkaProducer<String, String> producer =
new KafkaProducer<>(KafkaCommonConfig.getProducerProperties(StringSerializer.class, StringSerializer.class));
try {
//2、发送
for (int i = 1; i <= MSG_SIZE; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
KafkaConsts.THREAD_POLL_TOPIC, KafkaConsts.THREAD_POLL_KEY, "Hello ThreadPool");
executorService.submit(new ProducerWorker(record, producer));
}
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
executorService.shutdown();
}
}
}
修改上文消费者监听当前topic:
ThreadPoolCustomer:
KafkaConsumer,线程不安全,几个Runnable就建几个KafkaConsumer,多测了几次。
public class ThreadPoolCustomer {
private static ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static class ConsumerWorker implements Runnable {
KafkaConsumer<String, String> consumer;
public ConsumerWorker( Properties properties) {
this.consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(KafkaConsts.THREAD_POLL_TOPIC));
}
public void run() {
try {
//4、拉取消息
while (true) {
//4.1、500ms拉取一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
//4.2、读取消息
for (ConsumerRecord<String, String> record : records) {
System.out.print("主题:" + record.topic());
System.out.print(",分区:" + record.partition());
System.out.print(",偏移量:" + record.offset());
System.out.print(",key:" + record.key());
System.out.print(",value:" + record.value());
System.out.println();
}
}
} finally {
consumer.close();
}
}
}
public static void main(String[] args) {
Properties kafkaConfig = KafkaCommonConfig.getCustomerProperties
("threadPool", StringDeserializer.class, StringDeserializer.class);
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
executorService.submit(new ConsumerWorker(kafkaConfig));
}
}
}
11、分区
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15183.html