MongoDb 支持
版本 2.1 引入了对 MongoDB 的支持:这是一个“高性能、开源、面向文档的数据库”。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:7.0.2"
要下载、安装和运行 MongoDB,请参阅 MongoDB 文档。
连接到 MongoDB
阻塞式还是响应式?
从 5.3 版本开始,Spring Integration 提供了对响应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时实现非阻塞 I/O。要启用响应式支持,请将 MongoDB 响应式流驱动程序添加到您的依赖项中:
- Maven
- Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于常规的同步客户端,您需要将其对应的驱动程序添加到依赖项中:
- Maven
- Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
两者在框架中均为可选,以更好地支持最终用户的选择。
要开始与 MongoDB 交互,首先需要连接到它。Spring Integration 基于另一个 Spring 项目 Spring Data MongoDB 提供的支持构建。它提供了名为 MongoDatabaseFactory 和 ReactiveMongoDatabaseFactory 的工厂类,这些类简化了与 MongoDB Client API 的集成。
Spring Data 默认提供阻塞式的 MongoDB 驱动程序,但您也可以通过包含上述依赖项来选择使用响应式功能。
使用 MongoDatabaseFactory
要连接到 MongoDB,你可以使用 MongoDatabaseFactory 接口的一个实现。
以下示例展示了如何使用 SimpleMongoClientDatabaseFactory:
- Java
- XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory 接受两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。如果需要配置诸如 host、port 等属性,可以通过使用底层 MongoClients 类提供的构造函数之一来传递这些属性。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考文档。
使用 ReactiveMongoDatabaseFactory
要使用响应式驱动连接到 MongoDB,你可以使用 ReactiveMongoDatabaseFactory 接口的实现。
以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory:
- Java
- XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 消息存储
正如《企业集成模式》(EIP)一书所述,消息存储 允许您持久化消息。当处理具有缓冲消息能力的组件(如 QueueChannel、aggregator、resequencer 等)且对可靠性有要求时,这非常有用。在 Spring Integration 中,MessageStore 策略还为 claim check 模式提供了基础,该模式在 EIP 中也有描述。
Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它同时实现了 MessageStore 策略(主要用于声明检查模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)。
以下示例配置了一个 MongoDbMessageStore,用于 QueueChannel 和 aggregator:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
前面的例子是一个简单的 bean 配置,它期望一个 MongoDbFactory 作为构造函数的参数。
MongoDbMessageStore 通过使用 Spring Data Mongo 映射机制,将 Message 扩展为包含所有嵌套属性的 Mongo 文档。当您需要访问 payload 或 headers 进行审计或分析时(例如,针对存储的消息),这非常有用。
MongoDbMessageStore 使用自定义的 MappingMongoConverter 实现将 Message 实例存储为 MongoDB 文档,并且 Message 的属性(payload 和 header 值)存在一些限制。
从版本 5.1.6 开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会传播到内部的 MappingMongoConverter 实现中。更多信息请参阅 MongoDbMessageStore.setCustomConverters(Object… customConverters) 的 JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它同时实现了 MessageStore 和 MessageGroupStore 接口。该类可以将 MongoTemplate 作为构造函数参数接收,例如,您可以通过它配置自定义的 WriteConcern。另一个构造函数需要 MappingMongoConverter 和 MongoDbFactory,这允许您为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将 Message 实例写入 MongoDB 或从中读取(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 中其他属性的默认值。它根据提供的 MongoDbFactory 和 MappingMongoConverter 构建一个 MongoTemplate。ConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。当消息包含复杂数据类型时,我们建议使用此实现来创建健壮且灵活的解决方案。
从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean) 选项(默认为 true),可用于禁用自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB 优先级通道消息存储
版本 4.0 引入了新的 MongoDbChannelMessageStore。这是一个针对 QueueChannel 实例使用而优化的 MessageGroupStore。当 priorityEnabled = true 时,您可以在 <int:priority-queue> 实例中使用它,以实现对持久化消息的优先级顺序轮询。MongoDB 文档中的优先级字段由 IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息头填充。
此外,所有 MongoDB MessageStore 实例现在都为 MessageGroup 文档添加了一个 sequence 字段。sequence 值是对同一集合中一个简单的 sequence 文档执行 $inc 操作的结果,该文档是按需创建的。sequence 字段在 poll 操作中用于提供先进先出(FIFO)的消息顺序(如果配置了优先级,则在优先级内),当消息在同一毫秒内存储时。
我们不建议为优先级和非优先级使用相同的 MongoDbChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储。然而,相同的 collection 可以用于两种 MongoDbChannelMessageStore 类型,因为从存储中轮询消息是经过排序并使用索引的。要配置该场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如下例所示:
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
使用禁用自动索引创建的 AbstractConfigurableMongoDbMessageStore
从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean) 方法,可用于禁用或启用(默认)自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 元数据存储
Spring Integration 4.2 引入了一个新的基于 MongoDB 的 MetadataStore(参见 元数据存储)实现。你可以使用 MongoDbMetadataStore 来在应用程序重启之间维护元数据状态。你可以将此新的 MetadataStore 实现与以下适配器一起使用:
要指示这些适配器使用新的 MongoDbMetadataStore,需要声明一个名为 metadataStore 的 Spring bean。Feed 入站通道适配器会自动检测并使用已声明的 MongoDbMetadataStore。以下示例展示了如何声明一个名为 metadataStore 的 bean:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore 同样实现了 ConcurrentMetadataStore 接口,使其能够在多个应用程序实例之间可靠地共享,其中只有一个实例被允许存储或修改某个键的值。得益于 MongoDB 的保证,所有这些操作都是原子性的。
MongoDB 入站通道适配器
MongoDB入站通道适配器是一个轮询消费者,它从MongoDB读取数据并将其作为Message负载发送。以下示例展示了如何配置MongoDB入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前面的配置所示,您可以通过使用 inbound-channel-adapter 元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:
-
query: 一个 JSON 查询(参见 MongoDB 查询) -
query-expression: 一个 SpEL 表达式,其求值结果为一个 JSON 查询字符串(与上述query属性相同)或一个o.s.data.mongodb.core.query.Query的实例。与query属性互斥。 -
entity-class: 负载对象的类型。如果未提供,则返回一个com.mongodb.DBObject。 -
collection-name或collection-name-expression: 指定要使用的 MongoDB 集合的名称。 -
mongodb-factory: 对o.s.data.mongodb.MongoDbFactory实例的引用。 -
mongo-template: 对o.s.data.mongodb.core.MongoTemplate实例的引用。 -
其他所有入站适配器共有的属性(例如 'channel')。
不能同时设置 mongo-template 和 mongodb-factory。
前面的例子相对简单且静态,因为它为 query 使用了字面值,并为 collection 使用了默认名称。有时,你可能需要根据某些条件在运行时更改这些值。为此,可以使用它们的 -expression 等价形式(query-expression 和 collection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,你可能希望对从 MongoDB 成功处理的数据进行一些后处理。例如,你可能希望在处理完文档后移动或删除它。你可以使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例展示了前面示例中引用的 DocumentCleaner:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
你可以通过使用 transactional 元素将轮询器声明为事务性。该元素可以引用一个真实的事务管理器,例如,当流程的其他部分调用 JDBC 时。如果你没有“真实”的事务,可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的 PlatformTransactionManager 的实现,在没有实际事务的情况下,允许使用 Mongo 适配器的事务同步功能。
这样做并不会使 MongoDB 本身具备事务性。它允许在成功(提交)或失败(回滚)之前或之后同步执行操作。
一旦你的轮询器具备事务性,你可以在 transactional 元素上设置一个 o.s.i.transaction.TransactionSynchronizationFactory 的实例。TransactionSynchronizationFactory 会创建一个 TransactionSynchronization 的实例。为了方便起见,我们提供了一个基于 SpEL 的默认 TransactionSynchronizationFactory,允许你配置 SpEL 表达式,其执行将与事务协调(同步)。支持提交前、提交后和回滚后事件的表达式,并为每个事件提供一个通道,用于发送评估结果(如果有)。对于每个子元素,你可以指定 expression 和 channel 属性。如果只存在 channel 属性,接收到的消息将作为特定同步场景的一部分发送到该通道。如果只存在 expression 属性,并且表达式的结果为非空值,则会生成一个以该结果为负载的消息,并发送到默认通道(NullChannel),并在日志中显示(在 DEBUG 级别)。如果你希望评估结果发送到特定通道,请添加 channel 属性。如果表达式的结果为 null 或 void,则不会生成消息。
有关事务同步的更多信息,请参阅事务同步。
从版本5.5开始,MongoDbMessageSource 可以配置一个 updateExpression,该表达式必须求值为一个使用 MongoDb update 语法的 String 或一个 org.springframework.data.mongodb.core.query.Update 实例。它可以作为替代方案来描述上述后处理过程,并修改从集合中获取的那些实体,这样在下一次轮询周期中就不会再次从集合中拉取这些实体(假设更新更改了查询中使用的某些值)。当集群中为同一集合使用多个 MongoDbMessageSource 实例时,仍然建议使用事务来实现执行隔离和数据一致性。
MongoDB 变更流入站通道适配器
自 5.3 版本起,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer —— 一个针对 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的反应式 MessageProducerSupport 实现。该组件默认生成一个消息 Flux,其 body 为 ChangeStreamEvent 作为负载,并附带一些与变更流相关的头部信息(参见 MongoHeaders)。建议将此 MongoDbChangeStreamMessageProducer 与 FluxMessageChannel 结合使用,作为 outputChannel 以便按需订阅和下游事件消费。
该通道适配器的 Java DSL 配置可能如下所示:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
当 MongoDbChangeStreamMessageProducer 停止、下游订阅被取消,或 MongoDb 变更流产生 OperationType.INVALIDATE 时,Publisher 将完成。通道适配器可以重新启动,并创建新的源数据 Publisher,该发布器会在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自动订阅。如果需要从其他位置消费变更流事件,可以在两次启动之间为此通道适配器重新配置新选项。
有关变更流支持的更多信息,请参阅 Spring Data MongoDB 文档。
MongoDB 出站通道适配器
MongoDB出站通道适配器允许您将消息有效负载写入MongoDB文档存储,如下例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前面的配置所示,您可以使用 outbound-channel-adapter 元素配置 MongoDB 出站通道适配器,并为各种属性提供值,例如:
-
collection-name或collection-name-expression: 指定要使用的 MongoDb 集合名称。 -
mongo-converter: 引用o.s.data.mongodb.core.convert.MongoConverter实例,用于协助将原始 Java 对象转换为 JSON 文档表示。 -
mongodb-factory: 引用o.s.data.mongodb.MongoDbFactory实例。 -
mongo-template: 引用o.s.data.mongodb.core.MongoTemplate实例。注意:不能同时设置mongo-template和mongodb-factory。 -
所有入站适配器通用的其他属性(例如 'channel')。
前面的例子相对简单且静态,因为它为 collection-name 使用了字面值。有时,你可能需要根据某些条件在运行时更改这个值。为此,可以使用 collection-name-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。
MongoDB 出站网关
版本 5.0 引入了 MongoDB 出站网关。它允许您通过向请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息负载和标头来指定查询和集合名称,如下例所示:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以在 MongoDB 出站网关中使用以下属性:
-
collection-name或collection-name-expression:指定要使用的 MongoDB 集合名称。 -
mongo-converter:引用o.s.data.mongodb.core.convert.MongoConverter的实例,用于协助将原始 Java 对象转换为 JSON 文档表示。 -
mongodb-factory:引用o.s.data.mongodb.MongoDbFactory的实例。 -
mongo-template:引用o.s.data.mongodb.core.MongoTemplate的实例。注意:不能同时设置mongo-template和mongodb-factory。 -
entity-class:要传递给 MongoTemplate 中find(..)和findOne(..)方法的实体类的完全限定名。如果未提供此属性,则默认值为org.bson.Document。 -
query或query-expression:指定 MongoDB 查询。更多查询示例请参阅 MongoDB 文档。 -
collection-callback:引用org.springframework.data.mongodb.core.CollectionCallback的实例。自 5.0.11 版本起,更推荐使用带有请求消息上下文的o.s.i.mongodb.outbound.MessageCollectionCallback实例。更多信息请参阅其 Javadocs。注意:不能同时拥有collection-callback和任何查询属性。
作为 query 和 query-expression 属性的替代方案,您可以通过使用 collectionCallback 属性来引用 MessageCollectionCallback 函数式接口实现,以指定其他数据库操作。以下示例指定了一个计数操作:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 响应式通道适配器
从版本 5.3 开始,提供了 ReactiveMongoDbStoringMessageHandler 和 ReactiveMongoDbMessageSource 实现。它们基于 Spring Data 的 ReactiveMongoOperations,并需要 org.mongodb:mongodb-driver-reactivestreams 依赖。
ReactiveMongoDbStoringMessageHandler 是 ReactiveMessageHandler 的一个实现,当集成流定义中涉及响应式流组合时,该实现在本框架中得到了原生支持。更多信息请参阅 ReactiveMessageHandler。
从配置角度来看,它与许多其他标准通道适配器并无区别。例如,在 Java DSL 中,这样的通道适配器可以这样使用:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在本示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到名为 data 的默认集合中。实际操作将在内部创建的 ReactiveStreamsConsumer 中,根据响应式流的组合按需执行。
ReactiveMongoDbMessageSource 是一个基于提供的 ReactiveMongoDatabaseFactory 或 ReactiveMongoOperations 以及 MongoDb 查询(或表达式)的 AbstractMessageSource 实现,它根据 expectSingleResult 选项调用 find() 或 findOne() 操作,并使用预期的 entityClass 类型来转换查询结果。查询执行和结果评估在消息有效载荷中的 Publisher(根据 expectSingleResult 选项为 Flux 或 Mono)被订阅时按需执行。当下游使用拆分器和 FluxMessageChannel 时,框架可以自动订阅此类有效载荷(本质上是 flatMap)。否则,下游端点中订阅轮询发布器的责任由目标应用程序承担。
使用 Java DSL,可以这样配置一个通道适配器:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
从版本5.5开始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有与阻塞式 MongoDbMessageSource 相同的功能。更多信息请参阅 MongoDB 入站通道适配器 和 AbstractMongoDbMessageSourceSpec JavaDocs。