交互式查询

Kafka Streams binder API 会暴露一个名为互动查询服务用于交互式查询状态存储。 你可以在申请中以春季豆的形式访问这些信息。通过你的申请获取这颗豆子的一个简单方法是自动线豆子。spring-doc.cadn.net.cn

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦你获得了这个豆子的访问权限,就可以查询你感兴趣的特定州商店。见下文。spring-doc.cadn.net.cn

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

启动过程中,上述调用存储库的方法调用可能会失败。 比如,它可能还在初始化状态存储的过程中。 在这种情况下,重试该作会很有用。 Kafka Streams 绑定器提供了简单的重试机制来支持此功能。spring-doc.cadn.net.cn

以下是你可以用来控制这种重试的两个属性。spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认为1.spring-doc.cadn.net.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认是1000毫秒。spring-doc.cadn.net.cn

如果有多个 kafka 流应用实例在运行,那么在你可以交互式查询它们之前,你需要确定你查询的具体密钥托管在哪个应用实例。互动查询服务API提供了识别主机信息的方法。spring-doc.cadn.net.cn

为了实现这一点,你必须配置该属性application.server如下:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段:spring-doc.cadn.net.cn

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参见Javadoc上的相关方法。 对于这些方法,启动时如果底层 KafkaStreams 对象尚未准备好,可能会抛出异常。 上述重试特性同样适用于这些方法。spring-doc.cadn.net.cn

通过InteractiveQueryService提供的其他API方法

请使用以下API方法检索KeyQueryMetadata与给定存储和键组合相关的对象。spring-doc.cadn.net.cn

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

请使用以下API方法检索KakfaStreams与给定存储和键组合相关的对象。spring-doc.cadn.net.cn

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

自定义商店查询参数

有时候你需要在通过互动查询服务. 为此,从以下内容开始4.0.1结合剂版本,你可以提供一个豆子StoreQueryParametersCustomizer这是一个功能性接口,具有自定义方法StoreQueryParameter作为论点。 这是它的方法签名。spring-doc.cadn.net.cn

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

通过这种方法,应用程序可以进一步定制StoreQueryParameters(存储查询参数)比如让陈旧商店出现。spring-doc.cadn.net.cn

当这种豆子出现在应用中时,互动查询服务将称之为自定义在查询状态存储之前。spring-doc.cadn.net.cn

请记住,必须有一种独特的豆子StoreQueryParametersCustomizer申请表中提供。