跳到主要内容

消息网关

QWen Plus 中英对照 Messaging Gateways

网关隐藏了由 Spring Integration 提供的消息 API。它使您的应用程序业务逻辑无需了解 Spring Integration API。通过使用通用网关,您的代码只与一个简单的接口进行交互。

进入 GatewayProxyFactoryBean

如前所述,不依赖于 Spring Integration API 将会很好——包括网关类。为此,Spring Integration 提供了 GatewayProxyFactoryBean,它为任何接口生成一个代理,并在内部调用以下网关方法。通过使用依赖注入,您可以将接口暴露给业务方法。

以下示例展示了一个可以用于与 Spring Integration 交互的接口:

public interface Cafe {

void placeOrder(Order order);

}
java

网关 XML 命名空间支持

命名空间支持也被提供。它让你可以配置一个接口作为服务,如下例所示:

<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
xml

有了这个配置定义,cafeService 现在可以注入到其他 bean 中,并且调用 Cafe 接口的代理实例方法的代码对 Spring Integration API 毫无察觉。有关使用 gateway 元素的示例,请参阅 “示例” 附录(在 Cafe 演示中)。

前面配置中的默认值应用于网关接口上的所有方法。如果未指定回复超时时间,调用线程将等待回复 30 秒。请参阅当没有响应到达时的网关行为

默认值可以被单个方法覆盖。见 使用注解和 XML 的网关配置

设置默认回复通道

通常情况下,您无需指定 default-reply-channel,因为网关会自动创建一个临时的、匿名的回复通道,在那里它监听回复。但是,有些情况可能会促使您定义一个 default-reply-channel(或使用适配器网关如 HTTP 、 JMS 等的 reply-channel)。

为了更好地理解,我们简要讨论一下网关的一些内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,并以名称 replyChannel 添加到消息头中。在提供显式的 default-reply-channel (对于远程适配器网关为 reply-channel )时,你可以指向一个发布 - 订阅通道,之所以这样命名是因为你可以向其添加多个订阅者。在内部,Spring Integration 在临时的 replyChannel 和显式定义的 default-reply-channel 之间创建一座桥梁。

假设你希望你的回复不仅发送到网关,还要发送到其他一些消费者。在这种情况下,你需要做两件事:

  • 一个你可以订阅的命名通道

  • 该通道是一个发布订阅通道

网关使用的默认策略无法满足这些需求,因为添加到头部的回复通道是匿名且点对点的。这意味着没有其他订阅者可以获得它的句柄,即使可以获得,该通道也具有点对点行为,即只有一个订阅者会收到消息。通过定义 default-reply-channel,您可以指向您选择的通道。在这种情况下,这是一个 publish-subscribe-channel。网关会从它创建一个桥接到临时的、匿名的回复通道,该回复通道存储在头部。

你可能还希望明确地提供一个回复通道,通过拦截器进行监控或审计(例如,wiretap)。要配置通道拦截器,你需要一个命名的通道。

备注

从 5.4 版本开始,当网关方法返回类型为 void 时,如果未显式提供此类标头,框架会填充一个 replyChannel 标头作为 nullChannel bean 引用。这允许丢弃下游流的任何可能回复,满足单向网关契约。

使用注解和XML进行网关配置

考虑以下示例,它通过添加一个 @Gateway 注解来扩展前面的 Cafe 接口示例:

public interface Cafe {

@Gateway(requestChannel="orders")
void placeOrder(Order order);

}
java

@Header 注解让你添加被解释为消息头的值,如下例所示:

public interface FileWriter {

@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}
java

如果你更喜欢使用 XML 方法来配置网关方法,你可以向网关配置添加 method 元素,如下例所示:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
xml

你也可以使用 XML 为每个方法调用提供单独的报头。如果你想要设置的报头是静态性质的,并且你不想通过使用 @Header 注解将它们嵌入网关的方法签名中,这可能会很有用。例如,在贷款中介示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。通过评估哪个网关方法被调用来确定请求的类型虽然是可能的,但会违反关注点分离范式(方法是 Java 艺术品)。然而,在消息报头中表达你的意图(元信息)在消息传递架构中是很自然的。以下示例展示了如何为两个方法中的每一个添加不同的消息报头:

<method name="getSingleQuote" >
<header name="aggregationType" value="single"/>
</method>
<method name="getAllQuotes" >
<header name="aggregationType" value="all"/>
</method>
xml

以上XML配置片段展示了如何针对不同方法调用添加特定的消息头,以指示聚合类型。

<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
xml

在前面的例子中,根据网关的方法,为 'RESPONSE_TYPE' 标头设置了一个不同的值。

important

例如,如果你在 <int:method/> 以及 @Gateway 注解中都指定了 requestChannel,注解中的值将生效。

备注

如果在 XML 中指定了一个无参数网关,并且接口方法同时具有 @Payload@Gateway 注解(带有 payloadExpressionpayload-expression 在一个 <int:method/> 元素中),则 @Payload 的值将被忽略。

表达式和“全局”标题

<header/> 元素支持 expression 作为 value 的替代。SpEL 表达式会被评估以确定 header 的值。从 5.2 版本开始,评估上下文的 #root 对象是一个带有 getMethod()getArgs() 访问器的 MethodArgsHolder。例如,如果你想根据简单的方法名称进行路由,你可以添加一个具有以下表达式的 header:method.name

备注

java.reflect.Method 不可序列化。如果你后来序列化消息,带有 method 表达式的头信息将会丢失。因此,在这些情况下,你可能希望使用 method.namemethod.toString()toString() 方法提供方法的 String 表示,包括参数和返回类型。

自 3.0 版本以来,可以定义 <default-header/> 元素以向网关生成的所有消息添加标题,无论调用的方法是什么。为特定方法定义的标题优先于默认标题。在此为特定方法定义的标题会覆盖服务接口中的任何 @Header 注解。但是,默认标题不会覆盖服务接口中的任何 @Header 注解。

网关现在也支持 default-payload-expression,该表达式应用于所有方法(除非被覆盖)。

将方法参数映射到消息

使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。当没有使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个应该映射到标头。请考虑以下示例:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);
java

在第一种情况下,约定是将第一个参数映射到负载(只要它不是一个 Map),而第二个参数的内容则成为标题。

在第二种情况下(或当参数 thing1 的参数是一个 Map 时的第一种情况),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用 payload-expression@Payload 注解或 @Headers 注解来解决。

或者(在约定失效的情况下),你可以完全负责将方法调用映射到消息。为此,实现一个 MethodArgsMessageMapper 并通过使用 mapper 属性将其提供给 <gateway/> 。映射器映射一个 MethodArgsHolder ,这是一个简单的类,它包装了 java.reflect.Method 实例和一个包含参数的 Object[] 。当提供自定义映射器时,网关上不允许有 default-payload-expression 属性和 <default-header/> 元素。同样地,在任何 <method/> 元素上也不允许有 payload-expression 属性和 <header/> 元素。

映射方法参数

以下示例展示了如何将方法参数映射到消息,并显示了一些无效配置的示例:

public interface MyGateway {

void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);

@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // // <1>

// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

// invalid
void twoPayloads(@Payload String s1, @Payload String s2);

// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
java
  • 注意,在这个例子中,SpEL 变量 #this 引用的是参数——在这种情况下,是 s 的值。

XML 等价形式看起来有点不同,因为方法参数没有 #this 上下文。然而,表达式可以通过使用 MethodArgsHolder 根对象的 args 属性来引用方法参数(更多信息请参见 表达式和“全局”头信息),如下例所示:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
xml

@MessagingGateway 注解

从 4.0 版本开始,网关服务接口可以使用 @MessagingGateway 注解进行标记,而不需要定义 <gateway /> XML 元素来进行配置。下面的两组示例对比了这两种配置相同网关的方法:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
xml
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {

@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);

@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);

String echoViaDefault(String payload);

}
java
important

类似于 XML 版本,当 Spring Integration 在组件扫描期间发现这些注解时,它会创建带有其消息传递基础设施的 proxy 实现。为了执行此扫描并在应用程序上下文中注册 BeanDefinition,请将 @IntegrationComponentScan 注解添加到 @Configuration 类中。标准的 @ComponentScan 基础设施不处理接口。因此,我们引入了自定义的 @IntegrationComponentScan 逻辑来查找接口上的 @MessagingGateway 注解,并为它们注册 GatewayProxyFactoryBean 实例。另请参阅 注解支持

除了 @MessagingGateway 注解外,你还可以使用 @Profile 注解标记一个服务接口,以避免在没有激活此类配置文件时创建 bean。

从 6.0 版本开始,带有 @MessagingGateway 的接口也可以用 @Primary 注解标记,以实现相应的配置逻辑,这与任何 Spring @Component 定义一样。

从 6.0 版本开始,@MessagingGateway 接口可以在标准的 Spring @Import 配置中使用。这可以作为 @IntegrationComponentScan 或手动 AnnotationGatewayProxyFactoryBean bean 定义的替代方案。

@MessagingGateway 自版本 6.0 起被元注解为带有 @MessageEndpoint,并且 name() 属性实际上等同于 @Compnent.value()。这样,网关代理的 bean 名称生成策略就与标准 Spring 注解配置扫描和导入组件的方式对齐了。默认的 AnnotationBeanNameGenerator 可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 或作为 @IntegrationComponentScan.nameGenerator() 属性全局覆盖。

备注

如果你没有 XML 配置,在至少一个 @Configuration 类上需要 @EnableIntegration 注解。更多信息请参见 配置和 @EnableIntegration

调用无参数方法

当在没有参数的 Gateway 接口上调用方法时,默认行为是从 PollableChannel 接收一个 Message

有时,您可能需要触发无参数的方法,以便与下游的其他不需要用户提供参数的组件进行交互,例如触发无参数的 SQL 调用或存储过程。

要实现发送和接收语义,您必须提供有效负载。生成有效负载时,接口上的方法参数不是必需的。您可以使用 @Payload 注解或在 XML 中的 method 元素上使用 payload-expression 属性。以下列表包含一些有效负载可能的示例:

  • 一个字面量字符串

  • #gatewayMethod.name

  • 新的 java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示了如何使用 @Payload 注解:

public interface Cafe {

@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();

}
xml

你也可以使用 @Gateway 注解。

public interface Cafe {

@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();

}
xml
备注

如果两个注解都存在(且提供了 payloadExpression),则 @Gateway 生效。

如果一个方法没有参数且没有返回值,但包含有效负载表达式,则被视为仅发送操作。

调用 default 方法

网关代理接口也可以有 default 方法,从 5.3 版本开始,框架会将一个 DefaultMethodInvokingMethodInterceptor 注入到代理中,以使用 java.lang.invoke.MethodHandle 方法调用 default 方法,而不是通过代理。来自 JDK 的接口(如 java.util.function.Function)仍然可以用于网关代理,但由于 Java 内部安全原因,在针对 JDK 类实例化 MethodHandles.Lookup 时,其 default 方法无法被调用。这些方法也可以通过在方法上显式使用 @Gateway 注解,或在 @MessagingGateway 注解或 <gateway> XML 组件上的 proxyDefaultMethods 来代理(这将失去它们的实现逻辑,同时恢复以前的网关代理行为)。

错误处理

网关调用可能会导致错误。默认情况下,任何下游发生的错误会在网关的方法调用时被重新抛出,且“保持原样”。例如,考虑以下简单流程:

gateway -> service-activator
none

如果服务激活器调用的服务抛出一个 MyException(例如),框架会将其包装在一个 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。因此,框架执行的任何日志记录都具有完整的失败上下文。默认情况下,当异常被网关捕获时,MyException 会被解包并抛给调用者。你可以在网关方法声明上配置一个 throws 子句以匹配原因链中的特定异常类型。例如,如果你想捕获整个带有所有下游错误原因的 MessagingException 消息信息,你应该有一个类似于以下的网关方法:

MyException // 这是一个示例,实际代码应根据你的需求编写
java

请注意,上述代码部分仅作为占位符示例,具体实现应根据实际需求编写。

public interface MyGateway {

void performProcess() throws MessagingException;

}
java

由于我们鼓励使用 POJO 编程,您可能不希望将调用者暴露给消息传递基础结构。

如果您的网关方法没有 throws 子句,网关会遍历原因树,寻找不是 MessagingExceptionRuntimeException。如果没有找到,框架将抛出 MessagingException。如果前面讨论中的 MyException 有一个 SomeOtherException 的原因,并且您的方法 throws SomeOtherException,网关将进一步解开该异常并将其抛给调用者。

当网关声明时没有 service-interface,则使用内部框架接口 RequestReplyExchanger

考虑以下示例:

public interface RequestReplyExchanger {

Message<?> exchange(Message<?> request) throws MessagingException;

}
java

在版本 5.0 之前,这个 exchange 方法没有 throws 子句,因此异常会被解包。如果你想使用此接口并恢复之前的解包行为,请改用自定义的 service-interface 或自行访问 MessagingExceptioncause

但是,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效的回复(通过将其映射到符合调用者理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过支持 error-channel 属性来提供对专用错误消息通道的支持。在以下示例中,一个 'transformer' 从 Exception 创建一个回复 Message

<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
xml

exceptionTransformer 可以是一个简单的 POJO,知道如何创建预期的错误响应对象。这些对象会成为发送回调用者的有效负载。如果有必要,你可以在这样的“错误流”中做更多复杂的事情。它可能涉及到路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter),过滤器等。不过,大多数时候,一个简单的 '转换器' 应该就足够了。

或者,你可能只想记录异常(或异步发送到某个地方)。如果你提供一个单向流程,将不会有任何内容返回给调用者。如果你想完全抑制异常,你可以提供一个指向全局 nullChannel 的引用(基本上是 /dev/null 的方法)。最后,如上所述,如果没有定义 error-channel,则异常会像往常一样传播。

当你使用 @MessagingGateway 注解(见 [](#messaging-gateway-annotation)@MessagingGateway` 注解)时,你可以使用一个 errorChannel 属性。

从 5.0 版本开始,当您使用返回类型为 void 的网关方法(单向流)时,error-channel 引用(如果提供)将填充到每个发送消息的标准 errorChannel 头中。此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认全局 errorChannel 异常发送行为。以前,您必须手动使用 @GatewayHeader 注解或 <header> 元素指定 errorChannel 头。对于具有异步流的 void 方法,error-channel 属性被忽略。相反,错误消息会被发送到默认的 errorChannel

important

通过简单的 POJI 网关公开消息系统提供了好处,但“隐藏”底层消息系统的现实确实是有代价的,因此有一些事情您应该考虑。我们希望我们的 Java 方法能尽快返回,而不是无限期地挂起,当调用者等待它返回(无论是 void、返回值还是抛出的异常)时。当普通方法作为消息系统的代理使用时,我们必须考虑到底层消息系统的潜在异步性质。这意味着,由网关发起的消息可能会被过滤器丢弃,而永远不会到达负责生成回复的组件。某些服务激活器方法可能会导致异常,从而不提供回复(因为我们不会生成 null 消息)。换句话说,多个场景可能导致回复消息永远不会到来。这在消息系统中是完全自然的。然而,考虑一下这对网关方法的影响。网关方法的输入参数被合并到一条消息中并发送下游。回复消息将被转换为网关方法的返回值。因此,您可能需要确保每个网关调用都有一个回复消息。否则,如果 reply-timeout 设置为负值,您的网关方法可能会永远挂起。处理这种情况的一种方法是使用异步网关(本节稍后会解释)。另一种处理方法是依赖默认的 reply-timeout 30 秒。这样,网关挂起的时间不会超过 reply-timeout 指定的时间,并且如果超时,则返回 'null'。最后,您可能需要考虑设置下游标志,例如在服务激活器上设置 'requires-reply' 或在过滤器上设置 'throw-exceptions-on-rejection'。这些选项将在本章最后一节中详细讨论。

备注

如果下游流返回一个 ErrorMessage,其 payload(一个 Throwable)将被视为常规的下游错误。如果有配置 error-channel,它会被发送到错误流。否则,payload 会被抛给网关的调用者。同样地,如果 error-channel 上的错误流返回一个 ErrorMessage,其 payload 也会被抛给调用者。对于任何带有 Throwable payload 的消息也是如此。这在异步情况下很有用,当你需要将一个 Exception 直接传播给调用者时。为此,你可以返回一个 Exception(作为某些服务的 reply),或者直接抛出它。通常,即使在异步流中,框架也会处理将下游流抛出的异常传播回网关。TCP Client-Server Multiplex 示例展示了这两种技术,用于将异常返回给调用者。它通过使用带有 group-timeoutaggregator(参见 Aggregator and Group Timeout)和丢弃流上的 MessagingTimeoutException 回复来模拟等待线程的套接字 IO 错误。

网关超时

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可以阻塞的情况下适用(例如,已满的有界 QueueChannel)。replyTimeout 值是网关等待回复的时间或返回 null 的时间。它默认为无穷大。

超时可以为网关上的所有方法设置为默认值(defaultRequestTimeoutdefaultReplyTimeout),或者在 MessagingGateway 接口注解上设置。各个方法可以覆盖这些默认值(在 <method/> 子元素中)或在 @Gateway 注解上设置。

从版本 5.0 开始,超时可以定义为表达式,如下例所示:

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
java

评估上下文有一个 BeanResolver(使用 @someBean 引用其他 bean),并且可以从 #root 对象获取 args 数组属性。有关此根对象的更多信息,请参阅 表达式和“全局”标题。在使用 XML 配置时,超时属性可以是 long 值或 SpEL 表达式,如下例所示:

<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
xml

异步网关

作为一种模式,消息网关提供了一种很好的方式来隐藏与消息相关的代码,同时仍然暴露消息系统的全部功能。如前面所述GatewayProxyFactoryBean 提供了一种便捷的方式,在服务接口上暴露一个代理,使你能够以 POJO 方式访问消息系统(基于你自己域中的对象、基本类型/字符串或其他对象)。然而,当网关通过简单的 POJO 方法返回值时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。由于消息系统本质上是异步的,你可能无法总是保证“对于每个请求,总会有回复”的契约。Spring Integration 2.0 引入了对异步网关的支持,这为在你不知道是否期望回复或回复需要多长时间到达的情况下启动流程提供了一种便捷的方式。

为了处理这些类型的情况,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

从 XML 配置来看,没有任何变化,您仍然以定义普通网关的相同方式定义异步网关,如下例所示:

<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
xml

但是,网关接口(一个服务接口)有点不同,如下所示:

public interface MathServiceGateway {

Future<Integer> multiplyByTwo(int i);

}
java

如前面的例子所示,网关方法的返回类型是 Future。当 GatewayProxyFactoryBean 发现网关方法的返回类型是 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。这些就是差异的全部。调用此类方法总是立即返回一个 Future 实例。然后你可以按照自己的节奏与 Future 进行交互以获取结果、取消等。此外,与其他使用 Future 实例的情况一样,调用 get() 可能会显示超时、执行异常等。以下示例展示了如何使用从异步网关返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
java

对于更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。

AsyncTaskExecutor

默认情况下,GatewayProxyFactoryBean 在提交内部 AsyncInvocationTask 实例时使用 org.springframework.core.task.SimpleAsyncTaskExecutor,对于任何返回类型为 Future 的网关方法都是如此。但是,<gateway/> 元素配置中的 async-executor 属性允许你提供对 Spring 应用程序上下文中可用的任何 java.util.concurrent.Executor 实现的引用。

(默认的)SimpleAsyncTaskExecutor 支持 FutureCompletableFuture 两种返回类型。请参阅 CompletableFuture。即使有一个默认的执行器,通常提供一个外部执行器也是有用的,这样你可以在日志中识别它的线程(在使用 XML 时,线程名称基于执行器的 bean 名称),如下例所示:

@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);

}
java

如果你想返回一个不同的 Future 实现,你可以提供一个自定义执行器或完全禁用执行器,并从下游流的回复消息负载中返回 Future。要禁用执行器,在 GatewayProxyFactoryBean 中将其设置为 null (通过使用 setAsyncTaskExecutor(null))。当使用 XML 配置网关时,使用 async-executor=""。当使用 @MessagingGateway 注解进行配置时,使用类似的代码:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);

}
java
important

如果返回类型是特定的 Future 实现或其他不被配置的执行器支持的子接口,则流程在调用者的线程上运行,并且流程必须在回复消息的有效负载中返回所需类型。

CompletableFuture

从 4.2 版本开始,网关方法现在可以返回 CompletableFuture<?>。返回这种类型时有两种操作模式:

  • 当提供了一个异步执行器且返回类型正好是 CompletableFuture(而不是其子类)时,框架会在执行器上运行任务,并立即向调用者返回一个 CompletableFuture。使用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 来创建这个未来对象。

  • 当异步执行器被显式设置为 null 且返回类型是 CompletableFuture 或返回类型是 CompletableFuture 的子类时,流程将在调用者的线程上被调用。在这种情况下,下游流程应返回一个适当类型的 CompletableFuture

使用场景

在以下场景中,调用者线程立即返回一个 CompletableFuture<Invoice>,当下游流程回复网关(使用一个 Invoice 对象)时,该对象完成。

CompletableFuture<Invoice> order(Order order);
java
<int:gateway service-interface="something.Service" default-request-channel="orders" />
xml

在以下场景中,当下游流将其作为回复网关的有效负载提供时,调用线程返回一个 CompletableFuture<Invoice> 。必须有其他进程在发票准备好时完成此未来任务。

CompletableFuture<Invoice> order(Order order);
java
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
xml

在以下场景中,当下游流程将其作为回复给网关的有效负载时,调用线程返回一个 CompletableFuture<Invoice> 。必须有其他进程在发票准备好时完成该未来对象。如果启用了 DEBUG 日志记录,则会生成一条日志条目,指示异步执行器不能用于此场景。

MyCompletableFuture<Invoice> order(Order order);
java
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
xml

CompletableFuture 实例可以用于对回复进行额外的处理,如下例所示:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);
java

反应器 Mono

从 5.0 版本开始,GatewayProxyFactoryBean 允许在网关接口方法中使用 Project Reactor,使用 Mono<T> 返回类型。内部的 AsyncInvocationTask 被包装在 Mono.fromCallable() 中。

Mono 可用于在稍后检索结果(类似于 Future<?>),或者你可以通过调度器从它消费,方法是在结果返回到网关时调用你的 Consumer

important

Mono 不会立即被框架刷新。因此,在网关方法返回之前,底层的消息流不会启动(而 Future<?> Executor 任务会)。当 Mono 被订阅时,流才会开始。或者,Mono(作为一个“可组合对象”)可能是 Reactor 流的一部分,这时的 subscribe() 与整个 Flux 相关。以下示例展示了如何使用 Project Reactor 创建一个网关:

@MessagingGateway
public interface TestGateway {

@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
java

其中这样的网关可以在某些处理 Flux 数据流的服务中使用:

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
java

另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));
java

调用线程继续,在流完成时会调用 handleInvoice()

也请参阅 Kotlin 协程 以获取更多信息。

返回异步类型的下游流

如上文AsyncTaskExecutor部分所述,如果你希望某些下游组件返回带有异步负载(FutureMono 等)的消息,则必须显式地将异步执行器设置为 null(或在使用 XML 配置时设置为 "")。然后,流程将在调用线程上被调用,并且可以在之后获取结果。

异步 void 返回类型

消息网关方法可以这样声明:

@MessagingGateway
public interface MyGateway {

@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);

}
java

但是下游异常不会被传播回调用者。为了确保下游流程调用的异步行为和异常向调用者的传播,从 6.0 版本开始,框架提供了对 Future<Void>Mono<Void> 返回类型的 支持。使用场景类似于之前描述的简单 void 返回类型 的发送后即忘行为,但区别在于流程执行是异步发生的,并且返回的 Future(或 Mono)根据 send 操作的结果以 null 或异常完成。

备注

如果 Future<Void> 是精确的下游流回复,则网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置,使用 AnnotationConstants.NULL),并且 send 部分是在生产者线程上执行的。回复取决于下游流配置。这样,由目标应用程序来正确生成 Future<Void> 回复。Mono 的用例已经超出了框架的线程控制,因此将 asyncExecutor 设置为 null 没有意义。这里请求-回复网关操作的结果为 Mono<Void> 必须配置为网关方法的返回类型为 Mono<?>

当没有响应到达时的网关行为

前面所述,网关通过 POJO 方法调用提供了一种与消息系统交互的便捷方式。然而,通常期望总是返回的方法调用(即使带有异常),可能并不总是与消息交换一一对应(例如,回复消息可能不会到达——相当于方法没有返回)。

本节的其余部分涵盖了各种场景以及如何使网关的行为更加可预测。可以配置某些属性以使同步网关行为更加可预测,但其中一些可能并不总是如你期望的那样工作。其中一个属性是 reply-timeout(在方法级别或网关级别的 default-reply-timeout)。我们研究 reply-timeout 属性,以了解它在不同场景中如何影响以及无法影响同步网关的行为。我们研究了单线程场景(所有下游组件都通过直接通道连接)和多线程场景(例如,在某个下游位置,你可能有一个可轮询的或执行程序通道,这会打破单线程边界)。

长时间运行的下游进程

Sync Gateway,单线程

如果下游组件仍在运行(可能是由于无限循环或缓慢的服务),设置 reply-timeout 没有效果,网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。

Sync Gateway,多线程

如果下游组件仍在运行(可能是由于无限循环或缓慢的服务)在多线程消息流中,设置 reply-timeout 会在网关方法调用达到超时时间后返回,因为 GatewayProxyFactoryBean 会在回复通道上轮询,等待消息直到超时过期。然而,如果在实际回复生成之前已经达到了超时时间,则可能导致网关方法返回 'null'。您应该了解,回复消息(如果已生成)是在网关方法调用可能已经返回之后发送到回复通道的,因此您必须意识到这一点,并在设计流程时考虑到这一点。

也请参见 errorOnTimeout 属性,用于在超时发生时抛出 MessageTimeoutException 而不是返回 null

下游组件返回 'null'

Sync Gateway — 单线程

如果下游组件返回 'null' 并且 reply-timeout 已配置为负值,则网关方法调用将无限期挂起,除非在可能返回 'null' 的下游组件(例如,服务激活器)上设置了 requires-reply 属性。在这种情况下,会抛出异常并传播到网关。

Sync Gateway — 多线程

行为与前一种情况相同。

下游组件返回签名是 'void' 而网关方法签名是非 void

Sync Gateway — 单线程

如果下游组件返回 'void' 并且 reply-timeout 被配置为负值,网关方法调用将无限期挂起。

Sync Gateway — 多线程

行为与前面的情况相同。

下游组件导致运行时异常

Sync Gateway — 单线程

如果下游组件抛出运行时异常,该异常会通过错误消息传播回网关并重新抛出。

同步网关 — 多线程

行为与前一种情况相同。

important

你应该理解,默认情况下,reply-timeout 是无界的。因此,如果你将 reply-timeout 设置为负值,网关方法调用可能会无限期挂起。所以,为了确保你分析了你的流程,并且即使有这些场景之一发生的可能性,你也应该将 reply-timeout 属性设置为一个“安全”的值。默认是 30 秒。更好的做法是,你可以将下游组件的 requires-reply 属性设置为 'true',以确保及时响应,因为一旦该下游组件内部返回 null 就会抛出异常。然而,你也应该意识到有一些场景(参见第一个)中 reply-timeout 并不能提供帮助。这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。如之前所述,后者涉及定义返回 Future 实例的网关方法。这样可以保证接收到返回值,并且对调用结果有更细粒度的控制。另外,在处理路由器时,你应该记住将 resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。同样地,在处理过滤器时,你可以设置 throw-exception-on-rejection 属性。在这两种情况下,产生的流程行为类似于包含带有 'requires-reply' 属性的服务激活器。换句话说,这有助于确保网关方法调用的及时响应。

important

你应该理解,计时器是在线程返回网关时启动的 —— 也就是说,在流程完成或消息被交给另一个线程时启动。在那时,调用线程开始等待回复。如果流程是完全同步的,回复将立即可用。对于异步流程,线程将等待最多这段时间。

从 6.2 版本开始,MessagingGatewaySupport 内部 MethodInvocationGateway 扩展的 errorOnTimeout 属性在 @MessagingGatewayGatewayEndpointSpec 上公开。此选项与任何入站网关中的含义完全相同,在 端点摘要 章节末尾有解释。换句话说,将此选项设置为 true,会在发送和接收网关操作中抛出 MessageTimeoutException,而不是在接收超时耗尽时返回 null

参见 Java DSL 章节中的 集成流作为网关,了解通过 IntegrationFlow 定义网关的选项。