跳到主要内容

特性

ChatGPT-4o-mini 中英对照 Features

大多数功能在 @RetryableTopic 注解和 RetryTopicConfiguration beans 中都是可用的。

BackOff 配置

BackOff 配置依赖于 Spring Retry 项目的 BackOffPolicy 接口。

它包括:

  • 固定退避

  • 指数退避

  • 随机指数退避

  • 均匀随机退避

  • 无退避

  • 自定义退避

@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
java

您还可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现:

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
java
备注

默认的退避策略是 FixedBackOffPolicy,最大尝试次数为 3 次,间隔为 1000ms。

备注

ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的退避策略需要更大的延迟值,请相应地调整 maxDelay 属性。

important

第一次尝试会计入 maxAttempts,因此如果你提供的 maxAttempts 值为 4,那么将会有一次原始尝试加上 3 次重试。

全局超时

您可以设置重试过程的全局超时时间。如果达到该时间,下次消费者抛出异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则结束处理。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
java
备注

默认情况下没有设置超时,这也可以通过提供 -1 作为超时值来实现。

异常分类器

您可以指定希望重试的异常以及不希望重试的异常。您还可以设置它以遍历原因以查找嵌套异常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
java
备注

默认行为是在所有异常上重试,并且不遍历原因。

自 2.8.3 版本以来,存在一个全局的致命异常列表,这些异常将导致记录被发送到 DLT,而不进行任何重试。有关致命异常的默认列表,请参见 DefaultErrorHandler。您可以通过在扩展 RetryTopicConfigurationSupport@Configuration 类中重写 configureNonBlockingRetries 方法来添加或移除此列表中的异常。有关更多信息,请参见 Configuring Global Settings and Features

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
java
备注

要禁用致命异常的分类,只需清除提供的列表。

包含和排除的主题

您可以通过 .includeTopic(String topic).includeTopics(Collection\<String> topics).excludeTopic(String topic).excludeTopics(Collection\<String> topics) 方法决定 RetryTopicConfiguration bean 将处理哪些主题以及不处理哪些主题。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
java
备注

默认行为是包含所有主题。

主题自动创建

除非另有说明,框架将使用 NewTopic beans 自动创建所需的主题,这些主题由 KafkaAdmin bean 消费。您可以指定主题创建时的分区数量和复制因子,并且可以关闭此功能。从版本 3.0 开始,默认的复制因子为 -1,这意味着使用代理的默认值。如果您的代理版本早于 2.4,您需要设置一个显式值。

important

请注意,如果您没有使用 Spring Boot,则必须提供一个 KafkaAdmin bean 才能使用此功能。

@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}

@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
java
备注

默认情况下,主题会自动创建,具有一个分区和 -1 的复制因子(意味着使用代理的默认值)。如果您的代理版本早于 2.4,您需要设置一个明确的值。

失败头管理

在考虑如何管理失败头(原始头和异常头)时,框架委托给 DeadLetterPublishingRecoverer 来决定是追加还是替换头。

默认情况下,它将 appendOriginalHeaders 显式设置为 false,并将 stripPreviousExceptionHeaders 保留为 DeadLetterPublishingRecover 使用的默认值。

这意味着在默认配置下,仅保留第一个 "original" 和最后一个异常头。这是为了避免在涉及多个重试步骤时创建过于庞大的消息(例如,由于堆栈跟踪头)。

请参见 Managing Dead Letter Record Headers 以获取更多信息。

要重新配置框架以使用这些属性的不同设置,请通过在扩展 RetryTopicConfigurationSupport@Configuration 类中重写 configureCustomizers 方法来配置 DeadLetterPublishingRecoverer 自定义器。有关更多详细信息,请参见 Configuring Global Settings and Features

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
java

从版本 2.8.4 开始,如果您希望添加自定义头(除了工厂添加的重试信息头),您可以向工厂添加一个 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })

默认情况下,添加的任何头信息将是累积的 - Kafka 头信息可以包含多个值。从版本 2.9.5 开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的头信息,则该头信息的任何现有值将被移除,仅保留新的单一值。

Custom DeadLetterPublishingRecoverer

如在 Failure Header Management 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer 实例。然而,对于某些用例,有必要子类化 DeadLetterPublishingRecoverer,例如重写 createProducerRecord() 以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以重写 RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法,以提供一个 DeadLetterPublisherCreator 实例,例如:

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {

return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
java

建议在构建自定义实例时使用提供的解析器。

基于抛出异常的消息路由到自定义 DLT

从版本 3.2.0 开始,可以根据处理过程中抛出的异常类型将消息路由到自定义 DLT。为了实现这一点,需要指定路由。路由定制包括额外目的地的指定。目的地又由两个设置组成:suffixexceptions。当在 exceptions 中指定的异常类型被抛出时,包含 suffix 的 DLT 将被视为消息的目标主题,优先于通用 DLT。以下是使用注解或 RetryTopicConfiguration beans 的配置示例:

@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
java

suffix 在自定义 DLT 名称中位于通用 dltTopicSuffix 之前。考虑到所提供的示例,导致 DeserializationException 的消息将被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定义 DLT 将遵循 Topics AutoCreation 中所述的相同规则进行创建。