此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
使用协议适配器
到目前为止显示的所有示例都说明了 DSL 如何使用 Spring Integration 编程模型来支持消息传递架构。 但是,我们还没有进行任何真正的集成。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring Integration 支持所有这些以及更多。 理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上新适配器添加到 Spring Integration 是一项艰巨的任务。 因此,人们期望 DSL 会不断赶上 Spring Integration。
因此,我们提供了高级 API 来无缝定义特定于协议的消息传递。
我们使用工厂和构建器模式以及 lambda 来做到这一点。
您可以将工厂类视为“命名空间工厂”,因为它们与特定于具体协议的 Spring Integration 模块中的组件的 XML 命名空间扮演相同的角色。
目前,Spring Integration Java DSL 支持Amqp
,Feed
,Jms
,Files
,(S)Ftp
,Http
,JPA
,MongoDb
,TCP/UDP
,Mail
,WebFlux
和Scripts
命名空间工厂。
以下示例演示如何使用其中三个 (Amqp
,Jms
和Mail
):
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例演示了如何使用“命名空间工厂”作为内联适配器声明。
但是,您可以从@Bean
定义,使IntegrationFlow
方法链更具可读性。
在我们花精力在其他命名空间工厂之前,我们正在征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持哪些适配器和网关的优先级提出任何意见。 |
您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 Bean 并连接到IntegrationFlow
,如以下示例所示:
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}