|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了相当多的关于驱动 Spring Integration 的各种消息传递组件的底层 API。 如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。 但是,如果您想启动并运行各种元素的简化基于命名空间的配置,请随时跳到 Endpoint Namespace Support (终端节点命名空间支持)。
如概述中所述,消息终端节点负责将各种消息收发组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的 Message Channels 中所示,您可以向消息通道发送消息。 但是,接收稍微复杂一些。 主要原因是有两种类型的使用者:轮询使用者和事件驱动使用者。
在这两者中,事件驱动型消费者要简单得多。
无需管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring 集成提供了两种不同的端点实现来容纳这两种类型的消费者。
因此,消费者自己只需要实现回调接口即可。
当需要轮询时,终端节点充当使用者实例的容器。
其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在ApplicationContext,它更类似于 Spring 自己的MessageListener器皿。
消息处理程序
Spring 集成的MessageHandlerinterface 由框架中的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会实现MessageHandler径直。
尽管如此,消息使用者使用它来实际处理使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们托管这些基于回调的处理程序,并让它们连接到消息通道。
事件驱动型消费者
因为它是两者中更简单的,所以我们首先介绍事件驱动的使用者终端节点。
您可能还记得,SubscribableChannel接口提供了一个subscribe()方法,并且该方法接受MessageHandler参数(如SubscribableChannel).
下面的清单显示了subscribe方法:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,并且 Spring 集成提供的实现接受SubscribableChannel以及MessageHandler,如下例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring 集成还提供了一个PollingConsumer,并且可以以相同的方式实例化,只是通道必须实现PollableChannel,如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 以下示例显示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
这PeriodicTrigger通常使用简单的区间 (Duration),但也支持initialDelay属性和布尔值fixedRate属性(默认值为false— 即没有固定延迟)。
以下示例设置这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中的三个设置的结果是一个等待 5 秒,然后每秒触发一次的触发器。
这CronTrigger需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的CronTrigger:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是星期一到星期五每 10 秒触发一次的触发器。
轮询终端节点的默认触发器是PeriodicTrigger具有 1 秒固定延迟周期的实例。 |
除了触发器之外,您还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPoll和receiveTimeout.
以下示例说明如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
这maxMessagesPerPoll属性指定在给定轮询作中要接收的最大消息数。
这意味着 Poller 继续调用receive()而无需等待,直到null或达到最大值。
例如,如果 Poller 有一个 10 秒的间隔触发器,并且maxMessagesPerPoll设置25,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 个,等待 10 秒,抓取下一个 25 个,依此类推。
如果maxMessagesPerPoll配置了负值,则MessageSource.receive()在单个轮询周期内调用,直到返回null.
从版本 5.5 开始,一个0值具有特殊含义 - 跳过MessageSource.receive()调用,这可能被视为此轮询端点的暂停,直到maxMessagesPerPoll稍后更改为非零值,例如通过 Control Bus。
这receiveTimeoutproperty 指定 Poller 在调用 receive作时如果没有可用的消息时应等待的时间。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发为 50 毫秒,接收超时为 5 秒。
第一个接收消息的时间可能比它在通道上接受的晚 4950 毫秒(如果该消息在其 poll 调用之一返回后立即到达)。
另一方面,第二个配置永远不会错过超过 50 毫秒的消息。
区别在于第二个选项需要线程等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为 “长轮询”,可用于在轮询源上模拟事件驱动的行为。
轮询消费者也可以委托给 SpringTaskExecutor,如下例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer具有一个名为adviceChain.
此属性允许您指定List的 AOP 建议,用于处理其他横切关注点,包括交易。
这些建议围绕doPoll()方法。
有关更深入的信息,请参阅 Endpoint Namespace Support 下的有关 AOP 建议链和事务支持的部分。
另请参阅@Pollerannotation Javadocs 和相应的 Messaging Annotations Support 部分。
Java DSL 还提供了一个.poller()endpoint 配置选项及其各自的Pollers厂。
前面的示例显示了依赖项查找。
但是,请记住,这些使用者通常配置为 Spring bean 定义。
事实上, Spring 集成还提供了一个FactoryBean叫ConsumerEndpointFactoryBean,这将根据 Channel 的类型创建适当的 Consumer 类型。
此外,Spring 集成具有完整的 XML 名称空间支持,以进一步隐藏这些细节。
本指南介绍了基于命名空间的配置,因为介绍了每种组件类型。
许多MessageHandlerimplementations 可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,何时发送回复消息以及发送多少回复消息取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,该拆分器可以为其处理的每条消息生成多个回复。
使用 namespace 配置时,您不需要严格了解所有详细信息。
但是,仍然值得知道的是,这些组件中的几个共享一个公共基类AbstractReplyProducingMessageHandler,并且它提供了一个setOutputChannel(..)方法。 |
终端节点命名空间支持
在本参考手册中,您可以找到终端节点元素的特定配置示例,例如 router、transformer、service-activator 等。
其中大多数都支持input-channel属性,并且许多都支持output-channel属性。
解析后,这些 endpoint 元素会生成PollingConsumer或EventDrivenConsumer,具体取决于input-channel引用:PollableChannel或SubscribableChannel分别。
当通道是可轮询的时,轮询行为基于 endpoint 元素的pollersub-element 及其属性。
下面列出了poller:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
| 1 | 提供使用 Cron 表达式配置 Poller 的功能。
底层实现使用org.springframework.scheduling.support.CronTrigger.
如果设置了此属性,则不必指定以下任何属性:fixed-delay,trigger,fixed-rate和ref. |
| 2 | 通过将此属性设置为true,你可以只定义一个全局默认 Poller。
如果在应用程序上下文中定义了多个默认 Poller,则会引发异常。
连接到PollableChannel (PollingConsumer) 或任何SourcePollingChannelAdapter没有显式配置的 Poller 然后使用全局默认 Poller。
它默认为false.
自选。 |
| 3 | 标识如果此 Poller 的调用失败,则向其发送错误消息的通道。
要完全禁止异常,您可以提供对nullChannel.
自选。 |
| 4 | 固定延迟触发器使用PeriodicTrigger在被窝里。
数值为time-unit或者可以是 duration 格式(从版本 6.2 开始),例如PT10S,P1D.
如果设置了此属性,则不必指定以下任何属性:fixed-rate,trigger,cron和ref. |
| 5 | 固定速率触发器使用PeriodicTrigger在被窝里。
数值为time-unit或者可以是 duration 格式(从版本 6.2 开始),例如PT10S,P1D.
如果设置了此属性,则不必指定以下任何属性:fixed-delay,trigger,cron和ref. |
| 6 | 的PeriodicTriggerUnder the covers(从 6.2 版本开始)。
数值为time-unit或者可以是 duration 格式,例如PT10S,P1D. |
| 7 | 引用 poller 的基础 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata.
这id属性是顶级 Poller 元素所必需的,除非它是默认的 Poller (default="true"). |
| 8 | 有关更多信息,请参阅Configuring An Inbound Channel Adapter。
如果未指定,则默认值取决于上下文。
如果您使用PollingConsumer,则此属性默认为-1.
但是,如果您使用SourcePollingChannelAdapter这max-messages-per-poll属性默认为1.
自选。 |
| 9 | 在基础类上设置 ValuePollerMetadata.
如果未指定,则默认为 1000(毫秒)。
自选。 |
| 10 | Bean 引用另一个顶级 Poller 的 Pod 引用。
这ref属性不得出现在顶级poller元素。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-rate,trigger,cron和fixed-delay. |
| 11 | 提供引用自定义任务执行程序的功能。 有关更多信息,请参阅 TaskExecutor 支持。 自选。 |
| 12 | 此属性指定java.util.concurrent.TimeUnitenum 值org.springframework.scheduling.support.PeriodicTrigger.
因此,此属性只能与fixed-delay或fixed-rate属性。
如果与任一cron或triggerreference 属性,则会导致失败。
支持的PeriodicTrigger是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则任何fixed-delay或fixed-ratevalue 被解释为毫秒。
基本上,此枚举为基于秒的 interval 触发器值提供了便利。
对于每小时、每天和每月设置,我们建议使用crontrigger 来代替。 |
| 13 | 对任何 Spring 配置的 bean 的引用,该 bean 实现了org.springframework.scheduling.Trigger接口。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-delay,fixed-rate,cron和ref.
自选。 |
| 14 | 允许指定额外的 AOP 建议来处理其他横切关注点。 有关更多信息,请参阅 交易 。 自选。 |
| 15 | Poller 可以成为事务性的。 有关更多信息,请参阅 AOP Advice 链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的 Poller :
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate属性,您还可以使用fixed-delay属性。
对于基于 Cron 表达式的轮询器,请使用cron属性,如下例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是PollableChannel,则需要 Poller 配置。
具体来说,如前所述,trigger是PollingConsumer类。
因此,如果省略poller子元素,则可能会引发异常。
如果你试图在连接到不可轮询通道的元素上配置 Poller,也可能引发异常。
也可以创建顶级 poller,在这种情况下,只有一个refattribute 是必需的,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
这ref属性只允许在内部 Poller 定义上。
在顶级 Poller 上定义此属性会导致在应用程序上下文初始化期间引发配置异常。 |
全局默认轮询器
为了进一步简化配置,你可以定义一个全局默认 Poller。
XML DSL 中的单个顶级 Poller 组件可能具有default属性设置为true.
对于 Java 配置,一个PollerMetadatabean 的PollerMetadata.DEFAULT_POLLER在这种情况下,必须声明 name。
在这种情况下,任何具有PollableChannel对于其 input channel,该通道在同一ApplicationContext,并且没有明确配置poller使用该默认值。
下面的示例展示了这样的 poller 和使用它的转换器:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring 集成还为 pollers 提供了事务支持,以便每个接收和转发作都可以作为原子工作单元执行。
要为 Poller 配置事务,请添加<transactional/>sub-元素。
以下示例显示了可用属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅 轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于代理机制,因此TransactionInterceptor(AOP Advice) 处理由 Poller 启动的消息流的事务性行为时,有时必须提供额外的建议来处理与 Poller 关联的其他横切行为。
为此,poller定义了一个advice-chain元素,该元素允许您在实现MethodInterceptor接口。
以下示例演示如何定义advice-chain对于poller:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实施MethodInterceptor接口,请参阅 Spring Framework Reference Guide 的 AOP 部分。
建议链也可以应用于没有任何事务配置的 Poller,从而增强 Poller 启动的消息流的行为。
使用通知链时,<transactional/>不能指定 child 元素。
相反,声明一个<tx:advice/>bean 并将其添加到<advice-chain/>.
有关完整的配置详细信息,请参阅 Poller Transaction Support 。 |
TaskExecutor 支持
轮询线程可以由 Spring 的TaskExecutor抽象化。
这将为一个终端节点或一组终端节点启用并发。
从 Spring 3.0 开始,核心 Spring Framework 有一个tasknamespace 及其<executor/>元素支持创建简单的线程池执行程序。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行程序可以对终端节点在负载下的性能产生重大影响。
这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端节点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询终端节点启用并发,请提供task-executor引用其<poller/>元素,然后提供以下示例中所示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供 task-executor,则在调用者的线程中调用使用者的处理程序。
请注意,调用方通常是默认的TaskScheduler(请参见配置 Task Scheduler)。
您还应该记住,task-executor属性可以提供对 Spring 的TaskExecutor接口。
这executor元素是为了方便起见而提供的。
如前面在轮询使用者的 background 部分中提到的,您还可以以模拟事件驱动行为的方式配置轮询使用者。
在触发器中使用较长的接收超时和较短的间隔,您可以确保对到达的消息做出非常及时的反应,即使在轮询的消息源上也是如此。
请注意,这仅适用于具有超时的阻塞 wait 调用的源。
例如,文件 poller 不会阻塞。
每receive()call 会立即返回,并且要么包含新文件,要么不包含新文件。
因此,即使 Poller 包含长receive-timeout,该值永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示了轮询使用者如何几乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)颠簸、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询率
当使用fixed-delay或fixed-rate属性,则默认实现使用PeriodicTrigger实例。
这PeriodicTrigger是核心 Spring Framework 的一部分。
它仅接受 interval 作为构造函数参数。
因此,无法在运行时更改它。
但是,您可以定义自己的org.springframework.scheduling.Trigger接口。
您甚至可以使用PeriodicTrigger作为起点。
然后,您可以为间隔 (period) 添加 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。
这period属性与每次对nextExecutionTime以安排下一次轮询。
要在 Poller 中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用trigger属性,该属性引用自定义触发器 Bean 实例。
现在,您可以获取对触发器 Bean 的引用并更改轮询之间的轮询间隔。
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为dynamic-poller,它使用自定义触发器并演示在运行时更改轮询间隔的功能。
该示例提供了一个自定义触发器,用于实现org.springframework.scheduling.Trigger接口。
该示例的触发器基于 Spring 的PeriodicTrigger实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式 getter 和 setter,允许您在运行时动态更改轮询周期。
不过,请务必注意,由于 Trigger 方法是nextExecutionTime(),则根据现有配置,对动态触发器的任何更改在下次轮询之前不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。 |
负载类型转换
在本参考手册中,您还可以看到接受消息或任何任意Object作为输入参数。
在Object,这样的参数被映射到消息有效负载或有效负载或 Headers 的一部分(当使用 Spring 表达式语言时)。
但是,端点方法的 input 参数类型有时与有效负载或其部分的类型不匹配。
在此方案中,我们需要执行类型转换。
Spring 集成提供了一种注册类型转换器(通过使用 SpringConversionService) 在其自己的名为integrationConversionService.
一旦使用 Spring 集成基础结构定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现org.springframework.core.convert.converter.Converter,org.springframework.core.convert.converter.GenericConverter或org.springframework.core.convert.converter.ConverterFactory.
这Converterimplementation 是最简单的,并且从单一类型转换为另一种类型。
对于更复杂的作,例如转换为类层次结构,您可以实现GenericConverter可能还有一个ConditionalConverter.
这些选项允许您完全访问from和to类型描述符,支持复杂的转换。
例如,如果你有一个名为Something即转化的目标(parameter type、channel 数据类型等),您有两个具体的实现,称为Thing1和Thing,并且您希望根据输入类型转换为其中一种,则GenericConverter会很合适。
有关更多信息,请参阅以下接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,你可以使用内部 Bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring 集成 4.0 开始,你可以使用 Comments 来创建前面的配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您也可以使用@Configurationannotation 中,如下例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
|
在配置应用程序上下文时,Spring Framework 允许您添加 相比之下, 但是,如果您确实想使用 Spring
在这种情况下,由 |
内容类型转换
从版本 5.0 开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod基础设施。
其HandlerMethodArgumentResolver实现(例如PayloadArgumentResolver和MessageMethodArgumentResolver) 可以使用MessageConverterabstraction 来转换传入的payload设置为目标方法参数类型。
转换可以基于contentType消息标头。
为此, Spring 集成提供了ConfigurableCompositeMessageConverter,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非 null 结果。
默认情况下,此转换器提供(按严格顺序):
-
MappingJackson2MessageConverter如果 Jackson 处理器存在于 Classpath 中
请参阅 Javadoc(在前面的列表中链接)以了解有关其用途和适当性的更多信息contentType值进行转化。
这ConfigurableCompositeMessageConverter是因为它可以与任何其他MessageConverterimplementations,包括或排除前面提到的 default converters。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在 defaults 之前注册在 composite 中。
您也不能使用ConfigurableCompositeMessageConverter但请提供您自己的MessageConverter通过注册一个名称为integrationArgumentResolverMessageConverter(通过设置IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME属性)。
这MessageConverter基于 -based(包括contentTypeheader) 转换不可用。
在这种情况下,只有上面 Payload Type Conversion 中提到的常规类到类转换可用。 |
异步轮询
如果希望轮询是异步的,Poller 可以选择指定task-executor属性,该属性指向任何TaskExecutorbean(Spring 3.0 通过task命名空间)。
但是,在使用TaskExecutor.
问题在于有两个配置,即 Poller 和TaskExecutor.
他们必须彼此协调一致。
否则,您最终可能会造成人为的内存泄漏。
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了 Out-of-tune 配置。
默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,Poller 也会继续调度新任务,等待新消息到达或超时过期。 假设有 20 个线程执行任务,超时时间为 5 秒,因此它们以每秒 4 个的速率执行。 但是,新任务以每秒 20 个的速率调度,因此任务执行程序中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是将queue-capacity任务执行程序的属性。
即使 0 也是一个合理的值。
您还可以通过设置设置rejection-policyTask Executor 的属性(例如,更改为DISCARD).
换句话说,在配置时必须了解某些细节TaskExecutor.
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点是复合 bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给MessageHandler.
轮询的适配器通过委托给MessageSource.
通常,获取对委托 Bean 的引用很有用,这可能是为了在运行时更改配置或用于测试。
这些 bean 可以从ApplicationContext具有众所周知的名字。MessageHandler实例使用类似于someConsumer.handler(其中 'consumer' 是端点的id属性)。MessageSource实例使用 Bean ID 注册,类似于somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 的处理方式与声明的任何内部 bean 相同,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用id并使用ref属性。
有关更多信息,请参阅 Spring 文档。