kafka的demo踩坑
最近在写涉及kafka的web项目,首先,写了个demo练练手,但是出师不利,上来就给我报错哈哈哈,报错好,那我们就去解决他吧!
1. 创建Order类
public class Order {
private Long orderId;
private int count;
public Order(Long orderId, int count) {
this.orderId = orderId;
this.count = count;
}
public Order() {
}
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
2. 导入依赖
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
3. 生产者demo
// 1. topic名
private static final String TOPIC_NAME = "my-replicated-topic";
public class MyProducerDemo01 {
public static void main(String[] args) {
// 2. 配置
Properties properties = new Properties();
// 连接kafkaIP
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.108:9002,192.168.100.108:9003,192.168.100.108:9004");
// 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置acks
properties.put(ProducerConfig.ACKS_CONFIG,"0");
// 3. 创建一个要发送的消息对象,并带上配置
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 4. 封装消息对象
// 生成order对象
Order order = new Order(1001L,2);
// 封装
ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME,
String.valueOf(order.getOrderId()),
JSON.toJSONString(order));
try {
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4. 运行
5. 报错(23333…)
"C:\Program Files\Java\jdk1.8.0_301\bin\java.exe" "-javaagent:D:\IntelliJ IDEA 2019.3.1\lib\idea_rt.jar=55245:D:\IntelliJ IDEA 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_301\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_301\jre\lib\rt.jar;D:\IdeaProjects\kafka-demo\target\classes;D:\maven\RepMaven\com\alibaba\fastjson\1.2.47\fastjson-1.2.47.jar;D:\maven\RepMaven\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;D:\maven\RepMaven\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;D:\maven\RepMaven\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;D:\maven\RepMaven\org\xerial\snappy\snappy-java\1.1.7.3\snappy-java-1.1.7.3.jar;D:\maven\RepMaven\org\slf4j\slf4j-api\1.7.28\slf4j-api-1.7.28.jar" com.hao.kafka.MyProducerDemo01
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1299)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:963)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at com.hao.kafka.MyProducerDemo01.main(MyProducerDemo01.java:37)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms.
Process finished with exit code 0
亲爱的,终于解决了~!!!
效果如下
step1. kafka集群中,把集群启动
step2. zk中查kafka节点配置
step3.修改代码中的配置ip
step4.成功!!!完结撒花
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/65640.html