系统管理
系统管理
指标与管理
本节介绍如何捕获 Spring Integration 的指标。 在较新的版本中,我们更多地依赖 Micrometer(参见 https://micrometer.io),并计划在未来的发布中更广泛地使用 Micrometer。
在高流量环境中禁用日志记录
您可以在主消息流中控制调试日志记录。
在极高流量的应用中,某些日志子系统对 isDebugEnabled() 的调用可能非常昂贵。
您可以禁用所有此类日志记录以避免此开销。
异常日志记录(无论是调试还是其他级别)不受此设置影响。
以下列表显示了用于控制日志记录的可选项:
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)
| 1 | 设置为 false 以禁用主消息流中的所有日志记录,无论日志系统类别设置如何。
设置为 'true' 以启用调试日志记录(如果日志子系统也启用了该功能)。
仅当未在 bean 定义中显式配置该设置时才适用。
默认值为 true。 |
defaultLoggingEnabled 仅在您未在 bean 定义中显式配置相应设置时适用。 |
Micrometer 集成
概述
从版本 5.0.3 开始,应用上下文中存在 Micrometer MeterRegistry 将触发对 Micrometer 指标的支持。
要使用 Micrometer,请将其中一个 MeterRegistry Bean 添加到应用程序上下文中。
对于每个 MessageHandler 和 MessageChannel,都会注册计时器。
对于每个 MessageSource,会注册一个计数器。
这仅适用于扩展 AbstractMessageHandler、AbstractMessageChannel 和 AbstractMessageSource 的对象(大多数框架组件均属于这种情况)。
用于消息通道发送操作的 Timer 米具有以下名称或标签:
-
name:spring.integration.send -
tag:type:channel -
tag:name:<componentName> -
tag:result:(success|failure) -
tag:exception:(none|exception simple class name) -
description:Send processing time
(带有 none 个异常的 failure 条结果表示通道的 send() 操作返回了 false。)
可轮询消息通道上接收操作的 Counter 米具有以下名称或标签:
-
name:spring.integration.receive -
tag:type:channel -
tag:name:<componentName> -
tag:result:(success|failure) -
tag:exception:(none|exception simple class name) -
description:Messages received
用于消息处理器操作的 Timer 米的指标具有以下名称或标签:
-
name:spring.integration.send -
tag:type:handler -
tag:name:<componentName> -
tag:result:(success|failure) -
tag:exception:(none|exception simple class name) -
description:Send processing time
消息源的 Counter 米具有以下名称/标签:
-
name:spring.integration.receive -
tag:type:source -
tag:name:<componentName> -
tag:result:success -
tag:exception:none -
description:Messages received
此外,还有三个 Gauge 米:
-
spring.integration.channels: 应用程序中MessageChannels的数量。 -
spring.integration.handlers: 应用程序中MessageHandlers的数量。 -
spring.integration.sources: 应用程序中MessageSources的数量。
可以通过提供 MicrometerMetricsCaptor 的子类来自定义集成组件创建的 Meters 的名称和标签。
MicrometerCustomMetricsTests 测试用例展示了如何实现这一点的简单示例。
您还可以通过重载构建器子类上的 build() 方法进一步自定义计量器。
从版本 5.1.13 开始,QueueChannel 为队列大小和剩余容量公开了 Micrometer 指标:
-
name:spring.integration.channel.queue.size -
tag:type:channel -
tag:name:<componentName> -
description:The size of the queue channel
和
-
name:spring.integration.channel.queue.remaining.capacity -
tag:type:channel -
tag:name:<componentName> -
description:The remaining capacity of the queue channel
禁用指标
默认情况下,所有指标在首次使用时都会注册。
现在,借助 Micrometer,您可以在 MeterRegistry 中添加 MeterFilter,以防止部分或全部指标被注册。
您可以通过提供的任意属性(如 name、tag 等)过滤掉(拒绝)某些指标。
有关更多信息,请参阅 Micrometer 文档中的 指标过滤器。
例如,给定:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
您可以使用以下方法仅针对此通道抑制计量器的注册:
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
Micrometer 观测
从 6.0 版本开始,Spring Integration 利用 Micrometer Observation 抽象,可通过适当的 ObservationHandler 配置处理指标以及 链路追踪。
当应用程序上下文中存在 ObservationRegistry bean 且配置了 @EnableIntegrationManagement 时,IntegrationManagement 组件上的观察处理将被启用。
要自定义应被仪器化的组件集合,@EnableIntegrationManagement 注解上公开了 observationPatterns() 属性。
请参阅其 Javadoc 以了解模式匹配算法。
默认情况下,没有任何 IntegrationManagement 组件被配置为 ObservationRegistry Bean。
可以将其配置为 * 以匹配所有组件。 |
在这种情况下,指标并非独立收集,而是委托给在提供的 ObservationRegistry 上配置的适当 ObservationHandler。
以下 Spring Integration 组件均配备了观测逻辑,每个组件都有相应的约定:
-
MessageProducerSupport作为流程的入口端点,被视为1类型的span,并使用2 API; -
MessagingGatewaySupport` 是一个入站请求 - 响应端点,被视为
SERVER类型的跨度。 它使用IntegrationObservation.GATEWAYAPI; -
AbstractMessageChannel.send()操作是唯一生成消息的 Spring Integration API。 因此,它被视为PRODUCER跨度类型,并使用IntegrationObservation.PRODCUERAPI。 当通道是分布式实现(例如PublishSubscribeKafkaChannel或ZeroMqChannel)且需要向消息添加跟踪信息时,这样做更有意义。 因此,IntegrationObservation.PRODUCER观察基于MessageSenderContext,其中 Spring Integration 提供MutableMessage,以便后续的跟踪Propagator可以添加标头,从而使消费者能够使用这些标头; -
一个
AbstractMessageHandler是CONSUMER类型的 span,并使用IntegrationObservation.HANDLERAPI。
关于 IntegrationManagement 组件的观察生产可通过 ObservationConvention 配置进行自定义。
例如,AbstractMessageHandler 通过其 setObservationConvention() API 期望接收一个 MessageReceiverObservationConvention。
以下支持 Observation API 的指标、跨度及约定:
可观测性 - 指标
以下列出了本项目声明的所有指标。
网关
入站消息网关的观测。
指标名称 spring.integration.gateway(由约定类 o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention 定义)。类型 timer。
指标名称 spring.integration.gateway.active(由约定类 o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention 定义)。类型 long task timer。
| 在启动 Observation 后添加的 KeyValues 可能会缺失于 *.active 指标中。 |
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(即 Prometheus 使用秒) |
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息网关组件的名称。 |
|
请求/响应执行的结果。 |
|
组件类型 - '网关'。 |
处理器
消息处理器的观察功能。
指标名称 spring.integration.handler(由约定类 o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention 定义)。类型 timer。
指标名称 spring.integration.handler.active(由约定类 o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention 定义)。类型 long task timer。
| 在启动 Observation 后添加的 KeyValues 可能会缺失于 *.active 指标中。 |
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(即 Prometheus 使用秒) |
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息处理组件的名称。 |
|
组件类型 - '处理器'。 |
生产者
为消息生产者(例如通道)提供的观察功能。
指标名称 spring.integration.producer(由约定类 o.s.i.support.management.observation.DefaultMessageSenderObservationConvention 定义)。类型 timer。
指标名称 spring.integration.producer.active(由约定类 o.s.i.support.management.observation.DefaultMessageSenderObservationConvention 定义)。类型 long task timer。
| 在启动 Observation 后添加的 KeyValues 可能会缺失于 *.active 指标中。 |
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(即 Prometheus 使用秒) |
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息处理组件的名称。 |
|
组件类型 - '生产者'。 |
可观测性 - 跨度
以下列出了本项目声明的所有 span。
网关跨度
入站消息网关的观测。
Span 名称 spring.integration.gateway(由约定类 o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention 定义)。
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息网关组件的名称。 |
|
请求/响应执行的结果。 |
|
组件类型 - '网关'。 |
处理器跨度
消息处理器的观察功能。
Span 名称 spring.integration.handler(由约定类 o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention 定义)。
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息处理组件的名称。 |
|
组件类型 - '处理器'。 |
生产者 Span
为消息生产者(例如通道)提供的观察功能。
Span 名称 spring.integration.producer(由约定类 o.s.i.support.management.observation.DefaultMessageSenderObservationConvention 定义)。
封闭类 o.s.i.support.management.observation.IntegrationObservation 的全限定名。
所有标签都必须以 spring.integration. 前缀开头! |
姓名 |
描述 |
|
消息处理组件的名称。 |
|
组件类型 - '生产者'。 |
可观测性 - 约定
以下列出了本项目声明的所有 GlobalObservationConvention 和 ObservationConvention。
ObservationConvention 类名 |
适用的 ObservationContext 类名 |
|
|
|
|
|
|
|
|
|
|
|
|
观测传播
为了在一条跟踪中提供连接的跨度链,无论消息流的性质如何,即使 MessageChannel 是持久化且分布式的,也必须在此通道以及该通道的消费者(订阅者)上启用观察功能。
通过这种方式,跟踪信息会被存储在消息头中,然后再传播到消费者线程或持久化到数据库中。
这是通过上述提到的 MessageSenderContext 完成的。
消费者(MessageHandler)端使用 MessageReceiverContext 从这些消息头中恢复跟踪信息,并启动一个新的子 Observation。
Spring Integration JMX 支持
另请参阅 JMX 支持。
消息历史
消息架构的关键优势在于松耦合,使得参与组件之间无需相互感知。 仅凭这一点,就能使应用程序极具灵活性,允许您更改组件而不影响其余流程、更改消息路由、更改消息消费方式(轮询与事件驱动)等。 然而,当出现问题时,这种低调的架构风格可能会带来困难。 在调试时,您可能希望尽可能获取关于消息的更多信息(包括其来源、经过的通道以及其他细节)。
消息历史是那些通过提供选项来维护对消息路径的某种程度的感知,以便用于调试目的或维护审计跟踪的模式之一。 Spring Integration 提供了一种简单的方法来配置您的消息流,通过在消息中添加一个标头并在每次消息通过受跟踪组件时更新该标头来维护消息历史。
消息历史配置
要启用消息历史记录,您只需在配置中定义 message-history 元素(或 @EnableMessageHistory),如下例所示:
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>
现在,每个已命名的组件(即定义了'id'的组件)都会被追踪。
框架会在您的消息中设置'history'标头。
其值为 List<Properties>。
考虑以下配置示例:
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>
上述配置生成了一个简单的消息历史结构,输出类似于以下内容:
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]
要访问消息历史,您只需访问 MessageHistory 标头。
以下示例展示了如何实现:
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
您可能不想跟踪所有组件。
要基于组件名称将历史记录限制为特定组件,您可以提供 tracked-components 属性,并指定一个逗号分隔的组件名称和模式列表,以匹配您希望跟踪的组件。
以下示例展示了如何实现:
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>
在前面的示例中,消息历史仅针对以'Gateway'结尾、以'sample'开头或完全匹配名称'aName'的组件进行维护。
此外,MessageHistoryConfigurer bean 现在由 IntegrationMBeanExporter 暴露为 JMX MBean(参见 MBean Exporter),允许您在运行时更改模式。
但请注意,要更改模式,必须先停止该 bean(即关闭消息历史记录)。
此功能可能用于临时启用历史记录以分析系统。
MBean 的对象名称为 <domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer。
在应用上下文中,必须仅声明一个 @EnableMessageHistory(或 <message-history/>)作为组件跟踪配置的唯一来源。
不要为 MessageHistoryConfigurer 使用通用的 Bean 定义。 |
| 根据定义,消息历史头是不可变的(您无法重写历史)。 因此,在写入消息历史值时,组件要么创建新消息(当组件是源头时),要么从请求消息中复制历史,对其进行修改并将新列表设置到响应消息上。 无论哪种情况,即使消息本身跨越线程边界,这些值也可以被追加。 这意味着历史值可以极大地简化异步消息流中的调试工作。 |
消息存储
《企业集成模式》(EIP)一书指出了几种能够缓冲消息的模式。
例如,聚合器会缓冲消息直到它们可以被释放,而 QueueChannel 会缓冲消息直到消费者显式地从该通道接收这些消息。
由于消息流中的任何位置都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的风险点。
为了降低消息丢失的风险,EIP 定义了 消息存储 模式,该模式允许 EIP 组件将消息存储在某种持久化存储中(例如 RDBMS)。
Spring Integration 通过以下方式支持消息存储模式:
-
定义
org.springframework.integration.store.MessageStore策略接口 -
提供该接口的多种实现
-
在所有具备消息缓冲能力的组件上公开一个
message-store属性,以便您可以注入任何实现MessageStore接口的实例。
关于如何配置特定的消息存储实现以及如何将MessageStore实现注入到特定的缓冲组件中的详细信息,在手册中均有描述(参见特定组件,例如QueueChannel、Aggregator、Delayer等)。
以下两对示例展示了如何为QueueChannel和聚合器添加对消息存储的引用:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
默认情况下,消息通过使用 o.s.i.store.SimpleMessageStore(即 MessageStore 的一个实现)存储在内存中。
对于开发环境或简单的低流量环境,如果非持久化消息的潜在丢失不是问题,这可能已经足够。
然而,典型的生产应用程序需要更稳健的选项,不仅要降低消息丢失的风险,还要避免潜在的内存溢出错误。
因此,我们还为各种数据存储提供了 MessageStore 的实现。
以下是支持的实现的完整列表:
-
Hazelcast 消息存储: 使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储: 使用关系数据库管理系统 (RDBMS) 来存储消息
-
Redis 消息存储: 使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储: 使用 MongoDB 文档存储来存储消息
|
然而,在使用 消息数据(负载和标头)根据 请特别注意代表特定类型数据的标头。
例如,如果其中一个标头包含某个 Spring Bean 的实例,在反序列化时,您可能会得到该 Bean 的不同实例,这将直接影响框架创建的一些隐式标头(如 从 Spring Integration 3.0 版本开始,您可以通过配置一个头部增强器来解决此问题,该增强器在将通道注册到 此外,请考虑当您将消息流配置为以下形式时会发生什么:gateway → queue-channel(由持久化消息存储支持)→ service-activator。
该网关会创建一个临时回复通道,但在 service-activator 的轮询器从队列读取时,该通道已丢失。
同样,您可以使用头信息增强器将头信息替换为 有关更多信息,请参阅Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore: 为QueueChannel实例实现特定操作 -
PriorityCapableChannelMessageStore: 用于标记MessageStore实现,以便为PriorityChannel实例使用,并为持久化消息提供优先级顺序。
实际行为取决于具体实现。
该框架提供以下实现,可用作 MessageStore、QueueChannel 和 PriorityChannel 的持久化存储:
|
关于
SimpleMessageStore 的注意从版本 4.1 开始, 现在,在聚合器等组件之外访问组存储的用户将直接获得聚合器所使用的组的引用,而不是副本。 在聚合器之外操作该组可能会导致不可预测的结果。 出于此原因,您要么不应执行此类操作,要么将 |
使用MessageGroupFactory
从版本 4.3 开始,某些 MessageGroupStore 实现可以注入自定义的 MessageGroupFactory 策略,以创建和定制 MessageGroupStore 所使用的 MessageGroup 实例。
默认情况下使用 SimpleMessageGroupFactory,它基于 GroupType.HASH_SET(LinkedHashSet)内部集合生成 SimpleMessageGroup 实例。
其他可选方案包括 SYNCHRONISED_SET 和 BLOCKING_QUEUE,其中后者可用于恢复之前的 SimpleMessageGroup 行为。
此外,还提供了 PERSISTENT 选项。
更多信息请参阅下一节。
从版本 5.0.1 开始,当组内消息的顺序和唯一性不重要时,也可使用 LIST 选项。
持久的MessageGroupStore和延迟加载
从版本 4.3 开始,所有持久的 MessageGroupStore 实例都以延迟加载的方式从存储中检索 MessageGroup 实例及其 messages。
在大多数情况下,这对于关联 MessageHandler 实例很有用(参见 聚合器 和 重排序器),因为在每次关联操作时都从存储中加载整个 MessageGroup 会增加开销。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 选项从配置中关闭延迟加载行为。
我们对 MongoDB MessageStore(MongoDB 消息存储)和 <aggregator>(聚合器)的惰性加载性能测试使用了如下类似的自定义 release-strategy:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于 1000 条简单消息,它会产生类似以下结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从 5.5 版本开始,所有持久的 MessageGroupStore 实现都提供了基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId) 契约。
当存储中的组非常大时,这提高了资源利用率。
在框架内部,这个新 API 被用于例如 Delayer(延迟器),当它在启动时重新调度持久化消息时。
返回的 Stream<Message<?>> 必须在处理结束时关闭,例如通过 try-with-resources 的自动关闭功能。
每当使用 PersistentMessageGroup 时,其 streamMessages() 会委托给 MessageGroupStore.streamMessagesForGroup()。
消息组条件
从版本 5 开始。5,MessageGroup 抽象提供了一个 condition 字符串选项。该选项的值可以是任何内容,以便后续根据需要进行解析,从而为分组做出决策。例如,来自 相关消息处理器 的 ReleaseStrategy 可能从组中查询此属性,而不是遍历组中的所有消息。The MessageGroupStore exposes a setGroupCondition(Object groupId, String condition) API.为此,在AbstractCorrelatingMessageHandler中添加了setGroupConditionSupplier(BiFunction<Message<?>, String, String>)选项。此函数在消息添加到组后以及组的现有条件下进行评估。实现可能会决定返回一个新值、现有值,或将目标条件重置为 null。condition 的值可以是 JSON、SpEL 表达式、数字,或任何可序列化为字符串并随后被解析的内容。例如,来自 文件聚合器 组件的 FileMarkerReleaseStrategy 会将一个条件填充到来自 FileSplitter.FileMarker.Mark.END 消息的 FileHeaders.LINE_COUNT 头部的组中,并通过将其与 canRelease() 进行比较来根据该条件检查组的大小。这样它就不会遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT 头部的 FileSplitter.FileMarker.Mark.END 消息。它允许结束标记在聚合器中比其他记录更早到达;例如,当在多线程环境中处理文件时。
此外,为了配置方便,引入了 GroupConditionProvider 契约。
AbstractCorrelatingMessageHandler 检查提供的 ReleaseStrategy 是否实现了该接口,并提取一个 conditionSupplier 用于组条件评估逻辑。
元数据存储
许多外部系统、服务或资源是非事务性的(例如 Twitter、RSS、文件系统,等等),并且没有任何能力将数据标记为已读。
此外,有时您可能需要在某些集成解决方案中实现企业集成模式 幂等接收器。
为了实现这一目标并在与外部系统的下一次交互之前存储端点的先前状态,或者处理下一条消息,Spring Integration 提供了元数据存储组件,该组件作为 org.springframework.integration.metadata.MetadataStore 接口的实现,并提供通用的键值契约。
元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个馈送条目的发布日期),以帮助诸如馈送适配器之类的组件处理重复项。
如果某个组件未直接获得对 MetadataStore 的引用,则定位元数据存储的算法如下:首先,在应用程序上下文中查找 ID 为 metadataStore 的 Bean。
如果找到,则使用它。
否则,创建 SimpleMetadataStore 的新实例,这是一个内存中的实现,仅在当前运行的应用程序上下文的生命周期内持久化元数据。
这意味着,重启后,您可能会遇到重复条目。
如果您需要在应用程序上下文重启之间持久化元数据,该框架提供以下持久的 MetadataStores:
-
PropertiesPersistingMetadataStore
The PropertiesPersistingMetadataStore 由属性文件和一个 PropertiesPersister 提供支持。
默认情况下,它仅在应用程序上下文正常关闭时持久化状态。
它实现了Flushable,以便您可以通过调用flush()按需持久化状态。
以下示例展示了如何使用 XML 配置'PropertiesPersistingMetadataStore':
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
或者,您可以提供MetadataStore接口的自定义实现(例如JdbcMetadataStore),并将其作为 Bean 配置在应用程序上下文中。
从版本 4.0 开始,SimpleMetadataStore、PropertiesPersistingMetadataStore 和 RedisMetadataStore 实现了 ConcurrentMetadataStore。
这些提供了原子更新功能,可用于多个组件或应用程序实例之间。
幂等接收器和元数据存储
元数据存储对于实现 EIP 幂等接收器 模式非常有用,当需要过滤已处理过的传入消息时,您可以丢弃该消息或执行其他逻辑。 以下配置展示了如何实现这一点:
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
幂等条目的 value 可以是过期日期,此后该条目应由某个定时清理程序从元数据存储中移除。
另见 幂等接收器企业集成模式。
MetadataStoreListener
某些元数据存储(目前仅支持 ZooKeeper)支持注册监听器以在项发生变化时接收事件,如下例所示:
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
查看 Javadoc 以获取更多信息。
如果您只感兴趣于部分事件,可以继承 MetadataStoreListenerAdapter 类。
控制总线
正如《企业集成模式(EIP)一书中所描述的那样,控制总线的理念是:同一消息系统既可用于监控和管理框架内的组件,也可用于“应用级”消息传递。 在 Spring Integration 中,我们基于上述适配器构建功能,使您能够发送消息以调用已暴露的操作。
以下示例展示了如何使用 XML 配置控制总线:
<int:control-bus input-channel="operationChannel"/>
控制总线拥有一个输入通道,可用于调用应用上下文中的 Bean 操作。 它还具备服务激活端点的所有通用属性。 例如,如果操作结果包含需要发送到下游通道的返回值,您可以指定一个输出通道。
控制总线在输入通道上运行消息,将其作为 Spring 表达式语言 (SpEL) 表达式处理。它接收一条消息,将主体编译为表达式,添加一些上下文,然后运行它。默认上下文支持任何已使用 @ManagedAttribute 或 @ManagedOperation 注解的方法。它同时也支持 Spring 的 Lifecycle 接口上的方法(以及自 5.0 版本起其 Pausable 扩展)。2),它支持用于配置 Spring 的多个 TaskExecutor 和 TaskScheduler 实现的方法。确保自己的方法对控制总线可用的最简单方法是使用 @ManagedAttribute 或 @ManagedOperation 注解。由于这些注解也用于将方法暴露给 JMX MBean 注册表,因此它们提供了一个方便的副产品:通常,您希望向控制总线暴露的同一类操作也合理地适合通过 JMX 进行暴露)。应用上下文中任何特定实例的解析都是通过典型的 SpEL 语法实现的。为此,请为 Bean 提供带有 SpEL 前缀的 Bean 名称(@)。例如,要在 Spring Bean 上执行方法,客户端可以向操作通道发送如下消息:
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
表达式的上下文根是 Message 本身,因此您在表达式中也可以将 payload 和 headers 作为变量访问。
这与 Spring Integration 端点中的所有其他表达式支持一致。
使用 Java 注解,您可以按如下方式配置控制总线:
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同样,您可以按如下方式配置 Java DSL 流定义:
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
如果您更喜欢使用带有自动 DirectChannel 创建的 lambda 表达式,可以按以下方式创建控制总线:
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
在这种情况下,通道名称为 controlBus.input。
有序关闭
正如"MBean 导出器"中所描述,MBean 导出器提供了一个名为 stopActiveComponents 的 JMX 操作,该操作用于以有序方式停止应用程序。
此操作接受一个 Long 参数。
该参数指定操作等待(以毫秒为单位)以允许进行中的消息完成的时间长度。
该操作的执行过程如下:
-
对所有实现
OrderlyShutdownCapable的 Bean 调用beforeShutdown()。这样做可以让这些组件为关闭做好准备。 实现此接口的组件示例及其在此调用中的操作包括:JMS 和 AMQP 消息驱动适配器会停止其监听器容器,TCP 服务器连接工厂会停止接受新连接(同时保持现有连接打开),TCP 入站端点会丢弃(记录)任何收到的新消息,HTTP 入站端点会对任何新请求返回
503 - Service Unavailable。 -
停止任何活动通道,例如由 JMS 或 AMQP 支持的通道。
-
停止所有
MessageSource实例。 -
停止所有入站
MessageProducers(那些不是OrderlyShutdownCapable的)。 -
等待剩余时间,该时间由传递给操作的
Long参数的值定义。这样做可以让所有进行中的消息完成它们的旅程。 因此,在调用此操作时选择适当的超时时间非常重要。
-
在所有
OrderlyShutdownCapable组件上调用afterShutdown()。这样做可以让这些组件执行最终的关闭任务(例如,关闭所有打开的套接字)。
如 有序关闭管理操作 中所述,此操作可通过 JMX 调用。
如果您希望以编程方式调用该方法,则需要注入或获取对 IntegrationMBeanExporter 的引用。
如果 <int-jmx:mbean-export/> 定义上未提供 id 属性,则该 bean 将生成一个名称。
该名称包含随机成分,以避免在同一个 JVM 中存在多个 Spring Integration 上下文时发生 ObjectName 冲突(MBeanServer)。
出于这个原因,如果您希望以编程方式调用该方法,我们建议您为导出器提供一个id属性,以便您可以在应用程序上下文中轻松访问它。
最后,可以使用 <control-bus> 元素来调用该操作。
有关详细信息,请参阅 监控 Spring Integration 示例应用程序。
上述算法在 4.1 版本中得到了改进。
此前,所有任务执行器和调度器都会被停止。
这可能导致 QueueChannel 实例中的中途消息残留。
现在,关闭操作会保留轮询器运行,以便这些消息能够被排出并处理。 |
集成图
从版本 4.3 开始,Spring Integration 提供了对应用程序运行时对象模型的访问权限,该模型可选择性地包含组件指标。
它以图形形式公开,可用于可视化集成应用程序的当前状态。
o.s.i.support.management.graph 包包含所有必要的类,用于收集、构建并以单个树状 Graph 对象的形式呈现 Spring Integration 组件的运行时状态。
应声明 IntegrationGraphServer 为 Bean,以构建、检索和刷新 Graph 对象。
生成的 Graph 对象可以序列化为任何格式,尽管 JSON 在客户端解析和表示方面具有灵活性和便利性。
仅包含默认组件的 Spring Integration 应用程序将按如下方式公开图形:
{
"contentDescriptor" : {
"providerVersion" : "6.1.9",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}
| 版本 5.2 弃用了传统指标,转而采用 Micrometer 指标,如指标管理中所述。 传统指标在版本 5.4 中被移除,将不再出现在图表中。 |
在前面的示例中,该图由三个顶级元素组成。
contentDescriptor图元素包含有关提供数据的应用程序的常规信息。
name可以在IntegrationGraphServerbean上或spring.application.name应用程序上下文环境属性中进行自定义。
其他属性由框架提供,用于将相似模型与其他来源区分开来。
links图元素表示来自nodes图元素的节点之间的连接,因此也表示源 Spring Integration 应用程序中集成组件之间的连接。
例如,从MessageChannel到EventDrivenConsumer,中间带有某些MessageHandler,或从AbstractReplyProducingMessageHandler到MessageChannel。
为了方便并帮助您确定链接的用途,模型包含了type属性。
可能的类型包括:
-
input: 标识从MessageChannel到终点inputChannel或requestChannel属性的方向 -
output: 从MessageHandler、MessageProducer或SourcePollingChannelAdapter通过outputChannel或replyChannel属性到MessageChannel的方向 -
error: 从MessageHandler在PollingConsumer或MessageProducer或SourcePollingChannelAdapter到MessageChannel,通过errorChannel属性; -
discard: 从DiscardingMessageHandler(例如MessageFilter)通过errorChannel属性到MessageChannel。 -
route: 从AbstractMappingMessageRouter(例如HeaderValueRouter)到MessageChannel。 类似于output,但在运行时确定。 可能是配置的信道映射或动态解析的信道。 路由器通常仅为此目的保留最多 100 条动态路由,但你可以通过设置dynamicChannelLimit属性来修改此值。
此元素提供的信息可被可视化工具用于渲染来自 nodes 图元素的节点之间的连接,其中 from 和 to 数字代表链接节点的 nodeId 属性的值。
例如,link 元素可用于确定目标节点上正确的 port。
以下“文本图像”展示了类型之间的关系:
+---(discard)
|
+----o----+
| |
| |
| |
(input)--o o---(output)
| |
| |
| |
+----o----+
|
+---(error)
The nodes 图元素可能最有趣,因为它的元素不仅包含带有其 componentType 实例和 name 值的运行时组件,还可以选择性地包含由该组件暴露的指标。
节点元素包含各种属性,通常这些属性都是不言自明的。
例如,基于表达式的组件包括 expression 属性,其中包含该组件的主要表达式字符串。
要启用指标,请在 @Configuration 类中添加一个 @EnableIntegrationManagement,或者在 XML 配置中添加一个 <int:management/> 元素。
有关完整信息,请参阅 指标与管理。
nodeId 表示一个唯一的增量标识符,用于区分不同的组件。
它也在 links 元素中使用,以表示该组件与其他组件之间的关系(连接)(如果存在的话)。
input 和 output 属性用于 AbstractEndpoint、MessageHandler、SourcePollingChannelAdapter 或 MessageProducerSupport 的 inputChannel 和 outputChannel 属性。
更多信息请参阅下一节。
从版本 5.1 开始,IntegrationGraphServer 接受一个 Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback,用于为特定 NamedComponent 的 IntegrationNode 填充其他属性。
例如,您可以将 SmartLifecycle、autoStartup 和 running 属性暴露到目标图中:
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
图形运行时模型
Spring Integration 组件具有不同级别的复杂性。
例如,任何轮询的MessageSource也具有SourcePollingChannelAdapter和MessageChannel,用于定期从源数据发送消息。
其他组件可能是中间件请求 - 响应组件(如JmsOutboundGateway),包含一个用于订阅(或轮询)requestChannel(input)以获取消息的消费端AbstractEndpoint,以及一个用于生成要向下发送的响应消息的生产端replyChannel(output)。
同时,任何MessageProducerSupport实现(如ApplicationEventListeningMessageProducer)都封装了一些源协议监听逻辑,并将消息发送到outputChannel。
在图中,Spring Integration 组件通过 IntegrationNode 类层次结构表示,您可以在 o.s.i.support.management.graph 包中找到它们。
例如,您可以为 AggregatingMessageHandler 使用 ErrorCapableDiscardingMessageHandlerNode(因为它具有 discardChannel 选项),并且可以通过使用 PollingConsumer 从 PollableChannel 消费时产生错误。
另一个例子是 CompositeMessageHandlerNode — 当通过 EventDrivenConsumer 订阅到 SubscribableChannel 时,用于 MessageHandlerChain。
The @MessagingGateway (see Messaging Gateways) provides nodes for each of its method, where the name attribute is based on the gateway's bean name and the short method signature.
Consider the following example of a gateway: |
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
上述网关生成的节点类似于以下内容:
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}
您可以使用此 IntegrationNode 层次结构在客户端解析图模型,并理解一般的 Spring Integration 运行时行为。
有关更多信息,请参阅 编程技巧与窍门。
版本 5.3 引入了一个 IntegrationPattern 抽象,所有开箱即用的组件(代表企业集成模式 EIP)都实现了该抽象,并提供了一个 IntegrationPatternType 枚举值。
这些信息对于目标应用中的一些分类逻辑可能很有用;或者,当将其暴露到图节点中时,UI 可利用它来确定如何绘制组件。
集成图控制器
如果您的应用程序是基于 Web 的(或基于 Spring Boot 构建并带有嵌入式 Web 容器),并且类路径中存在 Spring Integration HTTP 或 WebFlux 模块(分别参见 HTTP 支持 和 WebFlux 支持),您可以使用 IntegrationGraphController 将 IntegrationGraphServer 功能暴露为 REST 服务。
为此,HTTP 模块提供了 @EnableIntegrationGraphController 和 @Configuration 类注解以及 <int-http:graph-controller/> XML 元素。
配合 @EnableWebMvc 注解(对于 XML 定义则为 <mvc:annotation-driven/>),此配置会注册一个 IntegrationGraphController @RestController,其中其 @RequestMapping.path 可在 @EnableIntegrationGraphController 注解或 <int-http:graph-controller/> 元素上进行配置。
默认路径为 /integration。
The IntegrationGraphController @RestController 提供以下服务:
-
@GetMapping(name = "getGraph"): 用于检索自上次IntegrationGraphServer刷新以来 Spring Integration 组件的状态。o.s.i.support.management.graph.Graph作为 REST 服务的@ResponseBody返回。 -
@GetMapping(path = "/refresh", name = "refreshGraph"): 刷新当前Graph以获取实际运行时状态,并将其作为 REST 响应返回。 无需为指标刷新图表。 在检索图表时,它们会实时提供。 如果自上次检索图表以来应用上下文已修改,则可以调用刷新操作。 在这种情况下,图表将被完全重建。
您可以使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件,为 IntegrationGraphController 设置安全性和跨源限制。
以下示例实现了这些目标:
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />
以下示例展示了如何使用 Java 配置实现相同的功能:
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
请注意,为了方便起见,@EnableIntegrationGraphController 注解提供了 allowedOrigins 属性。
这提供了对 path 的 GET 访问权限。
对于更复杂的需求,您可以使用标准的 Spring MVC 机制来配置 CORS 映射。