配置
配置
Spring Integration 提供了多种配置选项。 您选择哪种选项取决于您的具体需求以及您偏好的工作层级。 与 Spring 框架整体一样,您可以混合搭配各种技术以适应手头的问题。 例如,您可以为大多数配置选择基于 XSD 的命名空间,并将其与您使用注解配置的少量对象结合使用。 尽可能保持一致的命名风格。 由 XSD 模式定义的 XML 元素名称与注解名称相匹配,而这些 XML 元素的属性名称也与注解的属性名称相匹配。 您也可以直接使用 API,但我们期望大多数开发者会选择其中一种更高级的选项,或者采用基于命名空间和驱动于注解的配置组合方式。
命名空间支持
您可以使用与企业集成术语和概念直接映射的 XML 元素来配置 Spring Integration 组件。 在许多情况下,元素名称与《Enterprise Integration Patterns》一书中的名称相匹配。
要在您的 Spring 配置文件中启用 Spring Integration 的核心命名空间支持,请在顶层 'beans' 元素中添加以下命名空间引用和模式映射:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd">
(我们已突出显示了Spring Integration特有的行。)
您可以在"xmlns:"后选择任意名称。
为了清晰起见,我们使用int(Integration 的缩写),但您可能更喜欢其他缩写。
另一方面,如果您使用支持自动补全的 XML 编辑器或 IDE,其可用性可能会说服您保留较长的名称以增强清晰度。
或者,您可以创建将 Spring Integration 模式作为主要命名空间的配置文件,如下例所示:
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd">
(我们已突出显示了Spring Integration特有的行。)
使用此替代方案时,Spring Integration 元素不需要前缀。
另一方面,如果在同一配置文件中定义通用的 Spring bean,则 bean 元素需要前缀(<beans:bean …/>)。
由于通常将配置文件本身模块化(基于职责或架构层)是一个好主意,因此您可能会发现在以集成为重点的配置文件中采用后一种方法是合适的,因为这些文件中很少需要通用 bean。
出于本文档的目的,我们假设集成命名空间是主要的。
Spring Integration 提供了许多其他命名空间。
事实上,每个提供命名空间支持的适配器类型(如 JMS、文件等)都在单独的架构中定义其元素。
为了使用这些元素,请添加必要的命名空间,并在 xmlns 条目中添加相应的 schemaLocation 映射。
例如,以下根元素展示了其中几个命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
xmlns:int-ws="http://www.springframework.org/schema/integration/ws"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms
https://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/mail
https://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration/ws
https://www.springframework.org/schema/integration/ws/spring-integration-ws.xsd">
...
</beans>
本参考手册提供了各章节中各种元素的具体示例。 在此,需要认识到的主要一点是每个命名空间 URI 和架构位置命名的一致性。
配置任务调度器
在 Spring Integration 中,ApplicationContext扮演着消息总线的核心角色,您只需考虑少数几个配置选项。
首先,您可能希望控制中央TaskScheduler实例。
您可以通过提供一个名为taskScheduler的单个 Bean 来实现这一点。
这也定义为一个常量,如下所示:
IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME
默认情况下,Spring Integration 依赖于一个 ThreadPoolTaskScheduler 实例,如 Spring Framework 参考手册的任务执行与调度部分所述。
该默认的 TaskScheduler 会自动启动,并包含十个线程的线程池,详情请参见全局属性。
如果您提供自己的 TaskScheduler 实例,则可以将 'autoStartup' 属性设置为 false,或者提供您自己的线程池大小值。
当轮询消费者在配置中提供显式的任务执行器引用时,处理器方法的调用会在该执行器的线程池中进行,而不是在主调度器池中。 然而,当为端点的轮询器未提供任务执行器时,它将由主调度器的一个线程进行调用。
不要在轮询线程上运行长时间运行的任务。
请使用任务执行器代替。
如果您有很多轮询端点,可能会导致线程饥饿,除非您增加线程池大小。
此外,轮询消费者默认有一个 receiveTimeout 为一秒的间隔。
由于轮询线程在此期间会被阻塞,因此我们建议当存在许多此类端点时使用任务执行器,再次以避免线程饥饿。
或者,您可以减少 receiveTimeout 的值。 |
| 如果端点的输入通道是基于队列的(即可轮询)通道之一,则该端点为轮询消费者。 事件驱动型消费者是指那些具有分发器而非队列的输入通道的消费者(换句话说,它们是可订阅的)。 此类端点没有轮询器配置,因为它们的处理器是直接调用的。 |
|
当在 JEE 容器中运行时,您可能需要使用 Spring 的
|
当在应用上下文中配置了自定义 TaskScheduler(如上述提到的 DefaultManagedTaskScheduler)时,建议为其提供一个 MessagePublishingErrorHandler(integrationMessagePublishingErrorHandler Bean),以便能够像框架提供的 ErrorMessage`s sent to the error channel, as is done with the default `TaskScheduler Bean 那样处理异常。 |
另请参阅 错误处理 以获取更多信息。
全局属性
某些全局框架属性可以通过在类路径上提供属性文件来覆盖。
默认属性可以在 org.springframework.integration.context.IntegrationProperties 类中找到。
以下列表显示了默认值:
spring.integration.channels.autoCreate=true (1)
spring.integration.channels.maxUnicastSubscribers=0x7fffffff (2)
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff (3)
spring.integration.taskScheduler.poolSize=10 (4)
spring.integration.messagingTemplate.throwExceptionOnLateReply=false (5)
spring.integration.readOnly.headers= (6)
spring.integration.endpoints.noAutoStartup= (7)
spring.integration.channels.error.requireSubscribers=true (8)
spring.integration.channels.error.ignoreFailures=true (9)
| 1 | 当为 true 时,如果未在应用程序上下文中显式找到,则 input-channel 个实例将自动声明为 DirectChannel 个实例。 |
| 2 | 设置允许订阅的默认数量,例如 DirectChannel。
它可用于避免意外地将多个端点订阅到同一通道。
您可以通过设置 max-subscribers 属性在单个通道上覆盖此设置。 |
| 3 | 此属性提供了默认允许订阅的数量,例如在 PublishSubscribeChannel 上。
它可用于避免意外地将超过预期数量的端点订阅到同一通道。
您可以通过设置 max-subscribers 属性来为各个通道覆盖此设置。 |
| 4 | 默认 taskScheduler bean 中可用的线程数。
请参阅 配置任务调度器。 |
| 5 | 当值为true时,如果网关在发送线程超时或已收到回复的情况下不期望收到回复,则到达网关回复通道的消息将抛出异常。 |
| 6 | 在通过复制操作将消息头填充到Message实例时,不应被填充的消息头名称的逗号分隔列表。
该列表由DefaultMessageBuilderFactory bean使用,并传播到用于构建消息的IntegrationMessageHeaderAccessor实例(参见MessageHeaderAccessor API),这些实例通过MessageBuilder进行构建(参见The MessageBuilder Helper Class)。
默认情况下,仅在消息构建过程中不复制MessageHeaders.ID和MessageHeaders.TIMESTAMP。
自版本4.3.2起。 |
| 7 | 一个以逗号分隔的AbstractEndpoint个 Bean 名称模式列表(xxx*、xxx或*xxx和xxx*yyy),这些模式对应的端点不应在应用程序启动时自动启动。
您可以通过Control Bus(参见控制总线)、通过角色使用SmartLifecycleRoleController(参见端点角色)或通过Lifecycle Bean 注入,稍后手动按 Bean 名称启动这些端点。
您可以通过指定auto-startup XML 注解或autoStartup注解属性,或在 Bean 定义中调用AbstractEndpoint.setAutoStartup(),显式覆盖此全局属性的效果。
自版本 4.3.12 起。 |
| 8 | 一个布尔标志,用于指示默认的全局 errorChannel 必须使用 requireSubscribers 选项进行配置。
自版本 5.4.3 起。
有关更多信息,请参阅 错误处理。 |
| 9 | 一个布尔标志,用于指示默认全局 errorChannel 必须忽略分发错误并将消息传递给下一个处理器。
自版本 5.5 起。 |
这些属性可以通过将 /META-INF/spring.integration.properties 文件添加到类路径中,或为 org.springframework.integration.context.IntegrationProperties 实例创建一个 IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME Bean 来覆盖。
您无需提供所有属性,只需提供您想要覆盖的那些即可。
从 5.1 版本开始,当为 org.springframework.integration 类别开启 DEBUG 逻辑级别时,所有合并的全局属性都会在应用上下文启动后打印到日志中。
输出如下所示:
Spring Integration global properties:
spring.integration.endpoints.noAutoStartup=fooService*
spring.integration.taskScheduler.poolSize=20
spring.integration.channels.maxUnicastSubscribers=0x7fffffff
spring.integration.channels.autoCreate=true
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=true
注解支持
除了支持配置消息端点的 XML 命名空间外,您还可以使用注解。
首先,Spring Integration 提供了类级别的 @MessageEndpoint 作为 stereotypes 注解,这意味着它自身被 Spring 的 @Component 注解所标注,因此会被 Spring 的组件扫描自动识别为 bean 定义。
更为重要的是各种方法级注解。 它们表明被注解的方法能够处理消息。 以下示例展示了类级和方法级注解:
@MessageEndpoint
public class FooService {
@ServiceActivator
public void processMessage(Message message) {
...
}
}
方法“处理”Message 的确切含义取决于具体的注解。 Spring Integration 中可用的注解包括:
-
@Aggregator(参见 聚合器) -
@Filter(参见 过滤器) -
@Router(参见 路由器) -
@ServiceActivator(参见 服务激活器) -
@Splitter(参见 拆分器) -
@Transformer(参见 转换器) -
@InboundChannelAdapter(参见 通道适配器) -
@BridgeFrom(参见 使用 Java 配置配置桥接) -
@BridgeTo(参见 使用 Java 配置配置桥接) -
@MessagingGateway(参见 消息网关) -
@IntegrationComponentScan(参见 配置和@EnableIntegration)
如果您将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。
如果您想从 <service-activator/> 元素的 ref 属性配置 POJO 引用,您可以仅提供方法级注解。
在这种情况下,即使 <service-activator/> 元素上没有方法级属性,该注解也能防止歧义。 |
在大多数情况下,注解处理器方法不需要将 Message 类型作为其参数。
相反,方法参数类型可以匹配消息的有效负载类型,如下例所示:
public class ThingService {
@ServiceActivator
public void bar(Thing thing) {
...
}
}
当方法参数需要从MessageHeaders中的值进行映射时,另一个选项是使用参数级别的@Header注解。
通常,使用Spring Integration注解的方法可以接受Message本身、消息负载或一个头值(配合@Header)作为参数。
事实上,该方法可以接受组合形式,如下面的示例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
...
}
}
您还可以使用 @Headers 注解将所有消息头作为 Map 提供,如下例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
注解的值也可以是一个 SpEL 表达式(例如,someHeader.toUpperCase()),这在您希望在注入之前操作头值时非常有用。
它还提供可选的 required 属性,用于指定该属性值是否必须在请求头中可用。
required 属性的默认值为 true。 |
对于其中一些注解,当消息处理方法返回非空值时,端点会尝试发送回复。
这在两种配置选项(命名空间注解)中是一致的:使用此类端点的输出通道(如果可用),并将REPLY_CHANNEL消息头值作为后备。
| 端点上的输出通道组合与回复通道消息头启用了流水线方法,其中多个组件具有输出通道,且最终组件允许将回复消息转发到回复通道(如原始请求消息中所指定)。 换句话说,最终组件依赖于原始发送者提供的信息,并因此可以动态支持任意数量的客户端。 这是返回地址模式的一个示例。 |
除了这里展示的示例外,这些注解还支持 inputChannel 和 outputChannel 属性,如下例所示:
@Service
public class ThingService {
@ServiceActivator(inputChannel="input", outputChannel="output")
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
这些注解的处理会创建与对应的 XML 组件相同的 bean:—— AbstractEndpoint 个实例和 MessageHandler 个实例(对于入站通道适配器则为 MessageSource 个实例)。
请参见 方法上的注解。
bean 的名称遵循以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。
在前面的示例中,AbstractEndpoint 的 bean 名称为 thingService.otherThing.serviceActivator,而 MessageHandler(即 MessageSource)bean 的名称则是在此基础上附加了 .handler(.source)后缀。
可以通过在消息注解旁使用 @EndpointId 注解来自定义此类名称。
MessageHandler 实例(MessageSource 实例)也可由 消息历史 进行跟踪。
从 4.0 版本开始,所有消息传递注解都提供了 SmartLifecycle 选项(autoStartup 和 phase),以便在应用上下文初始化时控制端点的生命周期。
它们默认分别为 true 和 0。
要更改端点的状态(例如 start() 或 stop()),您可以通过使用 BeanFactory(或自动装配)获取对端点 bean 的引用并调用相应方法。
或者,您也可以向 Control Bus 发送命令消息(参见 控制总线)。
出于这些目的,您应使用上一段中提到的 beanName。
|
在解析提到的注解后(当未配置特定通道 bean 时),自动创建的通道及其对应的消费者端点,将在上下文初始化的末尾被声明为 beans。
这些 beans 可以在其他服务中通过自动注入使用,但必须标记
|
从版本 6.0 开始,所有消息传递注解现在都是 @Repeatable,因此可以在同一个服务方法上声明多个相同类型的注解,其含义是创建与这些注解重复次数一样多的端点:
@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
return input.toUpperCase();
}
使用@Poller注解
在 Spring Integration 4.0 之前,消息注解要求 inputChannel 必须是对 SubscribableChannel 的引用。
对于 PollableChannel 实例,需要一个 <int:bridge/> 元素来配置 <int:poller/>,并使复合端点成为 PollingConsumer。
版本 4.0 引入了 @Poller 注解,允许直接在消息注解上配置 poller 属性,如下例所示:
public class AnnotationService {
@Transformer(inputChannel = "input", outputChannel = "output",
poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
public String handle(String payload) {
...
}
}
@Poller注解仅提供简单的PollerMetadata选项。
您可以使用属性占位符配置@Poller注解的属性(maxMessagesPerPoll、fixedDelay、fixedRate和cron)。
此外,从版本5.1开始,也提供了用于PollingConsumer的receiveTimeout选项。
如果有必要提供更多轮询选项(例如transaction、advice-chain、error-handler等),您应将PollerMetadata配置为通用bean,并将其bean名称用作@Poller的value属性。
在这种情况下,不允许设置其他属性(它们必须在PollerMetadata bean上指定)。
注意,如果inputChannel是一个PollableChannel且未配置@Poller,则将使用默认PollerMetadata(如果它存在于应用上下文中)。
要使用@Configuration注解声明默认轮询器,请使用如下代码示例:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
下面的示例展示了如何使用默认轮询器:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
public String handle(String payload) {
...
}
}
以下示例展示了如何使用命名轮询器:
@Bean
public PollerMetadata myPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1000));
return pollerMetadata;
}
以下示例展示了一个使用默认轮询器的端点:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
poller = @Poller("myPoller"))
public String handle(String payload) {
...
}
}
从 4.3.3 版本开始,@Poller 注解拥有 errorChannel 属性,以便更轻松地配置底层的 MessagePublishingErrorHandler。
该属性在 <poller> XML 组件中扮演的角色与 error-channel 相同。
有关更多信息,请参阅 端点命名空间支持。
消息注解上的 poller() 属性与 reactive() 属性互斥。
更多信息请参见下一节。
使用@Reactive注解
ReactiveStreamsConsumer自版本5.0以来就已存在,但仅当端点的输入通道是FluxMessageChannel(或任何org.reactivestreams.Publisher实现)时才应用。
从版本5.3开始,当目标消息处理器为ReactiveMessageHandler时,框架也会创建其实例,与输入通道类型无关。
@Reactive子注解(类似于上述提到的@Poller)自版本5.5起已引入所有消息注解中。
它接受一个可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> Bean引用,并且无论输入通道类型和消息处理器如何,都将目标端点转换为ReactiveStreamsConsumer实例。
该函数由Flux.transform()操作符使用,以对来自输入通道的响应式流源应用某些自定义(publishOn()、doOnNext()、log()、retry()等)。
以下示例演示了如何独立于最终订阅者和生产者,将输入通道的发布线程更改为 DirectChannel:
@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
return flux -> flux.publishOn(Schedulers.parallel());
}
@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
...
}
消息注解上的 reactive() 属性与 poller() 属性互斥。
有关更多信息,请参阅 使用 @Poller 注解 和 响应式流支持。
使用@InboundChannelAdapter注解
版本 4.0 引入了 @InboundChannelAdapter 方法级注解。
它基于 MethodInvokingMessageSource 为一个被注解的方法生成一个 SourcePollingChannelAdapter 集成组件。
该注解是 <int:inbound-channel-adapter> XML 组件的等价物,具有相同的限制:方法不能有参数,且返回类型不能为 void。
它有两个属性:value(必需的 MessageChannel Bean 名称)和 poller(可选的 @Poller 注解,如前文所述)。
如果您需要提供某些 MessageHeaders,请使用 Message<?> 返回类型并使用 MessageBuilder 来构建 Message<?>。
使用 MessageBuilder 允许您配置 MessageHeaders。
以下示例展示了如何使用 @InboundChannelAdapter 注解:
@InboundChannelAdapter("counterChannel")
public Integer count() {
return this.counter.incrementAndGet();
}
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
return "foo";
}
版本 4.3 为 value 注解属性引入了 channel 别名,以提供更好的源代码可读性。
此外,目标 MessageChannel Bean 是在第一次 receive() 调用时通过提供的名称(由 outputChannelName 选项设置)在 SourcePollingChannelAdapter 中解析的,而不是在初始化阶段。这使得“延迟绑定”逻辑成为可能:从消费者角度来看,目标 MessageChannel Bean 的创建和注册比 @InboundChannelAdapter 解析阶段稍晚一些。
第一个示例要求默认轮询器已在应用程序上下文中其他地方声明。
使用 @MessagingGateway 注解
使用@IntegrationComponentScan注解
标准的 Spring Framework @ComponentScan 注解不会扫描接口上的 @Component 类型注解。
为了克服这一限制并允许配置 @MessagingGateway(参见 @MessagingGateway 注解),我们引入了 @IntegrationComponentScan 机制。
该注解必须与 @Configuration 注解一起使用,并进行自定义以定义其扫描选项,
例如 basePackages 和 basePackageClasses。
在此情况下,所有带有 @MessagingGateway 注解的发现接口都会被解析并注册为 GatewayProxyFactoryBean 实例。
所有其他基于类的组件则由标准的 @ComponentScan 进行解析。
消息元注解
从版本 4.0 开始,所有消息注解都可以配置为元注解,并且所有用户定义的消息注解都可以定义相同的属性以覆盖其默认值。 此外,元注解可以分层配置,如下例所示:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@ServiceActivator(inputChannel = "annInput", outputChannel = "annOutput")
public @interface MyServiceActivator {
String[] adviceChain = { "annAdvice" };
}
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MyServiceActivator
public @interface MyServiceActivator1 {
String inputChannel();
String outputChannel();
}
...
@MyServiceActivator1(inputChannel = "inputChannel", outputChannel = "outputChannel")
public Object service(Object payload) {
...
}
通过分层配置元注解,用户可以设置各种属性的默认值,并将框架的 Java 依赖隔离到用户注解中,避免在用户类中使用这些依赖。 如果框架发现某个方法上使用了带有框架元注解的用户注解,则该方法将被视为直接标注了该框架注解。
注解在@Bean方法
从 4.0 版本开始,您可以在 @Bean 类的方法定义上配置消息注解,从而基于 Bean 而非方法生成消息端点。
这在以下场景非常有用:@Bean 定义是开箱即用的 MessageHandler 实例(AggregatingMessageHandler、DefaultMessageSplitter 等)、Transformer 实例(JsonToObjectTransformer、ClaimCheckOutTransformer 等)以及 MessageSource 实例(FileReadingMessageSource、RedisStoreMessageSource 等)。
以下示例展示了如何将消息注解与 @Bean 注解配合使用:
@Configuration
@EnableIntegration
public class MyFlowConfiguration {
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> consoleSource() {
return CharacterStreamReadingMessageSource.stdin();
}
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "httpChannel")
public ObjectToMapTransformer toMapTransformer() {
return new ObjectToMapTransformer();
}
@Bean
@ServiceActivator(inputChannel = "httpChannel")
public HttpRequestExecutingMessageHandler httpHandler() {
HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler("https://foo/service");
handler.setExpectedResponseType(String.class);
handler.setOutputChannelName("outputChannel");
return handler;
}
@Bean
@ServiceActivator(inputChannel = "outputChannel")
public LoggingHandler loggingHandler() {
return new LoggingHandler("info");
}
}
版本 5.0 引入了一种支持带有 @InboundChannelAdapter 注解的 @Bean 的方法,该方法返回 java.util.function.Supplier,既可以生成 POJO,也可以生成 Message。
以下示例展示了如何使用这种组合:
@Configuration
@EnableIntegration
public class MyFlowConfiguration {
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<Message<String>> messageSupplier() {
return () -> new GenericMessage<>("foo");
}
}
元注解规则同样适用于 @Bean 方法(之前描述的 @MyServiceActivator 注解可以应用于 @Bean 定义)。
当您在消费者 @Bean 定义上使用这些注解时,如果 bean 定义返回了适当的 MessageHandler(取决于注解类型),则必须在 MessageHandler @Bean 定义本身上设置属性(例如 outputChannel、requiresReply、order 等)。
仅使用以下注解属性:adviceChain、autoStartup、inputChannel、phase 和 poller。
所有其他属性均用于处理器。 |
| Bean 名称按照以下算法生成: |
-
The
MessageHandler(MessageSource)@Bean从其方法名或@Bean上的name属性获取其自身的标准名称。 此行为相当于在@Bean方法上没有任何消息注解。 -
AbstractEndpointBean 的名称使用以下模式生成:[@Bean name].[decapitalizedAnnotationClassShortName]。 例如,前面所示的consoleSource()定义中的SourcePollingChannelAdapter端点获得的 Bean 名称为consoleSource.inboundChannelAdapter。 与 POJO 方法不同,Bean 方法名称不会包含在端点 Bean 名称中。 另见端点 Bean 名称。 -
如果
@Bean不能直接用于目标端点(即它不是MessageSource、AbstractReplyProducingMessageHandler或AbstractMessageRouter的实例),则会注册一个相应的AbstractStandardMessageHandlerFactoryBean来委托调用此@Bean。 该包装器的 Bean 名称按以下模式生成:[@Bean name].[decapitalizedAnnotationClassShortName].[handler (or source)]。
当在 @Bean 定义上使用这些注解时,inputChannel 必须引用一个已声明的 Bean。
如果应用上下文中尚未存在通道,则会自动声明它们。 |
|
使用 Java 配置,您可以在
与现有的 Spring 容器逻辑一起,消息端点 bean(基于 |
使用注解创建桥接
从版本 4.0 开始,Java 配置提供了 @BridgeFrom 和 @BridgeTo @Bean 方法注解,用于在 MessageChannel 类中标记 @Configuration Bean。
这些注解确实存在以完善功能,提供了一种便捷的机制来声明 BridgeHandler 及其消息端点配置:
@Bean
public PollableChannel bridgeFromInput() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "bridgeFromInput", poller = @Poller(fixedDelay = "1000"))
public MessageChannel bridgeFromOutput() {
return new DirectChannel();
}
@Bean
public QueueChannel bridgeToOutput() {
return new QueueChannel();
}
@Bean
@BridgeTo("bridgeToOutput")
public MessageChannel bridgeToInput() {
return new DirectChannel();
}
您也可以将这些注解用作元注解。
为注解端点提供建议
参见 使用注解进行端点建议。
消息映射规则与约定
Spring Integration 实现了一个灵活的设施,用于将消息映射到方法及其参数,而无需提供额外配置,这依赖于一些默认规则并定义某些约定。 以下各节中的示例阐述了这些规则。
场景示例
以下示例展示了一个未加注解的参数(对象或基本类型),它不是 Map 或 Properties 类型的对象,且具有非 void 的返回类型:
public String doSomething(Object o);
输入参数是一个消息负载。 如果参数类型与消息负载不兼容,则会尝试使用 Spring 3.0 提供的转换服务对其进行转换。 返回值将作为返回消息的负载被包含在内。
以下示例展示了一个未加注解的参数(对象或基本类型),它不是 Map 也不是 Properties,且返回类型为 Message:
public Message doSomething(Object o);
输入参数是一个消息负载。 如果参数类型与消息负载不兼容,则尝试使用 Spring 3.0 提供的转换服务对其进行转换。 返回值是构造的新消息,该消息将被发送到下一个目的地。
以下示例展示了一个参数,该参数为消息(或其子类之一),并具有任意对象或基本类型的返回类型:
public int doSomething(Message msg);
输入参数本身是一个 Message。
返回值成为发送给下一个目的地的 Message 的有效负载。
以下示例展示了一个参数为 Message(或其子类之一),返回类型为 Message(或其子类之一)的情况:
public Message doSomething(Message msg);
输入参数本身是一个 Message。
返回值是一个新构造的 Message,该值将被发送到下一个目的地。
下面的示例展示了一个类型为 Map 或 Properties 的单个参数,其返回类型为 Message:
public Message doSomething(Map m);
这一点有些有趣。
尽管乍一看,它似乎可以直接映射到消息头,但始终优先处理 Message 有效负载。
这意味着如果 Message 有效负载的类型为 Map,则该输入参数代表一个 Message 有效负载。
然而,如果 Message 有效负载不是 Map 类型,则转换服务不会尝试转换该有效负载,此时输入参数将映射到消息头。
以下示例显示了两个参数,其中一个是任意类型(对象或基本类型),且不是 Map 或 Properties 类型的对象,另一个是 Map 或 Properties 类型的对象(无论返回值如何):
public Message doSomething(Map h, <T> t);
此组合包含两个输入参数,其中一个是类型为 Map 的类型。
非 Map 的参数(无论顺序如何)被映射到 Message 负载,而 Map 或 Properties(无论顺序如何)被映射到消息头,从而为您提供一种以 POJO 方式与 Message 结构交互的优雅方法。
以下示例显示无参数(无论返回值如何):
public String doSomething();
该消息处理器方法是根据发送到与该处理器连接的消息输入通道的消息来调用的。
然而,没有Message数据被映射,因此Message作为事件或触发器来调用该处理器。
输出根据前面描述的规则进行映射。
以下示例显示没有参数且返回 void:
public void soSomething();
此示例与上一个示例相同,但不会产生任何输出。
基于注解的映射
基于注解的映射是将消息映射到方法的最安全且歧义最少的方法。 以下示例展示了如何显式地将方法映射到头部:
public String doSomething(@Payload String s, @Header("someheader") String b)
如您稍后将看到的,如果没有注解,此签名将导致歧义条件。
然而,通过将第一个参数显式映射到 Message 负载,并将第二个参数映射到 someheader 消息头的值,我们可以避免任何歧义。
以下示例与前面的示例几乎相同:
public String doSomething(@Payload String s, @RequestParam("something") String b)
@RequestMapping 或任何其他非 Spring Integration 映射注解均无关紧要,因此会被忽略,导致第二个参数未被映射。
尽管第二个参数很容易映射到负载(payload),但只能有一个负载。
因此,这些注解避免了该方法的歧义性。
以下示例展示了另一种类似的方法,如果没有注解来澄清意图,该方法将会产生歧义:
public String foo(String s, @Header("foo") String b)
唯一的区别是第一个参数隐式映射到消息负载。
以下示例展示了另一个签名,如果没有注解,它肯定会被视为歧义的,因为它有两个以上的参数:
public String soSomething(@Headers Map m, @Header("something") Map f, @Header("someotherthing") String bar)
此示例会特别成问题,因为其两个参数都是 Map 实例。
然而,基于注解的映射可以轻松避免这种歧义。
在此示例中,第一个参数被映射到所有消息头,而第二个和第三个参数则分别映射到名为 'something' 和 'someotherthing' 的消息头的值。
负载(payload)未被映射到任何参数。
复杂场景
以下示例使用了多个参数:
多个参数在确定适当的映射时可能会产生大量歧义。
一般建议是使用方法参数注解 @Payload、@Header 和 @Headers。
本节中的示例展示了会导致抛出异常的歧义条件。
public String doSomething(String s, int i)
这两个参数权重相等。 因此,无法确定哪一个是有效载荷。
下面的示例展示了一个类似的问题,只是有三个参数:
public String foo(String s, Map m, String b)
虽然 Map 可以轻松映射到消息头,但无法确定如何处理这两个 String 参数。
以下示例展示了另一个歧义方法:
public String foo(Map m, Map f)
尽管有人可能会争辩说一个 Map 可以映射到消息负载,另一个映射到消息头,但我们不能依赖顺序。
| 任何具有多个方法参数且参数未加注解的方法签名,如果参数数量不是 (0, 1),将导致条件歧义并触发异常。 |
下一组示例分别展示了会导致歧义的多种方法。
具有多个方法的消息处理器基于前面描述(在示例中)的相同规则进行映射。 然而,某些场景可能仍然令人困惑。
以下示例展示了多个具有合法(可映射且无歧义)签名的方法:
public class Something {
public String doSomething(String str, Map m);
public String doSomething(Map m);
}
(方法名称是否相同并无影响)。
Message可以映射到任意一个方法。
当消息负载可映射为str且消息头可映射为m时,将调用第一个方法。
第二个方法也可能成为候选,只需将消息头映射为m即可。
更糟糕的是,这两个方法具有相同的名称。
起初,由于以下配置,这看起来可能具有歧义性:
<int:service-activator input-channel="input" output-channel="output" method="doSomething">
<bean class="org.things.Something"/>
</int:service-activator>
这是因为映射首先基于负载,其次才是其他内容。 换句话说,其第一个参数可以映射到负载的方法优先于所有其他方法。
现在考虑一个替代示例,它会产生一个真正的歧义条件:
public class Something {
public String doSomething(String str, Map m);
public String doSomething(String str);
}
这两种方法的签名都可以映射到消息负载。
它们还具有相同的名称。
此类处理程序方法将触发异常。
但是,如果方法名称不同,则可以通过 method 属性影响映射(如下一个示例所示)。
以下示例展示了具有两个不同方法名称的相同示例:
public class Something {
public String doSomething(String str, Map m);
public String doSomethingElse(String str);
}
下面的示例展示了如何使用 method 属性来规定映射关系:
<int:service-activator input-channel="input" output-channel="output" method="doSomethingElse">
<bean class="org.bar.Foo"/>
</int:service-activator>
由于配置显式映射了 doSomethingElse 方法,我们已消除了歧义。