该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

混合高级DSL和低级处理器API

Kafka Streams 提供了两种 API 变体。 它有更高级的类似DSL的API,可以串联各种作,很多函数式程序员可能都很熟悉。 Kafka Streams 还提供一个低级别的处理器 API。 处理器API虽然非常强大,并且能够在更低层次上控制事物,但本质上是必不可少的。 Kafka Streams 的 Spring Cloud Stream 绑定器允许你使用高级 DSL 或混合使用 DSL 和处理器 API。 混合这两种变体可以让你在应用中控制各种用例。 应用程序可以使用变换过程方法API调用以访问处理器API。spring-doc.cadn.net.cn

以下是如何结合 DSL 和处理器 API 在 Spring Cloud Stream 应用中的应用,使用过程应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

这里有一个使用以下条件的例子变换应用程序接口。spring-doc.cadn.net.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

过程API 方法调用是终端作,而变换API 是非终端的,可能会给你一个变换后的版本KStream你可以通过DSL或处理器API继续进行进一步处理。spring-doc.cadn.net.cn