|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
连接到 Kafka
从 2.5 版开始,其中每一个都扩展了KafkaResourceFactory.
这允许在运行时通过添加Supplier<String>到他们的配置:setBootstrapServersSupplier(() -> …).
将为所有新连接调用此命令以获取服务器列表。
消费者和生产者通常是长寿的。
要关闭现有生产者,请调用reset()在DefaultKafkaProducerFactory.
要关闭现有使用者,请调用stop()(然后start()) 在KafkaListenerEndpointRegistry和/或stop()和start()在任何其他侦听器容器 Bean 上。
为方便起见,该框架还提供了一个ABSwitchCluster支持两组引导服务器;其中一个随时处于活动状态。
配置ABSwitchCluster并将其添加到生产者和消费者工厂,然后KafkaAdmin,通过调用setBootstrapServersSupplier().
当你想切换时,请致电primary()或secondary()并调用reset()在生产工厂建立新的连接;对于消费者来说,stop()和start()所有侦听器容器。
使用时@KafkaListeners,stop()和start()这KafkaListenerEndpointRegistry豆。
有关更多信息,请参阅 Javadocs。
工厂监听器
从 2.5 版开始,DefaultKafkaProducerFactory和DefaultKafkaConsumerFactory可以配置为Listener在创建或关闭生产者或使用者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id是通过将client-id属性(从metrics()创建后)到工厂beanName属性,分隔为..
例如,这些侦听器可用于创建和绑定千分尺KafkaClientMetrics实例,当创建新客户端时(并在客户端关闭时关闭它)。
该框架提供的侦听器正是这样做的;请参阅 Micrometer Native Metrics。