|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
Kafka Streams 绑定器中的绑定可视化与控制
从3.1.2版本开始,Kafka Streams 绑定器支持绑定可视化和控制。仅支持两个生命周期阶段停止和开始. 生命周期阶段暂停和恢复在Kafka Streams的活页夹中没有。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果你更喜欢用 webflux,也可以包含Spring BootStarters网流而不是标准的网络依赖。
此外,你还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明这一特性,我们以以下应用为参考:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
如我们所见,该应用有两个 Kafka Streams 函数——一个是消费者,另一个是函数。消费者绑定默认命名为消费者-0. 同样,对于函数,输入绑定为函数在0中输出绑定为函数输出-0.
应用程序启动后,我们可以通过以下绑定端点查找绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
关于这三种装帧的详细信息可在上文找到。
让我们现在停止消费者对零的约束。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,不会通过这种装订收到任何记录。
重新绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数存在多个绑定时,调用这些作对任意绑定都有效。这是因为单个函数的所有绑定都由相同的绑定支持。StreamsBuilderFactoryBean. 因此,对于上述函数,以下函数在0中或函数输出-0会有问题的。