Apache Kafka 支持
Apache Kafka 通过提供自动配置支持春-卡夫卡项目。
卡夫卡配置由外部配置属性控制,在Spring。卡夫卡。*.
例如,你可以声明以下部分application.properties:
-
Properties
-
YAML
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要创建启动主题,添加一个类型的豆子新主题.
如果话题已经存在,则忽略该“豆子”。 |
看卡夫卡属性更多支持的选项。
发送消息
斯普林斯卡夫卡模板是自动配置的,你可以直接在自己的豆子中自动接线,如下示例所示:
-
Java
-
Kotlin
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
// ...
fun someMethod() {
kafkaTemplate.send("someTopic", "Hello")
}
}
如果财产spring.kafka.producer.transaction-id-prefix定义为KafkaTransactionManager是自动配置的。
另外,如果记录消息转换器BEAN 是定义好的,它会自动关联到 Auto-配置卡夫卡模板. |
接收消息
当 Apache Kafka 基础设施存在时,任何豆子都可以被注释为@KafkaListener创建监听端点。
如果没有KafkaListenerContainerFactory定义后,默认配置会自动配置,密钥定义于Spring.卡夫卡.倾听者。*.
以下组件在某个话题主题:
-
Java
-
Kotlin
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果KafkaTransactionManager豆子是定义的,自动关联到集装箱工厂。
同样,如果RecordFilterStrategy,通用错误处理器,AfterRollback处理器或消费者意识RebalanceListener豆子是定义好的,它会自动关联到默认工厂。
一个习俗ChainedKafkaTransactionManager必须标记@Primary因为它通常引用自动配置KafkaTransactionManager豆。 |
卡夫卡流
Spring for Apache Kafka 提供了一个工厂豆子,用于创建StreamsBuilder对其流的生命周期进行对象管理和管理。
Spring Boot会自动配置所需的功能KafkaStreamsConfiguration豆子,只要卡夫卡流位于类路径上,Kafka Streams 由@EnableKafkaStreams注解。
启用Kafka Streams意味着必须设置应用ID和引导服务器。
前者可以通过以下方式配置spring.kafka.streams.application-id,默认为spring.application.name如果没有设置。
后者可以全局设置,或专门覆盖流。
还有若干额外的房产是专用房产;其他任意的卡夫卡性质可以通过以下方式设置spring.kafka.streams.properties(春卡夫卡.streams.properties)Namespace。
另请参见“其他卡夫卡属性”以获取更多信息。
使用原厂豆子时,先用线StreamsBuilder进入你的@Bean如下例所示:
-
Java
-
Kotlin
import java.util.Locale;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue)
.to("ks1Out",
Produced.with(Serdes.Integer(), new org.springframework.kafka.support.serializer.JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(),
org.springframework.kafka.support.serializer.JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int, String> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由StreamsBuilder对象会自动启动。
你可以用以下方式自定义这种行为Spring.kafka.streams.auto-startup财产。
你也可以注册任意数量的豆子,实现它们StreamsBuilderFactoryBeanConfigurer为了更高级的自定义。 |
其他卡夫卡作品
自动配置支持的属性见附录中的集成属性部分。 请注意,这些属性(连字符或骆驼壳)大多直接映射到Apache Kafka点状属性。 详情请参见Apache Kafka文档。
不包含客户端类型的属性 (制作人,消费者,管理或流)在他们名下被认为是通用的,适用于所有客户。
这些常见属性大多可以针对一种或多种客户端类型进行覆盖,如有需要。
Apache Kafka 指定属性的重要性为 HIGH、MEDIUM 或 LOW。 Spring Boot 自动配置支持所有高重要性属性、部分选定的中等和低重要性,以及没有默认值的属性。
只有卡夫卡支持的部分属性能直接通过卡夫卡属性类。
如果你想用不直接支持的额外属性配置各个客户端类型,可以使用以下属性:
-
Properties
-
YAML
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这设定了公共同点第一号提案卡夫卡属性到第一(适用于制作人、消费者、管理员和流媒体),提案二管理属性变为第二这三号提案消费者财产第三这第四号提案生产者属性第四以及第五号提案流属性为第五.
你也可以配置Spring Kafka。JsonDeserializer如下:
-
Properties
-
YAML
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,你可以禁用JsonSerializer以报头发送类型信息的默认行为:
-
Properties
-
YAML
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
| 以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。 |
使用嵌入式Kafka进行测试
Spring for Apache Kafka 提供了一种方便地测试嵌入 Apache Kafka 代理项目的方法。
要使用此功能,请在测试类中注释@EmbeddedKafka来自春季卡夫卡测试模块。
更多信息请参阅《Spring for Apache Kafka》参考手册。
为了让 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起工作,你需要重新映射嵌入式代理地址的系统属性(由EmbeddedKafkaBroker)映射到Apache Kafka的Spring Boot配置属性中。
有几种方法可以实现:
-
提供系统属性以映射嵌入的代理地址
spring.kafka.bootstrap-servers考试课内容:
-
Java
-
Kotlin
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka注解:
-
Java
-
Kotlin
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
-
Properties
-
YAML
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"