提供的建议类
除了提供应用AOP通知类的通用机制外,Spring Integration还提供了以下开箱即用的通知实现:
-
RequestHandlerRetryAdvice(在重试建议中描述) -
RequestHandlerCircuitBreakerAdvice(在断路器建议中描述) -
ExpressionEvaluatingRequestHandlerAdvice(在表达式建议中描述) -
RateLimiterRequestHandlerAdvice(在限流器建议中描述) -
CacheRequestHandlerAdvice(在缓存建议中描述) -
ReactiveRequestHandlerAdvice(在响应式建议中描述) -
ContextHolderRequestHandlerAdvice(在上下文持有者建议中描述) -
LockRequestHandlerAdvice(在锁建议中描述)
重试建议
重试建议(o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了 Spring Framework 中重试支持提供的丰富重试机制。该建议的核心组件是 RetryTemplate,它允许配置复杂的重试场景,包括 RetryPolicy 以及用于确定重试耗尽时采取何种操作的 RecoveryCallback 策略。
无状态重试
无状态重试是指重试活动完全在通知内部处理的情况。线程会暂停(如果配置为这样做)并重试操作。
有状态重试
有状态重试是指重试状态在通知内部管理,但抛出异常并由调用方重新提交请求的情况。有状态重试的一个例子是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。有状态重试需要某种机制来检测重试提交。为此,RequestHandlerRetryAdvice 暴露了 stateKeyFunction、newMessagePredicate 和 stateCacheSize 属性。其中后两个属性仅在第一个属性被提供时才有意义。本质上,stateKeyFunction 是将 RequestHandlerRetryAdvice 逻辑从无状态切换到有状态的指示器。newMessagePredicate 的含义是根据待处理的消息来刷新基于该键的现有重试状态。stateCacheSize 默认为 100,当更多新的重试状态到来时,较早的条目会从缓存中移除。也许这些旧消息不再从上游流中重新传递,例如,消息代理根据其重传递策略将这些消息放入死信队列。
默认的回退行为是不进行回退。重试会立即执行。使用导致线程在尝试之间暂停的回退策略可能会引发性能问题,包括内存使用过度和线程饥饿。在高负载环境中,应谨慎使用回退策略。
配置重试通知
本节中的示例使用以下始终抛出异常的 @ServiceActivator:
public class FailingService {
@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
public void service(String message) {
throw new RuntimeException("error");
}
}
简单无状态重试
默认的 RetryPolicy 会为目标 MessageHandler 重试三次,加上原始调用。默认情况下没有退避机制,因此三次尝试会连续进行,尝试之间没有延迟。没有配置 RecoveryCallback,所以在最终重试失败后会将异常抛给调用方。在 Spring Integration 环境中,可以通过入站端点的 error-channel 来处理这个最终异常。以下示例展示了 RequestHandlerRetryAdvice 的默认配置:
@Bean
RequestHandlerRetryAdvice retryAdvice() {
return new RequestHandlerRetryAdvice();
}
简单的无状态重试与恢复
以下示例在前例基础上添加了 RecoveryCallback,并使用 ErrorMessageSendingRecoverer 向通道发送 ErrorMessage:
@Bean
RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel));
return requestHandlerRetryAdvice;
}
无状态重试与自定义策略及恢复
为了更精细的控制,可以为 RequestHandlerRetryAdvice 提供自定义的 RetryPolicy。本示例继续使用简单的 RetryPolicy,但将重试次数增加到四次。同时添加了 ExponentialBackoff 策略,其中第一次重试等待 1 秒,第二次等待 5 秒,第三次等待 25 秒(总计四次尝试)。以下清单展示了这种配置的示例:
@Bean
RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxRetries(4)
.delay(Duration.ofSeconds(1))
.multiplier(5.0)
.maxDelay(Duration.ofMinutes(1))
.build();
requestHandlerRetryAdvice.setRetryPolicy(retryPolicy);
return requestHandlerRetryAdvice;
}
无状态重试的命名空间支持
以下示例展示了如何使用 Spring Integration XML 命名空间及其自定义标签来配置 RequestHandlerRetryAdvice:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<ref bean="retrier" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:handler-retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>
在前面的示例中,建议被定义为一个顶级 bean,以便可以在多个 request-handler-advice-chain 实例中使用。你也可以直接在链中定义建议,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<int:retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel">
<int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:retry-advice>
</int:request-handler-advice-chain>
</int:service-activator>
<handler-retry-advice> 可以包含一个 <fixed-back-off> 或 <exponential-back-off> 子元素,也可以不包含任何子元素。不包含子元素的 <handler-retry-advice> 不使用退避策略。如果没有 recovery-channel,当重试次数耗尽时,将抛出异常。该命名空间仅适用于无状态重试。
对于更复杂的环境(自定义策略等),请使用常规的 <bean> 定义。
带恢复功能的简单有状态重试
为了使重试具备状态,必须为 RequestHandlerRetryAdvice 实例提供一个 Function<Message<?>, Object> stateKeyFunction。此函数用于将消息识别为重新提交,以便 RequestHandlerRetryAdvice 能够确定此消息的当前重试状态。有状态重试背后的理念是不阻塞当前线程,而是缓存此消息的重试状态,并将 MessageHandler 的失败重新抛给调用者。通常,这与能够重新提交(或重新投递)事件的消息发起者配合良好,例如,具有 nack 功能的 RabbitMQ 等消息代理,或具有 seek 功能的 Apache Kafka;或者在消费回滚后的 JMS。如果尚未缓存状态(或者 Predicate<Message<?>> newMessagePredicate 对此消息返回 true),则 MessageHandler 调用被视为首次调用,并在其失败时,基于 BackOffExecution 的内部 RetryState 会在上述键下被缓存。当下一条消息到达时,缓存的状态会在尝试调用 MessageHandler 之前为 Thread.sleep() 提供一个退避间隔。如果此退避间隔等于 BackOffExecution.STOP(例如,已达到 maxAttempts),则意味着不再对此消息进行重试:整个重试周期被视为已耗尽,相应的 RetryException 将被抛回给调用者,或者如果提供了 RecoveryCallback,则用于调用它。总的来说,异常处理逻辑和退避执行与无状态行为类似,唯一的区别是线程不会因所有 maxAttempts 而被阻塞。由消息发起者负责为下一次重试调用重新投递消息。
断路器建议
断路器模式的基本思想是,如果某个服务当前不可用,就不要浪费时间和资源去尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了这一模式。当断路器处于闭合状态时,端点会尝试调用服务。如果连续多次尝试失败,断路器将切换到断开状态。在断开状态下,新的请求会“快速失败”,并且在指定时间到期之前,不会尝试调用服务。
当该时间到期后,断路器会切换到半开状态。在此状态下,即使只有一次尝试失败,断路器也会立即跳转到开路状态。如果尝试成功,断路器将进入闭合状态,此时除非再次达到配置的连续失败次数,否则不会重新进入开路状态。任何成功的尝试都会将状态重置为零次失败,以重新计算断路器何时可能再次进入开路状态。
通常,这条建议适用于外部服务,因为外部服务可能需要一些时间才会失败,例如尝试建立网络连接时发生超时。
RequestHandlerCircuitBreakerAdvice 有两个属性:threshold 和 halfOpenAfter。threshold 属性表示断路器在跳闸(打开)之前需要发生的连续故障次数,默认值为 5。halfOpenAfter 属性表示断路器在最后一次故障后等待尝试下一个请求的时间,默认值为 1000 毫秒。
以下示例配置了一个断路器,并展示了其 DEBUG 和 ERROR 输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的例子中,阈值设置为 2,halfOpenAfter 设置为 12 秒。每 5 秒会有一个新的请求到达。前两次尝试调用了服务。第三次和第四次尝试因断路器处于打开状态而失败并抛出异常。第五次请求被尝试,因为该请求距离上一次失败已经过去了 15 秒。第六次尝试立即失败,因为断路器立即又回到了打开状态。
表达式求值建议
最终提供的建议类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。这个建议类比其他两个建议类更为通用。它提供了一种机制,用于对发送到端点的原始入站消息评估表达式。无论成功或失败后,都可以评估不同的表达式。可选地,可以将包含评估结果的消息与输入消息一起发送到消息通道。
这条建议的一个典型用例可能是与 <ftp:outbound-channel-adapter/> 结合使用,例如在传输成功时将文件移至一个目录,或在传输失败时将其移至另一个目录:
该建议具有以下属性:可设置成功时的表达式、失败时的表达式,以及各自对应的通道。对于成功情况,发送到 successChannel 的消息是一个 AdviceMessage,其有效负载为表达式求值的结果。另一个名为 inputMessage 的属性包含发送给处理器的原始消息。发送到 failureChannel 的消息(当处理器抛出异常时)是一个 ErrorMessage,其有效负载为 MessageHandlingExpressionEvaluatingAdviceException。与所有 MessagingException 实例一样,此有效负载具有 failedMessage 和 cause 属性,以及一个名为 evaluationResult 的附加属性,该属性包含表达式求值的结果。
从版本 5.1.3 开始,如果配置了通道但未提供表达式,将使用默认表达式来评估消息的 payload。
当在通知作用域内抛出异常时,默认情况下,该异常会在所有 failureExpression 评估完成后抛给调用方。若希望抑制异常抛出,可将 trapException 属性设置为 true。以下示例展示了如何通过 Java DSL 配置 advice:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议(RateLimiterRequestHandlerAdvice)用于确保端点不会因请求过多而过载。当超出速率限制时,请求将进入阻塞状态。
这条建议的一个典型用例可能是外部服务提供商不允许每分钟超过 n 次请求。
RateLimiterRequestHandlerAdvice 的实现完全基于 Resilience4j 项目,需要注入 RateLimiter 或 RateLimiterConfig。也可使用默认值和/或自定义名称进行配置。
以下示例配置了一个限流器通知,限制为每秒一个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5.2 开始,引入了 CacheRequestHandlerAdvice。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注解系列提供的概念和功能保持一致。其内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作的代理围绕 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法进行,并以请求 Message<?> 作为参数。此通知可以通过 SpEL 表达式或 Function 来配置以评估缓存键。请求 Message<?> 可作为 SpEL 评估上下文的根对象,或作为 Function 的输入参数。默认情况下,请求消息的 payload 用于缓存键。当默认缓存操作为 CacheableOperation 时,CacheRequestHandlerAdvice 必须配置 cacheNames,或者配置一组任意的 CacheOperation。每个 CacheOperation 可以单独配置,或者共享选项,如 CacheManager、CacheResolver 和 CacheErrorHandler,可以从 CacheRequestHandlerAdvice 配置中重用。此配置功能类似于 Spring Framework 的 @CacheConfig 和 @Caching 注解组合。如果未提供 CacheManager,默认情况下会从 CacheAspectSupport 中的 BeanFactory 解析单个 bean。
以下示例配置了两个具有不同缓存操作集合的通知:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}