跳到主要内容
版本:7.0.2

reactive() 端点

DeepSeek V3 中英对照 The reactive() Endpoint The reactive() Endpoint

从 5.5 版本开始,ConsumerEndpointSpec 提供了一个 reactive() 配置属性,其中包含一个可选的定制器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此选项将目标端点配置为 ReactiveStreamsConsumer 实例,与输入通道类型无关,输入通道会通过 IntegrationReactiveUtils.messageChannelToFlux() 转换为 Flux。所提供的函数通过 Flux.transform() 操作符使用,用于对来自输入通道的响应式流源进行定制(例如 publishOn()log()doOnNext() 等)。

以下示例演示了如何独立于最终订阅者和生产者,更改从输入通道到该 DirectChannel 的发布线程:

@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}

更多信息请参阅 Reactive Streams 支持