|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
Apache Kafka Streams 支持
从 1.1.4 版开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序使用它,请kafka-streamsjar 必须存在于类路径上。
它是 Spring for Apache Kafka 项目的可选依赖项,不会传递下载。
基本
参考 Apache Kafka Streams 文档建议使用该 API 的以下方式:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我们有两个主要组件:
-
StreamsBuilder:使用要构建的 APIKStream(或KTable) 实例。 -
KafkaStreams:管理这些实例的生命周期。
都KStream暴露给KafkaStreams实例由单个StreamsBuilder同时启动和停止,即使它们具有不同的逻辑。
换句话说,由StreamsBuilder与单个生命周期控制相关联。
一次KafkaStreams实例已被streams.close(),则无法重新启动。
相反,新的KafkaStreams必须创建实例以重新启动流处理。 |
弹簧管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean.
这是一个AbstractFactoryBean实现以公开StreamsBuildersingleton 实例作为 bean。
以下示例创建这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration对象而不是StreamsConfig. |
这StreamsBuilderFactoryBean还实现SmartLifecycle管理内部KafkaStreams实例。
与 Kafka Streams API 类似,您必须定义KStream实例,然后启动KafkaStreams.
这也适用于 Kafka Streams 的 Spring API。
因此,当您使用 defaultautoStartup = true在StreamsBuilderFactoryBean,您必须声明KStream实例StreamsBuilder在刷新应用程序上下文之前。
例如KStream可以是常规的 bean 定义,而使用 Kafka Streams API 时不会受到任何影响。
以下示例显示了如何执行此作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手动控制生命周期(例如,通过某些条件停止和启动),您可以引用StreamsBuilderFactoryBean直接使用工厂 bean () 前缀进行 bean。
因为&StreamsBuilderFactoryBean使用其内部KafkaStreams例如,停止并重新启动它是安全的。
一个新的KafkaStreams在每个start().
您还可以考虑使用不同的StreamsBuilderFactoryBean实例,如果您想控制KStream实例。
您还可以指定KafkaStreams.StateListener,Thread.UncaughtExceptionHandler和StateRestoreListener选项StreamsBuilderFactoryBean,这些KafkaStreams实例。
此外,除了间接将这些选项设置为StreamsBuilderFactoryBean,从 2.1.5 版开始,您可以使用KafkaStreamsCustomizercallback 接口来配置内部KafkaStreams实例。
请注意KafkaStreamsCustomizer覆盖StreamsBuilderFactoryBean.
如果您需要执行一些KafkaStreams作,您可以直接访问该内部KafkaStreams实例,使用StreamsBuilderFactoryBean.getKafkaStreams().
您可以自动布线StreamsBuilderFactoryBeanbean 的定义,但您应该确保在 bean 定义中使用完整类型,如以下示例所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,您可以添加@Qualifier如果使用接口 Bean 定义,则按名称注入。
以下示例显示了如何执行此作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从 2.4.1 版开始,工厂 bean 有一个新属性infrastructureCustomizer带类型KafkaStreamsInfrastructureCustomizer;这允许自定义StreamsBuilder(例如,添加状态存储)和/或Topology在创建流之前。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了默认的无作实现,以避免在不需要时必须实现这两种方法。
一个CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个定制器时。
KafkaStreams 千分尺支持
在 2.5.3 版本中引入,您可以配置KafkaStreamsMicrometerListener自动注册千分尺KafkaStreams由工厂 Bean 管理的对象:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
流 JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个JsonSerde使用 JSON 的实现,委托给JsonSerializer和JsonDeserializer序列化、反序列化和消息转换中所述。
这JsonSerde实现通过其构造函数(目标类型或ObjectMapper).
在下面的示例中,我们使用JsonSerde序列化和反序列化CatKafka 流的有效负载(JsonSerde可以在需要实例的地方以类似的方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/消费者工厂中使用时,您可以使用 fluent API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
用KafkaStreamBrancher
这KafkaStreamBrancherclass 引入了一种更方便的方法来构建条件分支KStream.
考虑以下不使用KafkaStreamBrancher:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用KafkaStreamBrancher:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,请StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。
从版本 2.2 开始,流配置现在作为KafkaStreamsConfiguration对象,而不是作为StreamsConfig. |
为了避免在大多数情况下使用样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了@EnableKafkaStreams注释,您应该将其放在@Configuration类。
您只需要声明一个KafkaStreamsConfiguration名为 beandefaultKafkaStreamsConfig.
一个StreamsBuilderFactoryBeanbean,命名为defaultKafkaStreamsBuilder,在应用程序上下文中自动声明。
您可以声明和使用任何额外的StreamsBuilderFactoryBeanBeans也是如此。
您可以通过提供实现StreamsBuilderFactoryBeanConfigurer.
如果有多个这样的豆子,它们将根据它们的Ordered.order财产。
清理和停止配置
当工厂停止时,KafkaStreams.close()用 2 个参数调用:
-
closeTimeout :等待线程关闭的时间(默认为
DEFAULT_CLOSE_TIMEOUT设置为 10 秒)。可以使用StreamsBuilderFactoryBean.setCloseTimeout(). -
leaveGroupOnClose :触发来自组的消费者请假调用(默认为
false).可以使用StreamsBuilderFactoryBean.setLeaveGroupOnClose().
默认情况下,当工厂 Bean 停止时,KafkaStreams.cleanUp()方法被调用。
从 2.1.2 版开始,工厂 bean 具有额外的构造函数,将CleanupConfig对象,该对象具有属性,可让您控制cleanUp()方法在start()或stop()或者两者都不是。
从 2.7 版开始,默认值是从不清理本地状态。
标题丰富器
3.0 版添加了HeaderEnricherProcessor扩展ContextualProcessor;提供与已弃用的相同的功能HeaderEnricher实现了已弃用的Transformer接口。
这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式求值的根对象有 3 个属性:
-
record-这org.apache.kafka.streams.processor.api.Record(key,value,timestamp,headers) -
key- 当前记录的键 -
value- 当前记录的值 -
context-这ProcessorContext,允许访问当前记录元数据
表达式必须返回byte[]或String(将转换为byte[]用UTF-8).
要在流中使用扩充器,请执行以下作:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改key或value;它只是添加标题。
| 每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
这是一个简单的示例,添加一个文字标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
3.0 版添加了MessagingProcessor扩展ContextualProcessor,提供与已弃用的MessagingTransformer实现了已弃用的Transformer接口。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)交互。
转换器需要实现MessagingFunction.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其GatewayProxyFactoryBean.
它还需要一个MessagingMessageConverter将键、值和元数据(包括标头)转换为 Spring Messaging 或从 Spring Messaging 转换Message<?>.
看[从KStream] 以获取更多信息。
从反序列化异常中恢复
2.3 版引入了RecoveringDeserializationExceptionHandler当发生反序列化异常时,它可以采取一些作。
请参阅 Kafka 文档DeserializationExceptionHandler,其中RecoveringDeserializationExceptionHandler是一个实现。
这RecoveringDeserializationExceptionHandler配置了ConsumerRecordRecoverer实现。
该框架提供了DeadLetterPublishingRecoverer将失败的记录发送到死信主题。
有关此恢复器的更多信息,请参阅发布死信记录。
要配置恢复器,请将以下属性添加到流配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()bean 可以是你自己的实现ConsumerRecordRecoverer.
交互式查询支持
从 3.2 版开始,Spring for Apache Kafka 提供了 Kafka Streams 中交互式查询所需的基本功能。
交互式查询在有状态 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。
因此,如果应用程序想要实现所考虑系统的当前视图,交互式查询提供了一种方法。
若要了解有关交互式查询的详细信息,请参阅本文。
Spring 中对 Apache Kafka 的支持以一个名为KafkaStreamsInteractiveQueryService这是围绕 Kafka Streams 库中交互式查询 API 的外观。
应用程序可以将此服务的实例创建为 bean,然后稍后使用它按其名称检索状态存储。
以下代码片段显示了一个示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设 Kafka Streams 应用程序有一个名为app-store,则可以通过KafkStreamsInteractiveQueryAPI 如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。
在这种情况下,应用程序使用的状态存储是只读键值存储。
Kafka Streams 应用程序可以使用其他类型的状态存储。
例如,如果应用程序更喜欢查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后稍后检索它。
因此,用于检索KafkaStreamsInteractiveQueryService具有通用存储类型签名,以便最终用户可以分配正确的类型。
这是来自 API 的类型签名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
调用此方法时,用户可以专门请求正确的状态存储类型,就像我们在上面的示例中所做的那样。
重试状态存储检索
尝试使用KafkaStreamsInteractiveQueryService,由于各种原因,有可能找不到状态存储。
如果这些原因是暂时的,KafkaStreamsInteractiveQueryService提供了一个选项,通过允许注入自定义RetryTemplate.
默认情况下,RetryTemmplate用于KafkaStreamsInteractiveQueryService使用最多 3 次尝试,固定回退为 1 秒。
以下是如何注入自定义RetryTemmplate到KafkaStreamsInteractiveQueryService最多尝试十次。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查询远程状态存储
上面显示的用于检索状态存储的 API -retrieveQueryableStore适用于本地可用的键值状态存储。
在生产环境中,Kafka Streams 应用程序很可能根据分区数量进行分发。
如果一个主题有四个分区,并且有四个同一 Kafka Streams 处理器的实例在运行,那么每个实例可能负责处理该主题中的单个分区。
在此方案中,调用retrieveQueryableStore可能无法给出实例正在寻找的正确结果,尽管它可能会返回有效的存储。
假设具有四个分区的主题具有有关各种键的数据,并且单个分区始终负责特定键。
如果调用的实例retrieveQueryableStore正在查找有关此实例未托管的密钥的信息,则它不会接收任何数据。
这是因为当前的 Kafka Streams 实例对此密钥一无所知。
要解决此问题,调用实例首先需要确保它们具有托管特定密钥的 Kafka Streams 处理器实例的主机信息。
这可以从同一application.id如下。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在查询特定键12345从名为app-store.
API 还需要相应的密钥序列化器,在本例中是IntegerSerializer.
Kafka Streams 会查看同一application.id并尝试查找哪个实例托管了这个特定的密钥,
找到后,它会将该主机信息作为HostInfo对象。
API 如下所示:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当使用同一 Kafka Streams 处理器的多个实例时application.id以这样的分布式方式,应用程序应该提供一个 RPC 层,其中可以通过 RPC 端点(如 REST 端点)查询状态存储。
有关这方面的更多详细信息,请参阅本文。
使用 Spring for Apache Kafka 时,使用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。
一旦有了 REST 端点,就可以使用它来从任何 Kafka Streams 实例查询状态存储,给定HostInfo实例知道托管密钥的位置。
如果托管实例的密钥是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。
但是,问题在于应用程序可能不知道进行调用的实例是托管密钥的位置,因为特定服务器可能会由于使用者重新平衡而丢失分区。
要解决此问题,KafkaStreamsInteractiveQueryService提供了一个方便的 API,用于通过 API 方法查询当前主机信息getCurrentKafkaStreamsApplicationHostInfo()返回当前HostInfo.
这个想法是,应用程序可以首先获取有关密钥持有位置的信息,然后比较HostInfo与关于当前实例的那个。
如果HostInfo数据匹配,则可以通过retrieveQueryableStore,否则使用 RPC 选项。
Kafka Streams 示例
以下示例结合了本章中介绍的各种主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}