好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

spring-Kafka中的@KafkaListener深入源码解读

前言

本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

一、总体流程

从spring启动开始处理@KafkaListener,到start消息监听整体流程图

二、源码解读

1、postProcessAfterInitialization

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public Object postProcessAfterInitialization( final Object bean, final String beanName) throws BeansException {

     if (! this .nonAnnotatedClasses.contains(bean.getClass())) {

         Class<?> targetClass = AopUtils.getTargetClass(bean);

        

         // 扫描@KafkaListener注解

         Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);

        

         ......

        

         if (annotatedMethods.isEmpty()) {

             this .nonAnnotatedClasses.add(bean.getClass());

             this .logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());

         }

         else {

             // Non-empty set of methods

             for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {

                 Method method = entry.getKey();

                 // 遍历扫描到的所有@KafkaListener注解并开始处理

                 for (KafkaListener listener : entry.getValue()) {

                     processKafkaListener(listener, method, bean, beanName);

                 }

             }

             this .logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"

                         + beanName + "': " + annotatedMethods);

         }

         // 处理在类上的@KafkaListener注解

         if (hasClassLevelListeners) {

             processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);

         }

     }

     return bean;

}

1.1、processKafkaListener

KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

?

1

2

3

4

5

6

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {

     Method methodToUse = checkProxy(method, bean);

     MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();

     endpoint.setMethod(methodToUse);

     processListener(endpoint, kafkaListener, bean, methodToUse, beanName);

}

1.2、processListener

KafkaListenerAnnotationBeanPostProcessor#processListener

将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,

         Object bean, Object adminTarget, String beanName) {

 

     String beanRef = kafkaListener.beanRef();

     if (StringUtils.hasText(beanRef)) {

         this .listenerScope.addListener(beanRef, bean);

     }

     endpoint.setBean(bean);

     endpoint.setMessageHandlerMethodFactory( this .messageHandlerMethodFactory);

     endpoint.setId(getEndpointId(kafkaListener));

     endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));

     endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));

     endpoint.setTopics(resolveTopics(kafkaListener));

     endpoint.setTopicPattern(resolvePattern(kafkaListener));

     endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix" ));

     String group = kafkaListener.containerGroup();

 

     ......

  

     // 注册已经封装好的消费端-endpoint

     this .registrar.registerEndpoint(endpoint, factory);

    

     if (StringUtils.hasText(beanRef)) {

         this .listenerScope.removeListener(beanRef);

     }

}

1.3、registerEndpoint

KafkaListenerEndpointRegistrar#registerEndpoint

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {

    

     ......

    

     KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);

     synchronized ( this .endpointDescriptors) {

         // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动)

         if ( this .startImmediately) { // Register and start immediately

             this .endpointRegistry.registerListenerContainer(descriptor.endpoint,

                     resolveContainerFactory(descriptor), true );

         }

         else {

             // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动

             this .endpointDescriptors.add(descriptor);

         }

     }

}

 

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,

         boolean startImmediately) {

 

     ......

    

     synchronized ( this .listenerContainers) {

    

         ......

        

         // 1.创建消息监听容器

         MessageListenerContainer container = createListenerContainer(endpoint, factory);

         this .listenerContainers.put(id, container);

         if (StringUtils.hasText(endpoint.getGroup()) && this .applicationContext != null ) {

             List<MessageListenerContainer> containerGroup;

             if ( this .applicationContext.containsBean(endpoint.getGroup())) {

                 containerGroup = this .applicationContext.getBean(endpoint.getGroup(), List. class );

             }

             else {

                 containerGroup = new ArrayList<MessageListenerContainer>();

                 this .applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);

             }

             containerGroup.add(container);

         }

        

         // 2.是否立即启动消息监听

         if (startImmediately) {

             startIfNecessary(container);

         }

     }

}

1.4、startIfNecessary

KafkaListenerEndpointRegistry#startIfNecessary
启动消息监听

?

1

2

3

4

5

6

7

8

private void startIfNecessary(MessageListenerContainer listenerContainer) {

     if ( this .contextRefreshed || listenerContainer.isAutoStartup()) {

         // 启动消息监听

         // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑

         // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑

         listenerContainer.start();

     }

}

2、afterSingletonsInstantiated

这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

public void afterSingletonsInstantiated() {

     this .registrar.setBeanFactory( this .beanFactory);

 

     // 对"注册员"信息的完善

     if ( this .beanFactory instanceof ListableBeanFactory) {

         Map<String, KafkaListenerConfigurer> instances =

                 ((ListableBeanFactory) this .beanFactory).getBeansOfType(KafkaListenerConfigurer. class );

         for (KafkaListenerConfigurer configurer : instances.values()) {

             configurer.configureKafkaListeners( this .registrar);

         }

     }

 

     if ( this .registrar.getEndpointRegistry() == null ) {

         if ( this .endpointRegistry == null ) {

             Assert.state( this .beanFactory != null ,

                     "BeanFactory must be set to find endpoint registry by bean name" );

             this .endpointRegistry = this .beanFactory.getBean(

                     KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,

                     KafkaListenerEndpointRegistry. class );

         }

         this .registrar.setEndpointRegistry( this .endpointRegistry);

     }

 

     ......

 

     // Actually register all listeners

     // 整个方法这里才是关键

     // 创建MessageListenerContainer并注册

     this .registrar.afterPropertiesSet();

}

2.1、afterPropertiesSet

KafkaListenerEndpointRegistrar#afterPropertiesSet

?

1

2

3

public void afterPropertiesSet() {

     registerAllEndpoints();

}

2.2、registerAllEndpoints

KafkaListenerEndpointRegistrar#registerAllEndpoints

?

1

2

3

4

5

6

7

8

9

10

11

protected void registerAllEndpoints() {

     synchronized ( this .endpointDescriptors) {

         for (KafkaListenerEndpointDescriptor descriptor : this .endpointDescriptors) {

             // 这里是真正的创建ListenerContainer监听对象并注册

             this .endpointRegistry.registerListenerContainer(

                     descriptor.endpoint, resolveContainerFactory(descriptor));

         }

         // 启动时所有消息监听对象都注册之后,便将参数置为true

         this .startImmediately = true ;  // trigger immediate startup

     }

}

总结

以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

调试及相关源码版本:

?

1

2

org.springframework.boot:: 2.3 . 3 .RELEASE

spring-kafka: 2.5 . 4 .RELEASE

 

 

相关参考:

 

spring-kafka官方文档
spring容器之refresh方法

到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/ldw201510803006/article/details/115578280

查看更多关于spring-Kafka中的@KafkaListener深入源码解读的详细内容...

  阅读:17次