此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
动态和运行时集成流
IntegrationFlow
并且它的所有依赖组件都可以在运行时注册。
在 5.0 版本之前,我们使用BeanFactory.registerSingleton()
钩。
从 Spring 框架开始5.0
,我们使用instanceSupplier
用于程序化的钩子BeanDefinition
注册。
以下示例显示了如何以编程方式注册 bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,instanceSupplier
hook 是genericBeanDefinition
方法,在本例中由 lambda 提供。
所有必要的 Bean 初始化和生命周期都是自动完成的,就像标准上下文配置 Bean 定义一样。
为了简化开发体验,Spring Integration 引入了IntegrationFlowContext
注册和管理IntegrationFlow
实例,如以下示例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建多个类似流的实例时,这很有用。
为此,我们可以迭代我们的选项并创建和注册IntegrationFlow
循环中的实例。
另一种变体是当我们的数据源不是基于 Spring 时,因此我们必须动态创建它。
此类示例是响应式流事件源,如以下示例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这IntegrationFlowRegistrationBuilder
(由于IntegrationFlowContext.registration()
) 可用于为IntegrationFlow
注册,控制其autoStartup
,并注册非 Spring Integration Bean。
通常,这些附加 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和解序列化器,或任何其他所需的支持组件。
您可以使用IntegrationFlowRegistration.destroy()
回调以删除动态注册的IntegrationFlow
以及它的所有依赖 bean(当您不再需要它们时)。
请参阅IntegrationFlowContext
Javadoc了解更多信息。
从 5.0.6 版开始,所有生成的 bean 名称在IntegrationFlow 定义的前缀是流 ID 作为前缀。
我们建议始终指定显式流 ID。
否则,同步屏障将在IntegrationFlowContext ,以生成IntegrationFlow 并注册其 bean。
我们在这两个作上同步,以避免当生成的相同 bean 名称可用于不同的 bean 名称时出现竞争条件IntegrationFlow 实例。 |
此外,从版本 5.0.6 开始,注册生成器 API 有一个新方法:useFlowIdAsPrefix()
.
如果您希望声明同一流的多个实例并避免在流中的组件具有相同 ID 时发生 Bean 名称冲突,这很有用,如以下示例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用 bean 名称tcp1.client.handler
.
一id 属性是必需的。useFlowIdAsPrefix() . |