弹性:从错误和 Broker 故障中恢复
Spring AMQP 提供的一些关键(也是最受欢迎的)高级特性与在协议错误或代理故障情况下的恢复和自动重新连接有关。在本指南中,我们已经看到了所有相关的组件,但在这里将它们整合在一起并单独指出这些特性和恢复场景应该会有所帮助。
主要的重连功能由 CachingConnectionFactory
本身启用。通常使用 RabbitAdmin
的自动声明功能也是有益的。此外,如果你关心消息的可靠传递,你可能还需要在 RabbitTemplate
和 SimpleMessageListenerContainer
中使用 channelTransacted
标志,并在 SimpleMessageListenerContainer
中使用 AcknowledgeMode.AUTO
(或者如果你自己处理确认,则使用手动模式)。
自动声明交换器、队列和绑定
RabbitAdmin
组件可以在启动时声明交换器(exchanges)、队列(queues)和绑定(bindings)。它是通过一个 ConnectionListener
懒加载地完成这些操作的。因此,即使启动时代理(broker)不存在,也不会影响。第一次使用 Connection
时(例如,发送消息),监听器会触发,并且管理功能会生效。在监听器中执行自动声明的另一个好处是,如果连接由于任何原因断开(例如,代理崩溃、网络故障等),当连接重新建立时,这些声明会再次应用。
以这种方式声明的队列必须具有固定的名称——要么是显式声明的,要么是由框架为 AnonymousQueue
实例生成的。匿名队列是非持久的、独占的且自动删除的。
只有在 CachingConnectionFactory
的缓存模式为 CHANNEL
(默认模式)时,才会执行自动声明。此限制存在的原因是独占和自动删除队列绑定到连接。
从 2.2.2 版本开始,RabbitAdmin
将检测类型为 DeclarableCustomizer
的 bean,并在实际处理声明之前应用该函数。例如,这在框架内首次支持新参数(属性)之前设置新参数时非常有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
这在那些不直接提供对 Declarable
bean 定义访问的项目中也很有用。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的故障及重试选项
如果你在使用 RabbitTemplate
时在同步序列中失去了与 broker 的连接,Spring AMQP 会抛出一个 AmqpException
(通常但不总是 AmqpIOException
)。我们不会试图隐藏问题的存在,因此你必须能够捕获并响应这个异常。如果你怀疑连接丢失(并且这不是你的错),最简单的做法是重试该操作。你可以手动重试,或者考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了几个 AOP 拦截器以及大量的灵活性来指定重试的参数(重试次数、异常类型、回退算法等)。Spring AMQP 还为 AMQP 用例提供了一些方便的工厂 Bean,以便以方便的形式创建 Spring Retry 拦截器,并提供了强类型的回调接口,您可以使用这些接口来实现自定义的恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。如果不存在事务,或者事务在重试回调内部启动,则无状态重试是合适的。请注意,无状态重试比有状态重试更简单配置和分析,但如果有正在进行的事务必须回滚或肯定会回滚,则通常不适合使用无状态重试。事务中间断开的连接应该与回滚具有相同的效果。因此,对于在堆栈较高层启动事务的重新连接,有状态重试通常是最佳选择。
有状态重试需要一个机制来唯一标识消息。最简单的方法是让发送者在 MessageId
消息属性中放入一个唯一值。提供的消息转换器提供了一个选项来实现这一点:您可以将 createMessageIds
设置为 true
。否则,您可以将 MessageKeyGenerator
实现注入到拦截器中。密钥生成器必须为每条消息返回一个唯一的密钥。在 2.0 版本之前,提供了一个 MissingMessageIdAdvice
。它允许没有 messageId
属性的消息被精确地重试一次(忽略重试设置)。由于 spring-retry
1.2 版本的功能已内置到拦截器和消息监听器容器中,因此不再提供此建议。
为了向后兼容,默认情况下,带有空消息 ID 的消息会被消费者视为致命错误(消费者会停止,重试一次后)。要复制 MissingMessageIdAdvice
提供的功能,你可以在监听器容器上设置 statefulRetryFatalWithNullMessageId
属性为 false
。通过此设置,消费者将继续运行,并且消息会被拒绝(重试一次后)。消息将被丢弃或路由到死信队列(如果配置了死信队列)。
从 1.3 版本开始,提供了一个 builder API,通过在 Java 中使用(在 @Configuration
类中)来帮助组装这些拦截器。以下示例展示了如何做到这一点:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有一部分重试能力可以通过这种方式配置。更高级的功能需要将 RetryTemplate
配置为 Spring bean。有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadoc。
批量监听器的重试机制
不建议为批量监听器配置重试机制,除非该批量消息是由生产者以单条记录的方式创建的。有关消费者和生产者创建的批量消息的信息,请参阅 批量消息。对于由消费者创建的批量消息,框架无法知道批量中的哪条消息导致了失败,因此在重试次数用尽后无法进行恢复。而对于由生产者创建的批量消息,由于实际上只有一条消息失败,因此可以恢复整个消息。应用程序可能希望通知自定义恢复器批量中失败发生的位置,或许可以通过设置抛出异常的索引属性来实现。
批量监听器的重试恢复器必须实现 MessageBatchRecoverer
接口。
消息监听器与异步情况
如果 MessageListener
由于业务异常而失败,该异常将由消息监听器容器处理,然后容器会继续监听下一条消息。如果失败是由于连接断开(非业务异常)引起的,那么为监听器收集消息的消费者必须被取消并重新启动。SimpleMessageListenerContainer
会无缝处理这种情况,并且会记录日志表明监听器正在重新启动。实际上,它会无限循环,尝试重新启动消费者。只有在消费者确实表现非常糟糕的情况下,它才会放弃。一个副作用是,如果容器启动时代理(broker)处于关闭状态,它会不断尝试,直到能够建立连接。
业务异常处理,与协议错误和连接中断不同,可能需要更多的思考和定制配置,特别是在使用事务或容器确认(acks)的情况下。在 2.8.x 版本之前,RabbitMQ 没有定义死信行为。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可能会被无限次地重新投递。为了限制客户端的重新投递次数,一种选择是在监听器的建议链中使用 StatefulRetryOperationsInterceptor
。该拦截器可以包含一个恢复回调,用于实现自定义的死信操作——无论这在您的特定环境中是何种合适的处理方式。
另一种替代方法是将容器的 defaultRequeueRejected
属性设置为 false
。这会导致所有失败的消息被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换器(dead letter exchange)。
或者,你可以抛出一个 AmqpRejectAndDontRequeueException
。这样做可以防止消息重新入队,无论 defaultRequeueRejected
属性的设置如何。
从 2.1 版本开始,引入了一个 ImmediateRequeueAmqpException
来执行完全相反的逻辑:无论 defaultRequeueRejected
属性的设置如何,消息都将被重新排队。
通常,这两种技术会结合使用。你可以在建议链中使用 StatefulRetryOperationsInterceptor
,并搭配一个抛出 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。当所有重试尝试都用尽时,MessageRecover
会被调用。RejectAndDontRequeueRecoverer
正是执行这一操作。默认的 MessageRecoverer
会消费掉错误的消息,并发出一个 WARN
级别的日志消息。
从 1.3 版本开始,提供了一个新的 RepublishMessageRecoverer
,允许在重试耗尽后重新发布失败的消息。
当恢复者消费了最终的异常时,如果配置了死信交换(dead letter exchange),代理(broker)将确认(ack)该消息,并且不会将其发送到死信交换。
当在消费者端使用 RepublishMessageRecoverer
时,接收到的消息在 receivedDeliveryMode
消息属性中具有 deliveryMode
。在这种情况下,deliveryMode
为 null
。这意味着在 broker 上使用的是 NON_PERSISTENT
传递模式。从版本 2.0 开始,如果 deliveryMode
为 null
,你可以配置 RepublishMessageRecoverer
来设置要重新发布的消息的 deliveryMode
。默认情况下,它使用 MessageProperties
的默认值 - MessageDeliveryMode.PERSISTENT
。
以下示例展示了如何将 RepublishMessageRecoverer
设置为恢复器:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
会在消息头中发布包含额外信息的消息,例如异常消息、堆栈跟踪、原始交换器和路由键。可以通过创建子类并重写 additionalHeaders()
方法来添加额外的头信息。deliveryMode
(或任何其他属性)也可以在 additionalHeaders()
中更改,如下例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪过大,可能会被截断;这是因为所有头部信息必须适合单个帧。默认情况下,如果堆栈跟踪会导致其他头部信息的可用空间少于 20,000 字节('headroom'),它将被截断。如果需要为其他头部信息分配更多或更少的空间,可以通过设置恢复器的 frameMaxHeadroom
属性来调整此行为。从版本 2.1.13 和 2.2.3 开始,异常消息也被纳入此计算中,堆栈跟踪的数量将使用以下算法最大化:
-
如果仅堆栈跟踪就会超过限制,异常消息头将被截断为 97 字节加上
…
,并且堆栈跟踪也会被截断。 -
如果堆栈跟踪较小,消息将被截断(加上
…
)以适应可用的字节数(但堆栈跟踪本身中的消息会被截断为 97 字节加上…
)。
每当发生任何类型的截断时,原始异常将被记录下来以保留完整的信息。评估在头部信息增强之后进行,因此可以在表达式中使用诸如异常类型之类的信息。
从版本 2.4.8 开始,错误交换和路由键可以作为 SpEL 表达式提供,其中 Message
是评估的根对象。
从 2.3.3 版本开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms
;它支持两种风格的发布者确认,并在返回之前等待确认(如果未确认或消息被返回,则抛出异常)。
如果确认类型为 CORRELATED
,子类还会检测是否有消息返回,并抛出 AmqpMessageReturnedException
;如果发布被拒绝确认,则会抛出 AmqpNackReceivedException
。
如果确认类型为 SIMPLE
,子类将在通道上调用 waitForConfirmsOrDie
方法。
有关确认和返回的更多信息,请参见发布者确认和返回。
从 2.1 版本开始,添加了一个 ImmediateRequeueMessageRecoverer
来抛出 ImmediateRequeueAmqpException
,该异常会通知监听器容器重新排队当前失败的消息。
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以触发重试方面具有很大的灵活性。默认配置会对所有异常进行重试。鉴于用户异常被包装在 ListenerExecutionFailedException
中,我们需要确保分类器能够检查异常的原因。默认的分类器仅查看顶层的异常。
自 Spring Retry 1.0.3 版本起,BinaryExceptionClassifier
新增了一个名为 traverseCauses
的属性(默认值为 false
)。当该属性设置为 true
时,它会遍历异常的因果链,直到找到匹配的异常或没有更多异常原因为止。
要将此分类器用于重试,您可以使用通过构造函数创建的 SimpleRetryPolicy
,该构造函数接受最大尝试次数、Exception
实例的 Map
以及布尔值(traverseCauses
),然后将此策略注入到 RetryTemplate
中。
通过 Broker 重试
从队列中死信的消息可以通过 DLX 重新路由后重新发布回该队列。这种重试行为由代理端通过 x-death
标头控制。有关此方法的更多信息,请参阅官方 RabbitMQ 文档。
另一种方法是从应用程序手动将失败的消息重新发布到原始交换机。从 4.0
版本开始,RabbitMQ 代理不再考虑从客户端发送的 x-death
头。本质上,任何来自客户端的 x-*
头都会被忽略。
为了缓解 RabbitMQ 代理的这种新行为,Spring AMQP 从 3.2 版本开始引入了一个 retry_count
头部。当该头部不存在且服务器端 DLX(死信交换机)生效时,x-death.count
属性会被映射到这个头部。当失败的消息被手动重新发布以进行重试时,必须手动增加 retry_count
头部的值。有关更多信息,请参阅 MessageProperties.incrementRetryCount()
的 JavaDocs。
以下示例总结了一种在 broker 上进行手动重试的算法:
@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}