此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
@KafkaListener
生命周期管理
为@KafkaListener
注解不是应用程序上下文中的 bean。
相反,它们使用KafkaListenerEndpointRegistry
.
该 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何具有autoStartup
设置为true
.
所有容器工厂创建的所有容器必须位于同一容器中phase
.
有关更多信息,请参阅侦听器容器自动启动。
可以使用注册表以编程方式管理生命周期。
启动或停止注册表将启动或停止所有已注册的容器。
或者,您可以使用其id
属性。
您可以设置autoStartup
,这将覆盖配置到容器工厂中的默认设置。
您可以从应用程序上下文(例如自动连接)获取对 bean 的引用,以管理其已注册的容器。
以下示例显示了如何执行此作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 Bean 的容器不受注册表管理,可以从应用程序上下文中获取。
可以通过调用注册表的getListenerContainers()
方法。
2.2.5 版本添加了一个方便的方法getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为 bean 的容器。
返回的集合将包括任何已初始化的原型 Bean,但它不会初始化任何惰性 Bean 声明。
刷新应用程序上下文后注册的终结点将立即启动,无论其autoStartup 属性,以符合SmartLifecycle contract,其中autoStartup 仅在应用程序上下文初始化期间考虑。
延迟注册的一个示例是具有@KafkaListener 在原型范围内,在初始化上下文后创建实例。
从 2.8.7 版开始,您可以将注册表的alwaysStartAfterRefresh 属性设置为false 然后是容器的autoStartup 属性将定义容器是否启动。 |
从 KafkaListenerEndpointRegistry 检索 MessageListenerContainers
这KafkaListenerEndpointRegistry
提供检索方法MessageListenerContainer
实例以适应一系列管理方案:
所有容器:对于涵盖所有侦听器容器的作,请使用getListenerContainers()
检索全面的集合。
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
按 ID 划分的特定容器:要管理单个容器,getListenerContainer(String id)
启用按其 ID 检索。
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
动态容器过滤:在 3.2 版本中引入,两个重载getListenerContainersMatching
方法可以对容器进行精细选择。
一种方法采用Predicate<String>
用于基于 ID 的过滤作为参数,而另一个则采用BiPredicate<String, MessageListenerContainer>
获取可能包含容器属性或状态作为参数的更高级条件。
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
利用这些方法高效管理和查询MessageListenerContainer
实例。