@KafkaListener 生命周期管理

带有@KafkaListener个注解创建的监听器容器并非应用程序上下文中的 bean。相反,它们是通过类型为 KafkaListenerEndpointRegistry 的基础设施 bean 注册的。该bean由框架自动声明,负责管理容器的生命周期;它会自动启动所有将autoStartup设置为true的容器。所有由所有容器工厂创建的容器必须在同一 phase。查看 Listener Container 自动启动 以获取更多信息。您可以通过使用注册表程序化地管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,你可以通过使用其 id 属性来获取到单个容器的引用。您可以将注解设置为autoStartup,这将覆盖容器工厂中配置的默认设置。您可以从应用程序上下文获取到 bean 的引用,例如通过自动装配,来管理其已注册的容器。以下示例展示了如何操作:spring-doc.cadn.net.cn

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表仅维护其管理的容器的生命周期;声明为 beans 的容器不由注册表管理,可以从应用上下文中获取。 可以通过调用注册表的 getListenerContainers() 方法获取一组受管理的容器。 版本 2.2.5 增加了方便的方法 getAllListenerContainers(),返回所有容器的集合,包括注册表管理的和声明为 beans 的。 返回的集合将包含任何已初始化的 prototype beans,但不会初始化任何 lazy bean 声明。spring-doc.cadn.net.cn

在应用程序上下文刷新后注册的端点将立即启动,无论其 autoStartup 属性,以遵守 SmartLifecycle 合同,其中 autoStartup 仅在应用程序上下文初始化期间考虑。 一个晚注册的示例是一个在原型作用域中的 bean,其实例在上下文初始化后创建。 从 2.8.7 版本开始,您可以将注册表的 alwaysStartAfterRefresh 属性设置为 false,然后容器的 autoStartup 属性将定义容器是否启动。

检索KafkaListenerEndpointRegistry中的MessageListener Containers

The KafkaListenerEndpointRegistry 提供了用于检索 MessageListenerContainer 实例的方法,以适应一系列管理场景:spring-doc.cadn.net.cn

所有容器: 对于涵盖所有监听器容器的操作,使用 getListenerContainers() 来获取一个全面的集合。spring-doc.cadn.net.cn

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

按ID特定容器: 为了管理单个容器,getListenerContainer(String id) 允许通过其 id 进行检索。spring-doc.cadn.net.cn

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

动态容器过滤:在版本 3.2 中引入,两个重载的 getListenerContainersMatching 方法可以实现对容器的精确定义选择。 一个方法以 Predicate<String>(基于 ID 的筛选)作为参数,而另一个方法则以 BiPredicate<String, MessageListenerContainer> (包含容器属性或状态等更高级别的标准)作为参数。spring-doc.cadn.net.cn

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

使用这些方法可以有效地管理并查询应用程序中的MessageListenerContainer实例。spring-doc.cadn.net.cn