跳到主要内容
版本:7.0.2

网关操作符 gateway()

DeepSeek V3 中英对照 Operator gateway()

gateway() 操作符在 IntegrationFlow 定义中是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它与 <chain> 定义中的嵌套 <gateway> 组件扮演相同的角色(参见从链内调用链),并使得流程更加简洁明了。从逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。该操作符针对不同目标有多个重载版本:

  • gateway(String requestChannel) 通过名称向某个端点的输入通道发送消息;

  • gateway(MessageChannel requestChannel) 通过直接注入的方式向某个端点的输入通道发送消息;

  • gateway(IntegrationFlow flow) 向提供的 IntegrationFlow 的输入通道发送消息。

所有这些方法都有一个带有第二个 Consumer<GatewayEndpointSpec> 参数的变体,用于配置目标 GatewayMessageHandler 和相应的 AbstractEndpoint。此外,基于 IntegrationFlow 的方法允许调用现有的 IntegrationFlow bean,或通过 IntegrationFlow 函数式接口的内联 lambda 将流声明为子流,也可以将其提取到 private 方法中以实现更简洁的代码风格:

@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}

private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
important

如果下游流并不总是返回回复,您应将 requestTimeout 设置为 0,以防止调用线程无限期挂起。在这种情况下,流将在该点结束,线程将被释放以处理后续工作。

从版本 6.5 开始,gateway() 操作符完全支持 async(true) 行为。在内部,为 GatewayProxyFactoryBean 提供了一个 AsyncRequestReplyExchanger 服务接口。由于 AsyncRequestReplyExchanger 契约是一个 CompletableFuture<Message<?>>,整个请求-回复过程以异步方式执行。