消息监听
前言
最近大佬给了我一个课外研究话题,在kafka中实现上下文的传递及自定义header和全链路跟踪。使用spring-kafka,生产端拓展比较清晰。生产者发的数据是同源,header和traceId可以很好设置。只需要在发送消息前后做拦截即可实现。但是消费端listener就比较复杂了,首先消息的来源方不一样,消息主体可能是不同的对象,这样不能做到统一的直接类型转换。然后消费端是批量消费,一次性拉取一段时间的数据,那么如何保证单个消息记录的上下文,header和traceId得到有效传递。所以只能先盘源码。
1.自动装配
1.1 注册BeanDefinition
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
一般spring会提供Enablexxx注解,没有发现目标,我们接着看import的配置。在KafkaAnnotationDrivenConfiguration中找到了目标注解EnableKafka
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
//即@ConditionalOnMissingBean("internalKafkaListenerAnnotationProcessor")
static class EnableKafkaConfiguration {
}
}
当我们覆盖了internalKafkaListenerAnnotationProcessor,kafka驱动相关的配置就会被覆盖,默认情况下是空,该注解就会生效。我们接下来看EnableKafka的配置
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { KafkaBootstrapConfiguration.class.getName() };
}
}
然后KafkaBootstrapConfiguration里注入了两个比较重要的对象,
KafkaListenerAnnotationBeanPostProcessor(自动装配用),
KafkaListenerEndpointRegistry(注册endpoint用)
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
}
}
}
1.2 创建BeanPostProcessor
registerBeanPostProcessors()方法创建BeanPostProcessor。遵守spring的生命周期。和第一步一起总结。
1.3 激活BeanPostProcessor
我们知道BeanPostProcessor主要在初始化bean后做增强处理,下面是KafkaListenerAnnotationBeanPostProcessor的类图,从图中可以看出,它主要做的就是后置增强处理
我们直接看实现
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
//❶
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
//❷
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
//❸
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
//❹
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
}
我们再看KafkaListener注解
即可以用在类上,也可以用在方法上。
所以❶处是寻找所有在类上标记的kafkaListener,
❷处是寻找所有在方法上标记的kafkaListener.
同时看图中蓝色标记的部分,当KafkaListener标记在类上时,必须在方法上标记KafkaHandler注解才能起作用,这和异步注解@Async有点不一样.
❸processKafkaListener ,遍历该类上所有标记了KafkaListener注解的方法
❹processMultiMethodListeners,主要是遍历类上标记了KafkaListener,同时方法上标记了KafkaHandler 的方法。
我们先看❸processKafkaListener
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
//❶找到真实的bean上的方法非代理bean
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
String[] topics = resolveTopics(kafkaListener);
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
//❷解析方法上的listener
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
}
this.listenerScope.removeListener(beanRef);
}
继续跟进processListener方法
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
//❶
processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);
//❷
String containerFactory = resolve(kafkaListener.containerFactory());
//❸
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);
//❹
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
//❺
processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
}
首先看❶注册前置处理
private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopicPartitions(tps);
endpoint.setTopics(topics);
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String concurrency = kafkaListener.concurrency();
if (StringUtils.hasText(concurrency)) {
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
}
String autoStartup = kafkaListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolveKafkaProperties(endpoint, kafkaListener.properties());
endpoint.setSplitIterables(kafkaListener.splitIterables());
if (StringUtils.hasText(kafkaListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
}
}
主要就是初始化MethodKafkaListenerEndpoint,根据@KafkaListener的配置做方法级别的端点配置。
❷获取@KafkaListener上的containerFactory配置的beanName,
❸resolveContainerFactory,如果注解上有配置containerFactory,则从spring上下文中获取,如果没有,暂时返回为空
❹registerEndpoint,登记Endpoint
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
}
这步主要是将KafkaListenerEndpoint封装成KafkaListenerEndpointDescriptor,是KafkaListenerEndpointRegistrar的内部类,对KafkaListenerEndpoint的进一步包装,主要增加了KafkaListenerContainerFactory,再次说明每个kafkaListener标记的方法的endPoint是可以配置自己单独的ContainerFactory。这点和spring cache有点相似。
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
private static final class KafkaListenerEndpointDescriptor {
private final KafkaListenerEndpoint endpoint;
private final KafkaListenerContainerFactory<?> containerFactory;
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
this.endpoint = endpoint;
this.containerFactory = containerFactory;
}
}
}
❺processKafkaListenerEndpointAfterRegistration登记后置处理
//#KafkaListenerAnnotationBeanPostProcessor.class
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener) {
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
resolveErrorHandler(endpoint, kafkaListener);
}
String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(), "contentTypeConverter");
if (StringUtils.hasText(converterBeanName)) {
resolveContentTypeConverter(endpoint, kafkaListener);
}
}
主要是设置异常处理器和消息转化器,针对kafkaListener注解上的配置,如果有则配置,没有则暂时为空。
总结上面流程
2.注册KafkaListenerContainer
2.1 遍历Endpoint
再看KafkaListenerAnnotationBeanPostProcessor类
//KafkaListenerAnnotationBeanPostProcessor.class
@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
//❶针对listener的单独配置
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, KafkaListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
for (KafkaListenerConfigurer configurer : instances.values()) {
configurer.configureKafkaListeners(this.registrar);
}
}
//❷设置EndpointRegistry
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
KafkaListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
//❸设置ContainerFactoryBeanName
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
//❹设置message format
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
}
else {
addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
}
// Actually register all listeners
//❺注册所有的listener
this.registrar.afterPropertiesSet();
Map<String, ContainerGroupSequencer> sequencers =
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
sequencers.values().forEach(seq -> seq.initialize());
}
❶如果listener有指定configure配置,则配置listener
❷设置EndpointRegistry,如果EndpointRegistry为空,则从spring上下文中获取KafkaListenerEndpointRegistry,如果你对自动装配还有印象的话,在第一步注册BeanDefinition对象时,共注入了两个对象,第一个是KafkaListenerAnnotationBeanPostProcessor,第二个就是KafkaListenerEndpointRegistry,所以这里可以拿到。
后面可以看到KafkaListenerEndpointRegistrar注册所有的Endpoints,事实上是委托给了KafkaListenerEndpointRegistry去真正执行。
❸设置ContainerFactoryBeanName,默认的name就是kafkaListenerContainerFactory
❹设置ConversionService,如果没有配置,则获取默认的。
❺注册所有的listener,交给KafkaListenerEndpointRegistrar
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint
&& this.validator != null) {
((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
}
//❶
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
}
endpointDescriptors,如果你还有印象,在上面激活processor的后置处理的最后一步,将所有EndPoint及它的containerFactory封装成KafkaListenerEndpointDescriptor,存入到了集合中。
❶遍历所有的KafkaListenerEndpointDescriptor,委托给KafkaListenerEndpointRegistrar去注册,每个endpoint对应的是一个被KafkaListener注解标识的方法。
继续往下跟进
public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
String id = endpoint.getId();
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
//❶创建ListenerContainer
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
ConfigurableApplicationContext appContext = this.applicationContext;
//❷如果是消费者组,则将container加入消费者组
String groupName = endpoint.getGroup();
if (StringUtils.hasText(groupName) && appContext != null) {
List<MessageListenerContainer> containerGroup;
ContainerGroup group;
if (appContext.containsBean(groupName)) { // NOSONAR - hasText
containerGroup = appContext.getBean(groupName, List.class); // NOSONAR - hasText
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
group = new ContainerGroup(groupName);
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
}
containerGroup.add(container);
group.addContainers(container);
}
//❸如果需要立刻启动
if (startImmediately) {
startIfNecessary(container);
}
}
}
}
❶createListenerContainer,
创建ListernerContainer,通过命名我们其实可以把每个Listerner理解成一个容器,类似tomcat,每个独立的容器可以处理自己的请求,即topic类比url,每个容器有自己独立的生命周期,被spring维护。后面再详细的介绍
❷如果listener有配置group,说明它加入了一个消费者组共同消费。
❸startImmediately,又看见这个变量,这个时候还是false,如果是true,则调动startIfNecessary启动容器,后面会详细介绍这个方法。
我们把视线回到❶createListenerContainer这步,go down继续往下。
2.2创建ListenerContainer
//#KafkaListenerEndpointRegistry
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
...
//❶
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
...
return listenerContainer;
}
主要看factory.createListenerContainer(endpoint)
该factory如果我们自己有配置,就是我们自己配置的containerFactory,会调用父类的createListenerContainer方法
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean,
ApplicationContextAware {
@Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
//❶创建容器实例
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}
//❷配置容器
endpoint.setupListenerContainer(instance, this.messageConverter);
//❸初始化容器
initializeContainer(instance, endpoint);
//❹个性化配置
customizeContainer(instance);
return instance;
}
}
图中标记的四步,有点类似spring创建bean的方式。
我们首先看❶createContainerInstance(endpoint),默认实现是
ConcurrentKafkaListenerContainerFactory
public class ConcurrentKafkaListenerContainerFactory<K, V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
@Override
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
Collection<String> topics = endpoint.getTopics();
if (!topics.isEmpty()) { // NOSONAR
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONAR
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
}
}
}
可以看到,不管endpoint的分区选择是否配置,最终创建的container都会绑定consumerFactory
再看❷endpoint.setupListenerContainer
public abstract class AbstractKafkaListenerEndpoint<K, V>
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {
private void setupMessageListener(MessageListenerContainer container,@Nullable MessageConverter messageConverter) {
MessagingMessageListenerAdapter<K, V> adapter = createMessageListener(container, messageConverter);
...
Object messageListener = adapter;
...
container.setupMessageListener(messageListener);
}
}
我们先看createMessageListener,创建消息监听器
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {
@Override
protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
JavaUtils.INSTANCE
.acceptIfNotNull(getReplyTopic(), replyTopic -> {
Assert.state(getMethod().getReturnType().equals(void.class)
|| getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
messageListener.setReplyTopic(replyTopic);
})
.acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate);
return messageListener;
}
}
这里面有两个比较重要的方法
第一个是真正创建MessageListener实例
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {
protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
@Nullable MessageConverter messageConverter) {
MessagingMessageListenerAdapter<K, V> listener;
//❶
if (isBatchListener()) {
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
this.bean, this.method, this.errorHandler);
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
if (batchToRecordAdapter != null) {
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
}
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
}
listener = messageListener;
}
else {
//❷
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
}
listener = messageListener;
}
//❸
if (this.messagingConverter != null) {
listener.setMessagingConverter(this.messagingConverter);
}
BeanResolver resolver = getBeanResolver();
if (resolver != null) {
listener.setBeanResolver(resolver);
}
return listener;
}
}
❶isBatchListener(),如果我们的listener配置了batchListener,那就是batchListener,默认是false
❷默认是单个消息处理listener,通过new的方式创建的RecordMessagingMessageListenerAdapter
这里会调父类构造方法
public MessagingMessageListenerAdapter(Object bean, Method method) {
this.bean = bean;
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
}
其中 determineInferredType 会根据我们的Listener method上的对象类型设置payLoad类型,这个在后面对象转换时会用到
❸messagingConverter
MessageListener的类图如下
messageListener.setHandlerMethod(configureListenerAdapter(messageListener))
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
InvocableHandlerMethod invocableHandlerMethod =
this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
return new HandlerAdapter(invocableHandlerMethod);
}
@Nullable
protected Object doInvoke(Object... args) throws Exception {
try {
return getBridgedMethod().invoke(getBean(), args);
}
...
}
我们再回到创建容器的第❸步initializeContainer(instance, endpoint)
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
...
instance.setRecordInterceptor(this.recordInterceptor);
instance.setBatchInterceptor(this.batchInterceptor);
...
}
3 kakfa listener启动
3.1启动生命周期
去执行的,而KafkaListenerEndpointRegistry是kafka配置类自动装配时引入的。我们再仔细看看KafkaListenerEndpointRegistry的类图
public interface Lifecycle {
void start();
void stop();
boolean isRunning();
}
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
if (!NativeDetector.inNativeImage()) {
LiveBeansView.registerApplicationContext(this);
}
}
拿到LifeCycleProcessor后调用onRefresh方法,再拿到所有实现了Lifecycle的bean,调用start方法
for (LifecycleGroupMember member : this.members) {
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
....
private void doStart(){
try {
bean.start();
}
}
然后KafkaListenerEndpointRegistry实现了Lifecycle
public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
this.running = true;
}
}
#KafkaListenerEndpointRegistry.class
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
事实上MessageListenerContainer也实现了Lifecycle
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
@Override
protected void doStart() {
//❶
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
//❷
setRunning(true);
//❸
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
configureChildContainer(i, container);
if (isPaused()) {
container.pause();
}
//❹
container.start();
this.containers.add(container);
}
}
}
}
❸处concurrency可以看到如果你topic有多个分区配置,并发量concurrency就是分区的数量。这样保证了不会有多余的消费者启动浪费资源。我们知道kafka同一个分区只允许消费者 组里的一个消费者消费,如果消费者数量大于分区数量,那么就会有消费者消费不到消息。
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
String beanName = getBeanName();
beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
container.setBeanName(beanName);
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
container.setApplicationContext(applicationContext);
}
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
container.setApplicationEventPublisher(publisher);
}
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
container.setGenericErrorHandler(getGenericErrorHandler());
container.setCommonErrorHandler(getCommonErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setBatchInterceptor(getBatchInterceptor());
container.setInterceptBeforeTx(isInterceptBeforeTx());
container.setEmergencyStop(() -> {
stopAbnormally(() -> {
});
});
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
if (exec == null) {
if ((this.executors.size() > index)) {
exec = this.executors.get(index);
}
else {
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
this.executors.add(exec);
}
container.getContainerProperties().setConsumerTaskExecutor(exec);
}
}
}
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
@Override
protected void doStart() {
//❶
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
//❷
Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
//❸
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
}
}
❶和之前一样,如果容器已启动,则不再启动。
❷getMessageListener,如果你还有印象,在第二章节遍历endPoint,创建container时通过endpoint.setupListenerContainer方法创建。如果是单个消费则创建的是RecordMessagingMessageListenerAdapter,如果是批量消费则是BatchMessagingMessageListenerAdapter。
❸就是最核心的地方了。通过new的方式创建了ListenerConsumer,首先
看ListenerConsumer的类图
再看构造方法
可以看到ListenerConsumer事实上是一个定时task.
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = propertiesFromProperties();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
this.clientId = determineClientId();
this.transactionTemplate = determineTransactionTemplate();
this.genericListener = listener;
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
subscribeOrAssignTopics(this.consumer);
if (listener instanceof BatchMessageListener) {
this.listener = null;
this.batchListener = (BatchMessageListener<K, V>) listener;
this.isBatchListener = true;
this.wantsFullRecords = this.batchListener.wantsPollResult();
this.pollThreadStateProcessor = setUpPollProcessor(true);
}
else if (listener instanceof MessageListener) {
this.listener = (MessageListener<K, V>) listener;
this.batchListener = null;
this.isBatchListener = false;
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
+ "Acknowledging"
+ " not " + listener.getClass().getName());
}
this.listenerType = listenerType;
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
if (this.containerProperties.getScheduler() != null) {
this.taskScheduler = this.containerProperties.getScheduler();
this.taskSchedulerExplicitlySet = true;
}
else {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.initialize();
this.taskScheduler = threadPoolTaskScheduler;
}
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, false));
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
|| checkDeserializer(findDeserializerClass(props, consumerProperties, true));
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property so we can use it directly from code elsewhere
this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
KafkaMessageListenerContainer.this.thisOrParentContainer
.getContainerProperties()
.setSyncCommitTimeout(this.syncCommitTimeout);
}
}
this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
this.micrometerHolder = obtainMicrometerHolder();
this.deliveryAttemptAware = setupDeliveryAttemptAware();
this.subBatchPerPartition = setupSubBatchPerPartition();
this.lastReceivePartition = new HashMap<>();
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
this.pausedPartitions = new HashSet<>();
}
到这里整个生命周期的启动流程就结束了。
总结一下整个过程
4消费过程
4.1拉取消息
public void run() {
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
publishConsumerStartedEvent();
Throwable exitThrowable = null;
while (isRunning()) {
try {
pollAndInvoke();
}
....
finally {
clearThreadState();
}
}
wrapUp(exitThrowable);
}
protected void pollAndInvoke() {
...
//❶
ConsumerRecords<K, V> records = doPoll();
...
debugRecords(records);
resumeConsumerIfNeccessary();
...
//❷
invokeIfHaveRecords(records);
}
先分析❶doPoll(),拉取消息
private ConsumerRecords<K, V> doPoll() {
ConsumerRecords<K, V> records;
if (this.isBatchListener && this.subBatchPerPartition) {
if (this.batchIterator == null) {
this.lastBatch = pollConsumer();
captureOffsets(this.lastBatch);
if (this.lastBatch.count() == 0) {
return this.lastBatch;
}
else {
this.batchIterator = this.lastBatch.partitions().iterator();
}
}
TopicPartition next = this.batchIterator.next();
List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);
records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));
if (!this.batchIterator.hasNext()) {
this.batchIterator = null;
}
}
else {
records = pollConsumer();
captureOffsets(records);
checkRebalanceCommits();
}
return records;
}
不管是批量监听还是单个监听都会调pollConsumer拉取消息
private ConsumerRecords<K, V> pollConsumer() {
beforePoll();
try {
return this.consumer.poll(this.pollTimeout);
}
catch (WakeupException ex) {
return ConsumerRecords.empty();
}
}
这个就是调用kafka底层拉取消息的逻辑了。
4.2消息监听
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
if (records != null && records.count() > 0) {
this.receivedSome = true;
savePositionsIfNeeded(records);
notIdle();
notIdlePartitions(records.partitions());
invokeListener(records);
}
else {
checkIdle();
}
if (records == null || records.count() == 0
|| records.partitions().size() < this.consumer.assignment().size()) {
checkIdlePartitions();
}
}
当records不为空时,调用invokeListener,知名见义
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
if (this.stopImmediate && !isRunning()) {
break;
}
final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
if (record == null) {
continue;
}
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
doInvokeRecordListener(record, iterator);
if (this.commonRecordInterceptor != null) {
this.commonRecordInterceptor.afterRecord(record, this.consumer);
}
if (this.nackSleep >= 0) {
handleNack(records, record);
break;
}
}
}
如果我们listener是配置处理单个record,遍历records,
@Nullable
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {
Object sample = startMicrometerSample();
try {
invokeOnMessage(record);
successTimer(sample);
recordInterceptAfter(record, null);
}
return null;
}
RecordMessagingMessageListenerAdapter的onMessage方法。
@Nullable
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {
Object sample = startMicrometerSample();
try {
invokeOnMessage(record);
successTimer(sample);
recordInterceptAfter(record, null);
}
return null;
}
继续跟进
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements AcknowledgingConsumerAwareMessageListener<K, V> {
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
Message<?> message;
if (isConversionNeeded()) {
//❶
message = toMessagingMessage(record, acknowledgment, consumer);
}
else {
message = NULL_MESSAGE;
}
if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) {
this.logger.debug("Processing [" + message + "]");
}
try {
//❷
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
...
}
}
isConversionNeeded条件默认ture,spring会将kafka原生的ConsumerRecord转换成message对象
❶toMessagingMessage
private RecordMessageConverter messageConverter = new MessagingMessageConverter();
protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
return getMessageConverter().toMessage(record, acknowledgment, consumer, getType());
}
其中getMessageConverter(),默认是MessagingMessageConverter
接着看toMessage 方法
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Type type) {
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
//原生header
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
if (record.headers() != null) {
mapOrAddHeaders(record, rawHeaders);
}
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
//添加一些common header
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
record.offset(), ttName, record.timestamp());
if (this.rawRecordHeader) {
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
}
//将record转成message
Message<?> message = MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
if (this.messagingConverter != null && !message.getPayload().equals(KafkaNull.INSTANCE)) {
Class<?> clazz = type instanceof Class ? (Class<?>) type : type instanceof ParameterizedType
? (Class<?>) ((ParameterizedType) type).getRawType() : Object.class;
Object payload = this.messagingConverter.fromMessage(message, clazz, type);
if (payload != null) {
message = new GenericMessage<>(payload, message.getHeaders());
}
}
return message;
}
继续跟进createMessage
public static <T> Message<T> createMessage(@Nullable T payload, MessageHeaders messageHeaders) {
Assert.notNull(payload, "Payload must not be null");
Assert.notNull(messageHeaders, "MessageHeaders must not be null");
if (payload instanceof Throwable) {
return (Message<T>) new ErrorMessage((Throwable) payload, messageHeaders);
}
else {
return new GenericMessage<>(payload, messageHeaders);
}
}
最终返回GenericMessage对象
接下来就是调用listener方法了,回到前面第❷步invokeHandler
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {
try {
if (data instanceof List && !this.isConsumerRecordList) {
return this.handlerMethod.invoke(message, acknowledgment, consumer);
}
else {
if (this.hasMetadataParameter) {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
AdapterUtils.buildConsumerRecordMetadata(data));
}
else {
return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
}
}
}
...
}
例
1.通过payload方式用具体的消息类型获取
2. 通过ConsumerRecord获取
到这里整个监听的步骤完成了。

因为你的分享、点赞、在看


原文始发于微信公众号(小李的源码图):@KafkaListener底层是怎样实现的?
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/145485.html