此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
连接到 Kafka
从 2.5 版开始,其中每一个都扩展了KafkaResourceFactory
.
这允许在运行时通过添加Supplier<String>
到他们的配置:setBootstrapServersSupplier(() -> …)
.
将为所有新连接调用此命令以获取服务器列表。
消费者和生产者通常是长寿的。
要关闭现有生产者,请调用reset()
在DefaultKafkaProducerFactory
.
要关闭现有使用者,请调用stop()
(然后start()
) 在KafkaListenerEndpointRegistry
和/或stop()
和start()
在任何其他侦听器容器 Bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster
支持两组引导服务器;其中一个随时处于活动状态。
配置ABSwitchCluster
并将其添加到生产者和消费者工厂,然后KafkaAdmin
,通过调用setBootstrapServersSupplier()
.
当你想切换时,请致电primary()
或secondary()
并调用reset()
在生产工厂建立新的连接;对于消费者来说,stop()
和start()
所有侦听器容器。
使用时@KafkaListener
s,stop()
和start()
这KafkaListenerEndpointRegistry
豆。
有关更多信息,请参阅 Javadocs。
工厂监听器
从 2.5 版开始,DefaultKafkaProducerFactory
和DefaultKafkaConsumerFactory
可以配置为Listener
在创建或关闭生产者或使用者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
是通过将client-id
属性(从metrics()
创建后)到工厂beanName
属性,分隔为.
.
例如,这些侦听器可用于创建和绑定千分尺KafkaClientMetrics
实例,当创建新客户端时(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer Native Metrics。
默认客户端 ID 前缀
从版本 3.2 开始,对于使用spring.application.name
属性,现在使用该名称
作为以下客户端类型的自动生成客户端 ID 的默认前缀:
-
不使用消费者组的消费者客户端
-
生产者客户
-
管理员客户端
这样可以更轻松地在服务器端识别这些客户端,以进行故障排除或应用配额。
客户端类型 | 没有应用程序名称 | 使用应用程序名称 |
---|---|---|
没有消费者组的消费者 |
消费者空 1 |
我的应用消费者-1 |
消费者组“mygroup” |
消费者-我的组-1 |
消费者-我的组-1 |
制作人 |
生产者-1 |
我的应用程序生产者-1 |
管理 |
管理客户端-1 |
我的应用程序管理员-1 |