消息网关
网关隐藏了由 Spring Integration 提供的消息 API。它使您的应用程序业务逻辑无需了解 Spring Integration API。通过使用通用网关,您的代码只与一个简单的接口进行交互。
进入 GatewayProxyFactoryBean
如前所述,不依赖于 Spring Integration API 将会很好——包括网关类。为此,Spring Integration 提供了 GatewayProxyFactoryBean
,它为任何接口生成一个代理,并在内部调用以下网关方法。通过使用依赖注入,您可以将接口暴露给业务方法。
以下示例展示了一个可以用于与 Spring Integration 交互的接口:
public interface Cafe {
void placeOrder(Order order);
}
网关 XML 命名空间支持
命名空间支持也被提供。它让你可以配置一个接口作为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
有了这个配置定义,cafeService
现在可以注入到其他 bean 中,并且调用 Cafe
接口的代理实例方法的代码对 Spring Integration API 毫无察觉。有关使用 gateway
元素的示例,请参阅 “示例” 附录(在 Cafe 演示中)。
前面配置中的默认值应用于网关接口上的所有方法。如果未指定回复超时时间,调用线程将等待回复 30 秒。请参阅当没有响应到达时的网关行为。
默认值可以被单个方法覆盖。见 使用注解和 XML 的网关配置。
设置默认回复通道
通常情况下,您无需指定 default-reply-channel
,因为网关会自动创建一个临时的、匿名的回复通道,在那里它监听回复。但是,有些情况可能会促使您定义一个 default-reply-channel
(或使用适配器网关如 HTTP 、 JMS 等的 reply-channel
)。
为了更好地理解,我们简要讨论一下网关的一些内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,并以名称 replyChannel
添加到消息头中。在提供显式的 default-reply-channel
(对于远程适配器网关为 reply-channel
)时,你可以指向一个发布 - 订阅通道,之所以这样命名是因为你可以向其添加多个订阅者。在内部,Spring Integration 在临时的 replyChannel
和显式定义的 default-reply-channel
之间创建一座桥梁。
假设你希望你的回复不仅发送到网关,还要发送到其他一些消费者。在这种情况下,你需要做两件事:
-
一个你可以订阅的命名通道
-
该通道是一个发布订阅通道
网关使用的默认策略无法满足这些需求,因为添加到头部的回复通道是匿名且点对点的。这意味着没有其他订阅者可以获得它的句柄,即使可以获得,该通道也具有点对点行为,即只有一个订阅者会收到消息。通过定义 default-reply-channel
,您可以指向您选择的通道。在这种情况下,这是一个 publish-subscribe-channel
。网关会从它创建一个桥接到临时的、匿名的回复通道,该回复通道存储在头部。
你可能还希望明确地提供一个回复通道,通过拦截器进行监控或审计(例如,wiretap)。要配置通道拦截器,你需要一个命名的通道。
从 5.4 版本开始,当网关方法返回类型为 void
时,如果未显式提供此类标头,框架会填充一个 replyChannel
标头作为 nullChannel
bean 引用。这允许丢弃下游流的任何可能回复,满足单向网关契约。
使用注解和XML进行网关配置
考虑以下示例,它通过添加一个 @Gateway
注解来扩展前面的 Cafe
接口示例:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
@Header
注解让你添加被解释为消息头的值,如下例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果你更喜欢使用 XML 方法来配置网关方法,你可以向网关配置添加 method
元素,如下例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
你也可以使用 XML 为每个方法调用提供单独的报头。如果你想要设置的报头是静态性质的,并且你不想通过使用 @Header
注解将它们嵌入网关的方法签名中,这可能会很有用。例如,在贷款中介示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。通过评估哪个网关方法被调用来确定请求的类型虽然是可能的,但会违反关注点分离范式(方法是 Java 艺术品)。然而,在消息报头中表达你的意图(元信息)在消息传递架构中是很自然的。以下示例展示了如何为两个方法中的每一个添加不同的消息报头:
<method name="getSingleQuote" >
<header name="aggregationType" value="single"/>
</method>
<method name="getAllQuotes" >
<header name="aggregationType" value="all"/>
</method>
以上XML配置片段展示了如何针对不同方法调用添加特定的消息头,以指示聚合类型。
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的例子中,根据网关的方法,为 'RESPONSE_TYPE' 标头设置了一个不同的值。
例如,如果你在 <int:method/>
以及 @Gateway
注解中都指定了 requestChannel
,注解中的值将生效。
如果在 XML 中指定了一个无参数网关,并且接口方法同时具有 @Payload
和 @Gateway
注解(带有 payloadExpression
或 payload-expression
在一个 <int:method/>
元素中),则 @Payload
的值将被忽略。
表达式和“全局”标题
<header/>
元素支持 expression
作为 value
的替代。SpEL 表达式会被评估以确定 header 的值。从 5.2 版本开始,评估上下文的 #root
对象是一个带有 getMethod()
和 getArgs()
访问器的 MethodArgsHolder
。例如,如果你想根据简单的方法名称进行路由,你可以添加一个具有以下表达式的 header:method.name
。
java.reflect.Method
不可序列化。如果你后来序列化消息,带有 method
表达式的头信息将会丢失。因此,在这些情况下,你可能希望使用 method.name
或 method.toString()
。toString()
方法提供方法的 String
表示,包括参数和返回类型。
自 3.0 版本以来,可以定义 <default-header/>
元素以向网关生成的所有消息添加标题,无论调用的方法是什么。为特定方法定义的标题优先于默认标题。在此为特定方法定义的标题会覆盖服务接口中的任何 @Header
注解。但是,默认标题不会覆盖服务接口中的任何 @Header
注解。
网关现在也支持 default-payload-expression
,该表达式应用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。当没有使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个应该映射到标头。请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到负载(只要它不是一个 Map
),而第二个参数的内容则成为标题。
在第二种情况下(或当参数 thing1
的参数是一个 Map
时的第一种情况),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用 payload-expression
、@Payload
注解或 @Headers
注解来解决。
或者(在约定失效的情况下),你可以完全负责将方法调用映射到消息。为此,实现一个 MethodArgsMessageMapper
并通过使用 mapper
属性将其提供给 <gateway/>
。映射器映射一个 MethodArgsHolder
,这是一个简单的类,它包装了 java.reflect.Method
实例和一个包含参数的 Object[]
。当提供自定义映射器时,网关上不允许有 default-payload-expression
属性和 <default-header/>
元素。同样地,在任何 <method/>
元素上也不允许有 payload-expression
属性和 <header/>
元素。
映射方法参数
以下示例展示了如何将方法参数映射到消息,并显示了一些无效配置的示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // // <1>
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
注意,在这个例子中,SpEL 变量
#this
引用的是参数——在这种情况下,是s
的值。
XML 等价形式看起来有点不同,因为方法参数没有 #this
上下文。然而,表达式可以通过使用 MethodArgsHolder
根对象的 args
属性来引用方法参数(更多信息请参见 表达式和“全局”头信息),如下例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从 4.0 版本开始,网关服务接口可以使用 @MessagingGateway
注解进行标记,而不需要定义 <gateway />
XML 元素来进行配置。下面的两组示例对比了这两种配置相同网关的方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
类似于 XML 版本,当 Spring Integration 在组件扫描期间发现这些注解时,它会创建带有其消息传递基础设施的 proxy
实现。为了执行此扫描并在应用程序上下文中注册 BeanDefinition
,请将 @IntegrationComponentScan
注解添加到 @Configuration
类中。标准的 @ComponentScan
基础设施不处理接口。因此,我们引入了自定义的 @IntegrationComponentScan
逻辑来查找接口上的 @MessagingGateway
注解,并为它们注册 GatewayProxyFactoryBean
实例。另请参阅 注解支持。
除了 @MessagingGateway
注解外,你还可以使用 @Profile
注解标记一个服务接口,以避免在没有激活此类配置文件时创建 bean。
从 6.0 版本开始,带有 @MessagingGateway
的接口也可以用 @Primary
注解标记,以实现相应的配置逻辑,这与任何 Spring @Component
定义一样。
从 6.0 版本开始,@MessagingGateway
接口可以在标准的 Spring @Import
配置中使用。这可以作为 @IntegrationComponentScan
或手动 AnnotationGatewayProxyFactoryBean
bean 定义的替代方案。
@MessagingGateway
自版本 6.0
起被元注解为带有 @MessageEndpoint
,并且 name()
属性实际上等同于 @Compnent.value()
。这样,网关代理的 bean 名称生成策略就与标准 Spring 注解配置扫描和导入组件的方式对齐了。默认的 AnnotationBeanNameGenerator
可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
或作为 @IntegrationComponentScan.nameGenerator()
属性全局覆盖。
如果你没有 XML 配置,在至少一个 @Configuration
类上需要 @EnableIntegration
注解。更多信息请参见 配置和 @EnableIntegration。
调用无参数方法
当在没有参数的 Gateway 接口上调用方法时,默认行为是从 PollableChannel
接收一个 Message
。
有时,您可能需要触发无参数的方法,以便与下游的其他不需要用户提供参数的组件进行交互,例如触发无参数的 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。生成有效负载时,接口上的方法参数不是必需的。您可以使用 @Payload
注解或在 XML 中的 method
元素上使用 payload-expression
属性。以下列表包含一些有效负载可能的示例:
-
一个字面量字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例展示了如何使用 @Payload
注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
你也可以使用 @Gateway
注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注解都存在(且提供了 payloadExpression
),则 @Gateway
生效。
也请参阅 带有注解和 XML 的网关配置。
如果一个方法没有参数且没有返回值,但包含有效负载表达式,则被视为仅发送操作。
调用 default
方法
网关代理接口也可以有 default
方法,从 5.3 版本开始,框架会将一个 DefaultMethodInvokingMethodInterceptor
注入到代理中,以使用 java.lang.invoke.MethodHandle
方法调用 default
方法,而不是通过代理。来自 JDK 的接口(如 java.util.function.Function
)仍然可以用于网关代理,但由于 Java 内部安全原因,在针对 JDK 类实例化 MethodHandles.Lookup
时,其 default
方法无法被调用。这些方法也可以通过在方法上显式使用 @Gateway
注解,或在 @MessagingGateway
注解或 <gateway>
XML 组件上的 proxyDefaultMethods
来代理(这将失去它们的实现逻辑,同时恢复以前的网关代理行为)。
错误处理
网关调用可能会导致错误。默认情况下,任何下游发生的错误会在网关的方法调用时被重新抛出,且“保持原样”。例如,考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出一个 MyException
(例如),框架会将其包装在一个 MessagingException
中,并将传递给服务激活器的消息附加到 failedMessage
属性中。因此,框架执行的任何日志记录都具有完整的失败上下文。默认情况下,当异常被网关捕获时,MyException
会被解包并抛给调用者。你可以在网关方法声明上配置一个 throws
子句以匹配原因链中的特定异常类型。例如,如果你想捕获整个带有所有下游错误原因的 MessagingException
消息信息,你应该有一个类似于以下的网关方法:
MyException // 这是一个示例,实际代码应根据你的需求编写
请注意,上述代码部分仅作为占位符示例,具体实现应根据实际需求编写。
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励使用 POJO 编程,您可能不希望将调用者暴露给消息传递基础结构。
如果您的网关方法没有 throws
子句,网关会遍历原因树,寻找不是 MessagingException
的 RuntimeException
。如果没有找到,框架将抛出 MessagingException
。如果前面讨论中的 MyException
有一个 SomeOtherException
的原因,并且您的方法 throws SomeOtherException
,网关将进一步解开该异常并将其抛给调用者。
当网关声明时没有 service-interface
,则使用内部框架接口 RequestReplyExchanger
。
考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在版本 5.0 之前,这个 exchange
方法没有 throws
子句,因此异常会被解包。如果你想使用此接口并恢复之前的解包行为,请改用自定义的 service-interface
或自行访问 MessagingException
的 cause
。
但是,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效的回复(通过将其映射到符合调用者理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过支持 error-channel
属性来提供对专用错误消息通道的支持。在以下示例中,一个 'transformer' 从 Exception
创建一个回复 Message
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
exceptionTransformer
可以是一个简单的 POJO,知道如何创建预期的错误响应对象。这些对象会成为发送回调用者的有效负载。如果有必要,你可以在这样的“错误流”中做更多复杂的事情。它可能涉及到路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter
),过滤器等。不过,大多数时候,一个简单的 '转换器' 应该就足够了。
或者,你可能只想记录异常(或异步发送到某个地方)。如果你提供一个单向流程,将不会有任何内容返回给调用者。如果你想完全抑制异常,你可以提供一个指向全局 nullChannel
的引用(基本上是 /dev/null
的方法)。最后,如上所述,如果没有定义 error-channel
,则异常会像往常一样传播。
当你使用 @MessagingGateway
注解(见 [](#messaging-gateway-annotation)
@MessagingGateway` 注解)时,你可以使用一个 errorChannel
属性。
从 5.0 版本开始,当您使用返回类型为 void
的网关方法(单向流)时,error-channel
引用(如果提供)将填充到每个发送消息的标准 errorChannel
头中。此功能允许基于标准 ExecutorChannel
配置(或 QueueChannel
)的下游异步流覆盖默认全局 errorChannel
异常发送行为。以前,您必须手动使用 @GatewayHeader
注解或 <header>
元素指定 errorChannel
头。对于具有异步流的 void
方法,error-channel
属性被忽略。相反,错误消息会被发送到默认的 errorChannel
。
通过简单的 POJI 网关公开消息系统提供了好处,但“隐藏”底层消息系统的现实确实是有代价的,因此有一些事情您应该考虑。我们希望我们的 Java 方法能尽快返回,而不是无限期地挂起,当调用者等待它返回(无论是 void、返回值还是抛出的异常)时。当普通方法作为消息系统的代理使用时,我们必须考虑到底层消息系统的潜在异步性质。这意味着,由网关发起的消息可能会被过滤器丢弃,而永远不会到达负责生成回复的组件。某些服务激活器方法可能会导致异常,从而不提供回复(因为我们不会生成 null 消息)。换句话说,多个场景可能导致回复消息永远不会到来。这在消息系统中是完全自然的。然而,考虑一下这对网关方法的影响。网关方法的输入参数被合并到一条消息中并发送下游。回复消息将被转换为网关方法的返回值。因此,您可能需要确保每个网关调用都有一个回复消息。否则,如果 reply-timeout
设置为负值,您的网关方法可能会永远挂起。处理这种情况的一种方法是使用异步网关(本节稍后会解释)。另一种处理方法是依赖默认的 reply-timeout
30 秒。这样,网关挂起的时间不会超过 reply-timeout
指定的时间,并且如果超时,则返回 'null'。最后,您可能需要考虑设置下游标志,例如在服务激活器上设置 'requires-reply' 或在过滤器上设置 'throw-exceptions-on-rejection'。这些选项将在本章最后一节中详细讨论。
如果下游流返回一个 ErrorMessage
,其 payload
(一个 Throwable
)将被视为常规的下游错误。如果有配置 error-channel
,它会被发送到错误流。否则,payload 会被抛给网关的调用者。同样地,如果 error-channel
上的错误流返回一个 ErrorMessage
,其 payload 也会被抛给调用者。对于任何带有 Throwable
payload 的消息也是如此。这在异步情况下很有用,当你需要将一个 Exception
直接传播给调用者时。为此,你可以返回一个 Exception
(作为某些服务的 reply
),或者直接抛出它。通常,即使在异步流中,框架也会处理将下游流抛出的异常传播回网关。TCP Client-Server Multiplex 示例展示了这两种技术,用于将异常返回给调用者。它通过使用带有 group-timeout
的 aggregator
(参见 Aggregator and Group Timeout)和丢弃流上的 MessagingTimeoutException
回复来模拟等待线程的套接字 IO 错误。
网关超时
网关有两个超时属性:requestTimeout
和 replyTimeout
。请求超时仅在通道可以阻塞的情况下适用(例如,已满的有界 QueueChannel
)。replyTimeout
值是网关等待回复的时间或返回 null
的时间。它默认为无穷大。
超时可以为网关上的所有方法设置为默认值(defaultRequestTimeout
和 defaultReplyTimeout
),或者在 MessagingGateway
接口注解上设置。各个方法可以覆盖这些默认值(在 <method/>
子元素中)或在 @Gateway
注解上设置。
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文有一个 BeanResolver
(使用 @someBean
引用其他 bean),并且可以从 #root
对象获取 args
数组属性。有关此根对象的更多信息,请参阅 表达式和“全局”标题。在使用 XML 配置时,超时属性可以是 long 值或 SpEL 表达式,如下例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息网关提供了一种很好的方式来隐藏与消息相关的代码,同时仍然暴露消息系统的全部功能。如前面所述,GatewayProxyFactoryBean
提供了一种便捷的方式,在服务接口上暴露一个代理,使你能够以 POJO 方式访问消息系统(基于你自己域中的对象、基本类型/字符串或其他对象)。然而,当网关通过简单的 POJO 方法返回值时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。由于消息系统本质上是异步的,你可能无法总是保证“对于每个请求,总会有回复”的契约。Spring Integration 2.0 引入了对异步网关的支持,这为在你不知道是否期望回复或回复需要多长时间到达的情况下启动流程提供了一种便捷的方式。
为了处理这些类型的情况,Spring Integration 使用 java.util.concurrent.Future
实例来支持异步网关。
从 XML 配置来看,没有任何变化,您仍然以定义普通网关的相同方式定义异步网关,如下例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(一个服务接口)有点不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的例子所示,网关方法的返回类型是 Future
。当 GatewayProxyFactoryBean
发现网关方法的返回类型是 Future
时,它会立即通过使用 AsyncTaskExecutor
切换到异步模式。这些就是差异的全部。调用此类方法总是立即返回一个 Future
实例。然后你可以按照自己的节奏与 Future
进行交互以获取结果、取消等。此外,与其他使用 Future
实例的情况一样,调用 get()
可能会显示超时、执行异常等。以下示例展示了如何使用从异步网关返回的 Future
:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
对于更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,GatewayProxyFactoryBean
在提交内部 AsyncInvocationTask
实例时使用 org.springframework.core.task.SimpleAsyncTaskExecutor
,对于任何返回类型为 Future
的网关方法都是如此。但是,<gateway/>
元素配置中的 async-executor
属性允许你提供对 Spring 应用程序上下文中可用的任何 java.util.concurrent.Executor
实现的引用。
(默认的)SimpleAsyncTaskExecutor
支持 Future
和 CompletableFuture
两种返回类型。请参阅 CompletableFuture。即使有一个默认的执行器,通常提供一个外部执行器也是有用的,这样你可以在日志中识别它的线程(在使用 XML 时,线程名称基于执行器的 bean 名称),如下例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果你想返回一个不同的 Future
实现,你可以提供一个自定义执行器或完全禁用执行器,并从下游流的回复消息负载中返回 Future
。要禁用执行器,在 GatewayProxyFactoryBean
中将其设置为 null
(通过使用 setAsyncTaskExecutor(null)
)。当使用 XML 配置网关时,使用 async-executor=""
。当使用 @MessagingGateway
注解进行配置时,使用类似的代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的 Future
实现或其他不被配置的执行器支持的子接口,则流程在调用者的线程上运行,并且流程必须在回复消息的有效负载中返回所需类型。
CompletableFuture
从 4.2 版本开始,网关方法现在可以返回 CompletableFuture<?>
。返回这种类型时有两种操作模式:
-
当提供了一个异步执行器且返回类型正好是
CompletableFuture
(而不是其子类)时,框架会在执行器上运行任务,并立即向调用者返回一个CompletableFuture
。使用CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
来创建这个未来对象。 -
当异步执行器被显式设置为
null
且返回类型是CompletableFuture
或返回类型是CompletableFuture
的子类时,流程将在调用者的线程上被调用。在这种情况下,下游流程应返回一个适当类型的CompletableFuture
。
使用场景
在以下场景中,调用者线程立即返回一个 CompletableFuture<Invoice>
,当下游流程回复网关(使用一个 Invoice
对象)时,该对象完成。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流将其作为回复网关的有效负载提供时,调用线程返回一个 CompletableFuture<Invoice>
。必须有其他进程在发票准备好时完成此未来任务。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流程将其作为回复给网关的有效负载时,调用线程返回一个 CompletableFuture<Invoice>
。必须有其他进程在发票准备好时完成该未来对象。如果启用了 DEBUG
日志记录,则会生成一条日志条目,指示异步执行器不能用于此场景。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可以用于对回复进行额外的处理,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器 Mono
从 5.0 版本开始,GatewayProxyFactoryBean
允许在网关接口方法中使用 Project Reactor,使用 Mono<T> 返回类型。内部的 AsyncInvocationTask
被包装在 Mono.fromCallable()
中。
Mono
可用于在稍后检索结果(类似于 Future<?>
),或者你可以通过调度器从它消费,方法是在结果返回到网关时调用你的 Consumer
。
Mono
不会立即被框架刷新。因此,在网关方法返回之前,底层的消息流不会启动(而 Future<?>
Executor
任务会)。当 Mono
被订阅时,流才会开始。或者,Mono
(作为一个“可组合对象”)可能是 Reactor 流的一部分,这时的 subscribe()
与整个 Flux
相关。以下示例展示了如何使用 Project Reactor 创建一个网关:
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中这样的网关可以在某些处理 Flux
数据流的服务中使用:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,在流完成时会调用 handleInvoice()
。
也请参阅 Kotlin 协程 以获取更多信息。
返回异步类型的下游流
如上文AsyncTaskExecutor部分所述,如果你希望某些下游组件返回带有异步负载(Future
、Mono
等)的消息,则必须显式地将异步执行器设置为 null
(或在使用 XML 配置时设置为 ""
)。然后,流程将在调用线程上被调用,并且可以在之后获取结果。
异步 void
返回类型
消息网关方法可以这样声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会被传播回调用者。为了确保下游流程调用的异步行为和异常向调用者的传播,从 6.0 版本开始,框架提供了对 Future<Void>
和 Mono<Void>
返回类型的 支持。使用场景类似于之前描述的简单 void
返回类型 的发送后即忘行为,但区别在于流程执行是异步发生的,并且返回的 Future
(或 Mono
)根据 send
操作的结果以 null
或异常完成。
如果 Future<Void>
是精确的下游流回复,则网关的 asyncExecutor
选项必须设置为 null(对于 @MessagingGateway
配置,使用 AnnotationConstants.NULL
),并且 send
部分是在生产者线程上执行的。回复取决于下游流配置。这样,由目标应用程序来正确生成 Future<Void>
回复。Mono
的用例已经超出了框架的线程控制,因此将 asyncExecutor
设置为 null 没有意义。这里请求-回复网关操作的结果为 Mono<Void>
必须配置为网关方法的返回类型为 Mono<?>
。
当没有响应到达时的网关行为
如前面所述,网关通过 POJO 方法调用提供了一种与消息系统交互的便捷方式。然而,通常期望总是返回的方法调用(即使带有异常),可能并不总是与消息交换一一对应(例如,回复消息可能不会到达——相当于方法没有返回)。
本节的其余部分涵盖了各种场景以及如何使网关的行为更加可预测。可以配置某些属性以使同步网关行为更加可预测,但其中一些可能并不总是如你期望的那样工作。其中一个属性是 reply-timeout
(在方法级别或网关级别的 default-reply-timeout
)。我们研究 reply-timeout
属性,以了解它在不同场景中如何影响以及无法影响同步网关的行为。我们研究了单线程场景(所有下游组件都通过直接通道连接)和多线程场景(例如,在某个下游位置,你可能有一个可轮询的或执行程序通道,这会打破单线程边界)。
长时间运行的下游进程
Sync Gateway,单线程
如果下游组件仍在运行(可能是由于无限循环或缓慢的服务),设置 reply-timeout
没有效果,网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。
Sync Gateway,多线程
如果下游组件仍在运行(可能是由于无限循环或缓慢的服务)在多线程消息流中,设置 reply-timeout
会在网关方法调用达到超时时间后返回,因为 GatewayProxyFactoryBean
会在回复通道上轮询,等待消息直到超时过期。然而,如果在实际回复生成之前已经达到了超时时间,则可能导致网关方法返回 'null'。您应该了解,回复消息(如果已生成)是在网关方法调用可能已经返回之后发送到回复通道的,因此您必须意识到这一点,并在设计流程时考虑到这一点。
也请参见 errorOnTimeout
属性,用于在超时发生时抛出 MessageTimeoutException
而不是返回 null
。
下游组件返回 'null'
Sync Gateway — 单线程
如果下游组件返回 'null' 并且 reply-timeout
已配置为负值,则网关方法调用将无限期挂起,除非在可能返回 'null' 的下游组件(例如,服务激活器)上设置了 requires-reply
属性。在这种情况下,会抛出异常并传播到网关。
Sync Gateway — 多线程
行为与前一种情况相同。
下游组件返回签名是 'void' 而网关方法签名是非 void
Sync Gateway — 单线程
如果下游组件返回 'void' 并且 reply-timeout
被配置为负值,网关方法调用将无限期挂起。
Sync Gateway — 多线程
行为与前面的情况相同。
下游组件导致运行时异常
Sync Gateway — 单线程
如果下游组件抛出运行时异常,该异常会通过错误消息传播回网关并重新抛出。
同步网关 — 多线程
行为与前一种情况相同。
你应该理解,默认情况下,reply-timeout
是无界的。因此,如果你将 reply-timeout
设置为负值,网关方法调用可能会无限期挂起。所以,为了确保你分析了你的流程,并且即使有这些场景之一发生的可能性,你也应该将 reply-timeout
属性设置为一个“安全”的值。默认是 30 秒。更好的做法是,你可以将下游组件的 requires-reply
属性设置为 'true',以确保及时响应,因为一旦该下游组件内部返回 null 就会抛出异常。然而,你也应该意识到有一些场景(参见第一个)中 reply-timeout
并不能提供帮助。这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。如之前所述,后者涉及定义返回 Future
实例的网关方法。这样可以保证接收到返回值,并且对调用结果有更细粒度的控制。另外,在处理路由器时,你应该记住将 resolution-required
属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。同样地,在处理过滤器时,你可以设置 throw-exception-on-rejection
属性。在这两种情况下,产生的流程行为类似于包含带有 'requires-reply' 属性的服务激活器。换句话说,这有助于确保网关方法调用的及时响应。
你应该理解,计时器是在线程返回网关时启动的 —— 也就是说,在流程完成或消息被交给另一个线程时启动。在那时,调用线程开始等待回复。如果流程是完全同步的,回复将立即可用。对于异步流程,线程将等待最多这段时间。
从 6.2 版本开始,MessagingGatewaySupport
内部 MethodInvocationGateway
扩展的 errorOnTimeout
属性在 @MessagingGateway
和 GatewayEndpointSpec
上公开。此选项与任何入站网关中的含义完全相同,在 端点摘要 章节末尾有解释。换句话说,将此选项设置为 true
,会在发送和接收网关操作中抛出 MessageTimeoutException
,而不是在接收超时耗尽时返回 null
。
参见 Java DSL 章节中的 集成流作为网关,了解通过 IntegrationFlow
定义网关的选项。