kafka简介
kafka是一个分布式流处理平台,是三大MQ中间件之一。是一种高吞吐量的发布订阅消息系统。
快速认知概念
Broker: kafka的服务端程序,可以认为一个mq节点就是一个broker。
Topic: 每条发布到mq的消息都有一个类别,称为topic,主题的意思。
Producer: 生产者,创建消息发送给mq的topic
Consumer: 消费者,消费队列中的消息
Partition: 是Topic的实际存储空间,一个Topic有一个或多个Partition。Partition是一个有序队列
Replication 副本:也就是partition,副本分为leader和follower,learder挂了后,follower会自动升级为leader,只有leader才能和producer和consumer交互
ConsumerGroup:消费者组,同一个消费者组里同时只能有一个消费者能从相同的partition消费消息
MQ模型
点对点:所有消费者在同一个组里,每条消息只会被一个消费者消费
发布订阅:比如每个消费者都属于不同组,则kafka消息可以广播到每个消费者
springboot 中对topic的操作
springboot依赖版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.0</version>
</dependency>
创建和展示topic详情
public class KafkaAdminTest {
public static final String TOPIC_NAME = "default_topic";
/**
* 初始化KafkaAdmin
* @return
*/
public KafkaAdmin kafkaAdmin(){
Map<String, Object> config = new HashMap<>();
//填上自己的IP和端口
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
return new KafkaAdmin(config);
}
/**
* 创建topic
*/
@Test
public void createTopic(){
KafkaAdmin kafkaAdmin = kafkaAdmin();
//设置topic参数 名称 partition数量 备份数量(1代表只有leader,没有follower) 备份数
量不能大于集群节点数量,否则报错
NewTopic newTopic = new NewTopic(TOPIC_NAME, 6, (short)1);
kafkaAdmin.createOrModifyTopics(newTopic);
}
/**
* 展示topic的详情
*/
@Test
public void describeTopics(){
KafkaAdmin kafkaAdmin = kafkaAdmin();
Map<String, TopicDescription> describeTopics = kafkaAdmin.describeTopics(TOPIC_NAME);
Set<Map.Entry<String, TopicDescription>> entries = describeTopics.entrySet();
entries.stream().forEach((entry)-> System.err.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
}
对于更高级的功能,您可以AdminClient
直接使用。KafkaAdmin内部也是使用AdminClient
public class KafkaAdminTest {
private static final String TOPIC_NAME = "default_topic";
/**
* 设置admin 客户端
* @return
*/
public static AdminClient initAdminClient(){
Properties properties = new Properties();
//填上自己的IP和端口
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
/**
* 创建topic
*/
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数量,副本数量不能大于集群节点数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,6,(short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
try {
//future等待创建,成功则不会有任何报错
createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 列举topic列表
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
//是否查看内部的topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> topics = listTopicsResult.names().get();
for(String name : topics){
System.err.println(name);
}
}
/**
* 删除topic
*/
@Test
public void delTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
/**
* 查看某个topic详情
*/
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
}
/**
* 增加topic分区数量
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void incrPartitionTopicTest() throws ExecutionException, InterruptedException {
Map<String,NewPartitions> infoMap = new HashMap<>(1);
AdminClient adminClient = initAdminClient();
//分区数量不能比原有的数量小
NewPartitions newPartitions = NewPartitions.increaseTo(8);
infoMap.put(TOPIC_NAME,newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
createPartitionsResult.all().get();
}
}
上述代码对topic的操作,在公司中可能并不需要开发人员操作,下一期会讲解spingboot中生产者和消费者的代码开发。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/1278.html