州商店
当使用高层DSL并进行相应调用时,Kafka Streams会自动创建状态存储。
如果你想实现一个新来的目标KTable(英国可爱的)音乐绑定为命名状态存储,然后你可以用以下策略实现。
假设你有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,即KTable(英国可爱的)音乐数据会被具体化到指定的状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
你可以在应用中将自定义状态存储定义为 beans,绑定器会检测并添加到 Kafka Streams 构建器中。尤其是使用 Processor API 时,你需要手动注册状态存储。为此,你可以在应用中创建状态存储为 bean。以下是定义此类 beans 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
这些状态存储器随后可以被应用程序直接访问。
在引导过程中,上述豆子会被结合器处理,并传递给Streams构建对象。
进入州商店:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
但这在注册全局状态存储时无法实现。如需注册全局状态存储,请参见下方关于自定义的部分StreamsBuilderFactoryBean.