reactive()
端点
reactive()
Endpoint
从 5.5 版本开始,ConsumerEndpointSpec
提供了一个带有可选自定义器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
的 reactive()
配置属性。此选项将目标端点配置为 ReactiveStreamsConsumer
实例,独立于输入通道类型,后者通过 IntegrationReactiveUtils.messageChannelToFlux()
转换为 Flux
。提供的函数从 Flux.transform()
操作符中使用,以定制(如 publishOn()
、log()
、doOnNext()
等)来自输入通道的反应式流源。
以下示例演示了如何独立于最终订阅者和生产者更改 DirectChannel
的发布线程,而不依赖于输入通道:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
更多信息请参见 Reactive Streams 支持。