跳到主要内容
版本:7.0.2

MongoDb 支持

DeepSeek V3 中英对照 MongoDb Support

版本 2.1 引入了对 MongoDB 的支持:这是一个“高性能、开源、面向文档的数据库”。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>7.0.2</version>
</dependency>

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档

连接到 MongoDB

阻塞式还是响应式?

从 5.3 版本开始,Spring Integration 提供了对响应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时实现非阻塞 I/O。要启用响应式支持,请将 MongoDB 响应式流驱动程序添加到您的依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>

对于常规的同步客户端,您需要将其对应的驱动程序添加到依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>

两者在框架中均为可选,以更好地支持最终用户的选择。

要开始与 MongoDB 交互,首先需要连接到它。Spring Integration 基于另一个 Spring 项目 Spring Data MongoDB 提供的支持构建。它提供了名为 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,这些类简化了与 MongoDB Client API 的集成。

提示

Spring Data 默认提供阻塞式的 MongoDB 驱动程序,但您也可以通过包含上述依赖项来选择使用响应式功能。

使用 MongoDatabaseFactory

要连接到 MongoDB,你可以使用 MongoDatabaseFactory 接口的一个实现。

以下示例展示了如何使用 SimpleMongoClientDatabaseFactory

MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");

SimpleMongoClientDatabaseFactory 接受两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。如果需要配置诸如 hostport 等属性,可以通过使用底层 MongoClients 类提供的构造函数之一来传递这些属性。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考文档。

使用 ReactiveMongoDatabaseFactory

要使用响应式驱动连接到 MongoDB,你可以使用 ReactiveMongoDatabaseFactory 接口的实现。

以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory

ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");

MongoDB 消息存储

正如《企业集成模式》(EIP)一书所述,消息存储 允许您持久化消息。当处理具有缓冲消息能力的组件(如 QueueChannelaggregatorresequencer 等)且对可靠性有要求时,这非常有用。在 Spring Integration 中,MessageStore 策略还为 claim check 模式提供了基础,该模式在 EIP 中也有描述。

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它同时实现了 MessageStore 策略(主要用于声明检查模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)。

以下示例配置了一个 MongoDbMessageStore,用于 QueueChannelaggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>

前面的例子是一个简单的 bean 配置,它期望一个 MongoDbFactory 作为构造函数的参数。

MongoDbMessageStore 通过使用 Spring Data Mongo 映射机制,将 Message 扩展为包含所有嵌套属性的 Mongo 文档。当您需要访问 payloadheaders 进行审计或分析时(例如,针对存储的消息),这非常有用。

important

MongoDbMessageStore 使用自定义的 MappingMongoConverter 实现将 Message 实例存储为 MongoDB 文档,并且 Message 的属性(payloadheader 值)存在一些限制。

从版本 5.1.6 开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会传播到内部的 MappingMongoConverter 实现中。更多信息请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) 的 JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它同时实现了 MessageStoreMessageGroupStore 接口。该类可以将 MongoTemplate 作为构造函数参数接收,例如,您可以通过它配置自定义的 WriteConcern。另一个构造函数需要 MappingMongoConverterMongoDbFactory,这允许您为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将 Message 实例写入 MongoDB 或从中读取(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 中其他属性的默认值。它根据提供的 MongoDbFactoryMappingMongoConverter 构建一个 MongoTemplateConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。当消息包含复杂数据类型时,我们建议使用此实现来创建健壮且灵活的解决方案。

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean) 选项(默认为 true),可用于禁用自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}

MongoDB 优先级通道消息存储

版本 4.0 引入了新的 MongoDbChannelMessageStore。这是一个针对 QueueChannel 实例使用而优化的 MessageGroupStore。当 priorityEnabled = true 时,您可以在 <int:priority-queue> 实例中使用它,以实现对持久化消息的优先级顺序轮询。MongoDB 文档中的优先级字段由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息头填充。

此外,所有 MongoDB MessageStore 实例现在都为 MessageGroup 文档添加了一个 sequence 字段。sequence 值是对同一集合中一个简单的 sequence 文档执行 $inc 操作的结果,该文档是按需创建的。sequence 字段在 poll 操作中用于提供先进先出(FIFO)的消息顺序(如果配置了优先级,则在优先级内),当消息在同一毫秒内存储时。

备注

我们不建议为优先级和非优先级使用相同的 MongoDbChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储。然而,相同的 collection 可以用于两种 MongoDbChannelMessageStore 类型,因为从存储中轮询消息是经过排序并使用索引的。要配置该场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如下例所示:

<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>

使用禁用自动索引创建的 AbstractConfigurableMongoDbMessageStore

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean) 方法,可用于禁用或启用(默认)自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);

return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一个新的基于 MongoDB 的 MetadataStore(参见 元数据存储)实现。你可以使用 MongoDbMetadataStore 来在应用程序重启之间维护元数据状态。你可以将此新的 MetadataStore 实现与以下适配器一起使用:

要指示这些适配器使用新的 MongoDbMetadataStore,需要声明一个名为 metadataStore 的 Spring bean。Feed 入站通道适配器会自动检测并使用已声明的 MongoDbMetadataStore。以下示例展示了如何声明一个名为 metadataStore 的 bean:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 同样实现了 ConcurrentMetadataStore 接口,使其能够在多个应用程序实例之间可靠地共享,其中只有一个实例被允许存储或修改某个键的值。得益于 MongoDB 的保证,所有这些操作都是原子性的。

MongoDB 入站通道适配器

MongoDB入站通道适配器是一个轮询消费者,它从MongoDB读取数据并将其作为Message负载发送。以下示例展示了如何配置MongoDB入站通道适配器:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您可以通过使用 inbound-channel-adapter 元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:

  • query: 一个 JSON 查询(参见 MongoDB 查询

  • query-expression: 一个 SpEL 表达式,其求值结果为一个 JSON 查询字符串(与上述 query 属性相同)或一个 o.s.data.mongodb.core.query.Query 的实例。与 query 属性互斥。

  • entity-class: 负载对象的类型。如果未提供,则返回一个 com.mongodb.DBObject

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名称。

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用。

  • 其他所有入站适配器共有的属性(例如 'channel')。

备注

不能同时设置 mongo-templatemongodb-factory

前面的例子相对简单且静态,因为它为 query 使用了字面值,并为 collection 使用了默认名称。有时,你可能需要根据某些条件在运行时更改这些值。为此,可以使用它们的 -expression 等价形式(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,你可能希望对从 MongoDB 成功处理的数据进行一些后处理。例如,你可能希望在处理完文档后移动或删除它。你可以使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例展示了前面示例中引用的 DocumentCleaner

public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}

你可以通过使用 transactional 元素将轮询器声明为事务性。该元素可以引用一个真实的事务管理器,例如,当流程的其他部分调用 JDBC 时。如果你没有“真实”的事务,可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的 PlatformTransactionManager 的实现,在没有实际事务的情况下,允许使用 Mongo 适配器的事务同步功能。

important

这样做并不会使 MongoDB 本身具备事务性。它允许在成功(提交)或失败(回滚)之前或之后同步执行操作。

一旦你的轮询器具备事务性,你可以在 transactional 元素上设置一个 o.s.i.transaction.TransactionSynchronizationFactory 的实例。TransactionSynchronizationFactory 会创建一个 TransactionSynchronization 的实例。为了方便起见,我们提供了一个基于 SpEL 的默认 TransactionSynchronizationFactory,允许你配置 SpEL 表达式,其执行将与事务协调(同步)。支持提交前、提交后和回滚后事件的表达式,并为每个事件提供一个通道,用于发送评估结果(如果有)。对于每个子元素,你可以指定 expressionchannel 属性。如果只存在 channel 属性,接收到的消息将作为特定同步场景的一部分发送到该通道。如果只存在 expression 属性,并且表达式的结果为非空值,则会生成一个以该结果为负载的消息,并发送到默认通道(NullChannel),并在日志中显示(在 DEBUG 级别)。如果你希望评估结果发送到特定通道,请添加 channel 属性。如果表达式的结果为 null 或 void,则不会生成消息。

有关事务同步的更多信息,请参阅事务同步

从版本5.5开始,MongoDbMessageSource 可以配置一个 updateExpression,该表达式必须求值为一个使用 MongoDb update 语法的 String 或一个 org.springframework.data.mongodb.core.query.Update 实例。它可以作为替代方案来描述上述后处理过程,并修改从集合中获取的那些实体,这样在下一次轮询周期中就不会再次从集合中拉取这些实体(假设更新更改了查询中使用的某些值)。当集群中为同一集合使用多个 MongoDbMessageSource 实例时,仍然建议使用事务来实现执行隔离和数据一致性。

MongoDB 变更流入站通道适配器

自 5.3 版本起,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer —— 一个针对 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的反应式 MessageProducerSupport 实现。该组件默认生成一个消息 Flux,其 bodyChangeStreamEvent 作为负载,并附带一些与变更流相关的头部信息(参见 MongoHeaders)。建议将此 MongoDbChangeStreamMessageProducerFluxMessageChannel 结合使用,作为 outputChannel 以便按需订阅和下游事件消费。

该通道适配器的 Java DSL 配置可能如下所示:

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}

MongoDbChangeStreamMessageProducer 停止、下游订阅被取消,或 MongoDb 变更流产生 OperationType.INVALIDATE 时,Publisher 将完成。通道适配器可以重新启动,并创建新的源数据 Publisher,该发布器会在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自动订阅。如果需要从其他位置消费变更流事件,可以在两次启动之间为此通道适配器重新配置新选项。

有关变更流支持的更多信息,请参阅 Spring Data MongoDB 文档

MongoDB 出站通道适配器

MongoDB出站通道适配器允许您将消息有效负载写入MongoDB文档存储,如下例所示:

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />

如前面的配置所示,您可以使用 outbound-channel-adapter 元素配置 MongoDB 出站通道适配器,并为各种属性提供值,例如:

  • collection-namecollection-name-expression: 指定要使用的 MongoDb 集合名称。

  • mongo-converter: 引用 o.s.data.mongodb.core.convert.MongoConverter 实例,用于协助将原始 Java 对象转换为 JSON 文档表示。

  • mongodb-factory: 引用 o.s.data.mongodb.MongoDbFactory 实例。

  • mongo-template: 引用 o.s.data.mongodb.core.MongoTemplate 实例。注意:不能同时设置 mongo-templatemongodb-factory

  • 所有入站适配器通用的其他属性(例如 'channel')。

前面的例子相对简单且静态,因为它为 collection-name 使用了字面值。有时,你可能需要根据某些条件在运行时更改这个值。为此,可以使用 collection-name-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。

MongoDB 出站网关

版本 5.0 引入了 MongoDB 出站网关。它允许您通过向请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息负载和标头来指定查询和集合名称,如下例所示:

@SpringBootApplication
public class MongoDbJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private MongoDbFactory;

@Autowired
private MongoConverter;

@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}

private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}

}

您可以在 MongoDB 出站网关中使用以下属性:

  • collection-namecollection-name-expression:指定要使用的 MongoDB 集合名称。

  • mongo-converter:引用 o.s.data.mongodb.core.convert.MongoConverter 的实例,用于协助将原始 Java 对象转换为 JSON 文档表示。

  • mongodb-factory:引用 o.s.data.mongodb.MongoDbFactory 的实例。

  • mongo-template:引用 o.s.data.mongodb.core.MongoTemplate 的实例。注意:不能同时设置 mongo-templatemongodb-factory

  • entity-class:要传递给 MongoTemplate 中 find(..)findOne(..) 方法的实体类的完全限定名。如果未提供此属性,则默认值为 org.bson.Document

  • queryquery-expression:指定 MongoDB 查询。更多查询示例请参阅 MongoDB 文档

  • collection-callback:引用 org.springframework.data.mongodb.core.CollectionCallback 的实例。自 5.0.11 版本起,更推荐使用带有请求消息上下文的 o.s.i.mongodb.outbound.MessageCollectionCallback 实例。更多信息请参阅其 Javadocs。注意:不能同时拥有 collection-callback 和任何查询属性。

作为 queryquery-expression 属性的替代方案,您可以通过使用 collectionCallback 属性来引用 MessageCollectionCallback 函数式接口实现,以指定其他数据库操作。以下示例指定了一个计数操作:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}

MongoDB 响应式通道适配器

从版本 5.3 开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。它们基于 Spring Data 的 ReactiveMongoOperations,并需要 org.mongodb:mongodb-driver-reactivestreams 依赖。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的一个实现,当集成流定义中涉及响应式流组合时,该实现在本框架中得到了原生支持。更多信息请参阅 ReactiveMessageHandler

从配置角度来看,它与许多其他标准通道适配器并无区别。例如,在 Java DSL 中,这样的通道适配器可以这样使用:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在本示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到名为 data 的默认集合中。实际操作将在内部创建的 ReactiveStreamsConsumer 中,根据响应式流的组合按需执行。

ReactiveMongoDbMessageSource 是一个基于提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 以及 MongoDb 查询(或表达式)的 AbstractMessageSource 实现,它根据 expectSingleResult 选项调用 find()findOne() 操作,并使用预期的 entityClass 类型来转换查询结果。查询执行和结果评估在消息有效载荷中的 Publisher(根据 expectSingleResult 选项为 FluxMono)被订阅时按需执行。当下游使用拆分器和 FluxMessageChannel 时,框架可以自动订阅此类有效载荷(本质上是 flatMap)。否则,下游端点中订阅轮询发布器的责任由目标应用程序承担。

使用 Java DSL,可以这样配置一个通道适配器:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}

从版本5.5开始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有与阻塞式 MongoDbMessageSource 相同的功能。更多信息请参阅 MongoDB 入站通道适配器AbstractMongoDbMessageSourceSpec JavaDocs。