混合高级DSL和低级处理器API
Kafka Streams 提供了两种 API 变体。
它有更高级的类似DSL的API,可以串联各种作,很多函数式程序员可能都很熟悉。
Kafka Streams 还提供一个低级别的处理器 API。
处理器API虽然非常强大,并且能够在更低层次上控制事物,但本质上是必不可少的。
Kafka Streams 的 Spring Cloud Stream 绑定器允许你使用高级 DSL 或混合使用 DSL 和处理器 API。
混合这两种变体可以让你在应用中控制各种用例。
应用程序可以使用变换或过程方法API调用以访问处理器API。
以下是如何结合 DSL 和处理器 API 在 Spring Cloud Stream 应用中的应用,使用过程应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这里有一个使用以下条件的例子变换应用程序接口。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
这过程API 方法调用是终端作,而变换API 是非终端的,可能会给你一个变换后的版本KStream你可以通过DSL或处理器API继续进行进一步处理。