前一篇中讲解了Kafka基于Zookeeper的集群搭建,这一篇来着重讲解下Kafka的使用。
1、–create 创建
kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –create –topic ooxx –partitions 2 –replication-factor 2
连接Zookeepeer集群,拿到元数据,找到Controller,创建Topic
2、–list 查询
kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –list
可以看到已创建的Topic
3、–describe 查看Topic描述
kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –describe –topic ooxx
PartitionCount:2 >> ooxx 这个Topic的分区数量有2个,虽然分区有副本,但是它的副本并不参与读写分离之类的,所有的增删改查都压载Leader上,从机只是作为数据可靠性的保障。所以,分区不可能因为数量的增加而提高读写性能。
4、定义消费者
kafka-console-consumer.sh –bootstrap-server Kafka1:9092,Kafka2:9092 –topic ooxx –group feenix >> 一个从属于feenix组的消费者订阅 ooxx 这个 topic 下的消息
从消费者的角度来说,Kafka可以做存储,也可以做消息队列,具备可持久化消息的能力,也支持消费的回溯等,所以它有分组(–group)的概念,每个组按照自身不同的节奏自己去维护
5、定义生产者
kafka-console-producer.sh –broker-list Kafka3:9092 –topic ooxx
此时推送的生产的消息就会被消费者消费
生产者:
消费者:
此时再连接一个同组的消费者,当同一个组中有两个消费者的时候,默认轮询消费:
生产者:
消费者1:
消费者2:
Kafka API的使用
1、创建一个新的topic来演示api的使用
kafka-topics.sh –zookeeper Kafka3:2181/kafka –create –topic feenix-items –partitions 2 –replication-factor 2
同样的,topic有两个分区,每个分区有一个副本。
2、引入Kafka依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.1.0</version> </dependency>
3、生产者代码
public void producer() throws ExecutionException, InterruptedException { String topic = "feenix-items"; Properties p = new Properties(); p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka1:9092,Kafka2:9092,Kafka3:9092"); p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(p); // 假设3种商品,每种有线性的3个id for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item" + j, "val" + i); Future<RecordMetadata> send = producer.send(record); RecordMetadata rm = send.get(); int partition = rm.partition(); long offset = rm.offset(); System.out.println("key:" + record.key() + ",val:" + record.value() + ",partition:" + partition + ",offset:" + offset); } }
4、消费者代码
public void consumer() { Properties p = new Properties(); p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka1:9092,Kafka2:9092,Kafka3:9092"); p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "OOXX01"); p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, ""); // p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ""); // 通过POLL拉取数据,弹性,按需 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p); consumer.subscribe(Arrays.asList("feenix-items")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0)); if (!records.isEmpty()) { System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~" + records.count() + "~~~~~~~~~~~~~~~~~~~~~~~~~"); Iterator<ConsumerRecord<String, String>> iter = records.iterator(); while (iter.hasNext()) { // 因为一个Consumer可消费多个分区,但一个分区只能给一个组中的一个Consumer消费 ConsumerRecord<String, String> record = iter.next(); int partition = record.partition(); long offset = record.offset(); System.out.println("key:" + record.key() + ",value:" + record.value() + ",partition:" + partition + ",offset:" + offset); } } } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/111931.html