reactive() 端点
reactive() Endpoint
从 5.5 版本开始,ConsumerEndpointSpec 提供了一个带有可选自定义器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 的 reactive() 配置属性。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,独立于输入通道类型,后者通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。提供的函数从 Flux.transform() 操作符中使用,以定制(如 publishOn()、log()、doOnNext() 等)来自输入通道的反应式流源。
以下示例演示了如何独立于最终订阅者和生产者更改 DirectChannel 的发布线程,而不依赖于输入通道:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}
更多信息请参见 Reactive Streams 支持。