跳到主要内容

MongoDB 支持

QWen Plus 中英对照 MongoDb Support

版本 2.1 引入了对 MongoDB 的支持:一个“高性能、开源、文档导向型数据库”。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.4.2</version>
</dependency>
xml

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档

连接到 MongoDb

阻塞或响应?

从 5.3 版本开始,Spring Integration 提供了对反应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
xml

对于常规同步客户端,您需要将其相应的驱动程序添加到依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
xml

它们在框架中都是 optional,以更好地支持最终用户的选择。

要开始与 MongoDB 交互,您首先需要连接到它。Spring Integration 基于另一个 Spring 项目 Spring Data MongoDB 提供的支持进行构建。它提供了名为 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,这些类简化了与 MongoDB 客户端 API 的集成。

提示

Spring Data 默认提供阻塞式的 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择反应式用法。

使用 MongoDatabaseFactory

要连接到 MongoDB,你可以使用 MongoDatabaseFactory 接口的一个实现。

以下示例展示了如何使用 SimpleMongoClientDatabaseFactory

MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
java

SimpleMongoClientDatabaseFactory 需要两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。如果你需要配置诸如 hostport 等属性,可以通过底层 MongoClients 类提供的其中一个构造函数来传递这些参数。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 文档。

使用 ReactiveMongoDatabaseFactory

要使用反应式驱动程序连接到 MongoDB,你可以使用 ReactiveMongoDatabaseFactory 接口的一个实现。

以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory

ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
java

MongoDB 消息存储

企业集成模式 (EIP) 一书所述,消息存储 让你可以持久化消息。这样做在处理具有缓冲消息能力的组件时 (QueueChannelaggregatorresequencer 等)可能是有用的,如果可靠性是一个关注点的话。在 Spring Integration 中,MessageStore 策略还为 声明检查 模式提供了基础,该模式也在 EIP 中有所描述。

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用于 claim check 模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)的实现。

以下示例配置了一个 MongoDbMessageStore 以使用 QueueChannel 和一个 aggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
xml

前面的例子是一个简单的 bean 配置,它期望一个 MongoDbFactory 作为构造函数参数。

MongoDbMessageStore 使用 Spring Data Mongo 映射机制将 Message 展开为包含所有嵌套属性的 Mongo 文档。当你需要访问存储的消息的 payloadheaders 进行审计或分析时,这非常有用。

important

MongoDbMessageStore 使用自定义的 MappingMongoConverter 实现将 Message 实例存储为 MongoDB 文档,Message 的属性(payloadheader 值)存在一些限制。

从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会被传播到内部的 MappingMongoConverter 实现中。更多信息请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) 的 JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它实现了 MessageStoreMessageGroupStore 接口。这个类可以接收一个 MongoTemplate 作为构造函数参数,通过它可以配置自定义的 WriteConcern。另一个构造函数需要 MappingMongoConverterMongoDbFactory,这使得你可以为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化来将 Message 实例写入和读取 MongoDB(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 的默认值来设置其他属性。它从提供的 MongoDbFactoryMappingMongoConverter 构建了一个 MongoTemplateConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。我们建议使用此实现来创建稳健且灵活的解决方案,特别是在消息包含复杂数据类型时。

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean)(默认为 true)选项,可以用于禁用自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
java

MongoDB 通道消息存储

版本 4.0 引入了新的 MongoDbChannelMessageStore。它是一个优化的 MessageGroupStore,用于 QueueChannel 实例中。当 priorityEnabled = true 时,您可以在 <int:priority-queue> 实例中使用它,以实现持久化消息的优先级顺序轮询。优先级 MongoDB 文档字段从消息头 IntegrationMessageHeaderAccessor.PRIORITY (priority) 填充。

此外,所有 MongoDB MessageStore 实例现在都有一个 sequence 字段用于 MessageGroup 文档。sequence 值是通过对同一集合中的简单 sequence 文档执行 $inc 操作的结果,该文档按需创建。sequence 字段用于 poll 操作,在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则在优先级之内)。

备注

我们不建议为优先级和非优先级使用相同的 MongoDbChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储。但是,相同的 collection 可以用于两种 MongoDbChannelMessageStore 类型,因为从存储中轮询消息是经过排序并使用索引的。要配置这种场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如下例所示:

<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
xml

使用 AbstractConfigurableMongoDbMessageStore 禁用自动索引创建

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了 setCreateIndex(boolean),可以用来禁用或启用(默认)自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);

return mongoDbChannelMessageStore;
}
java

MongoDB 元数据存储

Spring Integration 4.2 引入了一个新的基于 MongoDB 的 MetadataStore(请参阅 Metadata Store)实现。您可以使用 MongoDbMetadataStore 在应用程序重启时保持元数据状态。您可以将这种新的 MetadataStore 实现与适配器一起使用,例如:

要指示这些适配器使用新的 MongoDbMetadataStore,声明一个名为 metadataStore 的 Spring bean。进站通道适配器会自动检测并使用声明的 MongoDbMetadataStore。以下示例展示了如何声明一个名为 metadataStore 的 bean:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
java

MongoDbMetadataStore 还实现了 ConcurrentMetadataStore,使其可以在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。所有这些操作都是原子性的,这要归功于 MongoDB 的保证。

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 负载发送。以下示例展示了如何配置 MongoDB 入站通道适配器:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
xml

如上所述的配置所示,您可以通过使用 inbound-channel-adapter 元素并为各种属性(例如:)提供值来配置 MongoDb 入站通道适配器:

  • query: 一个 JSON 查询(参见 MongoDB 查询

  • query-expression: 一个 SpEL 表达式,它被评估为 JSON 查询字符串(如上面的 query 属性)或 o.s.data.mongodb.core.query.Query 的实例。与 query 属性互斥。

  • entity-class: 有效负载对象的类型。如果没有提供,则返回一个 com.mongodb.DBObject

  • collection-namecollection-name-expression: 确定要使用的 MongoDB 集合的名称。

  • mongodb-factory: 引用 o.s.data.mongodb.MongoDbFactory 的一个实例

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 的一个实例

  • 其他在所有其他传入适配器中共有的属性(例如 'channel')。

备注

你不能同时设置 mongo-templatemongodb-factory

前面的例子相对简单和静态,因为它有 query 的字面值,并使用 collection 的默认名称。有时,您可能需要根据某些条件在运行时更改这些值。为此,请使用它们的 -expression 等价物 (query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,您可能希望对从 MongoDB 读取的成功处理的数据进行一些后处理。例如,您可能希望在处理完一个文档后移动或删除它。您可以使用 Spring Integration 2.2 添加的事务同步功能来实现,如下例所示:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
xml

下面的示例显示了前面示例中引用的 DocumentCleaner

public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
java

你可以通过使用 transactional 元素来声明你的轮询器是事务性的。这个元素可以引用一个真实的事务管理器(例如,如果你的流程的其他部分调用了 JDBC)。如果你没有“真实”的事务,你可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的 PlatformTransactionManager 的一种实现,在没有实际事务的情况下,启用 Mongo 适配器的事务同步功能。

important

这样做并不会使 MongoDB 本身具备事务性。它允许在成功(提交)之前或之后、或在失败(回滚)之后进行操作同步。

一旦你的轮询器是事务性的,你可以在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的一个实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的一个实例。为了方便起见,我们已经暴露了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,并且它们的执行将与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及每个事件的一个通道,在这些通道中发送评估结果(如果有)。对于每个子元素,你可以指定 expressionchannel 属性。如果只有 channel 属性存在,则接收到的消息将作为特定同步场景的一部分发送到那里。如果只有 expression 属性存在,并且表达式的结果是非空值,则会生成并发送一条以结果为负载的消息到默认通道 (NullChannel) 并在日志中显示(在 DEBUG 级别)。如果你想让评估结果发送到特定的通道,请添加 channel 属性。如果表达式的结果是 null 或 void,则不会生成消息。

有关事务同步的更多信息,请参阅事务同步

从 5.5 版本开始,MongoDbMessageSource 可以使用 updateExpression 配置,该表达式必须评估为带有 MongoDb update 语法的 Stringorg.springframework.data.mongodb.core.query.Update 实例。它可以作为上述后处理程序的替代方案,并修改从集合中获取的实体,因此它们在下一个轮询周期中不会再次从集合中拉取(假设更新更改了查询中使用的一些值)。仍然建议使用事务来实现执行隔离和数据一致性,当集群中使用多个针对同一集合的 MongoDbMessageSource 实例时。

MongoDB 变更流 inbound 通道适配器

从 5.3 版本开始,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer —— 这是一个响应式的 MessageProducerSupport 实现,用于 Spring Data 的 ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API。此组件默认生成一个以 ChangeStreamEvent 作为负载 body 的消息 Flux,并包含一些与更改流相关的头部信息(参见 MongoHeaders)。建议将此 MongoDbChangeStreamMessageProducerFluxMessageChannel 结合使用,作为 outputChannel 以实现按需订阅和下游事件消费。

这个通道适配器的 Java DSL 配置可能看起来像这样:

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
java

MongoDbChangeStreamMessageProducer 被停止,或者下游取消订阅,或者 MongoDb 变更流产生一个 OperationType.INVALIDATE 时,Publisher 就完成了。通道适配器可以再次启动,并创建一个新的源数据 Publisher,并且它会自动在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。如果需要从其他地方消费变更流事件,则可以在启动之间重新配置此通道适配器以适应新选项。

有关 Spring Data MongoDb 中 change stream 的更多信息,请参阅文档

MongoDB 输出通道适配器

MongoDB outbound 通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
xml

如上所述的配置所示,您可以使用 outbound-channel-adapter 元素配置 MongoDB 外发通道适配器,为各种属性提供值,例如:

  • collection-namecollection-name-expression:标识要使用的 MongoDb 集合的名称。

  • mongo-converter:引用 o.s.data.mongodb.core.convert.MongoConverter 的一个实例,该实例有助于将原始 Java 对象转换为 JSON 文档表示形式。

  • mongodb-factory:引用 o.s.data.mongodb.MongoDbFactory 的一个实例。

  • mongo-template:引用 o.s.data.mongodb.core.MongoTemplate 的一个实例。注意:不能同时设置 mongo-template 和 mongodb-factory。

  • 其他在所有入站适配器中通用的属性(例如 'channel')。

前面的例子相对简单和静态,因为它有一个 collection-name 的字面值。有时,您可能需要根据某些条件在运行时更改此值。为此,请使用 collection-name-expression,其中提供的表达式是任何有效的 SpEL 表达式。

MongoDB 外发网关

版本 5.0 引入了 MongoDB 外发网关。它允许您通过向请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息有效负载和标题来指定查询和集合名称,如下例所示:

@SpringBootApplication
public class MongoDbJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private MongoDbFactory;

@Autowired
private MongoConverter;

@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}

private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}

}
java

你可以使用以下属性与 MongoDB outbound Gateway:

  • collection-namecollection-name-expression:标识要使用的 MongoDB 集合的名称。

  • mongo-converter:引用 o.s.data.mongodb.core.convert.MongoConverter 的一个实例,用于将原始 Java 对象转换为 JSON 文档表示。

  • mongodb-factory:引用 o.s.data.mongodb.MongoDbFactory 的一个实例。

  • mongo-template:引用 o.s.data.mongodb.core.MongoTemplate 的一个实例。注意:你不能同时设置 mongo-templatemongodb-factory

  • entity-class:传递给 MongoTemplate 中 find(..)findOne(..) 方法的实体类的全限定名。如果未提供此属性,默认值是 org.bson.Document

  • queryquery-expression:指定 MongoDB 查询。请参阅 MongoDB 文档以获取更多查询示例。

  • collection-callback:引用 org.springframework.data.mongodb.core.CollectionCallback 的一个实例。自 5.0.11 起,更推荐使用 o.s.i.mongodb.outbound.MessageCollectionCallback 的实例,并带有请求消息上下文。有关更多信息,请参阅其 Javadoc。注意:你不能同时拥有 collection-callback 和任何查询属性。

作为 queryquery-expression 属性的替代,你可以通过使用 collectionCallback 属性引用 MessageCollectionCallback 函数式接口的实现来指定其他数据库操作。以下示例指定了一个计数操作:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
java

MongoDB 反应式通道适配器

从 5.3 版本开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。它们基于 Spring Data 的 ReactiveMongoOperations,并需要一个 org.mongodb:mongodb-driver-reactivestreams 依赖。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的一个实现,在集成流定义涉及反应式流组合时,框架中对此有原生支持。有关更多信息,请参阅 ReactiveMessageHandler

从配置的角度来看,与其他许多标准通道适配器没有区别。例如,使用 Java DSL,这样的通道适配器可以这样使用:

请注意,具体的实现细节会根据实际的开发环境和需求有所不同。

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
java

在本示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到名为 data 的默认集合中。 实际操作将根据需要从内部创建的 ReactiveStreamsConsumer 中的反应式流组合中执行。

ReactiveMongoDbMessageSource 是基于提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDb 查询(或表达式)的 AbstractMessageSource 实现,根据 expectSingleResult 选项调用 find()findOne() 操作,并期望 entityClass 类型以转换查询结果。查询执行和结果评估是在需要时进行的,当生成的消息的有效负载中的 Publisher (FluxMono 根据 expectSingleResult 选项) 被订阅时。框架可以在下游使用拆分器和 FluxMessageChannel 时自动订阅此类有效负载(本质上是 flatMap)。否则,由目标应用程序负责订阅在下游端点中轮询的发布者。

使用 Java DSL,这样的通道适配器可以配置为:

// 示例代码,根据实际情况调整
@Bean
public IntegrationFlow myChannelAdapter() {
return IntegrationFlows.from(() -> "payload",
e -> e.poller(Pollers.fixedRate(5000)))
.handle(logger())
.get();
}
java

请注意,以上代码仅作为示例,具体实现可能因需求不同而有所变化。

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
java

从 5.5 版本开始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有与阻塞型 MongoDbMessageSource 相同的功能。更多信息请参阅 MongoDB 入站通道适配器AbstractMongoDbMessageSourceSpec 的 JavaDocs。