Java DSL
Java DSL
Spring Integration Java 配置和 DSL 提供了一组方便的构建器和一个流畅的 API,允许您从 Spring 配置 Spring Integration 消息流@Configuration
类。
(另请参阅 Kotlin DSL。
(另见 Groovy DSL。
用于 Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。
DSL 提供了一种简单的方法,可以使用 fluent 将 Spring Integration Message Flows 嵌入到您的应用程序中Builder
pattern 以及来自 Spring Framework 和 Spring Integration 的现有 Java 配置。
我们还使用并支持 lambda(在 Java 8 中可用)来进一步简化 Java 配置。
咖啡馆提供了使用 DSL 的一个很好的例子。
DSL 由IntegrationFlow
流畅的 API(请参阅IntegrationFlowBuilder
).
这会产生IntegrationFlow
组件,该组件应注册为 Spring bean(通过使用@Bean
注释)。
构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法的层次结构。
这IntegrationFlowBuilder
仅收集集成组件 (MessageChannel
实例AbstractEndpoint
实例,依此类推)在IntegrationFlow
bean 用于通过IntegrationFlowBeanPostProcessor
.
Java DSL 直接使用 Spring Integration 类,并绕过任何 XML 生成和解析。 然而,DSL 提供的不仅仅是 XML 之上的语法糖。 其最引人注目的功能之一是能够定义内联 lambda 来实现端点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更容易、更强大。
以下示例显示了如何使用 Java 配置进行 Spring 集成:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
前面配置示例的结果是,它创建了ApplicationContext
启动、Spring Integration 端点和消息通道。
Java 配置可用于替换和增强 XML 配置。
无需替换所有现有的 XML 配置即可使用 Java 配置。
DSL 基础知识
这org.springframework.integration.dsl
包包含IntegrationFlowBuilder
前面提到的 API 和一些IntegrationComponentSpec
实现,它们也是构建器,并提供流畅的 API 来配置具体端点。
这IntegrationFlowBuilder
基础设施为基于消息的应用程序(例如通道、端点、轮询器和通道拦截器)提供常见的企业集成模式 (EIP)。
端点在 DSL 中表示为动词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP 端点:
-
转换→
Transformer
-
过滤→
Filter
-
手柄→
ServiceActivator
-
拆分→
Splitter
-
聚合→
Aggregator
-
路线→
Router
-
桥→
Bridge
从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构造的。
请注意,EIP 没有正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。
DSL 提供了一个IntegrationFlow
组件来定义通道和它们之间的端点的组合,但现在IntegrationFlow
仅在应用程序上下文中填充实际 Bean 的配置角色,在运行时不使用。
但是,用于IntegrationFlow
可以自动接线为Lifecycle
控制start()
和stop()
对于委托给与此关联的所有 Spring Integration 组件的整个流程IntegrationFlow
.
以下示例使用IntegrationFlow
fluent API 来定义IntegrationFlow
bean 使用 EIP 方法IntegrationFlowBuilder
:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
这transform
方法接受 lambda 作为端点参数来对消息有效负载进行作。
这个方法的真正参数是GenericTransformer<S, T>
实例。
因此,任何提供的转换器 (ObjectToJsonTransformer
,FileToStringTransformer
,和其他)可以在此处使用。
在被窝里,IntegrationFlowBuilder
识别MessageHandler
和它的端点,使用MessageTransformingHandler
和ConsumerEndpointFactoryBean
分别。
考虑另一个例子:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
前面的示例组成了Filter → Transformer → Service Activator
.
流程是“'单向'”。
也就是说,它不提供回复消息,而仅将有效负载打印到 STDOUT。
使用直接通道自动将终结点连接在一起。
lambda 和
Message<?> 参数在 EIP 方法中使用 lambda 时,“input”参数通常是消息有效负载。如果您希望访问整个消息,请使用其中一种重载方法,该方法采用
这将在运行时失败,并显示 相反,请使用:
|
Bean 定义覆盖
Java DSL 可以为流定义中内联定义的对象注册 Bean,也可以重用现有的注入 Bean。如果为内联对象和现有 Bean 定义定义了相同的 Bean 名称,则 |
消息通道
除了IntegrationFlowBuilder
使用 EIP 方法时,Java DSL 提供了一个流畅的 API 来配置MessageChannel
实例。 为此,该MessageChannels
builder factory 提供了。以下示例展示了如何使用它:
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
一样MessageChannels
builder factory 可用于channel()
EIP 方法IntegrationFlowBuilder
连接端点,类似于连接input-channel
/output-channel
配对。默认情况下,端点使用DirectChannel
bean 名称基于以下模式的实例:[IntegrationFlow.beanName].channel#[channelNameIndex]
. 此规则也适用于内联生成的未命名通道MessageChannels
构建器工厂使用。但是,所有MessageChannels
方法有一个变体,该变体知道channelId
您可以使用它来设置MessageChannel
实例。 这MessageChannel
参考资料和beanName
可以用作 bean-method 调用。以下示例显示了使用channel()
EIP方式:
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
意思是“'查找并使用MessageChannel
使用“输入”ID,或创建一个“”。 -
fixedSubscriberChannel()
生成一个FixedSubscriberChannel
并使用channelFlow.channel#0
. -
channel("queueChannel")
工作方式相同,但使用现有的queueChannel
豆。 -
channel(publishSubscribe())
是 bean-method 引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是IntegrationFlowBuilder
这暴露了IntegrationComponentSpec
到ExecutorChannel
并将其注册为executorChannel
. -
channel("output")
注册DirectChannel
bean 与output
作为其名称,只要没有具有此名称的 bean 已经存在。
注意:前面的IntegrationFlow
定义有效,并且其所有通道都应用于具有BridgeHandler
实例。
小心通过MessageChannels 来自不同工厂IntegrationFlow 实例。 即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定相同的对象 (MessageChannel ) 来自不同的IntegrationFlow 器皿。 以下示例是错误的: |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
这个坏例子的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其正常工作,您需要声明@Bean
并为该通道使用其来自不同IntegrationFlow
实例。
轮询器
Spring Integration 还提供了一个流畅的 API,可让您将PollerMetadata
为AbstractPollingEndpoint
实现。 您可以使用Pollers
构建器工厂,用于配置公共 Bean 定义或从IntegrationFlowBuilder
EIP 方法,如下例所示:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
看Pollers
和PollerSpec
在 Javadoc 中获取更多信息。
如果使用 DSL 构造PollerSpec 作为@Bean ,请勿调用get() 方法。 这PollerSpec 是一个FactoryBean 这会生成PollerMetadata 对象,并初始化其所有属性。 |
这reactive()
端点
从 5.5 版本开始,ConsumerEndpointSpec
提供一个reactive()
配置属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
. 此选项将目标端点配置为ReactiveStreamsConsumer
实例,独立于输入通道类型,该类型转换为Flux
通过IntegrationReactiveUtils.messageChannelToFlux()
. 提供的函数从Flux.transform()
运算符来自定义 (publishOn()
,log()
,doOnNext()
等)来自输入通道的反应流源。
以下示例演示如何将发布线程从输入通道更改为独立于最终订阅者和生产者DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
有关更多信息,请参阅响应式流支持。
DSL 和终结点配置
都IntegrationFlowBuilder
EIP 方法有一个变体,它应用 lambda 参数来提供选项AbstractEndpoint
实例:SmartLifecycle
,PollerMetadata
,request-handler-advice-chain
,等。
它们中的每一个都有通用参数,因此它允许您配置端点,甚至它的MessageHandler
在上下文中,如以下示例所示:
@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
此外,EndpointSpec
提供id()
方法,让您使用给定的 Bean 名称而不是生成的 Bean 名称注册端点 Bean。
如果MessageHandler
被引用为 bean,则任何现有的adviceChain
如果.advice()
方法存在于 DSL 定义中:
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
它们不会合并,只有testAdvice()
在这种情况下使用 bean。
变形金刚
DSL API 提供了方便、流畅的Transformers
工厂,用作.transform()
EIP 方法。
以下示例演示如何使用它:
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
它避免了使用 setter 进行不方便的编码,并使流定义更加简单。
请注意,您可以使用Transformers
声明目标Transformer
实例作为@Bean
实例,并再次将它们从IntegrationFlow
定义为 bean 方法。
尽管如此,DSL 解析器会处理内联对象的 bean 声明,如果它们尚未定义为 bean。
有关更多信息和支持的工厂方法,请参阅 Javadoc 中的 Transformers。
另请参阅lambda 和Message<?>
参数.
入站通道适配器
通常,消息流从入站通道适配器(例如<int-jdbc:inbound-channel-adapter>
).
适配器配置为<poller>
,它会询问一个MessageSource<?>
定期生成消息。
Java DSL 允许启动IntegrationFlow
从MessageSource<?>
太。
为此,该IntegrationFlow
Fluent API 提供了一个重载的IntegrationFlow.from(MessageSource<?> messageSource)
方法。
您可以配置MessageSource<?>
作为 bean 并将其作为该方法的参数提供。
的第二个参数IntegrationFlow.from()
是一个Consumer<SourcePollingChannelAdapterSpec>
lambda 的 lambda 允许您提供选项(例如PollerMetadata
或SmartLifecycle
) 的SourcePollingChannelAdapter
.
以下示例演示如何使用 Fluent API 和 lambda 创建IntegrationFlow
:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些没有构建要求的情况Message
对象,你可以使用IntegrationFlow.fromSupplier()
基于java.util.function.Supplier
.
结果Supplier.get()
会自动包装在Message
(如果它还不是Message
).
消息路由器
Spring Integration 原生提供专门的路由器类型,包括:
-
HeaderValueRouter
-
PayloadTypeRouter
-
ExceptionTypeRouter
-
RecipientListRouter
-
XPathRouter
与许多其他 DSL 一样IntegrationFlowBuilder
EIP 方法,则route()
方法可以应用任何AbstractMessageRouter
实现,或者为方便起见,一个String
作为 SpEL 表达式或ref
-method
双。
此外,您可以配置route()
使用 lambda 并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>
.
Fluent API 还提供AbstractMappingMessageRouter
选项,例如channelMapping(String key, String channelName)
对,如以下示例所示:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
以下示例显示了一个简单的基于表达式的路由器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
这routeToRecipients()
方法采用Consumer<RecipientListRouterSpec>
,如以下示例所示:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
这.defaultOutputToParentFlow()
的.routeToRecipients()
definition 允许您设置路由器的defaultOutput
作为网关,以继续处理主流中不匹配的消息。
另请参阅lambda 和Message<?>
参数.
分配器
要创建拆分器,请使用split()
EIP 方法。
默认情况下,如果有效负载是Iterable
一Iterator
一Array
一个Stream
,或响应式Publisher
这split()
方法将每个项目输出为单独的消息。
它接受 lambda、SpEL 表达式或任何AbstractMessageSplitter
实现。
或者,您可以在不带参数的情况下使用它来提供DefaultMessageSplitter
.
以下示例演示如何使用split()
方法,提供 lambda:
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
前面的示例创建了一个拆分器,用于拆分包含逗号分隔的消息String
.
另请参阅lambda 和Message<?>
参数.
聚合器和重排序器
一Aggregator
在概念上与Splitter
.
它将一系列单个消息聚合到单个消息中,并且必然更加复杂。
默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。
相同的规则适用于Resequencer
.
以下示例显示了拆分器聚合器模式的规范示例:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
这split()
方法将列表拆分为单独的消息,并将它们发送到ExecutorChannel
.
这resequence()
方法按消息头中的序列详细信息对消息重新排序。
这aggregate()
方法收集这些消息。
但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例关联了具有myCorrelationKey
标头并在累积至少十个消息后释放消息。
为resequence()
EIP 方法。
服务激活器和.handle()
方法
这.handle()
EIP 方法的目标是调用任何MessageHandler
实现或某些 POJO 上的任何方法。
另一种选择是使用 lambda 表达式定义“活动”。
因此,我们引入了一个泛型GenericHandler<P>
功能接口。
其handle
方法需要两个参数:P payload
和MessageHeaders headers
(从 5.1 版开始)。
有了这个,我们可以定义一个流程,如下所示:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
前面的示例将它收到的任何整数加倍。
然而,Spring Integration 的一个主要目标是loose coupling
,通过从消息有效负载到消息处理程序的目标参数的运行时类型转换。
由于 Java 不支持 lambda 类的泛型类型解析,因此我们引入了一种解决方法,其中包含一个额外的payloadType
大多数 EIP 方法的参数和LambdaMessageProcessor
.
这样做将艰苦的转换工作委托给 Spring 的ConversionService
,它使用提供的type
以及请求的消息到目标方法参数。
以下示例显示了生成的IntegrationFlow
可能看起来像:
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
我们也可以注册一些BytesToIntegerConverter
在ConversionService
以摆脱额外的.transform()
:
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
另请参阅lambda 和Message<?>
参数.
操作员网关 ()
这gateway()
运算符IntegrationFlow
definition 是一个特殊的服务激活器实现,通过其输入通道调用其他端点或集成流并等待回复。
从技术上讲,它与嵌套的<gateway>
组件中的组件<chain>
定义(请参阅从链中调用链),并允许流更干净、更直接。
从逻辑上讲,从业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅消息传递网关)。
此运算符针对不同的目标有多个重载:
-
gateway(String requestChannel)
按名称向某个端点的输入通道发送消息; -
gateway(MessageChannel requestChannel)
通过直接注入将消息发送到某个端点的输入通道; -
gateway(IntegrationFlow flow)
向提供的输入通道发送消息IntegrationFlow
.
所有这些都有第二个变体Consumer<GatewayEndpointSpec>
参数来配置目标GatewayMessageHandler
和各自的AbstractEndpoint
.
此外,IntegrationFlow
-based 方法允许调用现有的IntegrationFlow
bean 或通过就地 lambda 将流声明为子流,以便IntegrationFlow
函数式接口或将其提取到private
方法 Cleaner 代码样式:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将requestTimeout 设置为 0 以防止无限期挂起调用线程。
在这种情况将在该点结束,并释放线程以进行进一步工作。 |
运算符 log()
为方便起见,要通过 Spring Integration 流程(<logging-channel-adapter>
)、一个log()
运算符。
在内部,它由WireTap
ChannelInterceptor
使用LoggingHandler
作为其订阅者。
它负责将传入消息记录到下一个端点或当前通道中。
以下示例演示如何使用LoggingHandler
:
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
在前面的示例中,id
标头记录在ERROR
水平到test.category
仅适用于通过筛选器和路由之前的邮件。
从版本 6.0 开始,此运算符在流末尾的行为与其在中间的用法保持一致。
换句话说,即使log()
运算符被删除。
因此,如果预计不会在流结束时生成回复,则nullChannel()
建议在最后一个之后使用log()
.
运算符 intercept()
从 5.3 版开始,intercept()
运算符允许注册一个或多个ChannelInterceptor
实例在当前MessageChannel
在流动中。
这是创建显式MessageChannel
通过MessageChannels
应用程序接口。
以下示例使用MessageSelectingInterceptor
要拒绝某些邮件并出现异常:
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
Spring Integration 包括一个.wireTap()
流畅的 APIMessageChannelSpec
建设者。
以下示例演示如何使用wireTap
记录输入的方法:
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
如果
|
当当前MessageChannel
不实现InterceptableChannel
,隐式DirectChannel
和BridgeHandler
被注入IntegrationFlow
和WireTap
添加到这个新的DirectChannel
.
以下示例没有任何通道声明:
.handle(...)
.log()
}
在前面的示例中(以及任何未声明通道时),隐式DirectChannel
被注入到当前位置的IntegrationFlow
并用作当前配置的ServiceActivatingHandler
(从.handle()
,前面描述)。
使用消息流
IntegrationFlowBuilder
提供了一个顶级 API 来生成连接到消息流的集成组件。
当您的集成可以通过单个流程完成时(通常情况下),这很方便。
交互IntegrationFlow
实例可以通过以下方式加入MessageChannel
实例。
默认情况下,MessageFlow
在 Spring Integration 术语中表现为“链”。
也就是说,端点由DirectChannel
实例。
消息流实际上并没有被构造为链,这提供了更大的灵活性。
例如,如果您知道流中的任何组件,则可以向流中的任何组件发送消息inputChannel
name (也就是说,如果您显式定义了它)。
您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等)而不是直接通道。
因此,DSL 不支持 Spring Integrationchain
元素,因为它在这种情况下不会增加太多价值。
由于 Spring Integration Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring Framework@Configuration
基础设施,它可以与 XML 定义一起使用,并与 Spring Integration 消息传递注释配置连接。
您还可以定义直接IntegrationFlow
实例。
以下示例显示了如何执行此作:
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
此定义的结果是使用隐式直接通道连接的同一组集成组件。
这里唯一的限制是此流是从命名的直接通道启动的 -lambdaFlow.input
.
此外,Lambda 流不能从MessageSource
或MessageProducer
.
从 5.1 版本开始,这种IntegrationFlow
被包装到代理中,以公开生命周期控制并提供对inputChannel
的内部关联StandardIntegrationFlow
.
从 V5.0.6 开始,为IntegrationFlow
包括 flow bean 后跟一个点 (.
) 作为前缀。
例如,ConsumerEndpointFactoryBean
对于.transform("Hello "::concat)
在前面的示例中,导致 bean 名称为lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
.
(这o.s.i
是从org.springframework.integration
以适合页面。
这Transformer
该端点的实现 Bean 的 Bean 名称为lambdaFlow.transformer#0
(从 5.1 版开始),其中不是MethodInvokingTransformer
class,则使用其组件类型。
相同的模式应用于所有NamedComponent
s 当必须在流中生成 Bean 名称时。
这些生成的 bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起等目的,以及避免在运行时同时注册集成流时出现竞争条件。
有关详细信息,请参阅动态和运行时集成流。
FunctionExpression
我们引入了FunctionExpression
类(SpEL 的Expression
接口),让我们使用 lambda 和generics
.
这Function<T, R>
选项,以及expression
选项,当存在隐式Strategy
Core Spring Integration 的变体。
以下示例演示如何使用函数表达式:
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
这FunctionExpression
还支持运行时类型转换,如SpelExpression
.
子流支持
一些if…else
和publish-subscribe
组件提供了使用子流指定其逻辑或映射的能力。
最简单的示例是.publishSubscribeChannel()
,如以下示例所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您可以使用单独的IntegrationFlow
@Bean
定义,但我们希望您发现逻辑组合的子流风格有用。
我们发现它会导致代码更短(因此更具可读性)。
从 5.3 版开始,一个BroadcastCapableChannel
-基于publishSubscribeChannel()
提供了在代理支持的消息通道上配置子流订阅者的实现。
例如,我们现在可以将多个订阅者配置为Jms.publishSubscribeChannel()
:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
类似的publish-subscribe
子流组合提供了.routeToRecipients()
方法。
另一个例子是使用.discardFlow()
而不是.discardChannel()
在.filter()
方法。
这.route()
值得特别关注。
请考虑以下示例:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
这.channelMapping()
继续像在常规中一样工作Router
映射,但.subFlowMapping()
将该子流与主流联系起来。
换句话说,任何路由器的子流都会在.route()
.
有时,您需要参考现有的
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。 |
子流可以嵌套到任何深度,但我们不建议这样做。事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,并且人类很难解析。
在 DSL 支持子流配置的情况下,当正在配置的组件通常需要通道时,并且该子流以
框架在内部创建了一个 |
使用协议适配器
到目前为止显示的所有示例都说明了 DSL 如何使用 Spring Integration 编程模型来支持消息传递架构。但是,我们还没有进行任何真正的集成。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上新适配器添加到 Spring Integration 是一项艰巨的任务。因此,期望 DSL 不断赶上 Spring Integration。
因此,我们提供了高级 API 来无缝定义特定于协议的消息传递。我们使用工厂和构建器模式以及 lambda 来做到这一点。您可以将工厂类视为“命名空间工厂”,因为它们与特定于具体协议的 Spring Integration 模块中的组件的 XML 命名空间扮演相同的角色。目前,Spring Integration Java DSL 支持Amqp
,Feed
,Jms
,Files
,(S)Ftp
,Http
,JPA
,MongoDb
,TCP/UDP
,Mail
,WebFlux
和Scripts
命名空间工厂。以下示例显示如何使用其中三个 (Amqp
,Jms
和Mail
):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例演示了如何使用“命名空间工厂”作为内联适配器声明。
但是,您可以从@Bean
定义,使IntegrationFlow
方法链更具可读性。
在我们花精力在其他命名空间工厂之前,我们正在征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持哪些适配器和网关的优先级提出任何意见。 |
您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 Bean 并连接到IntegrationFlow
,如以下示例所示:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
IntegrationFlowAdapter
这IntegrationFlow
接口可以直接实现并指定为扫描的组件,如下例所示:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
它被IntegrationFlowBeanPostProcessor
并在应用程序上下文中正确解析和注册。
为了方便并获得松散耦合架构的好处,我们提供了IntegrationFlowAdapter
基类实现。
它需要一个buildFlow()
方法实现以生成IntegrationFlowDefinition
通过使用from()
方法,如以下示例所示:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
动态和运行时集成流
IntegrationFlow
并且它的所有依赖组件都可以在运行时注册。
在 5.0 版本之前,我们使用BeanFactory.registerSingleton()
钩。
从 Spring 框架开始5.0
,我们使用instanceSupplier
用于程序化的钩子BeanDefinition
注册。
以下示例显示了如何以编程方式注册 bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,instanceSupplier
hook 是genericBeanDefinition
方法,在本例中由 lambda 提供。
所有必要的 Bean 初始化和生命周期都是自动完成的,就像标准上下文配置 Bean 定义一样。
为了简化开发体验,Spring Integration 引入了IntegrationFlowContext
注册和管理IntegrationFlow
实例,如以下示例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建多个类似流的实例时,这很有用。
为此,我们可以迭代我们的选项并创建和注册IntegrationFlow
循环中的实例。
另一种变体是当我们的数据源不是基于 Spring 时,因此我们必须动态创建它。
此类示例是响应式流事件源,如以下示例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这IntegrationFlowRegistrationBuilder
(由于IntegrationFlowContext.registration()
) 可用于为IntegrationFlow
注册,控制其autoStartup
,并注册非 Spring Integration Bean。
通常,这些附加 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和解序列化器,或任何其他所需的支持组件。
您可以使用IntegrationFlowRegistration.destroy()
回调以删除动态注册的IntegrationFlow
以及它的所有依赖 bean(当您不再需要它们时)。
请参阅IntegrationFlowContext
Javadoc了解更多信息。
从 5.0.6 版开始,所有生成的 bean 名称在IntegrationFlow 定义的前缀是流 ID 作为前缀。
我们建议始终指定显式流 ID。
否则,同步屏障将在IntegrationFlowContext ,以生成IntegrationFlow 并注册其 bean。
我们在这两个作上同步,以避免当生成的相同 bean 名称可用于不同的 bean 名称时出现竞争条件IntegrationFlow 实例。 |
此外,从版本 5.0.6 开始,注册生成器 API 有一个新方法:useFlowIdAsPrefix()
.
如果您希望声明同一流的多个实例并避免在流中的组件具有相同 ID 时发生 Bean 名称冲突,这很有用,如以下示例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用 bean 名称tcp1.client.handler
.
一id 属性是必需的。useFlowIdAsPrefix() . |
IntegrationFlow
作为网关
这IntegrationFlow
可以从提供GatewayProxyFactoryBean
组件,如以下示例所示:
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}
接口方法的所有代理都随通道一起提供,用于将消息发送到下一个集成组件IntegrationFlow
.
您可以使用@MessagingGateway
注释,并使用@Gateway
附注。
尽管如此,requestChannel
被忽略并被该内部通道覆盖,用于IntegrationFlow
.
否则,使用IntegrationFlow
没有意义。
默认情况下,一个GatewayProxyFactoryBean
获取常规 bean 名称,例如[FLOW_BEAN_NAME.gateway]
.
您可以使用@MessagingGateway.name()
属性或重载的IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
工厂方法。
此外,来自@MessagingGateway
接口上的注释应用于目标GatewayProxyFactoryBean
.
当注释配置不适用时,Consumer<GatewayProxySpec>
variant 可用于为目标代理提供适当的选项。
此 DSL 方法从 5.2 版开始可用。
在 Java 8 中,您甚至可以使用java.util.function
接口,如以下示例所示:
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
那errorRecovererFlow
可以这样使用:
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL 扩展
从 5.3 版开始,IntegrationFlowExtension
已引入以允许使用自定义或组合的 EIP 运算符扩展现有的 Java DSL。所需要的只是该类的扩展,它提供了可用于IntegrationFlow
bean 定义。扩展类也可用于自定义IntegrationComponentSpec
配置; 例如,可以在现有的IntegrationComponentSpec
外延。 下面的示例演示了复合自定义运算符和AggregatorSpec
默认自定义的扩展outputProcessor
:
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。这样,目标IntegrationFlow
definition 将适用于新的和现有的 DSL 运算符:
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
集成流组合
使用MessageChannel
抽象作为一等公民在 Spring Integration 中,集成流的组成始终被假设。流中任何端点的输入通道都可用于从任何其他端点发送消息,而不仅仅是从具有此通道作为输出的端点发送消息。此外,使用@MessagingGateway
contract、Content Enricher 组件、复合端点(如<chain>
,现在使用IntegrationFlow
Beans(例如IntegrationFlowAdapter
),在较短的、可重用的部分之间分配业务逻辑非常简单。最终组合所需要的只是有关MessageChannel
发送到或接收。
从版本开始5.5.4
,从中抽象出更多内容MessageChannel
并对最终用户隐藏实现详细信息,IntegrationFlow
引入了from(IntegrationFlow)
工厂方法允许启动电流IntegrationFlow
从现有流的输出中:
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
另一方面,IntegrationFlowDefinition
添加了一个to(IntegrationFlow)
终端运算符在其他流的输入通道上继续电流:
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
流中间的组成可以通过现有的gateway(IntegrationFlow)
EIP 方法。这样,我们可以通过从更简单、可重用的逻辑块组合流来构建任何复杂的流。例如,您可以添加一个IntegrationFlow
bean 作为依赖项,只需将其配置类导入到最终项目并为您的自动连接即可IntegrationFlow
定义。