操作符 gateway()
-
gateway(String requestChannel)
通过名称向某个端点的输入通道发送消息; -
gateway(MessageChannel requestChannel)
通过直接注入向某个端点的输入通道发送消息; -
gateway(IntegrationFlow flow)
向提供的IntegrationFlow
的输入通道发送消息。
所有这些都有一个变体,带有第二个 Consumer<GatewayEndpointSpec>
参数,用于配置目标 GatewayMessageHandler
和相应的 AbstractEndpoint
。此外,基于 IntegrationFlow
的方法允许调用现有的 IntegrationFlow
bean,或者通过内联 lambda 声明流作为子流,以实现 IntegrationFlow
函数式接口,或者将其提取到 private
方法中以获得更清晰的代码风格:
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
important
如果下游流不总是返回回复,你应该将 requestTimeout
设置为 0 以防止无限期挂起调用线程。在那种情况下,流将在那一点结束,并释放线程以进行进一步的工作。