该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

州商店

当使用高层DSL并进行相应调用时,Kafka Streams会自动创建状态存储。spring-doc.cadn.net.cn

如果你想实现一个新来的目标KTable(英国可爱的)音乐绑定为命名状态存储,然后你可以用以下策略实现。spring-doc.cadn.net.cn

假设你有以下函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置以下属性,即KTable(英国可爱的)音乐数据会被具体化到指定的状态存储中。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

你可以在应用中将自定义状态存储定义为豆子,这些存储会被绑定器检测并添加到Kafka Streams构建器中。 尤其是使用处理器 API 时,你需要手动注册状态存储。 为此,你可以在应用程序中创建一个 StateStore 作为 bean。 以下是定义此类Beans的示例。spring-doc.cadn.net.cn

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

这些状态存储器随后可以被应用程序直接访问。spring-doc.cadn.net.cn

在引导过程中,上述豆子会被结合器处理,并传递给Streams构建对象。spring-doc.cadn.net.cn

进入州商店:spring-doc.cadn.net.cn

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

但这在注册全球州商店时行不通。 如需注册全球状态商店,请参见下方关于定制的部分StreamsBuilderFactoryBean.spring-doc.cadn.net.cn