跳到主要内容
版本:7.0.2

消息网关

DeepSeek V3 中英对照 Messaging Gateways

网关隐藏了Spring Integration提供的消息API。它让您的应用程序业务逻辑无需了解Spring Integration API。通过使用通用网关,您的代码仅需与简单接口交互。

进入 GatewayProxyFactoryBean

如前所述,最好不依赖于 Spring Integration API——包括网关类。为此,Spring Integration 提供了 GatewayProxyFactoryBean,它可以为任何接口生成代理,并在内部调用如下所示的网关方法。通过使用依赖注入,您可以将该接口暴露给您的业务方法。

以下示例展示了一个可用于与 Spring Integration 交互的接口:

public interface Cafe {

void placeOrder(Order order);

}

Gateway XML 命名空间支持

命名空间支持同样被提供。它允许你将一个接口配置为服务,如下例所示:

<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>

定义此配置后,cafeService 现在可以被注入到其他 Bean 中,而调用 Cafe 接口代理实例上方法的代码无需感知 Spring Integration API。有关使用 gateway 元素的示例(在 Café 演示中),请参阅“示例”附录。

上述配置中的默认值适用于网关接口上的所有方法。如果未指定回复超时,调用线程将等待回复 30 秒。请参阅未收到响应时的网关行为

默认值可以针对单个方法进行覆盖。请参阅使用注解和XML进行网关配置

设置默认回复通道

通常,您无需指定 default-reply-channel,因为网关会自动创建一个临时的匿名回复通道来监听回复。但在某些情况下,您可能需要定义 default-reply-channel(或适配器网关中的 reply-channel,例如 HTTP、JMS 等)。

为了提供一些背景信息,我们简要讨论网关的一些内部工作原理。网关会创建一个临时的点对点回复通道。该通道是匿名的,并以 replyChannel 的名称添加到消息头中。当提供一个显式的 default-reply-channel(对于远程适配器网关,使用 reply-channel)时,您可以指向一个发布-订阅通道,之所以这样命名,是因为您可以向其中添加多个订阅者。在内部,Spring Integration 会在临时的 replyChannel 和显式定义的 default-reply-channel 之间创建一个桥接。

假设你希望你的回复不仅发送到网关,还要发送给其他消费者。在这种情况下,你需要实现两件事:

  • 一个您可以订阅的命名通道

  • 该通道为发布-订阅通道

网关使用的默认策略无法满足这些需求,因为添加到消息头中的回复通道是匿名且点对点的。这意味着其他订阅者无法获取该通道的句柄,即使能够获取,由于通道具有点对点特性,也只有一个订阅者能收到消息。通过定义 default-reply-channel,您可以指向您选择的通道。在本例中,该通道是一个 publish-subscribe-channel。网关会创建一个从该通道到存储在消息头中的临时匿名回复通道的桥接。

您可能还需要通过拦截器(例如,wiretap)显式提供一个回复通道用于监控或审计。要配置通道拦截器,您需要一个命名通道。

备注

从版本 5.4 开始,当网关方法的返回类型为 void 时,如果未显式提供 replyChannel 头信息,框架会将其填充为 nullChannel bean 引用。这使得下游流程中任何可能的回复都能被丢弃,从而满足单向网关的约定。

通过注解和 XML 配置网关

考虑以下示例,该示例通过添加 @Gateway 注解扩展了之前的 Cafe 接口示例:

public interface Cafe {

@Gateway(requestChannel="orders")
void placeOrder(Order order);

}

@Header 注解允许您添加被解释为消息头的值,如下例所示:

public interface FileWriter {

@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您倾向于使用 XML 方式配置网关方法,可以在网关配置中添加 method 元素,如下例所示:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>

您也可以使用XML为每个方法调用提供独立的头部信息。如果希望设置的头部本质上是静态的,且不想通过@Header注解将其嵌入网关的方法签名中,这种方式会很有用。例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单次报价或全部报价)来影响贷款报价的聚合方式。虽然可以通过评估调用了哪个网关方法来确定请求类型,但这会违反关注点分离原则(因为方法是Java的产物)。然而,在消息架构中,通过消息头部表达意图(元信息)是很自然的方式。以下示例展示了如何为两个方法分别添加不同的消息头部:

<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>

在前面的示例中,根据网关的方法,为 'RESPONSE_TYPE' 标头设置了不同的值。

important

如果你在 <int:method/> 中指定了 requestChannel,同时在 @Gateway 注解中也指定了,那么注解中的值将优先生效。

备注

如果在 XML 中指定了一个无参数网关,并且接口方法同时具有 @Payload@Gateway 注解(在 <int:method/> 元素中包含 payloadExpressionpayload-expression),那么 @Payload 的值将被忽略。

表达式与“全局”标头

<header/> 元素支持使用 expression 作为 value 的替代方案。SpEL 表达式会被求值以确定标头的值。从版本 5.2 开始,求值上下文的 #root 对象是一个 MethodArgsHolder,它提供了 getMethod()getArgs() 访问器。例如,如果您希望基于简单的方法名进行路由,可以使用以下表达式添加标头:method.name

备注

java.reflect.Method 是不可序列化的。如果稍后对消息进行序列化,带有 method 表达式的头部信息将会丢失。因此,在这些情况下,你可能希望使用 method.namemethod.toString()toString() 方法提供了该方法的 String 表示形式,包括参数和返回类型。

自 3.0 版本起,可以定义 <default-header/> 元素,以便为网关生成的所有消息添加头部,无论调用的是哪个方法。为特定方法定义的头部优先级高于默认头部。此处为特定方法定义的头部会覆盖服务接口中的任何 @Header 注解。然而,默认头部不会覆盖服务接口中的任何 @Header 注解。

网关现在也支持 default-payload-expression,该表达式将应用于所有方法(除非被覆盖)。

将方法参数映射到消息

使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和头部)。当没有使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个应该映射到头部。考虑以下示例:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,惯例是将第一个参数映射为载荷(只要它不是 Map 类型),而第二个参数的内容则成为头部信息。

在第二种情况(或当参数 thing1 的参数为 Map 时的第一种情况)下,框架无法确定哪个参数应作为负载。因此,映射会失败。这通常可以通过使用 payload-expression@Payload 注解或 @Headers 注解来解决。

或者(以及当约定失效时),您可以完全负责将方法调用映射到消息。为此,请实现一个 MethodArgsMessageMapper,并通过 mapper 属性将其提供给 <gateway/>。该映射器会映射一个 MethodArgsHolder,这是一个简单的类,它包装了 java.reflect.Method 实例和一个包含参数的 Object[]。当提供自定义映射器时,网关不允许使用 default-payload-expression 属性和 <default-header/> 元素。同样,任何 <method/> 元素上也不允许使用 payload-expression 属性和 <header/> 元素。

映射方法参数

以下示例展示了如何将方法参数映射到消息,并提供了一些无效配置的示例:

public interface MyGateway {

void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);

@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // // <1>

// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

// invalid
void twoPayloads(@Payload String s1, @Payload String s2);

// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
  • 请注意,在此示例中,SpEL 变量 #this 指向参数——在本例中,即 s 的值。

XML 的等效写法略有不同,因为方法参数没有 #this 上下文。不过,表达式可以通过 MethodArgsHolder 根对象的 args 属性来引用方法参数(更多信息请参阅表达式与“全局”头信息),如下例所示:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>

@MessagingGateway 注解

从 4.0 版本开始,网关服务接口可以通过 @MessagingGateway 注解进行标记,而无需定义 <gateway /> XML 元素进行配置。以下两个示例对比了配置同一网关的两种方法:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {

@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);

@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);

String echoViaDefault(String payload);

}
important

与 XML 版本类似,当 Spring Integration 在组件扫描过程中发现这些注解时,它会创建带有其消息传递基础设施的 proxy 实现。要执行此扫描并在应用程序上下文中注册 BeanDefinition,请将 @IntegrationComponentScan 注解添加到 @Configuration 类。标准的 @ComponentScan 基础设施不处理接口。因此,我们引入了自定义的 @IntegrationComponentScan 逻辑来查找接口上的 @MessagingGateway 注解,并为它们注册 GatewayProxyFactoryBean 实例。另请参阅 注解支持

除了使用 @MessagingGateway 注解,你还可以在服务接口上使用 @Profile 注解,以便在相应配置文件未激活时避免创建该 bean。

从6.0版本开始,带有 @MessagingGateway 注解的接口也可以像任何Spring @Component 定义一样,用 @Primary 注解标记,以实现相应的配置逻辑。

从 6.0 版本开始,@MessagingGateway 接口可以在标准的 Spring @Import 配置中使用。这可以作为 @IntegrationComponentScan 或手动定义 AnnotationGatewayProxyFactoryBean bean 的替代方案。

自版本 6.0 起,@MessagingGateway 被元注解为 @MessageEndpoint,其 name() 属性实质上与 @Component.value() 互为别名。这样一来,网关代理的 Bean 名称生成策略便与 Spring 标准注解配置中对扫描和导入组件的处理方式保持一致。默认的 AnnotationBeanNameGenerator 可通过全局设置 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 或在 @IntegrationComponentScan.nameGenerator() 属性中进行覆盖。

备注

如果你没有 XML 配置,则至少需要一个 @Configuration 类上带有 @EnableIntegration 注解。更多信息请参阅 配置与 @EnableIntegration

调用无参数方法

当调用没有参数的 Gateway 接口方法时,默认行为是从 PollableChannel 接收一个 Message

然而,有时您可能希望触发无参数方法,以便与下游不需要用户提供参数的其他组件进行交互,例如触发无参数的 SQL 调用或存储过程。

为实现发送与接收语义,必须提供有效载荷。生成有效载荷时,接口方法参数并非必需。您可以使用 @Payload 注解,或在 XML 配置的 method 元素中使用 payload-expression 属性。以下列举了几种可能的有效载荷示例:

  • 一个字符串字面量

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示了如何使用 @Payload 注解:

public interface Cafe {

@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();

}

你也可以使用 @Gateway 注解。

public interface Cafe {

@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();

}
备注

如果两个注解同时存在(并且提供了 payloadExpression),则 @Gateway 优先。

如果一个方法没有参数也没有返回值,但包含有效负载表达式,则将其视为仅发送操作。

调用 default 方法

网关代理接口也可以包含default方法,从5.3版本开始,框架会向代理中注入一个DefaultMethodInvokingMethodInterceptor,通过java.lang.invoke.MethodHandle方式调用default方法,而非通过代理方式。JDK中的接口(例如java.util.function.Function)仍可用于网关代理,但由于Java内部安全限制,针对JDK类进行MethodHandles.Lookup实例化时,其default方法无法被调用。这些方法也可以通过显式在方法上使用@Gateway注解,或在@MessagingGateway注解上设置proxyDefaultMethods属性,或在XML组件<gateway>中进行配置,从而恢复代理(但会丢失其实现逻辑,同时恢复先前的网关代理行为)。

错误处理

网关调用可能会产生错误。默认情况下,下游发生的任何错误都会在网关方法调用时“原样”重新抛出。例如,考虑以下简单流程:

gateway -> service-activator

如果服务激活器调用的服务抛出了 MyException(例如),框架会将其包装在 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。因此,框架执行的任何日志记录都具备完整的失败上下文。默认情况下,当网关捕获到异常时,MyException 会被解包并抛给调用者。您可以在网关方法声明上配置 throws 子句,以匹配原因链中的特定异常类型。例如,如果您希望捕获完整的 MessagingException 并获取下游错误原因的所有消息传递信息,您的网关方法应类似于以下形式:

public interface MyGateway {

void performProcess() throws MessagingException;

}

由于我们鼓励使用POJO编程,您可能不希望将调用者暴露给消息传递基础设施。

如果你的网关方法没有 throws 子句,网关会遍历异常原因链,寻找一个非 MessagingExceptionRuntimeException。如果未找到,框架将抛出 MessagingException。如果前述讨论中的 MyException 的根因为 SomeOtherException,而你的方法声明了 throws SomeOtherException,网关会进一步解包该异常并将其抛给调用方。

当声明网关时未指定 service-interface,系统将使用内部框架接口 RequestReplyExchanger

考虑以下示例:

public interface RequestReplyExchanger {

Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,这个 exchange 方法没有 throws 子句,因此异常会被解包。如果你使用这个接口并希望恢复之前的解包行为,请改用自定义的 service-interface 或自行访问 MessagingExceptioncause

然而,您可能希望记录错误而非传播它,或者您可能希望将异常视为有效回复(通过将其映射为符合调用方理解的某种“错误消息”约定的消息)。为实现此功能,网关通过支持 error-channel 属性,提供了专用于错误处理的消息通道支持。在以下示例中,一个“转换器”从 Exception 创建回复 Message

<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用方的有效负载。如果需要,您可以在这样的“错误流”中执行更复杂的操作,可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、过滤器等。然而,大多数情况下,一个简单的 'transformer' 应该就足够了。

或者,您可能只想记录异常(或异步将其发送到某处)。如果提供单向流,则不会向调用方返回任何内容。如果想完全抑制异常,可以引用全局 nullChannel(本质上是一种 /dev/null 方法)。最后,如上所述,如果未定义 error-channel,则异常会照常传播。

当你使用 @MessagingGateway 注解时(参见 [](#messaging-gateway-annotation)@MessagingGateway` 注解``),你可以使用 errorChannel 属性。

从版本 5.0 开始,当你使用返回类型为 void 的网关方法(单向流)时,error-channel 引用(如果已提供)会被填充到每个发送消息的标准 errorChannel 头中。此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认的全局 errorChannel 异常发送行为。在此之前,你必须使用 @GatewayHeader 注解或 <header> 元素手动指定 errorChannel 头。对于带有异步流的 void 方法,error-channel 属性会被忽略,错误消息会被发送到默认的 errorChannel

important

通过简单的 POJI 网关暴露消息系统确实带来了好处,但“隐藏”底层消息系统的现实也确实需要付出代价,因此有一些事项需要考虑。我们希望 Java 方法能尽快返回,而不是在调用者等待其返回时(无论是 void、返回值还是抛出的异常)无限期地挂起。当常规方法被用作消息系统前的代理时,我们必须考虑到底层消息系统潜在的异步特性。这意味着,由网关发起的消息有可能被过滤器丢弃,从而永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,从而无法提供回复(因为我们不生成空消息)。换句话说,多种情况都可能导致回复消息永远不会到来。这在消息系统中是完全正常的。然而,请思考这对网关方法的影响。网关方法的输入参数已被整合到消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,你可能希望确保每次网关调用都始终有一个回复消息。否则,如果 reply-timeout 设置为负值,你的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方式是依赖默认的 reply-timeout,即 30 秒。这样,网关挂起的时间不会超过 reply-timeout 指定的时间,如果超时确实发生,则返回 'null'。最后,你可能需要考虑在下游设置标志,例如在服务激活器上设置 'requires-reply',或在过滤器上设置 'throw-exceptions-on-rejection'。这些选项将在本章最后一节详细讨论。

备注

如果下游流返回一个 ErrorMessage,其 payload(一个 Throwable)会被视为常规的下游错误。如果配置了 error-channel,它将被发送到错误流。否则,该 payload 会被抛给网关的调用者。类似地,如果 error-channel 上的错误流返回一个 ErrorMessage,其 payload 也会被抛给调用者。对于任何带有 Throwable payload 的消息,同样适用。这在需要将 Exception 直接传播给调用者的异步场景中非常有用。为此,你可以返回一个 Exception(作为某个服务的 reply)或直接抛出它。通常,即使在异步流中,框架也会负责将下游流抛出的异常传播回网关。TCP Client-Server Multiplex 示例演示了将异常返回给调用者的两种技术。它通过使用带有 group-timeoutaggregator(参见 聚合器与组超时)以及在丢弃流上回复 MessagingTimeoutException,来模拟等待线程的套接字 IO 错误。

网关超时

网关具有两个超时属性:requestTimeoutreplyTimeout。请求超时仅适用于可能阻塞的通道,例如已满的有界 QueueChannelreplyTimeout 值表示网关等待回复或返回 null 的时长,默认值为无限等待。

超时设置可以作为所有网关方法的默认值(通过 defaultRequestTimeoutdefaultReplyTimeout),也可以在 @MessagingGateway 接口注解上进行配置。单个方法可以通过 <method/> 子元素或 @Gateway 注解来覆盖这些默认值。

从 5.0 版本开始,超时时间可以定义为表达式,如下例所示:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文包含一个 BeanResolver(使用 @someBean 来引用其他 Bean),并且 #root 对象的 args 数组属性是可用的。有关此根对象的更多信息,请参阅表达式与“全局”头信息。在使用 XML 进行配置时,超时属性可以是长整型值或 SpEL 表达式,如下例所示:

<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>

异步网关

作为一种模式,消息网关提供了一种很好的方式来隐藏消息传递特定的代码,同时仍然暴露消息传递系统的全部功能。如前文所述GatewayProxyFactoryBean 提供了一种便捷的方法,通过服务接口暴露代理,使您能够基于 POJO 访问消息传递系统(基于您自己领域中的对象、基本类型/字符串或其他对象)。然而,当网关通过返回值的简单 POJO 方法暴露时,这意味着对于每个请求消息(在方法调用时生成),必须有一个回复消息(在方法返回时生成)。由于消息传递系统本质上是异步的,您可能并不总是能够保证“每个请求都会有一个回复”的约定。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否期望回复或回复需要多长时间到达时,这提供了一种便捷的方式来启动流程。

为了处理这类场景,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

从XML配置来看,没有任何变化,您仍然可以像定义常规网关一样定义异步网关,如下例所示:

<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>

然而,网关接口(一种服务接口)略有不同,如下所示:

public interface MathServiceGateway {

Future<Integer> multiplyByTwo(int i);

}

如前面的示例所示,网关方法的返回类型是 Future。当 GatewayProxyFactoryBean 发现网关方法的返回类型是 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。这就是差异所在。调用此类方法总是会立即返回一个 Future 实例。然后,您可以按照自己的节奏与 Future 交互以获取结果、取消操作等。此外,与任何其他使用 Future 实例的情况一样,调用 get() 可能会揭示超时、执行异常等情况。以下示例展示了如何使用从异步网关返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);

如需更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。

此外,从版本6.5开始,Java DSL的gateway()操作符完全支持async(true)行为。在内部,为GatewayProxyFactoryBean提供了一个AsyncRequestReplyExchanger服务接口。由于AsyncRequestReplyExchanger的契约是CompletableFuture<Message<?>>,整个请求-回复过程以异步方式执行。这种行为在某些场景下非常有用,例如在拆分器-聚合器场景中,当需要为每个项目调用另一个流程时。然而,顺序并不重要——重要的是所有处理完成后在聚合器上进行的组收集。

AsyncTaskExecutor

默认情况下,当任何返回类型为 Future 的网关方法提交内部 AsyncInvocationTask 实例时,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。然而,<gateway/> 元素配置中的 async-executor 属性允许您引用 Spring 应用上下文中可用的任何 java.util.concurrent.Executor 实现。

(默认的)SimpleAsyncTaskExecutor 支持 FutureCompletableFuture 两种返回类型。请参阅 CompletableFuture。尽管存在默认的执行器,但通常建议提供一个外部执行器,以便在日志中识别其线程(使用 XML 配置时,线程名称基于执行器的 bean 名称),如下例所示:

@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);

}

若希望返回不同的 Future 实现,您可以通过提供自定义执行器,或完全禁用执行器并从下游流的回复消息负载中返回 Future。要禁用执行器,请在 GatewayProxyFactoryBean 中将其设置为 null(通过使用 setAsyncTaskExecutor(null))。在使用 XML 配置网关时,请使用 async-executor=""。当使用 @MessagingGateway 注解进行配置时,请使用类似以下代码:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);

}
important

如果返回类型是特定的具体 Future 实现,或是配置的执行器不支持的其他子接口,则流程将在调用者线程上运行,并且流程必须在回复消息负载中返回所需的类型。

CompletableFuture

从 4.2 版本开始,网关方法现在可以返回 CompletableFuture<?>。返回此类型时有两种操作模式:

  • 当提供了异步执行器且返回类型恰好为 CompletableFuture(而非其子类)时,框架会在该执行器上运行任务,并立即向调用方返回一个 CompletableFuture。此时会使用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 来创建该 Future。

  • 当异步执行器被显式设置为 null 且返回类型为 CompletableFuture,或者返回类型是 CompletableFuture 的子类时,流程将在调用方线程上执行。在此场景下,下游流程应返回一个适当类型的 CompletableFuture

使用场景

在以下场景中,调用线程会立即返回一个 CompletableFuture<Invoice>,当下游流程向网关回复(携带一个 Invoice 对象)时,该 CompletableFuture 将被完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下场景中,当下游流程将发票作为网关回复的有效载荷提供时,调用线程会返回一个 CompletableFuture<Invoice>。当发票准备就绪时,其他一些流程必须在未来完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />

在以下场景中,当下游流程将发票作为网关回复的有效载荷提供时,调用线程会返回一个 CompletableFuture<Invoice>。当发票准备就绪时,其他一些流程必须在未来完成。如果启用了 DEBUG 日志记录,则会发出日志条目,表明异步执行器无法用于此场景。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 实例可用于对回复执行额外操作,如下例所示:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

从 5.0 版本开始,GatewayProxyFactoryBean 允许在网关接口方法中使用 Project Reactor,通过 Mono<T> 返回类型实现。内部的 AsyncInvocationTask 被包装在 Mono.fromCallable() 中。

Mono 可用于稍后检索结果(类似于 Future<?>),或者您可以通过调度器消费它,当结果返回给网关时调用您的 Consumer

important

Mono 不会被框架立即刷新。因此,在网关方法返回之前,底层的消息流不会启动(这与使用 Future<?> Executor 任务的情况不同)。当 Mono 被订阅时,流才会启动。或者,Mono(作为一个“可组合的”类型)可能是 Reactor 流的一部分,此时 subscribe() 与整个 Flux 相关。以下示例展示了如何使用 Project Reactor 创建网关:

@MessagingGateway
public interface TestGateway {

@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}

在这种场景下,网关可以用于处理数据Flux的某些服务中:

@Autowired
TestGateway testGateway;

public void handleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}

另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续执行,当流程完成时,handleInvoice() 将被调用。

另请参阅 Kotlin 协程 以获取更多信息。

下游流返回异步类型

正如上文 AsyncTaskExecutor 部分所述,若希望下游组件返回包含异步负载(FutureMono 等)的消息,必须将异步执行器显式设置为 null(使用 XML 配置时设为 "")。随后流程将在调用方线程上执行,其结果可在稍后获取。

异步 void 返回类型

消息网关方法可以这样声明:

@MessagingGateway
public interface MyGateway {

@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);

}

但下游异常不会传播回调用方。为确保下游流调用的异步行为以及异常向调用方的传播,从版本6.0开始,框架提供了对 Future<Void>Mono<Void> 返回类型的支持。该用例与之前描述的普通 void 返回类型的发送即忘行为类似,但区别在于流执行是异步进行的,并且返回的 Future(或 Mono)会根据 send 操作的结果以 null 或异常方式完成。

备注

如果 Future<Void> 是确切的下游流回复,那么网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置,使用 AnnotationConstants.NULL),并且 send 部分在生产者线程上执行。回复部分取决于下游流的配置。这样,目标应用程序就能正确地生成 Future<Void> 回复。Mono 的使用场景已经超出了框架的线程控制范围,因此将 asyncExecutor 设置为 null 没有意义。在这种情况下,请求-回复网关操作的结果 Mono<Void> 必须配置为网关方法的 Mono<?> 返回类型。

网关在无响应到达时的行为

正如之前所述,网关提供了一种通过POJO方法调用来与消息系统交互的便捷方式。然而,典型的方法调用通常期望总是有返回值(即使抛出异常),但这并不总能与消息交换形成一一对应关系(例如,回复消息可能无法到达——这相当于方法没有返回)。

本节其余部分将介绍各种场景以及如何使网关的行为更具可预测性。某些属性可以配置以使同步网关行为更可预测,但其中一些可能并不总是按预期工作。其中之一是 reply-timeout(方法级别)或 default-reply-timeout(网关级别)。我们将研究 reply-timeout 属性,探讨它在各种场景中如何影响同步网关的行为以及其局限性。我们将分析单线程场景(所有下游组件通过直接通道连接)和多线程场景(例如,下游可能存在轮询或执行器通道,从而打破单线程边界)。

长时运行进程下游

Sync Gateway,单线程

如果下游组件仍在运行(可能是因为无限循环或服务缓慢),设置 reply-timeout 将不会生效,网关方法调用会一直等待,直到下游服务退出(通过返回结果或抛出异常)。

Sync Gateway,多线程

在多线程消息流中,如果下游组件仍在运行(可能由于无限循环或服务响应缓慢),设置 reply-timeout 会产生效果:一旦达到超时时间,网关方法调用将允许返回,因为 GatewayProxyFactoryBean 会在回复通道上轮询并等待消息直至超时。但需注意,如果在实际回复生成前已达到超时,可能导致网关方法返回 'null'。您应当理解:即使回复消息最终生成,也可能在网关方法调用返回后才被发送到回复通道,因此必须充分考虑这一特性并在设计消息流时予以考虑。

另请参阅 errorOnTimeout 属性,当发生超时时,它会抛出 MessageTimeoutException 而不是返回 null

下游组件返回 'null'

Sync Gateway — 单线程

如果下游组件返回 nullreply-timeout 被配置为负值,网关方法调用将无限期挂起,除非在可能返回 null 的下游组件(例如服务激活器)上设置了 requires-reply 属性。在这种情况下,将抛出异常并传播到网关。

Sync Gateway — 多线程

行为与之前的情况相同。

下游组件返回签名是 'void' 而网关方法签名是非 void

Sync Gateway — 单线程

如果下游组件返回'void'且reply-timeout被配置为负值,网关方法调用将无限期挂起。

Sync Gateway — 多线程

行为与之前的情况相同。

下游组件导致运行时异常

Sync Gateway — 单线程

如果下游组件抛出运行时异常,该异常会通过错误消息传播回网关并重新抛出。

Sync Gateway — 多线程

行为与之前的情况相同。

important

您需要理解,默认情况下,reply-timeout 是无限制的。因此,如果您将 reply-timeout 设置为负值,您的网关方法调用可能会无限期挂起。所以,为确保安全,您应该分析您的流程,即使这些场景发生的可能性极小,也应将 reply-timeout 属性设置为一个“安全”值。默认情况下是 30 秒。更好的做法是,您可以将下游组件的 requires-reply 属性设置为 'true',以确保及时响应,因为一旦该下游组件内部返回 null,就会抛出异常。然而,您也应该意识到,在某些场景下(参见第一种情况),reply-timeout 并无帮助。这意味着分析您的消息流并决定何时使用同步网关而非异步网关同样重要。如先前所述,后一种情况涉及定义返回 Future 实例的网关方法。这样您就能保证收到返回值,并对调用结果拥有更精细的控制。此外,在处理路由器时,您应该记住,将 resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。同样,在处理过滤器时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,最终流程的行为类似于包含一个带有 'requires-reply' 属性的服务激活器。换句话说,这有助于确保从网关方法调用中获得及时响应。

important

您需要理解计时器在线程返回到网关时启动——即当流程完成或消息被传递给另一个线程时。此时,调用线程开始等待回复。如果流程是完全同步的,回复会立即可用。对于异步流程,线程最多会等待这么长时间。

自6.2版本起,MessagingGatewaySupport的内部MethodInvocationGateway扩展的errorOnTimeout属性已在@MessagingGatewayGatewayEndpointSpec中公开。此选项的含义与端点摘要章节末尾解释的任何入站网关完全相同。换句话说,将此选项设置为true时,当接收超时耗尽时,会从发送-接收网关操作中抛出MessageTimeoutException,而不是返回null

请参阅 Java DSL 章节中的 IntegrationFlow 作为网关,了解通过 IntegrationFlow 定义网关的选项。