跳到主要内容

DLT 策略

ChatGPT-4o-mini 中英对照 DLT Strategies

该框架提供了一些与 DLT 相关的工作策略。您可以提供一个 DLT 处理的方法,使用默认的日志记录方法,或者根本不使用 DLT。此外,您还可以选择如果 DLT 处理失败时会发生什么。

DLT 处理方法

您可以指定用于处理该主题的 DLT 的方法,以及如果该处理失败时的行为。

要做到这一点,您可以在带有 @RetryableTopic 注解的类的方法中使用 @DltHandler 注解。请注意,同一个方法将用于该类中所有带有 @RetryableTopic 注解的方法。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
java

可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供 DLT 处理程序方法,作为参数传递应处理 DLT 消息的 bean 名称和方法名称。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}

@Component
public class MyCustomDltProcessor {

private final MyDependency myDependency;

public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}

public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
java
备注

如果没有提供 DLT 处理程序,将使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod

从版本 2.8 开始,如果您不想在此应用程序中消费 DLT,包括通过默认处理程序(或者您希望推迟消费),您可以控制 DLT 容器是否启动,而不依赖于容器工厂的 autoStartup 属性。

使用 @RetryableTopic 注解时,将 autoStartDltHandler 属性设置为 false;使用配置构建器时,使用 autoStartDltHandler(false)

您可以稍后通过 KafkaListenerEndpointRegistry 启动 DLT 处理程序。

DLT 失败行为

如果 DLT 处理失败,有两种可能的行为可供选择:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

在前者中,记录被转发回 DLT 主题,以便不阻塞其他 DLT 记录的处理。在后者中,消费者结束执行而不转发消息。

@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
java
备注

默认行为是 ALWAYS_RETRY_ON_ERROR

important

从版本 2.8.3 开始,ALWAYS_RETRY_ON_ERROR 将不会将记录路由回 DLT,如果该记录导致抛出致命异常,例如 DeserializationException,因为通常情况下,这类异常将始终被抛出。

被认为是致命的异常包括:

  • 反序列化异常

  • 消息转换异常

  • 转换异常

  • 方法参数解析异常

  • 没有这样的构造方法异常

  • 类转换异常

您可以使用 DestinationTopicResolver bean 上的方法向此列表添加异常或从中删除异常。

请参见 Exception Classifier 以获取更多信息。

配置 No DLT

该框架还提供了不为主题配置 DLT 的可能性。在这种情况下,在重试耗尽后,处理将简单结束。

@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
java