Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
- Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
- Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- Segment:partition物理上由多个segment组成,每个Segment存着message信息
- Producer : 生产message发送到topic
- Consumer : 订阅topic消费message, consumer作为一个线程来消费
- Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
springboot项目中使用kafka的配置如下:
首先引入必须的依赖jar:
<!-- springboot整合kafka -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.6.RELEASE</version>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.7.RELEASE</version>
</dependency>
配置文件如下:
#============== kafka ===================
#kafka相关配置
spring.kafka.bootstrap-servers=39.105.104.132:9092
#设置一个默认组
spring.kafka.consumer.group-id=alarmTopic
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
消息生产者:
/**
* @author Shuyu.Wang
* @package:com.ganinfo.kafka
* @className:
* @description:生产者
* @date 2018-07-05 16:30
**/
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/** * 发送消息到kafka */
public void sendChannelMess(String channel, String message) {
kafkaTemplate.send(channel,message);
}
消息消费者:
/**
* @author Shuyu.Wang
* @package:com.ganinfo.kafka
* @className:
* @description:消费者
* @date 2018-07-05 16:31
**/
@Component
public class KafkaConsumer {
@Autowired
private SendMessageUtil sendMessageUtil;
/**
* 监听alarmTopic主题,有消息就读取 *
* @param message
*/
@KafkaListener(topics = {"alarmTopic"})
public void receiveMessage(String message) {
//收到通道的消息之后执行秒杀操作
System.out.println("KafkaConsumer的订阅消息:"+message);
sendMessageUtil.send("h1","1",message);
}
}
测试类:
@Autowired
private KafkaSender kafkaSender;
private static final String ALRAM_TOPIC = "alarmTopic";
@Autowired
private AlarmService alarmService;
@ResponseBody
@RequestMapping(value = "/kafka", method = RequestMethod.GET)
public ApiResult getCarInout(@RequestParam(value = "refId") String refId, @RequestParam(value = "passType") Integer passType, @RequestParam(value = "type") Integer type) {
ApiResult apiResult = new ApiResult();
try {
Map<String, Object> map = new HashMap<>();
map.put("refId", refId);
map.put("platformCode", "650106");
map.put("passType", passType);
map.put("type", type);
map.put("level", 2);
map.put("Gettime", 1000);
kafkaSender.sendChannelMess(ALRAM_TOPIC, GsonUtil.GsonString(map));
System.out.println("发送数据:" + GsonUtil.GsonString(map));
// kafkaTemplate.send(ALRAM_TOPIC, "alarm", GsonUtil.GsonString(map));
} catch (Exception e) {
e.printStackTrace();
}
return apiResult;
}
kafka的相关代码就完成了,请求测试方法,就会在消费者的类中打印相关的参数了,另外可以通过插件查看kafka的相关主题和消费者情况。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15846.html