|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
配置主题
如果你在应用程序上下文中定义了一个KafkaAdmin类型的bean,它会自动将主题添加到经纪人。
要做到这一点,你可以在应用程序上下文中为每个主题添加一个NewTopic @Bean。
2.3版引入了一个新的类TopicBuilder,以更方便地创建此类bean。
以下示例展示了如何实现:
-
Java
-
Kotlin
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从 2.6 版本开始,可以省略 partitions() 和/或 replicas(),那些属性将应用 broker 的默认值。
支持此功能的 broker 版本必须至少为 2.4.0 - 请参见 KIP-464。
-
Java
-
Kotlin
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从 2.7 版本开始,你可以在一个 KafkaAdmin.NewTopics 组件定义中声明多个 NewTopic:
-
Java
-
Kotlin
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
当使用 Spring Boot 时,会自动注册一个 KafkaAdmin bean,因此只需要 NewTopic(和/或 NewTopics)@Beans。 |
默认情况下,如果经纪人不可用,会记录一条消息,但上下文会继续加载。
你可以通过编程方式调用管理员的 initialize() 方法稍后重试。
如果你希望将此条件视为致命,请将管理员的 fatalIfBrokerNotAvailable 属性设置为 true。
上下文将无法初始化。
如果 broker 支持(1.0.0 或更高),当发现某个现有主题的分区数少于 NewTopic.numPartitions 时,管理员可以增加分区数量。 |
从 2.7 版本开始,KafkaAdmin 提供了在运行时创建和检查主题的方法。
-
createOrModifyTopics -
describeTopics
对于更高级的功能,您可以直接使用AdminClient。
以下示例展示了如何操作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从2.9.10、3.0.9版本开始,您可以提供一个Predicate<NewTopic>,该值可用于确定特定NewTopic bean是否应考虑创建或修改。
例如,如果您有多个KafkaAdmin实例指向不同的集群,并希望每个admin选择应由其创建或修改的主题,这将很有用。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));