非阻塞重试
版本 2.9 更改了引导基础设施 bean 的机制;请参阅 Configuration 了解现在引导该功能所需的两种机制。
实现非阻塞重试 / DLT 功能与 Kafka 通常需要设置额外的主题,并创建和配置相应的监听器。自 2.7 版本起,Spring for Apache Kafka 通过 @RetryableTopic
注解和 RetryTopicConfiguration
类提供对这一功能的支持,以简化该引导过程。
自 3.2 版本以来,Spring for Apache Kafka 支持使用 @KafkaListener on a Class 进行非阻塞重试。
不支持与 Batch Listeners 的非阻塞重试。
非阻塞重试不能与 容器事务 结合使用。
部分总结
📄️ 模式是如何工作的
如果消息处理失败,消息将被转发到一个重试主题,并带有一个退避时间戳。重试主题的消费者会检查时间戳,如果尚未到期,它会暂停该主题分区的消费。当时间到期时,分区消费将恢复,消息将再次被消费。如果消息处理再次失败,消息将被转发到下一个重试主题,并重复该模式,直到成功处理,或者尝试次数耗尽,消息将被发送到死信主题(如果已配置)。
📄️ 回退延迟精度
所有消息处理和退避都是由消费者线程处理的,因此,延迟精度在最佳努力的基础上得到保证。如果一条消息的处理时间超过该消费者的下一条消息的退避时间,则下一条消息的延迟将高于预期。此外,对于短延迟(大约 1s 或更短),线程必须执行的维护工作,例如提交偏移量,可能会延迟消息处理的执行。如果重试主题的消费者处理多个分区,精度也可能受到影响,因为我们依赖于从轮询中唤醒消费者并拥有完整的 pollTimeouts 来进行时间调整。
📄️ 配置
从版本 2.9 开始,对于默认配置,应该在一个带有 @Configuration 注解的类中使用 @EnableKafkaRetryTopic 注解。这使得该功能能够正确启动,并提供访问在运行时查找某些功能组件的能力。
📄️ 程序化构建
该功能旨在与 @KafkaListener 一起使用;然而,一些用户请求有关如何以编程方式配置非阻塞重试的信息。以下 Spring Boot 应用程序提供了如何做到这一点的示例。
📄️ 功能
大多数功能在 @RetryableTopic 注解和 RetryTopicConfiguration beans 中都是可用的。
📄️ 结合阻塞重试和非阻塞重试
从 2.8.4 开始,您可以配置框架同时使用阻塞和非阻塞重试。例如,您可以有一组异常,这些异常可能会在下一个记录上触发错误,例如 DatabaseAccessException,因此您可以在将相同记录发送到重试主题或直接发送到 DLT 之前重试几次。
📄️ 访问交付尝试
要访问阻塞和非阻塞的交付尝试,请在您的 @KafkaListener 方法签名中添加这些头部:
📄️ 主题命名
重试主题和 DLT 通过在主主题后添加提供的或默认值,并附加该主题的延迟或索引来命名。
📄️ 多个监听器,相同主题(们)
从版本 3.0 开始,现在可以在同一主题上配置多个监听器。为了做到这一点,您必须使用自定义主题命名来将重试主题彼此隔离。最好通过一个示例来说明这一点:
📄️ DLT 策略
该框架提供了一些与 DLT 相关的工作策略。您可以提供一个 DLT 处理的方法,使用默认的日志记录方法,或者根本不使用 DLT。此外,您还可以选择在 DLT 处理失败时发生什么。
📄️ 指定 ListenerContainerFactory
默认情况下,RetryTopic 配置将使用 @KafkaListener 注解中提供的工厂,但您可以指定一个不同的工厂来创建重试主题和 DLT 监听器容器。
📄️ 在运行时访问主题信息
自 2.9 版本以来,您可以通过注入提供的 DestinationTopicContainer bean 在运行时访问有关主题链的信息。该接口提供了查找链中下一个主题或主题的 DLT(如果已配置)的方式,以及一些有用的属性,例如主题的名称、延迟和类型。
📄️ 更改 KafkaBackOffException 日志级别
当重试主题中的消息尚未到达消费时间时,会抛出 KafkaBackOffException。此类异常默认以 DEBUG 级别记录,但您可以通过在 @Configuration 类中的 ListenerContainerFactoryConfigurer 中设置错误处理程序自定义器来更改此行为。