对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
连接到 Kafka
从 2.5 版开始,其中每一个都扩展了KafkaResourceFactory
.
这允许在运行时通过添加Supplier<String>
到他们的配置:setBootstrapServersSupplier(() -> …)
.
将为所有新连接调用此命令以获取服务器列表。
消费者和生产者通常是长寿的。
要关闭现有生产者,请调用reset()
在DefaultKafkaProducerFactory
.
要关闭现有使用者,请调用stop()
(然后start()
) 在KafkaListenerEndpointRegistry
和/或stop()
和start()
在任何其他侦听器容器 Bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster
支持两组引导服务器;其中一个随时处于活动状态。
配置ABSwitchCluster
并将其添加到生产者和消费者工厂,然后KafkaAdmin
,通过调用setBootstrapServersSupplier()
.
当你想切换时,请致电primary()
或secondary()
并调用reset()
在生产工厂建立新的连接;对于消费者来说,stop()
和start()
所有侦听器容器。
使用时@KafkaListener
s,stop()
和start()
这KafkaListenerEndpointRegistry
豆。
有关更多信息,请参阅 Javadocs。
工厂监听器
从 2.5 版开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以配置为Listener
在创建或关闭生产者或使用者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
是通过将client-id
属性(从metrics()
创建后)到工厂beanName
属性,分隔为.
.
例如,这些侦听器可用于创建和绑定千分尺KafkaClientMetrics
实例,当创建新客户端时(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer Native Metrics。