@KafkaListener底层是怎样实现的?

前言
自动装配
    注册BeanDefinition
     创建BeanPostProcessor
    激活BeanPostProcessor
注册KafkaListenerContainer
     遍历Endpoint
     创建ListenerContainer
kafkaListener启动
    启动生命周期
消费过程
    拉取消息
    消息监听


前言

    

       最近大佬给了我一个课外研究话题,在kafka中实现上下文的传递及自定义header和全链路跟踪。使用spring-kafka,生产端拓展比较清晰。生产者发的数据是同源,header和traceId可以很好设置。只需要在发送消息前后做拦截即可实现。但是消费端listener就比较复杂了,首先消息的来源方不一样,消息主体可能是不同的对象,这样不能做到统一的直接类型转换。然后消费端是批量消费,一次性拉取一段时间的数据,那么如何保证单个消息记录的上下文,header和traceId得到有效传递。所以只能先盘源码。


 

1.自动装配


1.1 注册BeanDefinition

springboot为我们提供了很多自动化配置,所以一般我们可以直接从xxxAutoConfiguration入手,所以我们直接打开KafkaAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {



    一般spring会提供Enablexxx注解,没有发现目标,我们接着看import的配置。在KafkaAnnotationDrivenConfiguration中找到了目标注解EnableKafka


@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    
    @Configuration(proxyBeanMethods = false)
    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    //即@ConditionalOnMissingBean("internalKafkaListenerAnnotationProcessor")
    static class EnableKafkaConfiguration {

    }

}


  当我们覆盖了internalKafkaListenerAnnotationProcessor,kafka驱动相关的配置就会被覆盖,默认情况下是空,该注解就会生效。我们接下来看EnableKafka的配置


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}


看到这个selector就很清晰了,前面几篇文章已经介绍过调用时机,这里就简单带过。当spring refreshContext()刷新上下文时,首先会先找到所有import注解,解析配置内容,注入beanDefinition对象。


@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[] { KafkaBootstrapConfiguration.class.getName() };
    }

}

        然后KafkaBootstrapConfiguration里注入了两个比较重要的对象,
KafkaListenerAnnotationBeanPostProcessor(自动装配用),
KafkaListenerEndpointRegistry(注册endpoint用)

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition(
                KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

            registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
                    new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
        }

        if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
            registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                    new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
        }
    }

}

1.2  创建BeanPostProcessor


既然注册了BeanPostProcessor,在创建bean之前会先创建BeanPostProcessor。同样也是在Spring refreshContext()中,通过调用
registerBeanPostProcessors()方法创建BeanPostProcessor。遵守spring的生命周期。和第一步一起总结。


@KafkaListener底层是怎样实现的?


1.3 激活BeanPostProcessor


    我们知道BeanPostProcessor主要在初始化bean后做增强处理,下面是KafkaListenerAnnotationBeanPostProcessor的类图,从图中可以看出,它主要做的就是后置增强处理

@KafkaListener底层是怎样实现的?

    

我们直接看实现




public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            //
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            //
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);

            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        //
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
            }
            if (hasClassLevelListeners) {
                //
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }    
        
        
}


我们再看KafkaListener注解


@KafkaListener底层是怎样实现的?


即可以用在类上,也可以用在方法上。

所以处是寻找所有在类上标记的kafkaListener,

处是寻找所有在方法上标记的kafkaListener.

同时看图中蓝色标记的部分,当KafkaListener标记在类上时,必须在方法上标记KafkaHandler注解才能起作用,这和异步注解@Async有点不一样.


❸processKafkaListener ,遍历该类上所有标记了KafkaListener注解的方法

❹processMultiMethodListeners,主要是遍历类上标记了KafkaListener,同时方法上标记了KafkaHandler 的方法。


我们先看❸processKafkaListener



    protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
        //找到真实的bean上的方法非代理bean
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setMethod(methodToUse);

        String beanRef = kafkaListener.beanRef();
        this.listenerScope.addListener(beanRef, bean);
        String[] topics = resolveTopics(kafkaListener);
        TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
        if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
            //解析方法上的listener
            processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
        }
        this.listenerScope.removeListener(beanRef);
    }


继续跟进processListener方法


      protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
                                Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
        //
        processKafkaListenerAnnotationBeforeRegistration(endpoint, kafkaListener, bean, topics, tps);

        //
        String containerFactory = resolve(kafkaListener.containerFactory());
        
        //
        KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName);

        //
        this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
        //
        processKafkaListenerEndpointAfterRegistration(endpoint, kafkaListener);
    }


首先看注册前置处理



    private void processKafkaListenerAnnotationBeforeRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
            KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {

        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
        endpoint.setTopicPartitions(tps);
        endpoint.setTopics(topics);
        endpoint.setTopicPattern(resolvePattern(kafkaListener));
        endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
        String group = kafkaListener.containerGroup();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }
        String concurrency = kafkaListener.concurrency();
        if (StringUtils.hasText(concurrency)) {
            endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
        }
        String autoStartup = kafkaListener.autoStartup();
        if (StringUtils.hasText(autoStartup)) {
            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
        }
        resolveKafkaProperties(endpoint, kafkaListener.properties());
        endpoint.setSplitIterables(kafkaListener.splitIterables());
        if (StringUtils.hasText(kafkaListener.batch())) {
            endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
        }
    }


主要就是初始化MethodKafkaListenerEndpoint,根据@KafkaListener的配置做方法级别的端点配置。


❷获取@KafkaListener上的containerFactory配置的beanName,


❸resolveContainerFactory,如果注解上有配置containerFactory,则从spring上下文中获取,如果没有,暂时返回为空


❹registerEndpoint,登记Endpoint



public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
    
    public void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // Factory may be null, we defer the resolution right before actually creating the container
        KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            if (this.startImmediately) { // Register and start immediately
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);

            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }


}


这步主要是将KafkaListenerEndpoint封装成KafkaListenerEndpointDescriptor,是KafkaListenerEndpointRegistrar的内部类,对KafkaListenerEndpoint的进一步包装,主要增加了KafkaListenerContainerFactory,再次说明每个kafkaListener标记的方法的endPoint是可以配置自己单独的ContainerFactory。这点和spring cache有点相似。


public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    private static final class KafkaListenerEndpointDescriptor {

            private final KafkaListenerEndpoint endpoint;

            private final KafkaListenerContainerFactory<?> containerFactory;

            private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
                    @Nullable KafkaListenerContainerFactory<?> containerFactory) {

                this.endpoint = endpoint;
                this.containerFactory = containerFactory;
            }

        }

}


如果startImmediately为true(默认为false),则委托给endpointRegistry,注册ListenerContainer,如果false,则先存入集合中,待后续统一处理。


❺processKafkaListenerEndpointAfterRegistration登记后置处理




//#KafkaListenerAnnotationBeanPostProcessor.class
private void processKafkaListenerEndpointAfterRegistration(MethodKafkaListenerEndpoint<?, ?> endpoint,
            KafkaListener kafkaListener) {

        endpoint.setBeanFactory(this.beanFactory);
        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            resolveErrorHandler(endpoint, kafkaListener);
        }
        String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(), "contentTypeConverter");
        if (StringUtils.hasText(converterBeanName)) {
            resolveContentTypeConverter(endpoint, kafkaListener);
        }
    }
 

主要是设置异常处理器和消息转化器,针对kafkaListener注解上的配置,如果有则配置,没有则暂时为空。


总结上面流程


@KafkaListener底层是怎样实现的?


2.注册KafkaListenerContainer


2.1 遍历Endpoint

再看KafkaListenerAnnotationBeanPostProcessor类

@KafkaListener底层是怎样实现的?


它实现了SmartInitializingSingleton,并且它的order最大,也就是它会在所有bean创建完成之后,调用afterSingletonsInstantiated




//KafkaListenerAnnotationBeanPostProcessor.class
@Override
    public void afterSingletonsInstantiated() {
        this.registrar.setBeanFactory(this.beanFactory);

        //针对listener的单独配置
        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, KafkaListenerConfigurer> instances =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
            for (KafkaListenerConfigurer configurer : instances.values()) {
                configurer.configureKafkaListeners(this.registrar);
            }
        }
        //设置EndpointRegistry
        if (this.registrar.getEndpointRegistry() == null) {
            if (this.endpointRegistry == null) {
                Assert.state(this.beanFactory != null,
                        "BeanFactory must be set to find endpoint registry by bean name");
                this.endpointRegistry = this.beanFactory.getBean(
                        KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                        KafkaListenerEndpointRegistry.class);
            }
            this.registrar.setEndpointRegistry(this.endpointRegistry);
        }

        //设置ContainerFactoryBeanName
        if (this.defaultContainerFactoryBeanName != null) {
            this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        //设置message format
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
        }
        else {
            addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
        }

        // Actually register all listeners
        //注册所有的listener
        this.registrar.afterPropertiesSet();
        Map<String, ContainerGroupSequencer> sequencers =
                this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
        sequencers.values().forEach(seq -> seq.initialize());
    }

❶如果listener有指定configure配置,则配置listener


设置EndpointRegistry,如果EndpointRegistry为空,则从spring上下文中获取KafkaListenerEndpointRegistry,如果你对自动装配还有印象的话,在第一步注册BeanDefinition对象时,共注入了两个对象,第一个是KafkaListenerAnnotationBeanPostProcessor,第二个就是KafkaListenerEndpointRegistry,所以这里可以拿到。

后面可以看到KafkaListenerEndpointRegistrar注册所有的Endpoints,事实上是委托给了KafkaListenerEndpointRegistry去真正执行。


❸设置ContainerFactoryBeanName,默认的name就是kafkaListenerContainerFactory


❹设置ConversionService,如果没有配置,则获取默认的。



❺注册所有的listener,交给KafkaListenerEndpointRegistrar



public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {

    protected void registerAllEndpoints() {
            synchronized (this.endpointDescriptors) {
                for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                    if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint
                            && this.validator != null) {
                        ((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
                    }
                    //
                    this.endpointRegistry.registerListenerContainer(
                            descriptor.endpoint, resolveContainerFactory(descriptor));

                }
            
                this.startImmediately = true;  // trigger immediate startup
            }
        }

}


endpointDescriptors,如果你还有印象,在上面激活processor的后置处理的最后一步,将所有EndPoint及它的containerFactory封装成KafkaListenerEndpointDescriptor,存入到了集合中。

遍历所有的KafkaListenerEndpointDescriptor,委托给KafkaListenerEndpointRegistrar去注册,每个endpoint对应的是一个被KafkaListener注解标识的方法。

继续往下跟进

public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

    private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();

    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
                boolean startImmediately) {

            String id = endpoint.getId();
            synchronized (this.listenerContainers) {
                Assert.state(!this.listenerContainers.containsKey(id),
                        "Another endpoint is already registered with id '" + id + "'");
                //创建ListenerContainer
                MessageListenerContainer container = createListenerContainer(endpoint, factory);
                this.listenerContainers.put(id, container);
                ConfigurableApplicationContext appContext = this.applicationContext;
                //如果是消费者组,则将container加入消费者组
                String groupName = endpoint.getGroup();
                if (StringUtils.hasText(groupName) && appContext != null) {
                    List<MessageListenerContainer> containerGroup;
                    ContainerGroup group;
                    if (appContext.containsBean(groupName)) { // NOSONAR - hasText
                        containerGroup = appContext.getBean(groupName, List.class); // NOSONAR - hasText
                        group = appContext.getBean(groupName + ".group", ContainerGroup.class);
                    }
                    else {
                        containerGroup = new ArrayList<MessageListenerContainer>();
                        appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
                        group = new ContainerGroup(groupName);
                        appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
                    }
                    containerGroup.add(container);
                    group.addContainers(container);
                }
                //如果需要立刻启动
                if (startImmediately) {
                    startIfNecessary(container);
                }
            }
        }

}       


❶createListenerContainer,

创建ListernerContainer,通过命名我们其实可以把每个Listerner理解成一个容器,类似tomcat,每个独立的容器可以处理自己的请求,即topic类比url,每个容器有自己独立的生命周期,被spring维护。后面再详细的介绍


❷如果listener有配置group,说明它加入了一个消费者组共同消费。


❸startImmediately,又看见这个变量,这个时候还是false,如果是true,则调动startIfNecessary启动容器,后面会详细介绍这个方法。


我们把视线回到❶createListenerContainer这步,go down继续往下。

2.2创建ListenerContainer



//#KafkaListenerEndpointRegistry
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
            KafkaListenerContainerFactory<?> factory) {

        ...
        //
        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
        ...

        return listenerContainer;
    }


主要看factory.createListenerContainer(endpoint)

该factory如果我们自己有配置,就是我们自己配置的containerFactory,会调用父类的createListenerContainer方法


public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
        implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean,
            ApplicationContextAware {
            
                    
            @Override
            public C createListenerContainer(KafkaListenerEndpoint endpoint) {
                //创建容器实例
                C instance = createContainerInstance(endpoint);
                JavaUtils.INSTANCE
                        .acceptIfNotNull(endpoint.getId(), instance::setBeanName);
                if (endpoint instanceof AbstractKafkaListenerEndpoint) {
                    configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
                }
                //配置容器
                endpoint.setupListenerContainer(instance, this.messageConverter);
                //初始化容器
                initializeContainer(instance, endpoint);
                //个性化配置
                customizeContainer(instance);
                return instance;
            }
    
            
        }


图中标记的四步,有点类似spring创建bean的方式。


我们首先看❶createContainerInstance(endpoint),默认实现是

ConcurrentKafkaListenerContainerFactory



public class ConcurrentKafkaListenerContainerFactory<K, V>
        extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
        
    @Override
    protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
        TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
        if (topicPartitions != null && topicPartitions.length > 0) {
            ContainerProperties properties = new ContainerProperties(topicPartitions);
            return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
        }
        else {
            Collection<String> topics = endpoint.getTopics();
            if (!topics.isEmpty()) { // NOSONAR
                ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
                return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
            }
            else {
                ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONAR
                return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
            }
        }
    }
                
}


可以看到,不管endpoint的分区选择是否配置,最终创建的container都会绑定consumerFactory


再看❷endpoint.setupListenerContainer



public abstract class AbstractKafkaListenerEndpoint<K, V>
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {
        
    private void setupMessageListener(MessageListenerContainer container,@Nullable MessageConverter messageConverter) {
        
        MessagingMessageListenerAdapter<K, V> adapter = createMessageListener(container, messageConverter);
        ...
        Object messageListener = adapter;
        ...
        container.setupMessageListener(messageListener);
    }
        
}               


我们先看createMessageListener,创建消息监听器


public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {

    @Override
    protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
            @Nullable MessageConverter messageConverter) {

        Assert.state(this.messageHandlerMethodFactory != null,
                "Could not create message listener - MessageHandlerMethodFactory not set");
        MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
        messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
        JavaUtils.INSTANCE
            .acceptIfNotNull(getReplyTopic(), replyTopic -> {
                Assert.state(getMethod().getReturnType().equals(void.class)
                        || getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
                messageListener.setReplyTopic(replyTopic);
            })
            .acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate);

        return messageListener;
    }


}

这里面有两个比较重要的方法

第一个是真正创建MessageListener实例


public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {

    protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
            @Nullable MessageConverter messageConverter) {

        MessagingMessageListenerAdapter<K, V> listener;
        //
        if (isBatchListener()) {
            BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
                    this.bean, this.method, this.errorHandler);
            BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
            if (batchToRecordAdapter != null) {
                messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
            }
            if (messageConverter instanceof BatchMessageConverter) {
                messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
            }
            listener = messageListener;
        }
        else {
            //
            RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
                    this.bean, this.method, this.errorHandler);
            if (messageConverter instanceof RecordMessageConverter) {
                messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
            }
            listener = messageListener;
        }
        //
        if (this.messagingConverter != null) {
            listener.setMessagingConverter(this.messagingConverter);
        }
        BeanResolver resolver = getBeanResolver();
        if (resolver != null) {
            listener.setBeanResolver(resolver);
        }
        return listener;
    }



}


❶isBatchListener(),如果我们的listener配置了batchListener,那就是batchListener,默认是false


默认是单个消息处理listener,通过new的方式创建的RecordMessagingMessageListenerAdapter

这里会调父类构造方法


    public MessagingMessageListenerAdapter(Object bean, Method method) {
        this.bean = bean;
        this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
    }


其中 determineInferredType 会根据我们的Listener method上的对象类型设置payLoad类型,这个在后面对象转换时会用到


❸messagingConverter


如果我们在创建KafkaListenerContainerFactory时没有传入,那默认就是null


MessageListener的类图如下


@KafkaListener底层是怎样实现的?


MessageListener是后面将kafkaConsumer的comsumerRecord对象转成Spring message对象的类,后面还会继续讲到。


回到createMessageListener那里,创建好MessageListener后,会设置HandleMethod,

messageListener.setHandlerMethod(configureListenerAdapter(messageListener))

这步主要是将MessageListener绑定HanderMethod,通过反射方法,最终会调用我们实际的Listener方法
    protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
        InvocableHandlerMethod invocableHandlerMethod =
                this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
        return new HandlerAdapter(invocableHandlerMethod);
    }
我们看InvocableHandlerMethod的invoke方法,java的反射逻辑了。
@Nullable
    protected Object doInvoke(Object... args) throws Exception {
        try {
            return getBridgedMethod().invoke(getBean(), args);
        }
        ...
    }


这样整个创建MessageListener的阶段主要是针对每个endpoint创建消息的转化器,并通过反射的方式最后调用我们真正的listener方法。


    我们再回到创建容器的第initializeContainer(instance, endpoint)


protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
        ContainerProperties properties = instance.getContainerProperties();
        ...
        instance.setRecordInterceptor(this.recordInterceptor);
        instance.setBatchInterceptor(this.batchInterceptor);

        ...
    }
这一步主要是设置record的拦截,这个也是预留对消息的拦截处理的扩展点


然后最后customizeContainer也是预留对containner的配置。

最后整个容器的创建注册也就完成了。

@KafkaListener底层是怎样实现的?



3 kakfa listener启动


3.1启动生命周期

在上面注册Endpoint,ListenerContainer的时候,我们可以看到主要是通过KafkaListenerEndpointRegistrar委托给KafkaListenerEndpointRegistry

去执行的,而KafkaListenerEndpointRegistry是kafka配置类自动装配时引入的。我们再仔细看看KafkaListenerEndpointRegistry的类图


@KafkaListener底层是怎样实现的?


可以看到KafkaListenerEndpointRegistry实现了Lifecycle,这是spring提供生命周期维护的重要类,可以通过扩展该接口来管理一些bean的生命周期。
当ApplicationContext接口启动和关闭时,它会调用本容器内所有的Lifecycle实现。


public interface Lifecycle {
    void start();

    void stop();

    boolean isRunning();
}
在一些开源框架中,我们也可以看到很多基于spring生命周期来扩展的例子
如eureka服务端的启动就是基于import的类实现了Lifecycle

 

@KafkaListener底层是怎样实现的?


Lifecycle是什么时候触发的?这里再简单提一下,如果想看详细过程的可以看我之前写的springboot源码。,当完成了bean的创建等过程后,spring就会调用finishRefresh()
protected void finishRefresh() {
        // Clear context-level resource caches (such as ASM metadata from scanning).
        clearResourceCaches();

        // Initialize lifecycle processor for this context.
        initLifecycleProcessor();

        // Propagate refresh to lifecycle processor first.
        getLifecycleProcessor().onRefresh();

        // Publish the final event.
        publishEvent(new ContextRefreshedEvent(this));

        // Participate in LiveBeansView MBean, if active.
        if (!NativeDetector.inNativeImage()) {
            LiveBeansView.registerApplicationContext(this);
        }
    }

拿到LifeCycleProcessor后调用onRefresh方法,再拿到所有实现了Lifecycle的bean,调用start方法


            for (LifecycleGroupMember member : this.members) {
                doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
            }
            ....
            
            private void doStart(){
                try {
                    bean.start();
                }
            }       


然后KafkaListenerEndpointRegistry实现了Lifecycle


public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, DisposableBean, SmartLifecycle,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
            
        @Override
        public void start() {
            for (MessageListenerContainer listenerContainer : getListenerContainers()) {
                startIfNecessary(listenerContainer);
            }
            this.running = true;
        }
        
}     


可以看到,当调用start方法时会拿到前面注册好的所有listenerContainers,,再一次调用startIfNecessary方法启动该方法在前面注册endpoint出现过几次,条件是配置了立即启动。


     #KafkaListenerEndpointRegistry.class
    private void startIfNecessary(MessageListenerContainer listenerContainer) {
        if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            listenerContainer.start();
        }
    }


事实上MessageListenerContainer也实现了Lifecycle

@KafkaListener底层是怎样实现的?


那这里为什么不直接调MessageListenerContainer方法呢?会不会启动两次?我们接着往下看


public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    @Override
    protected void doStart() {
        //
        if (!isRunning()) {
            checkTopics();
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            //
            setRunning(true);
            //
            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer<K, V> container =
                        constructContainer(containerProperties, topicPartitions, i);

                configureChildContainer(i, container);
                if (isPaused()) {
                    container.pause();
                }
                //
                container.start();
                this.containers.add(container);
            }
        }
    }
}


第一个问题,之前在注册endpoint的时候,如果判断该endpoint需要立即启动startIfNecessary,然后调用MessageListenerContainer的start方法。很显然,这个是单个启动,而通过KafkaListenerEndpointRegistry的批量启动。
第二个问题,我们可以看到❶❷出标记的地方,当MessageListenerContainer没有启动时才会启动,启动后会标记成已启动的状态,这样就不能重复启动了。

❸处concurrency可以看到如果你topic有多个分区配置,并发量concurrency就是分区的数量。这样保证了不会有多余的消费者启动浪费资源。我们知道kafka同一个分区只允许消费者 组里的一个消费者消费,如果消费者数量大于分区数量,那么就会有消费者消费不到消息。

当我们没有配置时,默认情况下concurrency的值就是1,代表只会启动一个container即KafkaMessageListenerContainer,即启动一个消费者消费。
当我们有配置大于1,他会根据配置启动container,然后加入一个消费者group.


public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
        
    private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
        String beanName = getBeanName();
        beanName = (beanName == null ? "consumer" : beanName) + "-" + index;
        container.setBeanName(beanName);
        ApplicationContext applicationContext = getApplicationContext();
        if (applicationContext != null) {
            container.setApplicationContext(applicationContext);
        }
        ApplicationEventPublisher publisher = getApplicationEventPublisher();
        if (publisher != null) {
            container.setApplicationEventPublisher(publisher);
        }
        container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
        container.setGenericErrorHandler(getGenericErrorHandler());
        container.setCommonErrorHandler(getCommonErrorHandler());
        container.setAfterRollbackProcessor(getAfterRollbackProcessor());
        container.setRecordInterceptor(getRecordInterceptor());
        container.setBatchInterceptor(getBatchInterceptor());
        container.setInterceptBeforeTx(isInterceptBeforeTx());
        container.setEmergencyStop(() -> {
            stopAbnormally(() -> {
            });
        });
        AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
        if (exec == null) {
            if ((this.executors.size() > index)) {
                exec = this.executors.get(index);
            }
            else {
                exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
                this.executors.add(exec);
            }
            container.getContainerProperties().setConsumerTaskExecutor(exec);
        }
    }
}
当我们没有配置线程池时,默认的线程池是SimpleAsyncTaskExecutor,看名字就知道它只有一个线程。


然后再调用KafkaMessageListenerContainer的start方法启动容器。


public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

    @Override
    protected void doStart() {
        //
        if (isRunning()) {
            return;
        }
        
        ContainerProperties containerProperties = getContainerProperties();
        checkAckMode(containerProperties);

        //
        Object messageListener = containerProperties.getMessageListener();
        AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
        ListenerType listenerType = determineListenerType(listener);
        //
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = consumerExecutor
                .submitListenable(this.listenerConsumer);

        
    }
}


❶和之前一样,如果容器已启动,则不再启动。


❷getMessageListener,如果你还有印象,在第二章节遍历endPoint,创建container时通过endpoint.setupListenerContainer方法创建。如果是单个消费则创建的是RecordMessagingMessageListenerAdapter,如果是批量消费则是BatchMessagingMessageListenerAdapter。


❸就是最核心的地方了。通过new的方式创建了ListenerConsumer,首先

看ListenerConsumer的类图


@KafkaListener底层是怎样实现的?


再看构造方法


可以看到ListenerConsumer事实上是一个定时task.


ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
            Properties consumerProperties = propertiesFromProperties();
            checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
            this.autoCommit = determineAutoCommit(consumerProperties);
            this.consumer =
                    KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                            this.consumerGroupId,
                            this.containerProperties.getClientId(),
                            KafkaMessageListenerContainer.this.clientIdSuffix,
                            consumerProperties);


            this.clientId = determineClientId();
            this.transactionTemplate = determineTransactionTemplate();
            this.genericListener = listener;
            this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
            this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
                    KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
            subscribeOrAssignTopics(this.consumer);
            if (listener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener<K, V>) listener;
                this.isBatchListener = true;
                this.wantsFullRecords = this.batchListener.wantsPollResult();
                this.pollThreadStateProcessor = setUpPollProcessor(true);
            }
            else if (listener instanceof MessageListener) {
                this.listener = (MessageListener<K, V>) listener;
                this.batchListener = null;
                this.isBatchListener = false;
                this.wantsFullRecords = false;
                this.pollThreadStateProcessor = setUpPollProcessor(false);
            }
            else {
                throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
                        + "'BatchMessageListener', or the variants that are consumer aware and/or "
                        + "Acknowledging"
                        + " not " + listener.getClass().getName());
            }
            this.listenerType = listenerType;
            this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
                    || listenerType.equals(ListenerType.CONSUMER_AWARE);
            this.commonErrorHandler = determineCommonErrorHandler();
            Assert.state(!this.isBatchListener || !this.isRecordAck,
                    "Cannot use AckMode.RECORD with a batch listener");
            if (this.containerProperties.getScheduler() != null) {
                this.taskScheduler = this.containerProperties.getScheduler();
                this.taskSchedulerExplicitlySet = true;
            }
            else {
                ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
                threadPoolTaskScheduler.initialize();
                this.taskScheduler = threadPoolTaskScheduler;
            }
            this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, // NOSONAR
                    Duration.ofSeconds(this.containerProperties.getMonitorInterval()));

            if (this.containerProperties.isLogContainerConfig()) {
                this.logger.info(toString());
            }
            Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
            this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
                    || checkDeserializer(findDeserializerClass(props, consumerProperties, false));
            this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
                    || checkDeserializer(findDeserializerClass(props, consumerProperties, true));
            this.syncCommitTimeout = determineSyncCommitTimeout();
            if (this.containerProperties.getSyncCommitTimeout() == null) {
                // update the property so we can use it directly from code elsewhere
                this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
                if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
                    KafkaMessageListenerContainer.this.thisOrParentContainer
                            .getContainerProperties()
                            .setSyncCommitTimeout(this.syncCommitTimeout);
                }
            }
            this.maxPollInterval = obtainMaxPollInterval(consumerProperties);
            this.micrometerHolder = obtainMicrometerHolder();
            this.deliveryAttemptAware = setupDeliveryAttemptAware();
            this.subBatchPerPartition = setupSubBatchPerPartition();
            this.lastReceivePartition = new HashMap<>();
            this.lastAlertPartition = new HashMap<>();
            this.wasIdlePartition = new HashMap<>();
            this.pausedPartitions = new HashSet<>();
        }

可以看到,红色标记的部分创建真正的kafkaConsumer,事实上ListenerConsumer只是包裹了kafkaConsumer,针对Listener的一些差异配置做了一些封装,可以理解成一个task,绿色标记的部分正好说了它是一个定时schedule,默认值是30s,实际上不是,它是可以自己调整的。

到这里整个生命周期的启动流程就结束了。


总结一下整个过程

@KafkaListener底层是怎样实现的?


4消费过程



前面理解了整个创建过程,到这里就轻松多了。在创建kafkaConsumer,将kafkaConsumer提交到线程池后,定时调用run方法。


4.1拉取消息

        public void run() {
            ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
            publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            setupSeeks();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.count = 0;
            this.last = System.currentTimeMillis();
            initAssignedPartitions();
            publishConsumerStartedEvent();
            Throwable exitThrowable = null;
            while (isRunning()) {
                try {
                    pollAndInvoke();
                }
                ....
                finally {
                    clearThreadState();
                }
            }
            wrapUp(exitThrowable);
        }



while (isRunning())只要容器是正常运行中,并且拿到cpu调度时间片段,就会执行pollAndInvoke


protected void pollAndInvoke() {
            ...
            //
            ConsumerRecords<K, V> records = doPoll();
            ...
            debugRecords(records);
            resumeConsumerIfNeccessary();
            ...
            //
            invokeIfHaveRecords(records);
        }

先分析❶doPoll(),拉取消息


private ConsumerRecords<K, V> doPoll() {
            ConsumerRecords<K, V> records;
            if (this.isBatchListener && this.subBatchPerPartition) {
                if (this.batchIterator == null) {
                    this.lastBatch = pollConsumer();
                    captureOffsets(this.lastBatch);
                    if (this.lastBatch.count() == 0) {
                        return this.lastBatch;
                    }
                    else {
                        this.batchIterator = this.lastBatch.partitions().iterator();
                    }
                }
                TopicPartition next = this.batchIterator.next();
                List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);
                records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));
                if (!this.batchIterator.hasNext()) {
                    this.batchIterator = null;
                }
            }
            else {
                records = pollConsumer();
                captureOffsets(records);
                checkRebalanceCommits();
            }
            return records;
        }


不管是批量监听还是单个监听都会调pollConsumer拉取消息


    private ConsumerRecords<K, V> pollConsumer() {
            beforePoll();
            try {
                return this.consumer.poll(this.pollTimeout);
            }
            catch (WakeupException ex) {
                return ConsumerRecords.empty();
            }
        }

这个就是调用kafka底层拉取消息的逻辑了。

4.2消息监听


在4.1拉取消息后,调用invokeIfHaveRecords(records),进而调用listener代码。



private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
            if (records != null && records.count() > 0) {
                this.receivedSome = true;
                savePositionsIfNeeded(records);
                notIdle();
                notIdlePartitions(records.partitions());
                invokeListener(records);
            }
            else {
                checkIdle();
            }
            if (records == null || records.count() == 0
                    || records.partitions().size() < this.consumer.assignment().size()) {
                checkIdlePartitions();
            }
        }


当records不为空时,调用invokeListener,知名见义


    private void invokeListener(final ConsumerRecords<K, V> records) {
            if (this.isBatchListener) {
                invokeBatchListener(records);
            }
            else {
                invokeRecordListener(records);
            }
        }

可以看到这里根据我们listener的配置来分两种情况处理。


 private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
            Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
            while (iterator.hasNext()) {
                if (this.stopImmediate && !isRunning()) {
                    break;
                }
                final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
                if (record == null) {
                    continue;
                }
                this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
                doInvokeRecordListener(record, iterator);
                if (this.commonRecordInterceptor !=  null) {
                    this.commonRecordInterceptor.afterRecord(record, this.consumer);
                }
                if (this.nackSleep >= 0) {
                    handleNack(records, record);
                    break;
                }
            }
        }


如果我们listener是配置处理单个record,遍历records,


 @Nullable
        private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
                Iterator<ConsumerRecord<K, V>> iterator) {

            Object sample = startMicrometerSample();

            try {
                invokeOnMessage(record);
                successTimer(sample);
                recordInterceptAfter(record, null);
            }
          
            return null;
        }


重点看invokeOnMessage,最终会调到我们前面设置的messageListener,即

RecordMessagingMessageListenerAdapteronMessage方法。


 @Nullable
        private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record, // NOSONAR
                Iterator<ConsumerRecord<K, V>> iterator) {

            Object sample = startMicrometerSample();

            try {
                invokeOnMessage(record);
                successTimer(sample);
                recordInterceptAfter(record, null);
            }
          
            return null;
        }


继续跟进


public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
        implements AcknowledgingConsumerAwareMessageListener<K, V> {
        
        public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
            Consumer<?, ?> consumer) {

        Message<?> message;
        if (isConversionNeeded()) {
            //
            message = toMessagingMessage(record, acknowledgment, consumer);
        }
        else {
            message = NULL_MESSAGE;
        }
        if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) {
            this.logger.debug("Processing [" + message + "]");
        }
        try {
            //
            Object result = invokeHandler(record, acknowledgment, message, consumer);
            if (result != null) {
                handleResult(result, record, message);
            }
        }
        ...
        }
    }
        
       


 isConversionNeeded条件默认ture,spring会将kafka原生的ConsumerRecord转换成message对象


❶toMessagingMessage



    
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    
    protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
            Consumer<?, ?> consumer) {
        return getMessageConverter().toMessage(record, acknowledgment, consumer, getType());
    }


其中getMessageConverter(),默认是MessagingMessageConverter

@KafkaListener底层是怎样实现的?


接着看toMessage 方法



@Override
    public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
            Type type) {

        KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
                this.generateTimestamp);
        //原生header
        Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
        if (record.headers() != null) {
            mapOrAddHeaders(record, rawHeaders);
        }
        String ttName = record.timestampType() != null ? record.timestampType().name() : null;
        //添加一些common header
        commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
                record.offset(), ttName, record.timestamp());
        if (this.rawRecordHeader) {
            rawHeaders.put(KafkaHeaders.RAW_DATA, record);
        }
        //将record转成message
        Message<?> message = MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
        if (this.messagingConverter != null && !message.getPayload().equals(KafkaNull.INSTANCE)) {
            Class<?> clazz = type instanceof Class ? (Class<?>) type : type instanceof ParameterizedType
                    ? (Class<?>) ((ParameterizedType) type).getRawType() : Object.class;
            Object payload = this.messagingConverter.fromMessage(message, clazz, type);
            if (payload != null) {
                message = new GenericMessage<>(payload, message.getHeaders());
            }
        }
        return message;
    }


继续跟进createMessage

其中extractAndConvertValue(record, type)会根据type类型将record转成对应的类型,这个type在解析endpoint的时候已经解析出来了,前面有做分析,可以倒回到endpoint解析那里。


    public static <T> Message<T> createMessage(@Nullable T payload, MessageHeaders messageHeaders) {
        Assert.notNull(payload, "Payload must not be null");
        Assert.notNull(messageHeaders, "MessageHeaders must not be null");
        if (payload instanceof Throwable) {
            return (Message<T>) new ErrorMessage((Throwable) payload, messageHeaders);
        }
        else {
            return new GenericMessage<>(payload, messageHeaders);
        }
    }

最终返回GenericMessage对象


这样listener端既可以拿到原生的consumerRecord,也可以GenericMessage中的具体类型payload.

接下来就是调用listener方法了,回到前面第invokeHandler



protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
            Consumer<?, ?> consumer) {

        try {
            if (data instanceof List && !this.isConsumerRecordList) {
                return this.handlerMethod.invoke(message, acknowledgment, consumer);
            }
            else {
                if (this.hasMetadataParameter) {
                    return this.handlerMethod.invoke(message, data, acknowledgment, consumer,
                            AdapterUtils.buildConsumerRecordMetadata(data));
                }
                else {
                    return this.handlerMethod.invoke(message, data, acknowledgment, consumer);
                }
            }
        }
        ...
    }


其中handlerMethod在前面解析endpoint,创建MessageListner那里有赋值。最终会通过反射的方式调用@KafkaListener监听的方法。

1.通过payload方式用具体的消息类型获取

@KafkaListener底层是怎样实现的?


2. 通过ConsumerRecord获取


@KafkaListener底层是怎样实现的?


到这里整个监听的步骤完成了。


@KafkaListener底层是怎样实现的?




@KafkaListener底层是怎样实现的?

因为你的分享、点赞、在看

我足足的精气神儿!
@KafkaListener底层是怎样实现的?


@KafkaListener底层是怎样实现的?
关注我的你,是最香哒!













原文始发于微信公众号(小李的源码图):@KafkaListener底层是怎样实现的?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/145485.html

(0)
青莲明月的头像青莲明月

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!