此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
消息通道
除了IntegrationFlowBuilder
使用 EIP 方法时,Java DSL 提供了一个流畅的 API 来配置MessageChannel
实例。
为此,该MessageChannels
提供架构商工厂。
以下示例演示如何使用它:
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
一样MessageChannels
builder factory 可用于channel()
EIP 方法IntegrationFlowBuilder
连接端点,类似于连接input-channel
/output-channel
配对。
默认情况下,端点使用DirectChannel
bean 名称基于以下模式的实例:[IntegrationFlow.beanName].channel#[channelNameIndex]
.
此规则也适用于内联生成的未命名通道MessageChannels
架构商工厂使用。
但是,所有MessageChannels
方法有一个变体,该变体知道channelId
您可以使用它来设置MessageChannel
实例。
这MessageChannel
参考资料和beanName
可以用作 bean 方法调用。
以下示例显示了使用channel()
EIP方式:
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
意思是“'查找并使用MessageChannel
使用“输入”ID,或创建一个“”。 -
fixedSubscriberChannel()
生成一个FixedSubscriberChannel
并使用channelFlow.channel#0
. -
channel("queueChannel")
工作方式相同,但使用现有的queueChannel
豆。 -
channel(publishSubscribe())
是 bean-method 引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是IntegrationFlowBuilder
这暴露了IntegrationComponentSpec
到ExecutorChannel
并将其注册为executorChannel
. -
channel("output")
注册DirectChannel
bean 与output
作为其名称,只要没有具有此名称的 bean 已经存在。
注意:前面的IntegrationFlow
定义有效,并且其所有通道都应用于具有BridgeHandler
实例。
小心通过MessageChannels 来自不同工厂IntegrationFlow 实例。
即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定相同的对象 (MessageChannel ) 来自不同的IntegrationFlow 器皿。
以下示例是错误的: |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
这个坏例子的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其正常工作,您需要声明@Bean
并为该通道使用其来自不同IntegrationFlow
实例。