|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
MongoDb 支持
2.1 版本引入了对 MongoDB 的支持:MongoDB 是一个“高性能、开源、面向文档的数据库”。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.4.1-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.1-SNAPSHOT"
要下载、安装和运行 MongoDB,请参阅 MongoDB 文档。
连接到 MongoDb
阻塞还是反应式?
从版本 5.3 开始, Spring 集成提供了对反应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。 要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于常规同步客户端,您需要将其相应的驱动程序添加到依赖项中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
他们俩都是optional在框架中提供更好的最终用户选择支持。
要开始与 MongoDB 交互,您首先需要连接到它。
Spring 集成建立在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。
它提供了名为MongoDatabaseFactory和ReactiveMongoDatabaseFactory,这简化了与 MongoDB 客户端 API 的集成。
| Spring Data 默认提供阻塞 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择反应式使用。 |
用MongoDatabaseFactory
要连接到 MongoDB,您可以使用MongoDatabaseFactory接口。
以下示例演示如何使用SimpleMongoClientDatabaseFactory:
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory接受两个参数:一个MongoClientinstance 和String指定数据库的名称。
如果您需要配置诸如host,port和其他构造函数中,您可以使用底层MongoClients类。
有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考。
用ReactiveMongoDatabaseFactory
要使用反应式驱动程序连接到 MongoDB,您可以使用ReactiveMongoDatabaseFactory接口。
以下示例演示如何使用SimpleReactiveMongoDatabaseFactory:
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 消息存储
如 Enterprise Integration Patterns (EIP) 一书中所述,Message Store 允许您保留消息。
在处理能够缓冲消息 (QueueChannel,aggregator,resequencer等。如果可靠性是一个问题。
在 Spring 集成中,MessageStore策略还为 Claim Check 模式提供了基础,EIP 中对此也有介绍。
Spring 集成的 MongoDB 模块提供了MongoDbMessageStore,它是MessageStore策略(主要由 Claim Check 模式使用)和MessageGroupStore策略(主要由 aggregator 和 resequencer 模式使用)。
以下示例将MongoDbMessageStore要使用QueueChannel以及一个aggregator:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
前面的示例是一个简单的 bean 配置,它需要一个MongoDbFactory作为构造函数参数。
这MongoDbMessageStore展开Message作为具有所有嵌套属性的 Mongo 文档。
当您需要访问payload或headers用于审计或分析 — 例如,针对存储的消息。
这MongoDbMessageStore使用自定义MappingMongoConverter实现到 storeMessage实例作为 MongoDB 文档,并且属性 (payload和header值)Message. |
从版本 5.1.6 开始,MongoDbMessageStore可以使用自定义转换器进行配置,这些转换器会传播到内部MappingMongoConverter实现。
看MongoDbMessageStore.setCustomConverters(Object… customConverters)JavaDocs 了解更多信息。
Spring Integration 3.0 引入了ConfigurableMongoDbMessageStore.
它实现了MessageStore和MessageGroupStore接口。
此类可以接收MongoTemplate,例如,您可以使用它配置自定义WriteConcern.
另一个构造函数需要一个MappingMongoConverter以及MongoDbFactory,它允许您为Message实例及其属性。
请注意,默认情况下,ConfigurableMongoDbMessageStore使用标准的 Java 序列化进行读写Message实例与 MongoDB 之间的 MongoDB 之间的实例(请参阅MongoDbMessageBytesConverter) 并依赖于MongoTemplate.
它构建了一个MongoTemplate从提供的MongoDbFactory和MappingMongoConverter.
由ConfigurableMongoDbMessageStore是configurableStoreMessages.
我们建议在消息包含复杂数据类型时使用此实现来创建强大而灵活的解决方案。
从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore提供setCreateIndexes(boolean)(默认为true) 选项,该选项可用于禁用自动索引创建。
下面的示例展示了如何声明 bean 并禁用自动索引创建:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB 通道消息存储
版本 4.0 引入了新的MongoDbChannelMessageStore.
它是一个优化的MessageGroupStore用于QueueChannel实例。
跟priorityEnabled = true,您可以在<int:priority-queue>实例来实现持久消息的优先级顺序轮询。
priority MongoDB document 字段从IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息标头。
此外,所有 MongoDBMessageStore实例现在具有sequence字段MessageGroup文件。
这sequencevalue 是$inc作进行简单的sequencedocument 的 document,该集合是按需创建的。
这sequencefield 用于poll当消息存储在同一毫秒内时,提供先进先出 (FIFO) 消息顺序(在优先级内,如果已配置)的作。
我们不建议使用相同的MongoDbChannelMessageStorebean 的优先级和非优先级,因为priorityEnabled选项适用于整个商店。
然而,同样的collection可用于两者MongoDbChannelMessageStore类型,因为来自存储的消息轮询是排序的并使用索引。
要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示: |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
在禁用自动索引创建的情况下使用 AbstractConfigurableMongoDbMessageStore
从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore实现一个setCreateIndex(boolean)可用于 desable 或 enable (default) 自动索引创建。
下面的示例展示了如何声明一个 bean 并禁用自动索引创建:
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 元数据存储
Spring Integration 4.2 引入了一个新的基于 MongoDB 的MetadataStore(请参阅 元数据存储) 实现。
您可以使用MongoDbMetadataStore在应用程序重启时保持元数据状态。
您可以使用这个新的MetadataStore使用适配器实现,例如:
要指示这些适配器使用新的MongoDbMetadataStore中,声明一个 Bean 名称为metadataStore.
feed 入站通道适配器会自动获取并使用声明的MongoDbMetadataStore.
以下示例说明如何声明名称为metadataStore:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
这MongoDbMetadataStore还实现了ConcurrentMetadataStore,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。
由于 MongoDB 的保证,所有这些作都是原子的。
MongoDB 入站通道适配器
MongoDB 入站通道适配器是一个轮询使用者,它从 MongoDB 读取数据并将其作为Message有效载荷。
以下示例显示如何配置 MongoDB 入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前面的配置所示,您可以使用inbound-channel-adapter元素并为各种属性提供值,例如:
-
query:JSON 查询(请参阅 MongoDB 查询) -
query-expression:一个 SPEL 表达式,计算结果为 JSON 查询字符串(作为query属性)或传递给o.s.data.mongodb.core.query.Query. 与query属性。 -
entity-class:负载对象的类型。 如果未提供,则com.mongodb.DBObject返回。 -
collection-name或collection-name-expression:标识要使用的 MongoDB 集合的名称。 -
mongodb-factory:对o.s.data.mongodb.MongoDbFactory -
mongo-template:对o.s.data.mongodb.core.MongoTemplate -
所有其他入站适配器中通用的其他属性(例如 'channel')。
不能同时设置mongo-template和mongodb-factory. |
前面的示例相对简单且静态,因为它具有query并使用collection.
有时,您可能需要在运行时根据某些条件更改这些值。
为此,请使用他们的-expression等效项 (query-expression和collection-name-expression),其中提供的表达式可以是任何有效的 SPEL 表达式。
此外,您可能希望对从 MongoDB 读取的成功处理数据进行一些后处理。 例如;您可能希望在处理完文档后移动或删除文档。 你可以通过使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例显示了DocumentCleaner在前面的示例中引用:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以使用transactional元素。
此元素可以引用真实的事务管理器(例如,如果流的某个其他部分调用 JDBC)。
如果您没有 “真实” 事务,则可以使用o.s.i.transaction.PseudoTransactionManager,它是 Spring 的PlatformTransactionManager并允许在没有实际事务时使用 Mongo 适配器的事务同步功能。
| 这样做不会使 MongoDB 本身成为事务性的。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行作同步。 |
一旦你的 Poller 是事务性的,你就可以设置o.s.i.transaction.TransactionSynchronizationFactory在transactional元素。
一个TransactionSynchronizationFactory创建TransactionSynchronization.
为方便起见,我们公开了默认的基于 SPEL 的TransactionSynchronizationFactory它允许您配置 SPEL 表达式,其执行与事务协调(同步)。
支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及发送评估结果(如果有)的每个事件的通道。
对于每个子元素,您可以指定expression和channel属性。
如果只有channel属性,则收到的消息将作为特定同步方案的一部分发送到该位置。
如果只有expression属性,并且表达式的结果是非 null 值,则会生成一条消息,其中包含有效负载的结果,并将其发送到默认通道 (NullChannel)并显示在日志中(在DEBUG级别)。
如果您希望评估结果转到特定频道,请添加channel属性。
如果表达式的结果是 null 或 void,则不会生成任何消息。
有关事务同步的更多信息,请参阅事务同步。
从版本 5.5 开始,MongoDbMessageSource可以配置updateExpression,它的计算结果必须为String使用 MongoDbupdate语法或更改为org.springframework.data.mongodb.core.query.Update实例。
它可以用作上述后处理程序的替代方案,并且它会修改从集合中获取的那些实体,因此它们不会在下一个轮询周期中再次从集合中拉出(假设更新更改了查询中使用的某些值)。
仍然建议使用事务来实现执行隔离和数据一致性,当MongoDbMessageSource的集合。
MongoDB Change Stream 入站通道适配器
从版本 5.3 开始,spring-integration-mongodb模块介绍了MongoDbChangeStreamMessageProducer- 反应式MessageProducerSupportSpring Data 的实现ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)应用程序接口。
此组件生成一个Flux的邮件中带有body之ChangeStreamEvent作为有效负载,并且某些与 Change Stream 相关的标头(请参阅MongoHeaders).
建议将此MongoDbChangeStreamMessageProducer与FluxMessageChannel作为outputChannel用于按需订阅和下游事件使用。
此通道适配器的 Java DSL 配置可能如下所示:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
当MongoDbChangeStreamMessageProducer已停止,或者订阅在下游被取消,或者 MongoDb 更改流生成OperationType.INVALIDATE这Publisher已完成。
通道适配器可以再次启动,并且新的Publisher的源数据,并在MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>).
如果需要使用来自其他位置的更改流事件,则可以在两次启动之间为新选项重新配置此通道适配器。
请参阅 Spring Data MongoDb 文档中有关更改流支持的更多信息。
MongoDB 出站通道适配器
MongoDB 出站通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前面的配置所示,您可以使用outbound-channel-adapter元素,为各种属性提供值,例如:
-
collection-name或collection-name-expression:标识要使用的 MongoDb 集合的名称。 -
mongo-converter:对o.s.data.mongodb.core.convert.MongoConverter这有助于将原始 Java 对象转换为 JSON 文档表示形式。 -
mongodb-factory:对o.s.data.mongodb.MongoDbFactory. -
mongo-template:对o.s.data.mongodb.core.MongoTemplate. 注意:您不能同时设置 mongo-template 和 mongodb-factory。 -
所有入站适配器中通用的其他属性(例如 'channel')。
前面的示例相对简单且静态,因为它具有collection-name.
有时,您可能需要在运行时根据某些条件更改此值。
为此,请使用collection-name-expression,其中提供的表达式是任何有效的 SPEL 表达式。
MongoDB 出站网关
版本 5.0 引入了 MongoDB 出站网关。 它允许您通过向数据库的请求通道发送消息来查询数据库。 然后,网关将响应发送到回复通道。 您可以使用消息有效负载和标头来指定查询和集合名称,如下例所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以将以下属性用于 MongoDB 出站网关:
-
collection-name或collection-name-expression:标识要使用的 MongoDB 集合的名称。 -
mongo-converter:对o.s.data.mongodb.core.convert.MongoConverter这有助于将原始 Java 对象转换为 JSON 文档表示形式。 -
mongodb-factory:对o.s.data.mongodb.MongoDbFactory. -
mongo-template:对o.s.data.mongodb.core.MongoTemplate. 注意:您不能同时设置两者mongo-template和mongodb-factory. -
entity-class:要传递给find(..)和findOne(..)方法。 如果未提供此属性,则默认值为org.bson.Document. -
query或query-expression:指定 MongoDB 查询。 有关更多查询示例,请参阅 MongoDB 文档。 -
collection-callback:对org.springframework.data.mongodb.core.CollectionCallback. 最好是o.s.i.mongodb.outbound.MessageCollectionCallback从 5.0.11 开始,使用请求消息上下文。 有关更多信息,请参阅其 Javadocs。 注意:您不能同时拥有两者collection-callback和任何 query 属性。
作为query和query-expression属性中,您可以使用collectionCallback属性作为对MessageCollectionCallbackfunctional interface 实现。
以下示例指定 count作:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 反应式通道适配器
从版本 5.3 开始,ReactiveMongoDbStoringMessageHandler和ReactiveMongoDbMessageSource提供了 implementations。
它们基于ReactiveMongoOperations,并且需要一个org.mongodb:mongodb-driver-reactivestreamsDependency。
这ReactiveMongoDbStoringMessageHandler是ReactiveMessageHandler当集成流定义中涉及 Reactive Streams 组合时,框架中原生支持此功能。
有关更多信息,请参见 ReactiveMessageHandler 。
从配置的角度来看,它与许多其他标准通道适配器没有区别。 例如,对于 Java DSL,这样的通道适配器可以像这样使用:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在此示例中,我们将通过提供的ReactiveMongoDatabaseFactory并将 request message 中的数据存储到默认集合中,并使用data名字。
真正的作将从内部创建的ReactiveStreamsConsumer.
这ReactiveMongoDbMessageSource是一个AbstractMessageSourceimplementation based on providedReactiveMongoDatabaseFactory或ReactiveMongoOperations和 MongoDb 查询(或表达式)调用find()或findOne()根据expectSingleResult选项替换为预期的entityClasstype 转换查询结果。
在以下情况下,将按需执行查询执行和结果评估Publisher (Flux或Mono根据expectSingleResult选项)的 Payload 中。
框架可以自动订阅这样的有效负载(本质上flatMap) 当 splitter 和FluxMessageChannel在下游使用。
否则,目标应用程序负责订阅下游终端节点中轮询的发布者。
使用 Java DSL 时,可以像这样配置这样的通道适配器:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
从版本 5.5 开始,ReactiveMongoDbMessageSource可以配置updateExpression.
它具有与阻塞相同的功能MongoDbMessageSource.
请参阅 MongoDB 入站通道适配器 和AbstractMongoDbMessageSourceSpecJavaDocs 了解更多信息。