提供的建议类
除了提供应用 AOP 建议类的一般机制外,Spring Integration 还提供了这些开箱即用的建议实现:
-
RequestHandlerRetryAdvice
(描述见重试建议) -
RequestHandlerCircuitBreakerAdvice
(描述见断路器建议) -
ExpressionEvaluatingRequestHandlerAdvice
(描述见表达式建议) -
RateLimiterRequestHandlerAdvice
(描述见速率限制建议) -
CacheRequestHandlerAdvice
(描述见缓存建议) -
ReactiveRequestHandlerAdvice
(描述见反应式建议) -
ContextHolderRequestHandlerAdvice
(描述见上下文持有者建议)
重试建议
重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice
) 利用了 Spring Retry 项目提供的丰富的重试机制。spring-retry
的核心组件是 RetryTemplate
,它允许配置复杂的重试场景,包括 RetryPolicy
和 BackoffPolicy
策略(具有多种实现)以及一个 RecoveryCallback
策略,以确定在重试耗尽时应采取的行动。
无状态重试
无状态重试是指重试活动完全在建议中处理。线程会暂停(如果配置为如此)并重试该操作。
有状态重试
有状态重试是指重试状态在建议中进行管理,但发生异常时由调用者重新提交请求的情况。有状态重试的一个例子是我们希望消息发起者(例如,JMS)负责重新提交,而不是在当前线程上执行它。有状态重试需要某种机制来检测已重试的提交。
有关 spring-retry
的更多信息,请参阅 项目的 Javadoc 和 Spring Batch 的参考文档,spring-retry
最初就是从 Spring Batch 中衍生出来的。
默认的退避行为是不进行退避。重试会立即尝试。使用导致线程在尝试之间暂停的退避策略可能会引起性能问题,包括过度的内存使用和线程饥饿。在高流量环境中,应谨慎使用退避策略。
配置重试建议
本节中的示例使用了以下总是抛出异常的 <service-activator>
:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
简单无状态重试
默认的 RetryTemplate
使用一个 SimpleRetryPolicy
,它会尝试三次。没有 BackOffPolicy
,因此三次尝试是连续进行的,在尝试之间没有延迟。没有 RecoveryCallback
,所以在最终重试失败后,结果是将异常抛给调用者。在 Spring Integration 环境中,这个最终异常可能会通过在入站端点上使用 error-channel
来处理。以下示例使用 RetryTemplate
并显示其 DEBUG
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
</int:request-handler-advice-chain>
</int:service-activator>
DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
简单的无状态重试与恢复
以下示例在前面的示例中添加了一个 RecoveryCallback
,并使用 ErrorMessageSendingRecoverer
将 ErrorMessage
发送到一个通道:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
无状态重试与自定义策略及恢复
为了更复杂的情况,我们可以提供一个自定义的 RetryTemplate
来进行重试建议。此示例继续使用 SimpleRetryPolicy
,但将尝试次数增加到四次。它还添加了一个 ExponentialBackoffPolicy
,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。以下列表显示了示例及其 DEBUG
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
<property name="retryTemplate" ref="retryTemplate" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="4" />
</bean>
</property>
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="1000" />
<property name="multiplier" value="5.0" />
<property name="maxInterval" value="60000" />
</bean>
</property>
</bean>
27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
命名空间对无状态重试的支持
从 4.0 版开始,由于重试建议支持命名空间,前面的配置可以大大简化,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<ref bean="retrier" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>
在前面的例子中,建议被定义为顶级bean,以便可以在多个 request-handler-advice-chain
实例中使用。你也可以直接在链中定义建议,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:retry-advice>
</int:request-handler-advice-chain>
</int:service-activator>
一个 <handler-retry-advice>
可以有一个 <fixed-back-off>
或 <exponential-back-off>
子元素,或者没有子元素。一个没有子元素的 <handler-retry-advice>
不使用任何退避策略。如果没有 recovery-channel
,当重试次数用尽时会抛出异常。该命名空间只能与无状态重试一起使用。
对于更复杂的环境(自定义策略等),使用正常的 <bean>
定义。
简单的有状态重试与恢复
要使重试具有状态性,我们需要为建议提供一个 RetryStateGenerator
实现。这个类用于识别消息是否为重新提交,以便 RetryTemplate
可以确定此消息的当前重试状态。框架提供了一个 SpelExpressionRetryStateGenerator
,它通过使用 SpEL 表达式来确定消息标识符。此示例再次使用默认策略(三次尝试且不进行退避)。与无状态重试一样,这些策略也可以自定义。以下列表显示了示例及其 DEBUG
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
<property name="retryStateGenerator">
<bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
<constructor-arg value="headers['jms_messageId']" />
</bean>
</property>
<property name="recoveryCallback">
<bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="myErrorChannel" />
</bean>
</property>
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果你将前面的示例与无状态示例进行比较,你可以看到,使用有状态重试时,每次失败都会将异常抛给调用者。
异常分类以供重试
Spring Retry 在确定哪些异常可以触发重试方面具有很大的灵活性。默认配置会对所有异常进行重试,并且异常分类器会查看顶级异常。例如,如果你将其配置为仅在 MyException
上重试,而你的应用程序抛出了一个 SomeOtherException
,其中的原因是 MyException
,则不会发生重试。
自从 Spring Retry 1.0.3 以来,BinaryExceptionClassifier
有一个名为 traverseCauses
的属性(默认值为 false
)。当设置为 true
时,它会遍历异常原因,直到找到匹配项或遍历完所有原因。
要使用此分类器进行重试,使用带有 max attempts 、 Map
的 Exception
对象和 traverseCauses
布尔值的 SimpleRetryPolicy
构造函数创建一个 SimpleRetryPolicy
。然后你可以将此策略注入到 RetryTemplate
中。
在这种情况下,traverseCauses
是必需的,因为用户异常可能被包装在 MessagingException
中。
断路器建议
断路器模式的一般思想是,如果一个服务当前不可用,则不要浪费时间(和资源)尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
实现了这一模式。当断路器处于闭合状态时,端点会尝试调用服务。如果连续一定次数的尝试失败,断路器将进入打开状态。当它处于打开状态时,新的请求会“快速失败”,并且直到经过一段时间之前不会尝试调用服务。
当该时间到期后,断路器将设置为半开状态。在这种状态下,如果哪怕是一次尝试失败,断路器会立即进入打开状态。如果尝试成功,断路器则进入关闭状态,在这种情况下,它不会再次进入打开状态,直到再次发生配置的连续失败次数。任何成功的尝试都会将失败次数重置为零,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,这些服务可能会花费一些时间才能失败(例如,尝试建立网络连接时超时)。
RequestHandlerCircuitBreakerAdvice
有两个属性:threshold
和 halfOpenAfter
。threshold
属性表示在断路器打开之前需要连续发生的故障次数。默认值为 5
。halfOpenAfter
属性表示断路器在最后一次故障后等待多久再尝试另一请求的时间。默认是 1000 毫秒。
以下示例配置了一个熔断器并显示其 DEBUG
和 ERROR
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的例子中,阈值设置为 2
,halfOpenAfter
设置为 12 秒。每 5 秒会有一个新的请求到达。前两次尝试调用了服务。第三次和第四次由于断路器打开而抛出异常失败。第五次请求是在上次失败后 15 秒发出的,因此进行了尝试。第六次尝试立即失败,因为断路器立刻转为打开状态。
表达式评估建议
提供的最后一个建议类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
。这个建议比其他两个建议更为通用。它提供了一种机制,可以在发送到端点的原始入站消息上评估表达式。在成功或失败后,可以分别评估不同的表达式。可选地,可以将包含评估结果的消息连同输入消息一起发送到消息通道。
此建议的一个典型用例可能是与 <ftp:outbound-channel-adapter/>
一起使用,也许是在传输成功时将文件移动到一个目录,或者在传输失败时移动到另一个目录:
建议有属性用于设置成功时的表达式、失败时的表达式,以及每个表达式对应的通道。对于成功的情况,发送到 successChannel
的消息是一个 AdviceMessage
,其有效负载是表达式求值的结果。一个额外的属性,称为 inputMessage
,包含发送到处理器的原始消息。当处理器抛出异常时,发送到 failureChannel
的消息是一个 ErrorMessage
,其有效负载为 MessageHandlingExpressionEvaluatingAdviceException
。像所有 MessagingException
实例一样,此有效负载具有 failedMessage
和 cause
属性,以及一个额外的属性,称为 evaluationResult
,它包含表达式求值的结果。
从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式来评估消息的 payload
。
当在建议的作用范围内抛出异常时,默认情况下,该异常会在评估任何 failureExpression
后抛给调用者。如果你想抑制抛出异常,将 trapException
属性设置为 true
。以下建议展示了如何使用 Java DSL 配置一个 advice
:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议 (RateLimiterRequestHandlerAdvice
) 可以确保一个端点不会因请求过多而过载。当超过速率限制时,请求将进入被阻止状态。
此建议的一个典型用例可能是外部服务提供商不允许每分钟超过 n
个请求。
RateLimiterRequestHandlerAdvice
实现完全基于 Resilience4j 项目,并需要注入 RateLimiter
或 RateLimiterConfig
。也可以使用默认配置和/或自定义名称进行配置。
以下示例配置了一个速率限制建议,每 1 秒 一个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从 5.2 版本开始,引入了 CacheRequestHandlerAdvice
。它是基于 Spring Framework 中的缓存抽象,并与 @Caching
注解系列提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport
扩展,其中缓存操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法周围进行的,该方法以请求 Message<?>
作为参数。此建议可以通过 SpEL 表达式或 Function
来评估缓存键进行配置。请求 Message<?>
可用作 SpEL 评估上下文中的根对象,或作为 Function
输入参数。默认情况下,使用请求消息的 payload
作为缓存键。CacheRequestHandlerAdvice
必须配置 cacheNames
,当默认缓存操作是 CacheableOperation
时,或者使用任意一组 CacheOperation
进行配置。每个 CacheOperation
可以单独配置或共享选项,例如 CacheManager
、CacheResolver
和 CacheErrorHandler
,这些可以从 CacheRequestHandlerAdvice
配置中重用。此配置功能类似于 Spring Framework 的 @CacheConfig
和 @Caching
注解组合。如果没有提供 CacheManager
,则默认从 CacheAspectSupport
中的 BeanFactory
解析一个单例 bean。
以下示例配置了两个具有不同缓存操作集的建议:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}