特性
大多数功能在 @RetryableTopic 注解和 RetryTopicConfiguration beans 中都是可用的。
BackOff 配置
BackOff 配置依赖于 Spring Retry 项目的 BackOffPolicy 接口。
它包括:
- 
固定退避
 - 
指数退避
 - 
随机指数退避
 - 
均匀随机退避
 - 
无退避
 - 
自定义退避
 
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}
您还可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
默认的退避策略是 FixedBackOffPolicy,最大尝试次数为 3 次,间隔为 1000ms。
ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的退避策略需要更大的延迟值,请相应地调整 maxDelay 属性。
第一次尝试会计入 maxAttempts,因此如果你提供的 maxAttempts 值为 4,那么将会有一次原始尝试加上 3 次重试。
全局超时
您可以设置重试过程的全局超时时间。如果达到该时间,下次消费者抛出异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
默认情况下没有设置超时,这也可以通过提供 -1 作为超时值来实现。
异常分类器
您可以指定希望重试的异常以及不希望重试的异常。您还可以设置它以遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
默认行为是在所有异常上重试,并且不遍历原因。
自 2.8.3 版本以来,存在一个全局的致命异常列表,这些异常将导致记录被发送到 DLT,而不进行任何重试。有关致命异常的默认列表,请参见 DefaultErrorHandler。您可以通过在扩展 RetryTopicConfigurationSupport 的 @Configuration 类中重写 configureNonBlockingRetries 方法来添加或移除此列表中的异常。有关更多信息,请参见 Configuring Global Settings and Features。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表。
包含和排除的主题
您可以通过 .includeTopic(String topic)、.includeTopics(Collection\<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection\<String> topics) 方法决定 RetryTopicConfiguration bean 将处理哪些主题以及不处理哪些主题。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
默认行为是包含所有主题。
主题自动创建
除非另有说明,框架将使用 NewTopic beans 自动创建所需的主题,这些主题由 KafkaAdmin bean 消费。您可以指定主题创建时的分区数量和复制因子,并且可以关闭此功能。从版本 3.0 开始,默认的复制因子为 -1,这意味着使用代理的默认值。如果您的代理版本早于 2.4,您需要设置一个显式值。
请注意,如果您没有使用 Spring Boot,则必须提供一个 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
默认情况下,主题会自动创建,具有一个分区和 -1 的复制因子(意味着使用代理的默认值)。如果您的代理版本早于 2.4,您需要设置一个明确的值。
失败头管理
在考虑如何管理失败头(原始头和异常头)时,框架委托给 DeadLetterPublishingRecoverer 来决定是追加还是替换头。
默认情况下,它将 appendOriginalHeaders 显式设置为 false,并将 stripPreviousExceptionHeaders 保留为 DeadLetterPublishingRecover 使用的默认值。
这意味着在默认配置下,仅保留第一个 "original" 和最后一个异常头。这是为了避免在涉及多个重试步骤时创建过于庞大的消息(例如,由于堆栈跟踪头)。
请参见 Managing Dead Letter Record Headers 以获取更多信息。
要重新配置框架以使用这些属性的不同设置,请通过在扩展 RetryTopicConfigurationSupport 的 @Configuration 类中重写 configureCustomizers 方法来配置 DeadLetterPublishingRecoverer 自定义器。有关更多详细信息,请参见 Configuring Global Settings and Features。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}
从版本 2.8.4 开始,如果您希望添加自定义头(除了工厂添加的重试信息头),您可以向工厂添加一个 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })。
默认情况下,添加的任何头信息将是累积的 - Kafka 头信息可以包含多个值。从版本 2.9.5 开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的头信息,则该头信息的任何现有值将被移除,仅保留新的单一值。
Custom DeadLetterPublishingRecoverer
如在 Failure Header Management 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer 实例。然而,对于某些用例,有必要子类化 DeadLetterPublishingRecoverer,例如重写 createProducerRecord() 以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以重写 RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法,以提供一个 DeadLetterPublisherCreator 实例,例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {
    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}
建议在构建自定义实例时使用提供的解析器。
基于抛出异常的消息路由到自定义 DLT
从版本 3.2.0 开始,可以根据处理过程中抛出的异常类型将消息路由到自定义 DLT。为了实现这一点,需要指定路由。路由定制包括额外目的地的指定。目的地又由两个设置组成:suffix 和 exceptions。当在 exceptions 中指定的异常类型被抛出时,包含 suffix 的 DLT 将被视为消息的目标主题,优先于通用 DLT。以下是使用注解或 RetryTopicConfiguration beans 的配置示例:
@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(template);
}
suffix 在自定义 DLT 名称中位于通用 dltTopicSuffix 之前。考虑到所提供的示例,导致 DeserializationException 的消息将被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定义 DLT 将遵循 Topics AutoCreation 中所述的相同规则进行创建。