绑定可视化与控制

Spring Cloud Stream 支持通过执行器端点以及编程方式对绑定进行可视化和控制。spring-doc.cadn.net.cn

程序化方式

自3.1版本起,我们暴露了org.springframework.cloud.stream.binding.BindingsLifecycleController注册时为Bean和Once 注入可以用来控制单个绑定的生命周期spring-doc.cadn.net.cn

例如,查看某个测试用例中的片段。如你所见,我们正在取回BindingsLifecycleController从Spring应用上下文中执行个别方法以控制 的生命周期回声-0捆绑。。spring-doc.cadn.net.cn

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

定义新绑定并管理现有绑定

此外,从4.2版本开始使用BindingsLifecycleController你可以通过访问 来定义新的绑定,也可以修改现有的绑定配置 其消费者和生产者配置属性,以实现更动态的值管理。spring-doc.cadn.net.cn

要定义新的输入绑定,你可以调用BindingsLifecycleController.defineInputBinding(..)方法(见下文)。存在defineOutputBinding(..)方法。spring-doc.cadn.net.cn

BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class);
KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding");

然后你可以通过调用来管理其属性getExtensionProperties(..)方法。spring-doc.cadn.net.cn

KafkaConsumerProperties properties = controller.getExtensionProperties("test-input-binding”);
与由函数定义衍生的绑定名称不同,显式定义的绑定不包含入0/出0后缀是因为它们没有实际函数支持。

getExtensionProperties(..)作定义是为了确保你获得配置属性类的正确类型,因此根据所使用的绑定器和绑定,你可以安全地将扩展属性转换为合适的类型。在我们这里是卡夫卡消费者属性性能。spring-doc.cadn.net.cn

根据你更改的属性类型,可能需要重新启动绑定才能生效(前面提到的)。

驱动器

由于执行器和网页是可选的,你必须先添加一个网络依赖,并手动添加执行器依赖。 以下示例展示了如何为Web框架添加依赖:spring-doc.cadn.net.cn

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例展示了如何为 WebFlux 框架添加依赖:spring-doc.cadn.net.cn

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

你可以按以下方式添加执行器依赖:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在Cloud Foundry中运行Spring Cloud Stream 2.0应用,您必须添加Spring Boot启动网Spring-启动-执行器去上课路径。否则, 由于健康检查失败,应用无法启动。

你还必须启用绑定执行器端点通过设置以下属性实现:--management.endpoints.web.exposure.include=bindings.spring-doc.cadn.net.cn

一旦满足了这些前提条件。申请开始时,你应该会在日志中看到以下内容:spring-doc.cadn.net.cn

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下URL:<host>:<port>/执行器/bindingsspring-doc.cadn.net.cn

另外,要查看单个绑定,可以访问以下类似的URL之一:<host>:<port>/actuator/bindings/<bindingName>;spring-doc.cadn.net.cn

你也可以通过发布到相同网址并提供参数为JSON,如下示例所示:spring-doc.cadn.net.cn

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
暂停恢复只有当对应的活页夹及其底层技术支持时才有效。否则,你会在日志中看到警告信息。 目前,只有 Kafka 和 [Solace](github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 活页夹支持暂停恢复国家。

净化敏感数据

使用绑定执行器端点时,有时必须对敏感数据进行净化,如用户凭证、SSL密钥信息等。 为此,终端用户应用程序可以提供消毒功能在应用中,Spring Boot 中的豆子。 这里有一个示例,用于在为Apache Kafka提供值时对数据进行扰乱sasl.jaas.config财产。spring-doc.cadn.net.cn

@Bean
public SanitizingFunction sanitizingFunction() {
	return sanitizableData -> {
		if (sanitizableData.getKey().equals("sasl.jaas.config")) {
			return sanitizableData.withValue("data-scrambled!!");
		}
		else {
			return sanitizableData;
		}
	};
}