|
此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 的各种消息传递组件的底层 API 的相当多的信息。 如果您想真正了解幕后发生的事情,此信息会很有帮助。 但是,如果您想启动并运行各种元素的简化的基于命名空间的配置,请随时跳到端点命名空间支持。
如概述中所述,消息端点负责将各种消息传递组件连接到通道。 在接下来的几章中,我们将介绍使用消息的许多不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的消息通道所示,您可以向消息通道发送消息。 但是,接收要复杂一些。 主要原因是有两种类型的消费者:轮询消费者和事件驱动消费者。
在两者中,事件驱动的消费者要简单得多。
无需管理和调度单独的轮询器线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring Integration提供了两种不同的端点实现来适应这两种类型的消费者。
因此,消费者自己只需要实现回调接口。
当需要轮询时,端点充当使用者实例的容器。
其好处类似于使用容器托管消息驱动的 bean,但是,由于这些消费者是在ApplicationContext,它更类似于 Spring 自己的MessageListener器皿。
消息处理程序
Spring Integration 的MessageHandler接口由框架内的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会实现MessageHandler径直。
尽管如此,消息使用者使用它来实际处理消耗的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但该接口为以下章节中介绍的大多数组件(路由器、转换器、分离器、聚合器、服务激活器等)提供了基础。 这些组件各自通过它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring Integration提供了两个端点实现,它们托管这些基于回调的处理程序,并允许它们连接到消息通道。
事件驱动的消费者
因为它是两者中更简单的一个,所以我们首先介绍事件驱动的消费者端点。
您可能还记得SubscribableChannel接口提供了一个subscribe()方法,并且该方法接受MessageHandler参数(如SubscribableChannel).
以下列表显示了subscribe方法:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,并且 Spring Integration 提供的实现接受SubscribableChannel和MessageHandler,如以下示例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了一个PollingConsumer,并且可以用相同的方式实例化,只是通道必须实现PollableChannel,如以下示例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 以下示例演示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
这PeriodicTrigger通常用简单的间隔 (Duration),但也支持initialDelay属性和布尔值fixedRate属性(默认值为false——也就是说,没有固定的延迟)。
以下示例设置这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中三个设置的结果是一个触发器,它等待五秒钟,然后每秒触发一次。
这CronTrigger需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的CronTrigger:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是,在周一到周五每十秒触发一次触发器。
轮询终结点的默认触发器是PeriodicTrigger具有 1 秒固定延迟期的实例。 |
除了触发器之外,还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPoll和receiveTimeout.
以下示例演示如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
这maxMessagesPerPoll属性指定在给定轮询作中要接收的最大消息数。
这意味着轮询器继续调用receive()无需等待,直到null返回或达到最大值。
例如,如果轮询器有一个十秒间隔触发器和maxMessagesPerPoll设置25,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 秒,等待 10 秒,抓取接下来的 25 秒,依此类推。
如果maxMessagesPerPoll配置为负值,则MessageSource.receive()在单个轮询周期内调用,直到返回null.
从 5.5 版开始,一个0value 具有特殊含义 - 跳过MessageSource.receive()调用,这可能被视为暂停此轮询端点,直到maxMessagesPerPoll稍后更改为非零值,例如通过控制总线。
这receiveTimeout属性指定轮询程序在调用接收作时没有可用消息时应等待的时间量。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个具有 5 秒的间隔触发和 50 毫秒的接收超时,而第二个具有 50 毫秒的间隔触发和 5 秒的接收超时。
第一个消息可能会比它在通道上接受的时间晚 4950 毫秒(如果该消息在其一个轮询调用返回后立即到达)。
另一方面,第二种配置不会错过超过 50 毫秒的消息。
不同之处在于,第二个选项需要一个线程来等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为“长轮询”,可用于模拟轮询源上的事件驱动行为。
轮询使用者还可以委托给 SpringTaskExecutor,如以下示例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,一个PollingConsumer有一个名为adviceChain.
此属性允许您指定List的 AOP 建议,用于处理包括交易在内的其他跨领域问题。
这些建议适用于doPoll()方法。
有关更深入的信息,请参阅端点命名空间支持下有关 AOP 通知链和事务支持的部分。
另请参阅@Poller注释 Javadocs 和相应的消息传递注释支持部分。
Java DSL 还提供了一个.poller()endpoint 配置选项及其各自的Pollers厂。
前面的示例显示了依赖项查找。
但是,请记住,这些消费者通常配置为 Spring bean 定义。
事实上,Spring Integration 还提供了一个FactoryBean叫ConsumerEndpointFactoryBean根据通道类型创建适当的使用者类型。
此外,Spring Integration 具有完整的 XML 命名空间支持,甚至可以进一步隐藏这些细节。
本指南中介绍了基于命名空间的配置,因为介绍了每种组件类型。
许多MessageHandler实现可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
不过,发送回复消息的时间和数量取决于处理程序类型。
例如,聚合器等待多条消息到达,并且通常被配置为拆分器的下游使用者,拆分器可以为它处理的每条消息生成多个回复。
使用命名空间配置时,您不需要严格了解所有详细信息。
但是,仍然值得知道其中几个组件共享一个公共基类,即AbstractReplyProducingMessageHandler,并且它提供了一个setOutputChannel(..)方法。 |
端点命名空间支持
在本参考手册中,您可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。
其中大多数支持input-channel属性,并且许多支持output-channel属性。
解析后,这些端点元素会生成PollingConsumer或EventDrivenConsumer,具体取决于input-channel引用的:PollableChannel或SubscribableChannel分别。
当通道可轮询时,轮询行为基于端点元素的poller子元素及其属性。
下面列出了poller:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
| 1 | 提供使用 Cron 表达式配置轮询器的功能。
底层实现使用org.springframework.scheduling.support.CronTrigger.
如果设置了此属性,则不得指定以下属性:fixed-delay,trigger,fixed-rate和ref. |
| 2 | 通过将此属性设置为true,您可以只定义一个全局默认轮询器。
如果在应用程序上下文中定义了多个缺省轮询器,则会引发异常。
连接到PollableChannel (PollingConsumer) 或任何SourcePollingChannelAdapter没有显式配置的轮询器,则使用全局默认轮询器。
它默认为false.
自选。 |
| 3 | 标识在此轮询程序的调用中发生故障时将错误消息发送到的通道。
要完全禁止异常,您可以提供对nullChannel.
自选。 |
| 4 | 固定延迟触发器使用PeriodicTrigger在被窝里。
数值位于time-unit或者可以作为持续时间格式(从 6.2 版开始),例如PT10S,P1D.
如果设置了此属性,则不得指定以下属性:fixed-rate,trigger,cron和ref. |
| 5 | 固定利率触发器使用PeriodicTrigger在被窝里。
数值位于time-unit或者可以作为持续时间格式(从 6.2 版开始),例如PT10S,P1D.
如果设置了此属性,则不得指定以下属性:fixed-delay,trigger,cron和ref. |
| 6 | 初始延迟PeriodicTrigger幕后(从 6.2 版开始)。
数值位于time-unit或者可以作为持续时间格式,例如PT10S,P1D. |
| 7 | 引用轮询器的基础 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata.
这id属性是顶级轮询器元素所必需的,除非它是默认轮询器(default="true"). |
| 8 | 有关更多信息,请参阅配置入站通道适配器。
如果未指定,则默认值取决于上下文。
如果您使用PollingConsumer,则此属性默认为-1.
但是,如果您使用SourcePollingChannelAdapter这max-messages-per-poll属性默认为1.
自选。 |
| 9 | 在基础类上设置值PollerMetadata.
如果未指定,则默认为 1000(毫秒)。
自选。 |
| 10 | Bean 对另一个顶级轮询器的引用。
这ref属性不得存在于顶级poller元素。
但是,如果设置了此属性,则不得指定以下属性:fixed-rate,trigger,cron和fixed-delay. |
| 11 | 提供引用自定义任务执行器的功能。 有关详细信息,请参阅 TaskExecutor 支持。 自选。 |
| 12 | 此属性指定java.util.concurrent.TimeUnit枚举值org.springframework.scheduling.support.PeriodicTrigger.
因此,此属性只能与fixed-delay或fixed-rate属性。
如果与任一cron或triggerreference 属性,则会导致失败。
支持的最小粒度PeriodicTrigger是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则任何fixed-delay或fixed-rate值被解释为毫秒。
基本上,此枚举为基于秒的间隔触发器值提供了便利。
对于每小时、每天和每月的设置,我们建议使用cron触发器。 |
| 13 | 引用任何 Spring 配置的 bean 实现org.springframework.scheduling.Trigger接口。
但是,如果设置了此属性,则不得指定以下属性:fixed-delay,fixed-rate,cron和ref.
自选。 |
| 14 | 允许指定额外的 AOP 建议来处理其他横切问题。 有关详细信息,请参阅事务。 自选。 |
| 15 | 轮询器可以成为事务性的。 有关更多信息,请参阅 AOP 通知链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的轮询器:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate属性,也可以使用fixed-delay属性。
对于基于 Cron 表达式的轮询器,请使用cron属性,如以下示例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是PollableChannel,则需要轮询器配置。
具体来说,如前所述,trigger是PollingConsumer类。
因此,如果您省略pollersub元素,则可能会抛出异常。
如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。
也可以创建顶级轮询器,在这种情况下,只有一个ref属性是必需的,如以下示例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
这ref属性仅允许用于内部轮询器定义。
在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。 |
全局默认轮询器
为了进一步简化配置,您可以定义全局默认轮询器。
XML DSL 中的单个顶级轮询器组件可能具有default属性设置为true.
对于 Java 配置PollerMetadatabean 与PollerMetadata.DEFAULT_POLLER在这种情况下,必须声明名称。
在这种情况下,任何具有PollableChannel对于其输入通道,该通道在同一ApplicationContext,并且没有显式配置poller使用该默认值。
以下示例显示了这样的轮询器和使用它的转换器:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
交易支持
Spring Integration还为轮询器提供了事务支持,以便每个接收和转发作都可以作为原子工作单元执行。
要为轮询器配置事务,请将<transactional/>子元素。
以下示例显示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参阅轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于TransactionInterceptor(AOP 通知)处理轮询程序启动的消息流的事务行为时,有时必须提供额外的建议来处理与轮询程序关联的其他横切行为。
为此,poller定义一个advice-chain元素,允许您在实现MethodInterceptor接口。
以下示例显示了如何定义advice-chain对于一个poller:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现MethodInterceptor接口,请参阅 Spring Framework 参考指南的 AOP 部分。
通知链也可以应用于没有任何事务配置的轮询器,从而增强轮询器启动的消息流的行为。
使用通知链时,<transactional/>不能指定子元素。
相反,请声明<tx:advice/>bean 并将其添加到<advice-chain/>.
有关完整的配置详细信息,请参阅轮询器事务支持。 |
TaskExecutor 支持
轮询线程可以由 Spring 的任何实例执行TaskExecutor抽象化。
这为终结点或终结点组启用并发性。
从 Spring 3.0 开始,核心 Spring Framework 有一个task命名空间及其<executor/>元素支持创建简单的线程池执行器。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行器可以对端点在负载下的执行方式产生重大影响。
这些设置可用于每个端点,因为端点的性能是要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询端点启用并发性,请提供task-executor在其上的引用<poller/>元素,然后提供以下示例中显示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果未提供任务执行器,则在调用方的线程中调用使用者的处理程序。
请注意,调用方通常是默认的TaskScheduler(请参阅配置任务计划程序)。
您还应该记住,task-executor属性可以提供对 Spring 的TaskExecutor接口,通过指定 bean 名称。 这executor为方便起见,提供了前面显示的元素。
如前文轮询使用者的后台部分所述,您还可以以模拟事件驱动行为的方式配置轮询使用者。
通过较长的接收超时和触发器中的较短间隔,即使在轮询的消息源上,您也可以确保对到达的消息做出非常及时的响应。
请注意,这仅适用于具有超时的阻塞等待调用的源。
例如,文件轮询器不会阻塞。
每receive()调用立即返回,并且包含新文件或不包含新文件。
因此,即使轮询器包含 longreceive-timeout,则该值永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示轮询使用者如何几乎立即接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会产生太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像抖动、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询速率
使用fixed-delay或fixed-rate属性时,默认实现使用PeriodicTrigger实例。
这PeriodicTrigger是核心 Spring 框架的一部分。
它仅接受间隔作为构造函数参数。
因此,它不能在运行时更改。
但是,您可以定义自己的org.springframework.scheduling.Trigger接口。
您甚至可以使用PeriodicTrigger作为起点。
然后,您可以为间隔(周期)添加一个 setter,或者您甚至可以在触发器本身中嵌入自己的限制逻辑。
这period属性用于每次调用nextExecutionTime以安排下一次投票。
要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用trigger属性,它引用自定义触发器 bean 实例。
现在,您可以获取对触发器 bean 的引用,并更改轮询之间的轮询间隔。
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为dynamic-poller,它使用自定义触发器并演示在运行时更改轮询间隔的能力。
该示例提供了一个自定义触发器,用于实现org.springframework.scheduling.Trigger接口。
示例的触发器基于 Spring 的PeriodicTrigger实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。
但需要注意的是,由于 Trigger 方法是nextExecutionTime(),对动态触发器的任何更改都不会生效,直到下一次轮询时才会生效,具体取决于现有配置。
无法强制触发器在其当前配置的下一次执行时间之前触发。 |
有效负载类型转换
在本参考手册中,您还可以看到接受消息或任何任意消息的各种端点的特定配置和实现示例Object作为输入参数。
在Object,此类参数将映射到消息有效负载或有效负载或标头的一部分(使用 Spring 表达式语言时)。
但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。
在这种情况下,我们需要进行类型转换。
Spring Integration 提供了一种注册类型转换器的便捷方法(通过使用 SpringConversionService) 在其自己的转换服务 Bean 实例中,名为integrationConversionService.
一旦使用 Spring Integration 基础设施定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现org.springframework.core.convert.converter.Converter,org.springframework.core.convert.converter.GenericConverter或org.springframework.core.convert.converter.ConverterFactory.
这Converter实现是最简单的,从单一类型转换为另一种类型。
为了更复杂,例如转换为类层次结构,您可以实现GenericConverter并且可能是一个ConditionalConverter.
这些使您可以完全访问from和to类型描述符,支持复杂的转换。
例如,如果您有一个名为Something即转化的目标(参数类型、通道数据类型等),您有两个名为Thing1和Thing,并且您希望根据输入类型转换为其中一种,则GenericConverter会很合适。
有关更多信息,请参阅以下接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持对其进行注册,如以下示例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 Bean,如以下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注释来创建前面的配置,如以下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用@Configuration注释,如以下示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
|
配置应用程序上下文时,Spring Framework 允许您将 相比之下, 但是,如果您确实想使用 Spring
在这种情况下,由 |
内容类型转换
从 5.0 版本开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod基础设施。
其HandlerMethodArgumentResolver实现(例如PayloadArgumentResolver和MessageMethodArgumentResolver) 可以使用MessageConverter抽象以转换传入的payload设置为目标方法参数类型。
转换可以基于contentType消息头。
为此,Spring Integration 提供了ConfigurableCompositeMessageConverter,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非空结果。
默认情况下,此转换器提供(按严格顺序):
有关其用途和适当性的更多信息,请参阅 Javadoc(在前面列表中链接)contentType转换值。
这ConfigurableCompositeMessageConverter之所以使用,是因为它可以与任何其他MessageConverter实现,包括或排除前面提到的默认转换器。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如以下示例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前在复合中注册。
您也不能使用ConfigurableCompositeMessageConverter但提供您自己的MessageConverter通过注册一个名称为 bean,integrationArgumentResolverMessageConverter(通过将IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME属性)。
这MessageConverter-基于(包括contentTypeheader) 转换在使用 SpEL 方法调用时不可用。
在这种情况下,只有上面在有效负载类型转换中提到的常规类到类转换可用。 |
异步轮询
如果您希望轮询是异步的,轮询器可以选择指定task-executor属性,该属性指向任何TaskExecutorbean(Spring 3.0 通过task命名空间)。
但是,在配置轮询器时,您必须了解某些事项TaskExecutor.
问题是有两种配置,轮询器和TaskExecutor.
他们必须彼此协调一致。
否则,您最终可能会造成人为内存泄漏。
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上述配置演示了失调配置。
默认情况下,任务执行器具有无界任务队列。 轮询器会继续调度新任务,即使所有线程都被阻塞,等待新消息到达或超时到期。 鉴于有 20 个线程执行任务,超时时间为 5 秒,它们以每秒 4 个的速度执行。 但是,新任务以每秒 20 个的速度调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是将queue-capacity任务执行器的属性。
即使 0 也是一个合理的值。
您还可以通过设置rejection-policy属性,例如,设置为DISCARD.
换句话说,在配置时必须了解某些细节TaskExecutor.
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点都是复合 Bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给MessageHandler.
轮询适配器通过委托给MessageSource.
通常,获取对委托 Bean 的引用很有用,也许是在运行时更改配置或用于测试。
这些豆子可以从ApplicationContext有众所周知的名字。MessageHandler实例在应用程序上下文中注册,其 bean ID 类似于someConsumer.handler(其中 'consumer' 是端点的id属性)。MessageSource实例使用 Bean ID 注册,类似于somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
Bean 被视为声明的任何内部 Bean,并且不会在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用id并使用ref属性。
有关更多信息,请参阅 Spring 文档。