配置
从版本 2.9 开始,对于默认配置,应在 @Configuration
注解的类中使用 @EnableKafkaRetryTopic
注解。这使得该功能能够正确启动,并提供访问在运行时查找该功能的一些组件的能力。
如果您添加了 @EnableKafkaRetryTopic
注解,则不必再添加 @EnableKafka
,因为 @EnableKafkaRetryTopic
是用 @EnableKafka
进行元注解的。
此外,从该版本开始,为了更高级地配置该功能的组件和全局功能,应该在 @Configuration
类中扩展 RetryTopicConfigurationSupport
类,并重写适当的方法。有关更多详细信息,请参阅 Configuring Global Settings and Features。
默认情况下,重试主题的容器将具有与主容器相同的并发性。从版本 3.0 开始,您可以为重试容器设置不同的 concurrency
(可以在注解中设置,也可以在 RetryTopicConfigurationBuilder
中设置)。
上述技术只能使用一种,并且只能有一个 @Configuration
类可以扩展 RetryTopicConfigurationSupport
。
使用 @RetryableTopic
注解
要为 @KafkaListener
注解的方法配置重试主题和 DLT,只需在其上添加 @RetryableTopic
注解,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
自 3.2 以来,@RetryableTopic
对类上的 @KafkaListener 的支持将是:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以在同一类中指定一个方法来处理 dlt 消息,通过使用 @DltHandler
注解进行标注。如果没有提供 DltHandler 方法,将创建一个默认的消费者,仅记录消费日志。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您不指定 kafkaTemplate 名称,将查找名为 defaultRetryTopicKafkaTemplate
的 bean。如果未找到 bean,将抛出异常。
从版本 3.0 开始,@RetryableTopic
注解可以作为自定义注解的元注解使用;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 RetryTopicConfiguration
beans
您还可以通过在 @Configuration
注解的类中创建 RetryTopicConfiguration
bean 来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为所有使用默认配置的 @KafkaListener
注解的方法创建重试主题和一个 DLT,以及相应的消费者。KafkaTemplate
实例是消息转发所必需的。
为了更细粒度地控制如何处理每个主题的非阻塞重试,可以提供多个 RetryTopicConfiguration
bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
重试主题和 DLT 的消费者将被分配到一个消费者组,该消费者组的组 ID 是您在 @KafkaListener
注解的 groupId
参数中提供的组 ID 与主题后缀的组合。如果您没有提供任何内容,它们将全部属于同一组,并且在重试主题上的重新平衡将导致主主题上的不必要重新平衡。
如果消费者配置了一个 ErrorHandlingDeserializer 来处理反序列化异常,那么配置 KafkaTemplate
及其生产者时,必须使用一个能够处理正常对象以及由于反序列化异常而产生的原始 byte[]
值的序列化器。模板的通用值类型应该是 Object
。一种技术是使用 DelegatingByTypeSerializer
;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener
注解可以用于同一个主题,无论是否手动分配分区,并且可以进行非阻塞重试,但对于给定主题,只会使用一个配置。最好为此类主题使用一个单独的 RetryTopicConfiguration
bean 进行配置;如果对同一个主题使用多个 @RetryableTopic
注解,所有注解的值应该相同,否则其中一个将应用于该主题的所有监听器,而其他注解的值将被忽略。
配置全局设置和功能
自 2.9 以来,之前用于配置组件的 bean 覆盖方法已被移除(没有弃用,因为上述 API 的实验性质)。这并不改变 RetryTopicConfiguration
bean 的方法——仅影响基础设施组件的配置。现在应该在一个(单一的) @Configuration
类中扩展 RetryTopicConfigurationSupport
类,并重写适当的方法。以下是一个示例:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
在使用这种配置方法时,应该避免使用 @EnableKafkaRetryTopic
注解,以防由于重复的 bean 导致上下文无法启动。请改用简单的 @EnableKafka
注解。
当 autoCreateTopics
为 true 时,主主题和重试主题将按照指定的分区数和复制因子创建。从版本 3.0 开始,默认的复制因子为 -1
,这意味着使用代理的默认值。如果您的代理版本早于 2.4,您需要设置一个明确的值。要为特定主题(例如主主题或 DLT)覆盖这些值,只需添加一个具有所需属性的 NewTopic
@Bean
;这将覆盖自动创建属性。
默认情况下,记录使用接收到的记录的原始分区发布到重试主题。如果重试主题的分区少于主主题,则应适当地配置框架;以下是一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
Request error occurred:
默认情况下,当记录在重试主题中转换时,所有重试头的值(尝试次数、时间戳)都会被保留。从版本 2.9.6 开始,如果您只想保留这些头的最后一个值,请使用上面显示的 configureDeadLetterPublishingContainerFactory()
方法将工厂的 retainAllRetryHeaderValues
属性设置为 false
。
查找 RetryTopicConfiguration
尝试通过从 @RetryableTopic
注解创建一个 RetryTopicConfiguration
实例,或者在没有可用注解的情况下从 bean 容器中获取一个实例。
如果在容器中发现了豆子,则会进行检查,以确定提供的话题是否应该由任何这样的实例处理。
如果提供了 @RetryableTopic
注解,将查找带有 DltHandler
注解的方法。
自 3.2 起,提供新的 API 来创建 RetryTopicConfiguration
,当 @RetryableTopic
注解在一个类上时:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}