此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
配置消息通道
要创建消息通道实例,您可以使用<channel/>
元素或 XML 的元素DirectChannel
实例,如下所示:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当您使用<channel/>
元素,它会创建一个DirectChannel
实例 (一个SubscribableChannel
).
要创建发布-订阅通道,请使用<publish-subscribe-channel/>
元素(PublishSubscribeChannel
在 Java 中),如下所示:
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
或者,您可以提供各种<queue/>
子元素来创建任何可轮询的通道类型(如消息通道实现中所述)。
以下部分显示了每种通道类型的示例。
DirectChannel
配置
如前所述,DirectChannel
是默认类型。
以下列表显示了要定义的人员:
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有循环负载均衡器,并且还启用了故障转移(请参阅DirectChannel
了解更多详情)。
要禁用其中一个或两个,请添加一个<dispatcher/>
子元素(一个LoadBalancingStrategy
构造函数DirectChannel
)并按如下方式配置属性:
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
从 6.3 版开始,所有MessageChannel
基于UnicastingDispatcher
可以配置为Predicate<Exception> failoverStrategy
而不是普通failover
选择。
此谓词决定是否故障转移到下一个MessageHandler
基于从当前引发的异常。
更复杂的错误分析应使用ErrorMessageExceptionTypeRouter
.
数据类型通道配置
有时,使用者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。 首先想到的可能是使用消息过滤器。 但是,该消息筛选器所能做的就是过滤掉不符合使用者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不合规数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这将起作用,但完成相同事情的更简单方法是应用数据类型通道模式。 您可以为每个特定的有效负载数据类型使用单独的数据类型通道。
要创建仅接受包含特定有效负载类型的消息的数据类型通道,请在通道元素的datatype
属性,如以下示例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,类型检查会针对可分配给通道数据类型的任何类型进行检查。
换句话说,numberChannel
在前面的示例中,将接受有效负载为java.lang.Integer
或java.lang.Double
.
可以将多种类型作为逗号分隔的列表提供,如以下示例所示:
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的“numberChannel”仅接受数据类型为java.lang.Number
.
但是,如果消息的有效负载不是所需的类型,会发生什么情况?
这取决于您是否定义了一个名为integrationConversionService
这是 Spring 转换服务的一个实例。
如果没有,则Exception
会立即被抛出。
但是,如果您定义了integrationConversionService
bean,它用于尝试将消息的有效负载转换为可接受的类型。
您甚至可以注册自定义转换器。
例如,假设您发送的消息带有String
有效负载到我们上面配置的 'numberChannel'。
您可以按如下方式处理该消息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这将是一项完全合法的作。 但是,由于我们使用数据类型通道,因此此类作的结果将生成类似于以下内容的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
发生异常是因为我们要求有效负载类型是Number
,但我们发送了一个String
.
所以我们需要一些东西来转换String
设置为Number
.
为此,我们可以实现类似于以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将其注册为集成转换服务的转换器,如以下示例所示:
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在StringToIntegerConverter
类,当它标有@Component
用于自动扫描的注释。
当解析 'converter' 元素时,它会创建integrationConversionService
bean 如果尚未定义。
有了该转换器,就会有send
作现在将成功,因为数据类型通道使用该转换器将String
有效负载到Integer
.
有关有效负载类型转换的更多信息,请参阅有效负载类型转换。
从 4.0 版开始,integrationConversionService
由DefaultDatatypeChannelMessageConverter
,在应用程序上下文中查找转换服务。
要使用不同的转换技术,您可以指定message-converter
属性。
这必须是对MessageConverter
实现。
只有fromMessage
方法。
它为转换器提供了对消息标头的访问权限(以防转换可能需要来自标头的信息,例如content-type
).
该方法只能返回转换后的有效负载或完整的Message
对象。
如果是后者,转换器必须小心地从入站消息中复制所有标头。
或者,您可以声明<bean/>
类型MessageConverter
ID 为datatypeChannelMessageConverter
,并且该转换器被所有通道使用datatype
.
QueueChannel
配置
要创建QueueChannel
,使用<queue/>
子元素。
您可以按如下方式指定通道的容量:
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您没有为此提供“capacity”属性的值<queue/> 子元素,则生成的队列是无界的。
为避免内存不足等问题,强烈建议为有界队列设置显式值。 |
持续QueueChannel
配置
由于QueueChannel
提供了缓冲消息的功能,但默认情况下仅在内存中这样做,它还引入了在系统故障时消息可能丢失的可能性。
为了降低这种风险,一个QueueChannel
可能由MessageGroupStore
策略接口。
有关MessageGroupStore
和MessageStore
,请参阅消息存储。
这capacity 属性在message-store 属性。 |
当QueueChannel
收到一个Message
,则将消息添加到消息存储中。
当Message
从QueueChannel
,则从消息存储中删除它。
默认情况下,一个QueueChannel
将其消息存储在内存队列中,这可能导致前面提到的消息丢失情况。
但是,Spring Integration 提供了持久化存储,例如JdbcChannelMessageStore
.
您可以为任何QueueChannel
通过添加message-store
属性,如以下示例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有关 Java/Kotlin 配置选项,请参阅下面的示例。
Spring Integration JDBC 模块还为许多流行的数据库提供了模式数据定义语言 (DDL)。
这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包中(spring-integration-jdbc
).
一个重要的特性是,对于任何事务持久性存储(例如JdbcChannelMessageStore ),只要轮询器配置了事务,则只有当事务成功完成时,才能永久删除从存储中删除的消息。
否则,事务将回滚,并且Message 没有丢失。 |
随着越来越多的与“NoSQL”数据存储相关的 Spring 项目开始为这些存储提供底层支持,消息存储的许多其他实现都可用。
您还可以提供自己的MessageGroupStore
接口,如果您找不到满足您特定需求的接口。
从 4.0 版本开始,我们建议QueueChannel
实例配置为使用ChannelMessageStore
,如果可能的话。与一般消息存储相比,这些通常针对这种用途进行了优化。如果ChannelMessageStore
是一个ChannelPriorityMessageStore
,消息按优先级顺序以 FIFO 接收。优先级的概念由消息存储实现决定。例如,以下示例显示了 MongoDB 通道消息存储的 Java 配置:
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意MessageGroupQueue 类。
那是一个BlockingQueue 实现以使用MessageGroupStore 操作。 |
自定义QueueChannel
environment 由ref
属性的<int:queue>
子元素或其特定构造函数。
此属性提供对任何java.util.Queue
实现。
例如,分布式 HazelcastIQueue
可以配置如下:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
配置
要创建PublishSubscribeChannel
,请使用 <publish-subscribe-channel/> 元素。
使用此元素时,您还可以指定task-executor
用于发布消息(如果未指定,则在发送方的线程中发布),如下所示:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果您提供重排序器或聚合器,则PublishSubscribeChannel
,您可以将通道上的“apply-sequence”属性设置为true
.
这样做表示通道应将sequence-size
和sequence-number
消息标头以及传递消息之前的相关 ID。
例如,如果有五个订阅者,则sequence-size
将设置为5
,并且消息将具有sequence-number
标头值范围为1
自5
.
与Executor
,您还可以配置ErrorHandler
.
默认情况下,PublishSubscribeChannel
使用MessagePublishingErrorHandler
实现将错误发送到MessageChannel
从errorChannel
标头或进入全局errorChannel
实例。
如果Executor
未配置,则ErrorHandler
被忽略,异常将直接抛向调用方的线程。
如果您提供Resequencer
或Aggregator
从PublishSubscribeChannel
,您可以将通道上的“apply-sequence”属性设置为true
.
这样做指示通道应在传递消息之前设置序列大小和序列号消息头以及相关标识。
例如,如果有五个订阅者,则序列大小将设置为5
,并且消息的序列号标头值范围为1
自5
.
以下示例演示如何设置apply-sequence
header 到true
:
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
这apply-sequence 值为false 默认情况下,发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。
由于 Spring Integration 强制有效负载和标头引用的不变性,因此当标志设置为true ,通道会创建新的Message 具有相同有效负载引用但标头值不同的实例。 |
从 5.4.3 版本开始,PublishSubscribeChannel
也可以配置requireSubscribers
其BroadcastingDispatcher
以指示此通道在没有订阅者时不会静默忽略消息。
一个MessageDispatchingException
使用Dispatcher has no subscribers
当没有订阅者时,将抛出消息,并且此选项设置为true
.
ExecutorChannel
要创建ExecutorChannel
,将<dispatcher>
子元素,并带有task-executor
属性。
属性的值可以引用任何TaskExecutor
在上下文中。
例如,这样做可以配置线程池,以便将消息分派到订阅的处理程序。
如前所述,这样做会破坏发送方和接收方之间的单线程执行上下文,以便调用处理程序时不会共享任何活动事务上下文(即,处理程序可能会抛出Exception
,但send
调用已成功返回)。
以下示例演示如何使用dispatcher
元素并在task-executor
属性:
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
这
|
PriorityChannel
配置
要创建PriorityChannel
,使用<priority-queue/>
子元素,如以下示例所示:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会查阅priority
报头。
但是,您可以改为提供自定义Comparator
参考。
另外,请注意PriorityChannel
(与其他类型一样)确实支持datatype
属性。
与QueueChannel
,它还支持capacity
属性。
以下示例演示了所有这些:
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
从 4.0 版本开始,priority-channel
child 元素支持message-store
选项 (comparator
和capacity
在这种情况下是不允许的)。
消息存储必须是PriorityCapableChannelMessageStore
.
的实现PriorityCapableChannelMessageStore
目前为Redis
,JDBC
和MongoDB
.
看QueueChannel
配置和消息存储以获取更多信息。
可以在支持消息通道中找到示例配置。
RendezvousChannel
配置
一个RendezvousChannel
当队列子元素是<rendezvous-queue>
.
它不为前面描述的配置选项提供任何其他配置选项,并且其队列不接受任何容量值,因为它是零容量直接切换队列。
以下示例演示如何声明RendezvousChannel
:
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
信道拦截器配置
消息通道也可能具有拦截器,如通道拦截器中所述。
这<interceptors/>
子元素可以添加到<channel/>
(或更具体的元素类型)。
您可以提供ref
属性来引用任何实现ChannelInterceptor
接口,如以下示例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可以在多个通道中重用的通用行为。
全局信道拦截器配置
通道拦截器提供了一种干净简洁的方式来应用每个通道的横切行为。 如果应该在多个通道上应用相同的行为,那么为每个通道配置同一组拦截器并不是最有效的方法。 为了避免重复配置,同时使拦截器能够应用于多个通道,Spring Integration 提供了全局拦截器。 考虑以下一对示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每<channel-interceptor/>
元素允许您定义一个全局拦截器,该拦截器应用于与pattern
属性。
在前面的案例中,全局拦截器应用于“thing1”通道和以“thing2”或“input”开头的所有其他通道,但不应用于以“thing3”开头的通道(自 5.0 版起)。
将此语法添加到模式中会导致一个可能(尽管可能不太可能)的问题。
如果你有一个名为!thing1 并且您包含了!thing1 在频道拦截器的pattern 模式,它不再匹配。
该模式现在匹配所有未命名的 beanthing1 .
在这种情况下,您可以转义! 在带有 的模式中。
图案\ \!thing1 匹配名为!thing1 . |
order 属性允许您管理当给定通道上有多个拦截器时,此拦截器的注入位置。 例如,通道 'inputChannel' 可以在本地配置单个拦截器(见下文),如以下示例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是“如何相对于本地或通过其他全局拦截器定义配置的其他拦截器注入全局拦截器?
当前实现提供了一种简单的机制来定义拦截器执行的顺序。
中的正数order
属性确保在任何现有拦截器之后注入拦截器,而负数则确保在现有拦截器之前注入拦截器。
这意味着,在前面的示例中,全局拦截器是在(因为它的order
大于0
) 本地配置的“窃听”拦截器。
如果有另一个全局拦截器具有匹配的pattern
,其顺序将通过比较两个拦截器的值来确定。order
属性。
要在现有拦截器之前注入全局拦截器,请对order
属性。
请注意,两个order 和pattern 属性是可选的。
的默认值order 将为 0,对于pattern ,默认值为“*”(匹配所有通道)。 |
窃听
如前所述,Spring Integration 提供了一个简单的窃线拦截器。
您可以在<interceptors/>
元素。
这样做对于调试特别有用,可以与 Spring Integration 的日志记录通道适配器结合使用,如下所示:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 还接受 'expression' 属性,以便您可以根据 'payload' 和 'headers' 变量评估 SpEL 表达式。
或者,要记录完整消息toString() result,则提供true 用于 'log-full-message' 属性。
默认情况下,它是false 以便仅记录有效负载。
将其设置为true 除了有效负载之外,还可以记录所有标头。
'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name" ). |
关于窃听器和其他类似组件(消息发布配置)的常见误解之一是它们本质上是自动异步的。 默认情况下,作为组件的窃线器不会异步调用。 相反,Spring Integration 专注于配置异步行为的单一统一方法:消息通道。 使消息流的某些部分同步或异步的原因是在该流中配置的消息通道的类型。 这是消息通道抽象的主要好处之一。 从框架诞生之初,我们就一直强调消息通道作为框架一等公民的必要性和价值。 它不仅仅是对 EIP 模式的内部隐式实现。 它作为可配置组件完全公开给最终用户。 因此,窃线器组件仅负责执行以下任务:
-
通过分接通道(例如,
channelA
) -
抓取每条消息
-
将消息发送到另一个通道(例如,
channelB
)
它本质上是桥接模式的变体,但它封装在通道定义中(因此更容易在不中断流的情况下启用和禁用)。 此外,与桥不同,它基本上分叉了另一个消息流。 该流是同步的还是异步的?答案取决于 'channelB' 的消息通道类型。 我们有以下选项:直接通道、可轮询通道和执行器通道。 最后两个破坏了线程边界,使通过此类通道进行异步通信,因为将消息从该通道分派到其订阅的处理程序发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使您的窃听流同步或异步的原因。 它与框架内的其他组件(例如消息发布器)一致,并且通过让您无需提前担心(除了编写线程安全代码)是否应将特定代码段实现为同步或异步,从而增加了一定程度的一致性和简单性。 通过消息通道实际连接两段代码(例如,组件 A 和组件 B)是使它们的协作同步或异步的原因。 您甚至可能希望将来从同步更改为异步,消息通道可让您快速完成此作,而无需接触代码。
关于窃听器的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快移交消息。 因此,使用异步通道选项作为窃听器的出站通道是很常见的。 但是,默认情况下不会强制执行异步行为。 如果我们这样做,有许多用例会中断,包括您可能不想破坏事务边界。 也许您使用窃听模式进行审计,并且您确实希望在原始事务中发送审计消息。 例如,您可以将分接器连接到 JMS 出站通道适配器。 这样,您就可以两全其美:1) JMS 消息的发送可以在事务中发生,而 2) 它仍然是一个“触发后忘记”的作,从而防止主消息流中出现任何明显的延迟。
从 4.0 版开始,当拦截器(例如WireTap 类) 引用通道。
您需要从当前拦截器拦截的通道中排除此类通道。
这可以通过适当的模式或以编程方式完成。
如果您有自定义ChannelInterceptor 引用channel ,请考虑实现VetoCapableInterceptor .
这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。
您还可以在拦截器方法中添加运行时保护,以确保该通道不是拦截器引用的通道。
这WireTap 使用这两种技术。 |
从 4.3 版开始,WireTap
具有采用channelName
而不是MessageChannel
实例。
这对于 Java 配置和使用通道自动创建逻辑时来说很方便。
目标MessageChannel
bean 是从提供的channelName
稍后,在与
拦截 器。
通道解析需要BeanFactory ,因此 wire tap 实例必须是 Spring 管理的 bean。 |
这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如以下示例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
有条件的窃线器
可以使用selector
或selector-expression
属性。
这selector
引用 aMessageSelector
bean,它可以在运行时确定消息是否应该转到 tap 通道。
同样,selector-expression
是一个布尔 SpEL 表达式,它执行相同的目的:如果表达式的计算结果为true
,消息将发送到分流通道。
全局窃线器配置
可以将全局窃听器配置为全局信道拦截器配置的特例。
为此,请配置顶级wire-tap
元素。
现在,除了正常的wire-tap
命名空间支持,则pattern
和order
属性受支持,其工作方式与channel-interceptor
.
以下示例显示如何配置全局窃听器:
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局分接器提供了一种在外部配置单通道分接器而无需修改现有通道配置的便捷方法。
为此,请将pattern 属性添加到目标通道名称。
例如,您可以使用此技术配置测试用例以验证通道上的消息。 |