MongoDB 支持
版本 2.1 引入了对 MongoDB 的支持:一个“高性能、开源、文档导向型数据库”。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.2"
要下载、安装和运行 MongoDB,请参阅 MongoDB 文档。
连接到 MongoDb
阻塞或响应?
从 5.3 版本开始,Spring Integration 提供了对反应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:
- Maven
- Gradle
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于常规同步客户端,您需要将其相应的驱动程序添加到依赖项中:
- Maven
- Gradle
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
它们在框架中都是 optional,以更好地支持最终用户的选择。
要开始与 MongoDB 交互,您首先需要连接到它。Spring Integration 基于另一个 Spring 项目 Spring Data MongoDB 提供的支持进行构建。它提供了名为 MongoDatabaseFactory 和 ReactiveMongoDatabaseFactory 的工厂类,这些类简化了与 MongoDB 客户端 API 的集成。
Spring Data 默认提供阻塞式的 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择反应式用法。
使用 MongoDatabaseFactory
要连接到 MongoDB,你可以使用 MongoDatabaseFactory 接口的一个实现。
以下示例展示了如何使用 SimpleMongoClientDatabaseFactory :
- Java
- XML
MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory 需要两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。如果你需要配置诸如 host、port 等属性,可以通过底层 MongoClients 类提供的其中一个构造函数来传递这些参数。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 文档。
使用 ReactiveMongoDatabaseFactory
要使用反应式驱动程序连接到 MongoDB,你可以使用 ReactiveMongoDatabaseFactory 接口的一个实现。
以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory:
- Java
- XML
ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>
MongoDB 消息存储
Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用于 claim check 模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)的实现。
以下示例配置了一个 MongoDbMessageStore 以使用 QueueChannel 和一个 aggregator:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>
前面的例子是一个简单的 bean 配置,它期望一个 MongoDbFactory 作为构造函数参数。
MongoDbMessageStore 使用 Spring Data Mongo 映射机制将 Message 展开为包含所有嵌套属性的 Mongo 文档。当你需要访问存储的消息的 payload 或 headers 进行审计或分析时,这非常有用。
MongoDbMessageStore 使用自定义的 MappingMongoConverter 实现将 Message 实例存储为 MongoDB 文档,Message 的属性(payload 和 header 值)存在一些限制。
从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会被传播到内部的 MappingMongoConverter 实现中。更多信息请参阅 MongoDbMessageStore.setCustomConverters(Object… customConverters) 的 JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它实现了 MessageStore 和 MessageGroupStore 接口。这个类可以接收一个 MongoTemplate 作为构造函数参数,通过它可以配置自定义的 WriteConcern。另一个构造函数需要 MappingMongoConverter 和 MongoDbFactory,这使得你可以为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化来将 Message 实例写入和读取 MongoDB(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 的默认值来设置其他属性。它从提供的 MongoDbFactory 和 MappingMongoConverter 构建了一个 MongoTemplate。ConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。我们建议使用此实现来创建稳健且灵活的解决方案,特别是在消息包含复杂数据类型时。
从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean)(默认为 true)选项,可以用于禁用自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}
MongoDB 通道消息存储
版本 4.0 引入了新的 MongoDbChannelMessageStore。它是一个优化的 MessageGroupStore,用于 QueueChannel 实例中。当 priorityEnabled = true 时,您可以在 <int:priority-queue> 实例中使用它,以实现持久化消息的优先级顺序轮询。优先级 MongoDB 文档字段从消息头 IntegrationMessageHeaderAccessor.PRIORITY (priority) 填充。
此外,所有 MongoDB MessageStore 实例现在都有一个 sequence 字段用于 MessageGroup 文档。sequence 值是通过对同一集合中的简单 sequence 文档执行 $inc 操作的结果,该文档按需创建。sequence 字段用于 poll 操作,在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则在优先级之内)。
我们不建议为优先级和非优先级使用相同的 MongoDbChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储。但是,相同的 collection 可以用于两种 MongoDbChannelMessageStore 类型,因为从存储中轮询消息是经过排序并使用索引的。要配置这种场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如下例所示:
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>
使用 AbstractConfigurableMongoDbMessageStore 禁用自动索引创建
从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了 setCreateIndex(boolean),可以用来禁用或启用(默认)自动索引创建。以下示例展示了如何声明一个 bean 并禁用自动索引创建:
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);
    return mongoDbChannelMessageStore;
}
MongoDB 元数据存储
Spring Integration 4.2 引入了一个新的基于 MongoDB 的 MetadataStore(请参阅 Metadata Store)实现。您可以使用 MongoDbMetadataStore 在应用程序重启时保持元数据状态。您可以将这种新的 MetadataStore 实现与适配器一起使用,例如:
要指示这些适配器使用新的 MongoDbMetadataStore,声明一个名为 metadataStore 的 Spring bean。进站通道适配器会自动检测并使用声明的 MongoDbMetadataStore。以下示例展示了如何声明一个名为 metadataStore 的 bean:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore 还实现了 ConcurrentMetadataStore,使其可以在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。所有这些操作都是原子性的,这要归功于 MongoDB 的保证。
MongoDB 入站通道适配器
MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 负载发送。以下示例展示了如何配置 MongoDB 入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如上所述的配置所示,您可以通过使用 inbound-channel-adapter 元素并为各种属性(例如:)提供值来配置 MongoDb 入站通道适配器:
- 
query: 一个 JSON 查询(参见 MongoDB 查询)
- 
query-expression: 一个 SpEL 表达式,它被评估为 JSON 查询字符串(如上面的query属性)或o.s.data.mongodb.core.query.Query的实例。与query属性互斥。
- 
entity-class: 有效负载对象的类型。如果没有提供,则返回一个com.mongodb.DBObject。
- 
collection-name或collection-name-expression: 确定要使用的 MongoDB 集合的名称。
- 
mongodb-factory: 引用o.s.data.mongodb.MongoDbFactory的一个实例
- 
mongo-template: 引用o.s.data.mongodb.core.MongoTemplate的一个实例
- 
其他在所有其他传入适配器中共有的属性(例如 'channel')。 
你不能同时设置 mongo-template 和 mongodb-factory。
前面的例子相对简单和静态,因为它有 query 的字面值,并使用 collection 的默认名称。有时,您可能需要根据某些条件在运行时更改这些值。为此,请使用它们的 -expression 等价物 (query-expression 和 collection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 MongoDB 读取的成功处理的数据进行一些后处理。例如,您可能希望在处理完一个文档后移动或删除它。您可以使用 Spring Integration 2.2 添加的事务同步功能来实现,如下例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
下面的示例显示了前面示例中引用的 DocumentCleaner :
public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}
你可以通过使用 transactional 元素来声明你的轮询器是事务性的。这个元素可以引用一个真实的事务管理器(例如,如果你的流程的其他部分调用了 JDBC)。如果你没有“真实”的事务,你可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的 PlatformTransactionManager 的一种实现,在没有实际事务的情况下,启用 Mongo 适配器的事务同步功能。
这样做并不会使 MongoDB 本身具备事务性。它允许在成功(提交)之前或之后、或在失败(回滚)之后进行操作同步。
一旦你的轮询器是事务性的,你可以在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的一个实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的一个实例。为了方便起见,我们已经暴露了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,并且它们的执行将与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及每个事件的一个通道,在这些通道中发送评估结果(如果有)。对于每个子元素,你可以指定 expression 和 channel 属性。如果只有 channel 属性存在,则接收到的消息将作为特定同步场景的一部分发送到那里。如果只有 expression 属性存在,并且表达式的结果是非空值,则会生成并发送一条以结果为负载的消息到默认通道 (NullChannel) 并在日志中显示(在 DEBUG 级别)。如果你想让评估结果发送到特定的通道,请添加 channel 属性。如果表达式的结果是 null 或 void,则不会生成消息。
有关事务同步的更多信息,请参阅事务同步。
从 5.5 版本开始,MongoDbMessageSource 可以使用 updateExpression 配置,该表达式必须评估为带有 MongoDb update 语法的 String 或 org.springframework.data.mongodb.core.query.Update 实例。它可以作为上述后处理程序的替代方案,并修改从集合中获取的实体,因此它们在下一个轮询周期中不会再次从集合中拉取(假设更新更改了查询中使用的一些值)。仍然建议使用事务来实现执行隔离和数据一致性,当集群中使用多个针对同一集合的 MongoDbMessageSource 实例时。
MongoDB 变更流 inbound 通道适配器
从 5.3 版本开始,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer —— 这是一个响应式的 MessageProducerSupport 实现,用于 Spring Data 的 ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API。此组件默认生成一个以 ChangeStreamEvent 作为负载 body 的消息 Flux,并包含一些与更改流相关的头部信息(参见 MongoHeaders)。建议将此 MongoDbChangeStreamMessageProducer 与 FluxMessageChannel 结合使用,作为 outputChannel 以实现按需订阅和下游事件消费。
这个通道适配器的 Java DSL 配置可能看起来像这样:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}
当 MongoDbChangeStreamMessageProducer 被停止,或者下游取消订阅,或者 MongoDb 变更流产生一个 OperationType.INVALIDATE 时,Publisher 就完成了。通道适配器可以再次启动,并创建一个新的源数据 Publisher,并且它会自动在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。如果需要从其他地方消费变更流事件,则可以在启动之间重新配置此通道适配器以适应新选项。
有关 Spring Data MongoDb 中 change stream 的更多信息,请参阅文档。
MongoDB 输出通道适配器
MongoDB outbound 通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />
如上所述的配置所示,您可以使用 outbound-channel-adapter 元素配置 MongoDB 外发通道适配器,为各种属性提供值,例如:
- 
collection-name或collection-name-expression:标识要使用的 MongoDb 集合的名称。
- 
mongo-converter:引用o.s.data.mongodb.core.convert.MongoConverter的一个实例,该实例有助于将原始 Java 对象转换为 JSON 文档表示形式。
- 
mongodb-factory:引用o.s.data.mongodb.MongoDbFactory的一个实例。
- 
mongo-template:引用o.s.data.mongodb.core.MongoTemplate的一个实例。注意:不能同时设置 mongo-template 和 mongodb-factory。
- 
其他在所有入站适配器中通用的属性(例如 'channel')。 
前面的例子相对简单和静态,因为它有一个 collection-name 的字面值。有时,您可能需要根据某些条件在运行时更改此值。为此,请使用 collection-name-expression,其中提供的表达式是任何有效的 SpEL 表达式。
MongoDB 外发网关
版本 5.0 引入了 MongoDB 外发网关。它允许您通过向请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息有效负载和标题来指定查询和集合名称,如下例所示:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
public class MongoDbJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }
    @Autowired
    private MongoDbFactory;
    @Autowired
    private MongoConverter;
    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }
    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }
}
class MongoDbKotlinApplication {
    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory
    @Autowired
    lateinit var mongoConverter: MongoConverter
    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }
    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }
}
@SpringBootApplication
public class MongoDbJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }
    @Autowired
    private MongoDbFactory mongoDbFactory;
    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }
    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
你可以使用以下属性与 MongoDB outbound Gateway:
- 
collection-name或collection-name-expression:标识要使用的 MongoDB 集合的名称。
- 
mongo-converter:引用o.s.data.mongodb.core.convert.MongoConverter的一个实例,用于将原始 Java 对象转换为 JSON 文档表示。
- 
mongodb-factory:引用o.s.data.mongodb.MongoDbFactory的一个实例。
- 
mongo-template:引用o.s.data.mongodb.core.MongoTemplate的一个实例。注意:你不能同时设置mongo-template和mongodb-factory。
- 
entity-class:传递给 MongoTemplate 中find(..)和findOne(..)方法的实体类的全限定名。如果未提供此属性,默认值是org.bson.Document。
- 
query或query-expression:指定 MongoDB 查询。请参阅 MongoDB 文档以获取更多查询示例。
- 
collection-callback:引用org.springframework.data.mongodb.core.CollectionCallback的一个实例。自 5.0.11 起,更推荐使用o.s.i.mongodb.outbound.MessageCollectionCallback的实例,并带有请求消息上下文。有关更多信息,请参阅其 Javadoc。注意:你不能同时拥有collection-callback和任何查询属性。
作为 query 和 query-expression 属性的替代,你可以通过使用 collectionCallback 属性引用 MessageCollectionCallback 函数式接口的实现来指定其他数据库操作。以下示例指定了一个计数操作:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}
MongoDB 反应式通道适配器
从 5.3 版本开始,提供了 ReactiveMongoDbStoringMessageHandler 和 ReactiveMongoDbMessageSource 实现。它们基于 Spring Data 的 ReactiveMongoOperations,并需要一个 org.mongodb:mongodb-driver-reactivestreams 依赖。
ReactiveMongoDbStoringMessageHandler 是 ReactiveMessageHandler 的一个实现,在集成流定义涉及反应式流组合时,框架中对此有原生支持。有关更多信息,请参阅 ReactiveMessageHandler。
从配置的角度来看,与其他许多标准通道适配器没有区别。例如,使用 Java DSL,这样的通道适配器可以这样使用:
请注意,具体的实现细节会根据实际的开发环境和需求有所不同。
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在本示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到名为 data 的默认集合中。 实际操作将根据需要从内部创建的 ReactiveStreamsConsumer 中的反应式流组合中执行。
ReactiveMongoDbMessageSource 是基于提供的 ReactiveMongoDatabaseFactory 或 ReactiveMongoOperations 和 MongoDb 查询(或表达式)的 AbstractMessageSource 实现,根据 expectSingleResult 选项调用 find() 或 findOne() 操作,并期望 entityClass 类型以转换查询结果。查询执行和结果评估是在需要时进行的,当生成的消息的有效负载中的 Publisher (Flux 或 Mono 根据 expectSingleResult 选项) 被订阅时。框架可以在下游使用拆分器和 FluxMessageChannel 时自动订阅此类有效负载(本质上是 flatMap)。否则,由目标应用程序负责订阅在下游端点中轮询的发布者。
使用 Java DSL,这样的通道适配器可以配置为:
// 示例代码,根据实际情况调整
@Bean
public IntegrationFlow myChannelAdapter() {
    return IntegrationFlows.from(() -> "payload",
        e -> e.poller(Pollers.fixedRate(5000)))
        .handle(logger())
        .get();
}
请注意,以上代码仅作为示例,具体实现可能因需求不同而有所变化。
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}
从 5.5 版本开始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有与阻塞型 MongoDbMessageSource 相同的功能。更多信息请参阅 MongoDB 入站通道适配器 和 AbstractMongoDbMessageSourceSpec 的 JavaDocs。