连接到Kafka
-
KafkaAdmin- see 配置主题 -
ProducerFactory- see Sending Messages -
ConsumerFactory- see Receiving Messages
从 2.5 版本开始,每个类都扩展了 KafkaResourceFactory。
这允许在运行时通过向其配置添加 Supplier<String> 来更改引导服务器: setBootstrapServersSupplier(() -> …)。
这将对所有新连接调用以获取服务器列表。
生产者和消费者通常具有长生命周期。
要关闭现有生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。
要关闭现有消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后 start()),并在/或在任何其他监听器容器 bean 上的 stop() 和 start() 调用。
为了方便,框架还提供了一个ABSwitchCluster,它支持两组引导服务器;任何时候其中一组处于活动状态。
配置ABSwitchCluster并将它添加到生产者和消费者工厂中,以及调用setBootstrapServersSupplier()的KafkaAdmin。
当您想切换时,请调用primary()或secondary()并调用reset()以建立新的连接(对于生产者);对于消费者,请调用stop()和start()所有监听器容器。
在使用@KafkaListener时,请将stop()和start()作为KafkaListenerEndpointRegistry bean。
查看更多Java文档信息。
工厂监听器
从版本 2.5 开始,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 可以通过配置 Listener 来接收生产者或消费者创建或关闭的通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id都是通过将创建后从metrics()获得的client-id属性附加到工厂beanName属性来创建的,并用.分隔。
这些侦听器可以用于创建和绑定一个 Micrometer KafkaClientMetrics 实例,当新客户端被创建时(并在关闭客户端时将其关闭)。
框架提供了监听器来实现完全相同的功能;参见Micrometer原生指标。
默认客户端ID前缀
从版本 3.2 开始,对于使用 spring.application.name 属性定义应用程序名称的 Spring Boot 应用程序,该名称现在被用作这些客户端类型的自动生成的客户端 ID 的默认前缀:
-
不使用消费者组的消费者客户端
-
生产者客户端
-
管理员 客户端
这使得在服务器端更容易识别这些客户端,以便于故障排除或应用配额。
| 客户端类型 | 没有应用程序名称 | 带有应用程序名称 |
|---|---|---|
没有消费者组的消费者 |
consumer-null-1 |
myapp-consumer-1 |
带有消费者组 "mygroup" 的消费者 |
consumer-mygroup-1 |
consumer-mygroup-1 |
生产者 |
producer-1 |
myapp-producer-1 |
管理员 |
adminclient-1 |
myapp-admin-1 |