【手把手】教你Kafka中的应用

导读:本篇文章讲解 【手把手】教你Kafka中的应用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前一篇中讲解了Kafka基于Zookeeper的集群搭建,这一篇来着重讲解下Kafka的使用。

1、–create 创建

kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –create –topic ooxx –partitions 2 –replication-factor 2

【手把手】教你Kafka中的应用

连接Zookeepeer集群,拿到元数据,找到Controller,创建Topic

2、–list 查询

kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –list

【手把手】教你Kafka中的应用

可以看到已创建的Topic

3、–describe 查看Topic描述

kafka-topics.sh –zookeeper Kafka2:2181,Kafka3:2181/kafka –describe –topic ooxx

【手把手】教你Kafka中的应用

PartitionCount:2  >> ooxx 这个Topic的分区数量有2个,虽然分区有副本,但是它的副本并不参与读写分离之类的,所有的增删改查都压载Leader上,从机只是作为数据可靠性的保障。所以,分区不可能因为数量的增加而提高读写性能。

4、定义消费者

kafka-console-consumer.sh –bootstrap-server Kafka1:9092,Kafka2:9092 –topic ooxx –group feenix  >>  一个从属于feenix组的消费者订阅 ooxx 这个 topic 下的消息

【手把手】教你Kafka中的应用

 从消费者的角度来说,Kafka可以做存储,也可以做消息队列,具备可持久化消息的能力,也支持消费的回溯等,所以它有分组(–group)的概念,每个组按照自身不同的节奏自己去维护

5、定义生产者

kafka-console-producer.sh –broker-list Kafka3:9092 –topic ooxx

此时推送的生产的消息就会被消费者消费

生产者:

【手把手】教你Kafka中的应用

消费者:

【手把手】教你Kafka中的应用

此时再连接一个同组的消费者,当同一个组中有两个消费者的时候,默认轮询消费:

生产者:

【手把手】教你Kafka中的应用

消费者1:

【手把手】教你Kafka中的应用

消费者2:

【手把手】教你Kafka中的应用

【手把手】教你Kafka中的应用

【手把手】教你Kafka中的应用

Kafka API的使用

1、创建一个新的topic来演示api的使用

kafka-topics.sh –zookeeper Kafka3:2181/kafka –create –topic feenix-items –partitions 2 –replication-factor 2

【手把手】教你Kafka中的应用

同样的,topic有两个分区,每个分区有一个副本。

2、引入Kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

3、生产者代码

public void producer() throws ExecutionException, InterruptedException {
    String topic = "feenix-items";

    Properties p = new Properties();
    p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka1:9092,Kafka2:9092,Kafka3:9092");
    p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(p);

    // 假设3种商品,每种有线性的3个id
    for (int i = 0; i < 3; i++) {
        for (int j = 0; j < 3; j++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item" + j, "val" + i);
            Future<RecordMetadata> send = producer.send(record);

            RecordMetadata rm = send.get();
            int partition = rm.partition();
            long offset = rm.offset();
            System.out.println("key:" + record.key() + ",val:" + record.value() + ",partition:" + partition + ",offset:" + offset);

        }
    }

【手把手】教你Kafka中的应用

4、消费者代码

public void consumer() {
    Properties p = new Properties();
    p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka1:9092,Kafka2:9092,Kafka3:9092");
    p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "OOXX01");
    p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

    // p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");
    // p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ""); // 通过POLL拉取数据,弹性,按需

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
    consumer.subscribe(Arrays.asList("feenix-items"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));

        if (!records.isEmpty()) {
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~" + records.count() + "~~~~~~~~~~~~~~~~~~~~~~~~~");
            Iterator<ConsumerRecord<String, String>> iter = records.iterator();
            while (iter.hasNext()) {
                // 因为一个Consumer可消费多个分区,但一个分区只能给一个组中的一个Consumer消费
                ConsumerRecord<String, String> record = iter.next();
                int partition = record.partition();
                long offset = record.offset();

                System.out.println("key:" + record.key() + ",value:" + record.value() + ",partition:" + partition + ",offset:" + offset);
            }
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/111931.html

(0)
Java光头强的头像Java光头强

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!