此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
Spring Integration Framework 概述
Spring Integration提供了Spring编程模型的扩展,以支持众所周知的企业集成模式。 它支持在基于 Spring 的应用程序中进行轻量级消息传递,并支持通过声明性适配器与外部系统集成。 这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。
Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。
Spring 集成概述
本章对 Spring Integration 的核心概念和组件进行了高级介绍。 它包括一些编程技巧,可帮助您充分利用 Spring Integration。
背景
Spring Framework 的关键主题之一是控制反转 (IoC)。 从最广泛的意义上讲,这意味着框架代表在其上下文中管理的组件处理职责。 组件本身被简化,因为它们被免除了这些责任。 例如,依赖项注入减轻了组件查找或创建其依赖项的责任。 同样,面向方面的编程通过将业务组件模块化为可重用的方面来减轻业务组件的通用跨领域问题。 在每种情况下,最终结果都是一个更易于测试、理解、维护和扩展的系统。
此外,Spring 框架和产品组合为构建企业应用程序提供了全面的编程模型。 开发人员受益于该模型的一致性,尤其是它基于完善的最佳实践这一事实,例如对接口进行编程和更倾向于组合而不是继承。 Spring 的简化抽象和强大的支持库提高了开发人员的工作效率,同时提高了可测试性和可移植性。
Spring Integration 的动机是这些相同的目标和原则。 它将 Spring 编程模型扩展到消息传递领域,并建立在 Spring 现有的企业集成支持之上,以提供更高级别的抽象。 它支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应从何处发送响应。 它支持消息的路由和转换,以便可以在不影响可测试性的情况下集成不同的传输和不同的数据格式。 换句话说,消息传递和集成问题由框架处理。 业务组件进一步与基础设施隔离,开发人员从复杂的集成责任中解脱出来。
作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注释、支持命名空间的 XML、具有通用“bean”元素的 XML 以及底层 API 的直接使用。 该 API 基于定义明确的策略接口和非侵入式委托适配器。 Spring Integration 的设计灵感来自于 Spring 中的常见模式与 Gregor Hohpe 和 Bobby Woolf (Addison Wesley, 2004) 中描述的 Enterprise Integration Patterns 中描述的众所周知的模式之间的强烈亲和力。 阅读过该书的开发人员应该立即熟悉 Spring Integration 的概念和术语。
目标和原则
Spring Integration 的动机是以下目标:
-
为实现复杂的企业集成解决方案提供简单的模型。
-
在基于 Spring 的应用程序中促进异步、消息驱动的行为。
-
促进现有 Spring 用户直观、增量的采用。
Spring Integration 遵循以下原则:
-
组件应松散耦合,以实现模块化和可测试性。
-
该框架应强制分离业务逻辑和集成逻辑之间的关注点。
-
扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。
主要组件
从纵向角度来看,分层架构有利于关注点分离,层间基于接口的合约促进松耦合。 基于 Spring 的应用程序通常以这种方式设计,而 Spring 框架和产品组合为企业应用程序的完整堆栈遵循此最佳实践提供了坚实的基础。 消息驱动的架构增加了横向视角,但这些相同的目标仍然具有相关性。 正如“分层架构”是一种极其通用和抽象的范式一样,消息传递系统通常遵循类似的抽象“管道和过滤器”模型。 “过滤器”表示能够生成或使用消息的任何组件,“管道”在过滤器之间传输消息,以便组件本身保持松散耦合。 值得注意的是,这两种高级范式并不相互排斥。 支持“管道”的底层消息传递基础设施仍应封装在契约定义为接口的层中。 同样,“过滤器”本身应该在逻辑上高于应用程序服务层的层中进行管理,通过接口与这些服务进行交互,其方式与 Web 层大致相同。
消息
在 Spring Integration 中,消息是任何 Java 对象的通用包装器,与框架在处理该对象时使用的元数据相结合。 它由有效负载和标头组成。 有效负载可以是任何类型,标头包含常见所需的信息,例如 ID、时间戳、相关 ID 和返回地址。 标头还用于向连接的传输传递值或从连接的传输传递值。 例如,从接收的文件创建消息时,文件名可以存储在标头中,以便下游组件访问。 同样,如果邮件的内容最终将由出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可以由上游组件配置为邮件头值。 开发人员还可以在标头中存储任何任意键值对。

消息通道
消息通道表示管道和过滤器体系结构的“管道”。 生产者向通道发送消息,使用者从通道接收消息。 因此,消息通道解耦了消息传递组件,并为拦截和监视消息提供了一个方便的点。

消息通道可以遵循点对点或发布-订阅语义。 使用点对点通道时,发送到该通道的每条消息只能有一个使用者。 另一方面,发布-订阅频道尝试将每条消息广播给频道上的所有订阅者。 Spring Integration 支持这两种模型。
虽然“点对点”和“发布-订阅”定义了最终接收每条消息的消费者数量的两个选项,但还有另一个重要的考虑因素:通道是否应该缓冲消息? 在 Spring Integration 中,可轮询通道能够缓冲队列中的消息。 缓冲的优点是它允许限制入站消息,从而防止使用者过载。 然而,顾名思义,这也增加了一些复杂性,因为如果配置了轮询器,消费者只能从这样的通道接收消息。 另一方面,连接到可订阅通道的消费者只是消息驱动的。消息通道实现 详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。 这意味着您不必直接实现使用者和生产者,甚至不必在消息通道上构建消息和调用发送或接收作。 相反,您应该能够通过基于普通对象的实现来专注于您的特定领域模型。 然后,通过提供声明性配置,您可以将特定于域的代码“连接”到 Spring Integration 提供的消息传递基础设施。 负责这些连接的组件是消息端点。 这并不意味着您必须直接连接现有的应用程序代码。 任何实际的企业集成解决方案都需要一些专注于集成问题(例如路由和转换)的代码。 重要的是实现集成逻辑和业务逻辑之间的关注点分离。 换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个精简但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。 下一节概述了处理这些职责的消息端点类型,在接下来的章节中,您可以看到 Spring Integration 的声明式配置选项如何提供一种非侵入式方式来使用这些职责中的每一个。
消息端点
消息端点表示管道和过滤器体系结构的“过滤器”。 如前所述,端点的主要作用是将应用程序代码连接到消息传递框架,并以非侵入性方式执行此作。 换句话说,理想情况下,应用程序代码应该不了解消息对象或消息通道。 这类似于 MVC 范式中控制器的角色。 就像控制器处理 HTTP 请求一样,消息端点处理消息。 正如控制器映射到 URL 模式一样,消息端点也映射到消息通道。 在这两种情况下,目标都是相同的:将应用程序代码与基础结构隔离开来。 这些概念和后续所有模式在 Enterprise Integration Patterns 一书中进行了详细讨论。 在这里,我们仅提供 Spring Integration 支持的主要端点类型以及与这些类型关联的角色的高级描述。 以下章节详细阐述并提供了示例代码和配置示例。
消息转换器
消息转换器负责转换消息的内容或结构并返回修改后的消息。
最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String
).
同样,转换器可以添加、删除或修改邮件的标头值。
消息过滤器
消息过滤器确定是否应将消息传递到输出通道。
这只需要一个布尔测试方法,该方法可以检查特定的有效负载内容类型、属性值、标头的存在或其他条件。
如果消息被接受,则将其发送到输出通道。
如果没有,则将其删除(或者,对于更严格的实现,将Exception
可以扔掉)。
消息过滤器通常与发布-订阅通道结合使用,其中多个使用者可能会收到相同的消息,并使用过滤器的条件来缩小要处理的消息集。
请注意,不要将管道和过滤器体系结构模式中“过滤器”的通用用法与这种特定终结点类型混淆,该终结点类型有选择地缩小两个通道之间流动的消息范围。 “过滤器”的管道和过滤器概念与 Spring Integration 的消息端点更接近:任何可以连接到消息通道以发送或接收消息的组件。 |
消息路由器
消息路由器负责决定接下来应该接收消息的一个或多个通道(如果有)。 通常,决策基于消息的内容或消息标头中可用的元数据。 消息路由器通常用作服务激活器或其他能够发送回复消息的端点上静态配置的输出通道的动态替代方案。 同样,如前所述,消息路由器为多个订阅者使用的响应式消息过滤器提供了主动替代方案。

聚合
聚合器基本上是拆分器的镜像,是一种消息端点,它接收多条消息并将它们组合成一条消息。
事实上,聚合商通常是包含拆分器的管道中的下游消费者。
从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(要聚合的消息),确定何时完整的消息组可用,并在必要时超时。
此外,在超时的情况下,聚合器需要知道是发送部分结果、丢弃它们还是将它们发送到单独的通道。
Spring Integration 提供了一个CorrelationStrategy
一个ReleaseStrategy
,以及超时的可配置设置,无论
在超时时发送部分结果,并丢弃通道。
服务激活器
服务激活器是用于将服务实例连接到消息传递系统的通用端点。 必须配置输入消息通道,并且,如果要调用的服务方法能够返回值,则还可以提供输出消息通道。
输出通道是可选的,因为每条消息还可以提供自己的“返回地址”标头。 这同样的规则适用于所有使用者端点。 |
服务激活器对某些服务对象调用作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。 每当服务对象的方法返回值时,该返回值也会在必要时转换为回复消息(如果它还不是消息类型)。 该回复消息将发送到输出通道。 如果未配置输出通道,则应答将发送到消息的“返回地址”中指定的通道(如果可用)。
请求-回复服务激活器终结点将目标对象的方法连接到输入和输出消息通道。

如前所述,在消息通道中,通道可以是可轮询的,也可以是可订阅的。 在上图中,这由“时钟”符号、实心箭头(轮询)和虚线箭头(订阅)表示。 |
通道适配器
通道适配器是将消息通道连接到其他系统或传输的端点。 通道适配器可以是入站的,也可以是出站的。 通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。 根据传输,通道适配器还可以填充或提取消息头值。 Spring Integration提供了许多通道适配器,这些适配器将在后续章节中进行描述。

MessageChannel
.消息源可以是可轮询的(例如,POP3)或消息驱动的(例如,IMAP 空闲)。 在上图中,这由“时钟”符号、实心箭头(轮询)和虚线箭头(消息驱动)表示。 |

MessageChannel
到目标系统。如前面在消息通道中讨论的,通道可以是可轮询的,也可以是可订阅的。 在上图中,这由“时钟”符号、实心箭头(轮询)和虚线箭头(订阅)表示。 |
端点 Bean 名称
使用端点(任何具有inputChannel
)由两个 bean 组成,即消费者和消息处理程序。
使用者具有对消息处理程序的引用,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
给定前面的示例,Bean 名称如下:
-
消费者:
someService
(这id
) -
处理器:
someService.handler
使用企业集成模式 (EIP) 注释时,名称取决于几个因素。 考虑以下带注释的 POJO 示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
给定前面的示例,Bean 名称如下:
-
消费者:
someComponent.someMethod.serviceActivator
-
处理器:
someComponent.someMethod.serviceActivator.handler
从 5.0.4 版开始,您可以使用@EndpointId
注释,如以下示例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
给定前面的示例,Bean 名称如下:
-
消费者:
someService
-
处理器:
someService.handler
这@EndpointId
创建由id
属性。
考虑以下带注释的 bean 示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
给定前面的示例,Bean 名称如下:
-
消费者:
someConfiguration.someHandler.serviceActivator
-
处理器:
someHandler
(这@Bean
名称)
从 5.0.4 版开始,您可以使用@EndpointId
注释,如以下示例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") (1)
@EndpointId("someService") (2)
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
1 | 处理器:someService.handler (豆名) |
2 | 消费者:someService (端点 ID) |
这@EndpointId
注释创建由id
属性,只要您使用附加.handler
到@Bean
名字。
有一个特殊情况是创建第三个 bean:出于架构原因,如果MessageHandler
@Bean
不定义AbstractReplyProducingMessageHandler
,框架将提供的 bean 包装在ReplyProducingMessageHandlerWrapper
.
此包装器支持请求处理程序通知处理,并发出正常的“未生成回复”调试日志消息。
它的 bean 名称是处理程序 bean name 加上.wrapper
(当有@EndpointId
— 否则,它是正常生成的处理程序名称)。
类似地,可轮询消息源创建两个 bean,一个SourcePollingChannelAdapter
(爱护动物协会)和MessageSource
.
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
给定前面的 XML 配置,Bean 名称如下所示:
-
爱护动物协会:
someAdapter
(这id
) -
处理器:
someAdapter.source
考虑以下 POJO 的 Java 配置来定义@EndpointId
:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
给定前面的 Java 配置示例,Bean 名称如下:
-
爱护动物协会:
someAdapter
-
处理器:
someAdapter.source
考虑以下 Bean 的 Java 配置,以定义@EndpointID
:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
给定前面的示例,Bean 名称如下:
-
爱护动物协会:
someAdapter
-
处理器:
someAdapter.source
(只要你使用附加.source
到@Bean
名称)
配置和@EnableIntegration
在本文档中,您可以看到对 XML 命名空间支持的引用,用于在 Spring Integration 流中声明元素。
这种支持由一系列命名空间解析器提供,这些解析器生成适当的 bean 定义来实现特定组件。
例如,许多端点由一个MessageHandler
bean 和ConsumerEndpointFactoryBean
将处理程序和输入通道名称注入其中。
第一次遇到 Spring Integration 命名空间元素时,框架会自动声明许多用于支持运行时环境的 bean(任务调度程序、隐式通道创建器等)。
4.0 版引入了@EnableIntegration 注释,以允许注册 Spring Integration 基础架构 bean(请参阅 Javadoc)。
当仅使用 Java 配置时,需要此 Comments — 例如,支持 Spring Boot 或 Spring Integration Messaging Annotation 以及没有 XML 集成配置的 Spring Integration Java DSL。 |
这@EnableIntegration
当您有一个没有 Spring Integration 组件的父上下文和两个或多个使用 Spring Integration 的子上下文时,注释也很有用。
它允许这些常见组件在父上下文中仅声明一次。
这@EnableIntegration
注释向应用程序上下文注册许多基础结构组件。
特别是,它:
-
注册一些内置 bean,例如
errorChannel
及其LoggingHandler
,taskScheduler
对于民意调查员,jsonPath
SpEL 函数等。 -
添加几个
BeanFactoryPostProcessor
实例来增强BeanFactory
适用于全局和默认集成环境。 -
添加几个
BeanPostProcessor
实例来增强或转换和包装特定的 bean 以进行集成。 -
添加注释处理器以解析消息传递注释,并向应用程序上下文注册它们的组件。
这@IntegrationComponentScan
注释还允许类路径扫描。
此注释起着与标准 Spring Framework 类似的作用@ComponentScan
注释,但它仅限于特定于 Spring Integration 的组件和注释,这是标准 Spring Framework 组件扫描机制无法达到的。
有关示例,请参阅@MessagingGateway
注解.
这@EnablePublisher
注释寄存器PublisherAnnotationBeanPostProcessor
bean 并配置default-publisher-channel
对于那些@Publisher
不带channel
属性。
如果多个@EnablePublisher
注解,则它们的默认通道必须具有相同的值。
看注释驱动的配置,其中包含@Publisher
注解了解更多信息。
这@GlobalChannelInterceptor
已引入注释来标记ChannelInterceptor
用于全局通道拦截的 bean。
此注释类似于<int:channel-interceptor>
XML 元素(请参阅全局通道拦截器配置)。@GlobalChannelInterceptor
注释可以放置在类级别(使用@Component
stereotype 注释)或@Bean
方法@Configuration
类。
无论哪种情况,bean 都必须实现ChannelInterceptor
.
从 V5.1 开始,全局通道拦截器适用于动态注册的通道,例如使用beanFactory.initializeBean()
或通过IntegrationFlowContext
使用 Java DSL 时。
以前,在刷新应用程序上下文后创建 Bean 时,不会应用拦截器。
这@IntegrationConverter
注释标记Converter
,GenericConverter
或ConverterFactory
bean 作为候选转换器integrationConversionService
.
此注释类似于<int:converter>
XML 元素(请参阅有效负载类型转换)。
您可以放置@IntegrationConverter
类级别的注释(使用@Component
stereotype 注释)或@Bean
方法@Configuration
类。
有关消息传递注释的更多信息,请参阅注释支持。
编程注意事项
除非另有说明,否则 Spring Integration 中的大多数类必须在应用程序上下文中声明为 bean 和单例。
这意味着这些类的实例是线程安全的,它们的生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。
实用程序和构建器类 (JacksonMessagingUtils
,MessageBuilder
,ExpressionEvalMap
,IntegrationReactiveUtils
等等)可以直接在 Java 代码中使用。
但是,Java DSL 工厂和IntegrationComponentSpec
实现结果仍然必须作为 bean 注册到应用程序上下文中。
这Session
抽象,存在于许多模块中,不是线程安全的,通常由Factory
模式实现,并从线程安全Template
模式。
例如,请参阅SftpRemoteFileTemplate
及其与DefaultSftpSessionFactory
.
您应该尽可能使用普通的旧 Java 对象 (POJO)(用于目标逻辑中的消息处理),并且仅在绝对必要时在代码中公开框架。 有关更多信息,请参阅 POJO 方法调用。
如果您确实将框架公开给您的类,则需要考虑一些注意事项,尤其是在应用程序启动期间:
-
如果您的组件是
ApplicationContextAware
,通常不应使用ApplicationContext
在setApplicationContext()
方法。 相反,请存储引用并将此类使用推迟到上下文生命周期的后期。 -
如果您的组件是
InitializingBean
或用途@PostConstruct
方法,则不要从这些初始化方法发送任何消息。 调用这些方法时,应用程序上下文尚未初始化,发送此类消息可能会失败。 如果需要在启动期间发送消息,请实现ApplicationListener
并等待ContextRefreshedEvent
. 或者,实现SmartLifecycle
,将您的 Bean 置于后期阶段,并从start()
方法。
使用打包(例如,着色)罐时的注意事项
Spring Integration 通过使用 Spring Framework 的SpringFactories
加载多个的机制IntegrationConfigurationInitializer
类。
这包括-core
jar 以及某些其他,包括-http
和-jmx
.
此过程的信息存储在META-INF/spring.factories
文件。
一些开发人员更喜欢使用众所周知的工具(例如 Apache Maven Shade Plugin)将他们的应用程序和所有依赖项重新打包到单个 jar 中。
默认情况下,阴影插件不会合并spring.factories
文件。
除了spring.factories
其他META-INF
文件 (spring.handlers
和spring.schemas
) 用于 XML 配置。
这些文件也需要合并。
Spring Boot 的可执行 jar 机制采用了不同的方法,因为它嵌套了 jar,从而保留了每个 jarspring.factories 文件。
因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需更多。 |
即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具通过为上述文件添加转换器来增强 shade 插件。 以下示例显示了如何配置插件:
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> (1)
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> (2)
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体说来
1 | 添加spring-boot-maven-plugin 作为依赖项。 |
2 | 配置转换器。 |
您可以为${spring.boot.version}
或使用显式版本。
编程提示和技巧
本节记录了一些充分利用 Spring Integration 的方法。
XML 模式
使用 XML 配置时,为避免出现错误的模式验证错误,您应该使用“Spring 感知”IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。
这些 IDE 知道如何从类路径解析正确的 XML 模式(通过使用META-INF/spring.schemas
文件)。
将 STS 或 Eclipse 与插件一起使用时,必须启用Spring Project Nature
在项目上。
出于兼容性原因,某些旧模块(版本 1.0 中存在的模块)托管在 Internet 上的架构是 1.0 版本。 如果您的 IDE 使用这些架构,您可能会看到错误。
每个联机架构都有类似于以下内容的警告:
此模式适用于 Spring Integration Core 的 1.0 版本。 我们无法将其更新为当前模式,因为这会破坏任何使用 1.0.3 或更低版本的应用程序。 对于后续版本,“未版本控制”模式从类路径解析并从 jar 中获取。 请参考 GitHub: |
受影响的模块包括
-
core
(spring-integration.xsd
) -
file
-
http
-
jms
-
mail
-
security
-
stream
-
ws
-
xml
查找 Java 和 DSL 配置的类名
通过XML配置和Spring Integration Namespace支持,XML解析器隐藏了目标bean的声明方式和连接方式。 对于 Java 配置,了解目标最终用户应用程序的框架 API 非常重要。
实施 EIP 的一等公民是Message
,Channel
和Endpoint
(参见本章前面的主要组件)。
它们的实现(合同)是:
前两个足够简单,可以理解如何实现、配置和使用。 最后一个值得更多关注
这AbstractEndpoint
在整个 Spring 框架中广泛用于不同的组件实现。
它的主要实现是:
-
EventDrivenConsumer
,当我们订阅SubscribableChannel
以监听消息。 -
PollingConsumer
,当我们轮询来自PollableChannel
.
当您使用消息传递注释或 Java DSL 时,您无需担心这些组件,因为框架会自动生成具有适当注释和BeanPostProcessor
实现。
手动构建组件时,应使用ConsumerEndpointFactoryBean
帮助确定目标AbstractEndpoint
消费者实现创建,基于提供的inputChannel
财产。
另一方面,ConsumerEndpointFactoryBean
授权给《框架》中的另一一等公民——org.springframework.messaging.MessageHandler
.
实现此接口的目标是处理终结点从通道使用的消息。
Spring Integration 中的所有 EIP 组件都是MessageHandler
实现(例如,AggregatingMessageHandler
,MessageTransformingHandler
,AbstractMessageSplitter
等)。
目标协议出站适配器 (FileWritingMessageHandler
,HttpRequestExecutingMessageHandler
,AbstractMqttMessageHandler
等)也是MessageHandler
实现。
当您使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以找到合适的MessageHandler
实现用于@ServiceActivator
配置。
例如,若要发送 XMPP 消息(请参阅 XMPP 支持),应配置如下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
这MessageHandler
实现表示消息流的出站和处理部分。
入站消息流侧有自己的组件,分为轮询行为和监听行为。
监听(消息驱动)组件很简单,通常只需要一个目标类实现即可
生成消息。
聆听组件可以是单向的MessageProducerSupport
实现(例如AbstractMqttMessageDrivenChannelAdapter
和ImapIdleChannelAdapter
) 或请求-回复MessagingGatewaySupport
实现(例如AmqpInboundGateway
和AbstractWebServiceInboundGateway
).
轮询入站端点适用于不提供侦听器 API 或不适用于的协议 此类行为,包括任何基于文件的协议(例如 FTP)、任何数据库(RDBMS 或 NoSQL)等。
这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务,
以及一个消息源类,用于从目标协议读取数据并为下游集成流生成消息。
轮询器配置的第一类是SourcePollingChannelAdapter
.
又是一个AbstractEndpoint
实现,但特别是用于轮询以启动集成流。
通常,使用消息传递注释或 Java DSL,您不必担心此类。
框架根据@InboundChannelAdapter
配置或 Java DSL 构建器规范。
消息源组件对于目标应用程序开发更为重要,它们都实现了MessageSource
接口(例如MongoDbMessageSource
和AbstractTwitterMessageSource
).
考虑到这一点,我们使用 JDBC 从 RDBMS 表读取数据的配置可能类似于以下内容:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。
例如,spring-integration-websocket
适配器是:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter
:实现MessageProducerSupport
侦听套接字上的帧并向通道生成消息。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler
:单向AbstractMessageHandler
实现将传入消息转换为适当的帧并通过 WebSocket 发送。
如果您熟悉 Spring Integration XML 配置,从 4.3 版开始,我们在 XSD 元素定义,了解哪些目标类用于为适配器或网关声明 Bean,如以下示例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
如编程注意事项中所述,我们建议使用 POJO 编程风格,如以下示例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架会提取一个String
payload,调用您的方法,并将结果包装在消息中以发送到流中的下一个组件(原始标头将复制到新消息)。
事实上,如果您使用 XML 配置,您甚至不需要@ServiceActivator
注释,如以下配对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
您可以省略method
属性,只要类上的公共方法没有歧义。
您还可以在 POJO 方法中获取标头信息,如以下示例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
您还可以取消引用消息上的属性,如以下示例所示:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
由于有各种 POJO 方法调用可用,因此 5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。
与方法中通常完成的实际工作相比,SpEL(甚至解释)对于这些作通常“足够快”。
但是,从 5.0 版本开始,org.springframework.messaging.handler.invocation.InvocableHandlerMethod
尽可能默认使用。
这种技术通常比解释的 SpEL 执行得更快,并且与其他 Spring 消息传递项目一致。
这InvocableHandlerMethod
类似于在 Spring MVC 中调用控制器方法的技术。
使用 SpEL 时,仍然总是会调用某些方法。
示例包括具有取消引用属性的注释参数,如前所述。
这是因为 SpEL 具有导航属性路径的能力。
可能还有一些我们没有考虑过的其他极端情况也不起作用InvocableHandlerMethod
实例。
出于这个原因,在这些情况下,我们会自动回退到使用 SpEL。
如果您愿意,您还可以设置 POJO 方法,使其始终使用 SpEL,并使用UseSpelInvoker
注释,如以下示例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果compilerMode
属性被省略,则spring.expression.compiler.mode
system 属性确定编译器模式。
有关已编译的 SpEL 的更多信息,请参阅 SpEL 编译。