跳到主要内容

处理异常

ChatGPT-4o-mini 中英对照 Handling Exceptions

本节描述了在使用 Spring for Apache Kafka 时如何处理可能出现的各种异常。

Listener Error Handlers

从版本 2.0 开始,@KafkaListener 注解新增了一个属性:errorHandler

您可以使用 errorHandler 提供 KafkaListenerErrorHandler 实现的 bean 名称。这个函数式接口有一个方法,如下所示:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}
java

您可以访问由消息转换器生成的 spring-messaging Message<?> 对象以及由监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。错误处理程序可以抛出原始异常或新的异常,这些异常会被抛给容器。错误处理程序返回的任何内容都会被忽略。

从版本 2.7 开始,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致原始的 ConsumerRecord 被添加到转换后的 Message<?>KafkaHeaders.RAW_DATA 头中。这在某些情况下非常有用,例如,如果您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer。它可能在请求/回复场景中使用,在该场景中,您希望在经过一定次数的重试后,将失败结果发送给发送者,同时将失败的记录捕获到死信主题中。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
java

它有一个子接口(ConsumerAwareListenerErrorHandler),可以通过以下方法访问消费者对象:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
java

另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckMode 时提供对 Acknowledgment 对象的访问。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
java

在任何情况下,您都不应该在消费者上执行任何查找,因为容器将对此毫不知情。

容器错误处理程序

从版本 2.8 开始,传统的 ErrorHandlerBatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理程序可以处理记录和批处理监听器的错误,允许单个监听器容器工厂为这两种类型的监听器创建容器。提供了 CommonErrorHandler 实现,以替代大多数传统框架的错误处理程序实现。

请参见 Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler 以获取将自定义错误处理程序迁移到 CommonErrorHandler 的信息。

当使用事务时,默认情况下没有配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果在使用事务时提供自定义错误处理程序,则必须抛出异常以使事务回滚。

该接口有一个默认方法 isAckAfterHandle(),由容器调用以确定在错误处理程序返回而不抛出异常的情况下,是否应该提交偏移量;默认返回 true。

通常,框架提供的错误处理程序在错误未被“处理”时(例如,在执行查找操作后)会抛出异常。默认情况下,这些异常会被容器以 ERROR 级别记录。所有框架的错误处理程序都扩展了 KafkaExceptionLogLevelAware,这使您能够控制这些异常被记录的级别。

/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
java

您可以指定一个全局错误处理程序,以便在容器工厂中的所有监听器中使用。以下示例展示了如何做到这一点:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
java

默认情况下,如果一个注解的监听器方法抛出异常,该异常会被抛给容器,并且消息会根据容器的配置进行处理。

容器在调用错误处理程序之前,会提交任何待处理的偏移量提交。

如果您使用 Spring Boot,只需将错误处理程序添加为 @Bean,Boot 将其添加到自动配置的工厂中。

Back Off Handlers

错误处理程序,例如 DefaultErrorHandler,使用 BackOff 来确定在重试传递之前等待多长时间。从版本 2.9 开始,您可以配置自定义的 BackOffHandler。默认处理程序简单地挂起线程,直到回退时间过去(或容器停止)。该框架还提供了 ContainerPausingBackOffHandler,它在回退时间过去之前暂停监听器容器,然后恢复容器。当延迟时间超过 max.poll.interval.ms 消费者属性时,这非常有用。请注意,实际回退时间的分辨率将受到 pollTimeout 容器属性的影响。

DefaultErrorHandler

这个新的错误处理程序替换了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,这两者在多个版本中一直是默认的错误处理程序。一个不同之处在于,对于批处理监听器的回退行为(当抛出除 BatchListenerFailedException 之外的异常时),相当于 Retrying Complete Batches

important

从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与下面讨论的获取未处理记录偏移量相同的语义,但实际上并不进行寻址。相反,记录由监听器容器保留,并在错误处理程序退出后(并在执行一次暂停的 poll() 以保持消费者存活之后)重新提交给监听器;如果使用了 Non-Blocking RetriesContainerPausingBackOffHandler,暂停可能会延续到多个轮询中。错误处理程序向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者如果它已被恢复,则不会再次发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false

错误处理程序可以恢复(跳过)一个不断失败的记录。默认情况下,在十次失败后,失败的记录会被记录(在 ERROR 级别)。您可以使用自定义恢复器(BiConsumer)和 BackOff 来配置处理程序,以控制交付尝试和每次之间的延迟。使用 FixedBackOffFixedBackOff.UNLIMITED_ATTEMPTS 会导致(实际上)无限重试。以下示例配置在三次尝试后进行恢复:

DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
java

要使用自定义实例配置监听器容器,请将其添加到容器工厂中。

例如,使用 @KafkaListener 容器工厂,您可以如下添加 DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
java

对于记录监听器,它将重试交付最多 2 次(共 3 次交付尝试),并且回退时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。在重试耗尽后,失败将被简单记录。

作为一个例子,如果 poll 返回六条记录(每个分区 0、1、2 各两条),而监听器在第四条记录上抛出异常,容器会通过提交它们的偏移量来确认前三条消息。DefaultErrorHandler 会将分区 1 的偏移量寻址到 1,将分区 2 的偏移量寻址到 0。下一个 poll() 返回三条未处理的记录。

如果 AckModeBATCH,容器会在调用错误处理程序之前提交前两个分区的偏移量。

对于批量监听器,监听器必须抛出一个 BatchListenerFailedException,以指示批次中哪些记录失败。

事件的顺序是:

  • 在索引之前提交记录的偏移量。

  • 如果重试次数未用尽,执行查找,以便所有剩余记录(包括失败的记录)将被重新传送。

  • 如果重试次数用尽,尝试恢复失败的记录(默认仅限日志),并执行查找,以便剩余记录(不包括失败的记录)将被重新传送。恢复的记录的偏移量被提交。

  • 如果重试次数用尽且恢复失败,则执行查找,仿佛重试次数未用尽。

important

从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与上述讨论的查找未处理记录偏移量相同的语义,但实际上并不进行查找。相反,错误处理程序会创建一个新的 ConsumerRecords<?, ?>,仅包含未处理的记录,然后将其提交给监听器(在执行一次暂停的 poll() 之后,以保持消费者处于活动状态)。要启用此模式,请将属性 seekAfterError 设置为 false

默认的恢复器在重试耗尽后记录失败的记录。您可以使用自定义恢复器,或者使用框架提供的恢复器,例如 DeadLetterPublishingRecoverer

当使用 POJO 批量监听器(例如 List<Thing>)时,如果没有完整的消费者记录可以添加到异常中,您可以仅添加失败记录的索引:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
java

当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以配置错误处理程序以提交已恢复记录的偏移量;将 commitRecovered 属性设置为 true

在使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参见 After-rollback Processor

DefaultErrorHandler 将某些异常视为致命异常,因此对于这些异常会跳过重试;在第一次失败时会调用恢复器。默认情况下,被视为致命的异常包括:

  • 反序列化异常

  • 消息转换异常

  • 转换异常

  • 方法参数解析异常

  • 没有这样的函数异常

  • 类转换异常

由于这些异常不太可能在重试交付时得到解决。

您可以将更多异常类型添加到不可重试类别中,或完全替换分类异常的映射。有关更多信息,请参阅 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadocs,以及 spring-retryBinaryExceptionClassifier 的相关文档。

这里是一个将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
java

错误处理程序可以配置一个或多个 RetryListener,以接收重试和恢复进度的通知。从版本 2.8.10 开始,添加了批处理监听器的方法。

@FunctionalInterface
public interface RetryListener {

void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}

default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}

default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}

default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}

default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}

}
java

请查看 JavaDocs 以获取更多信息。

important

如果恢复器失败(抛出异常),失败的记录将包含在查找中。如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。要在恢复失败后跳过重试,请将错误处理程序的 resetStateOnRecoveryFailure 设置为 false

您可以为错误处理程序提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常确定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });
java

如果函数返回 null,将使用处理程序的默认 BackOff

resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将被重新启动(包括选择一个新的 BackOff,如果已配置)。当设置为 false(在 2.9 版本之前的默认值)时,不会考虑异常类型。

从版本 2.9 开始,这默认值现在为 true

另请参见 Delivery Attempts Header

使用批处理错误处理程序的转换错误

从版本 2.8 开始,批处理监听器现在可以正确处理转换错误,当使用 MessageConverterByteArrayDeserializerBytesDeserializerStringDeserializer 以及 DefaultErrorHandler 时。当发生转换错误时,负载被设置为 null,并且一个反序列化异常被添加到记录头中,类似于 ErrorHandlingDeserializer。监听器中提供了一系列 ConversionException,以便监听器可以抛出 BatchListenerFailedException,指示发生转换异常的第一个索引。

示例:

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
java

重试完整批次

这是现在 DefaultErrorHandler 对于批处理监听器的回退行为,当监听器抛出除 BatchListenerFailedException 之外的异常时。

没有保证在重新交付一个批次时,该批次具有相同数量的记录和/或重新交付的记录顺序相同。因此,无法轻松维护一个批次的重试状态。FallbackBatchErrorHandler 采取以下方法。如果一个批处理监听器抛出一个不是 BatchListenerFailedException 的异常,则从内存中的记录批次执行重试。为了避免在延长的重试序列期间发生重新平衡,错误处理程序在每次重试时暂停消费者,在休眠之前进行轮询,并再次调用监听器。如果/当重试耗尽时,将为批次中的每条记录调用 ConsumerRecordRecoverer。如果恢复器抛出异常,或者线程在休眠期间被中断,则该批次的记录将在下次轮询时重新交付。在退出之前,无论结果如何,消费者都会恢复。

important

此机制不能与事务一起使用。

在等待 BackOff 间隔时,错误处理程序将循环短暂休眠,直到达到所需的延迟,同时检查容器是否已停止,从而允许在 stop() 之后尽快退出休眠,而不是造成延迟。

容器停止错误处理程序

CommonContainerStoppingErrorHandler 会在监听器抛出异常时停止容器。对于记录监听器,当 AckModeRECORD 时,已经处理的记录的偏移量会被提交。对于记录监听器,当 AckMode 为任何手动值时,已经确认的记录的偏移量会被提交。对于记录监听器,当 AckModeBATCH 时,或者对于批量监听器,当容器重新启动时,整个批次会被重放。

在容器停止后,会抛出一个包装了 ListenerExecutionFailedException 的异常。这是为了使事务回滚(如果启用了事务)。

委托错误处理器

CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理程序。例如,您可能希望对大多数异常调用 DefaultErrorHandler,或者对其他异常调用 CommonContainerStoppingErrorHandler

所有委托必须共享相同的兼容属性(ackAfterHandleseekAfterError …​)。

Logging Error Handler

CommonLoggingErrorHandler 只是记录异常;对于记录监听器,来自上一个轮询的剩余记录会传递给监听器。对于批处理监听器,批处理中的所有记录都会被记录。

使用不同的常见错误处理程序用于记录和批处理监听器

如果您希望为记录和批处理监听器使用不同的错误处理策略,可以使用 CommonMixedErrorHandler,该工具允许为每种监听器类型配置特定的错误处理程序。

常见错误处理程序摘要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

传统错误处理程序及其替代方案

旧版错误处理程序替换
LoggingErrorHandlerCommonLoggingErrorHandler
BatchLoggingErrorHandlerCommonLoggingErrorHandler
ConditionalDelegatingErrorHandlerDelegatingErrorHandler
ConditionalDelegatingBatchErrorHandlerDelegatingErrorHandler
ContainerStoppingErrorHandlerCommonContainerStoppingErrorHandler
ContainerStoppingBatchErrorHandlerCommonContainerStoppingErrorHandler
SeekToCurrentErrorHandlerDefaultErrorHandler
SeekToCurrentBatchErrorHandler无替换,使用 DefaultErrorHandler 和无限 BackOff
RecoveringBatchErrorHandlerDefaultErrorHandler
RetryingBatchErrorHandler无替换,使用 DefaultErrorHandler 并抛出除 BatchListenerFailedException 之外的异常。

将自定义遗留错误处理程序实现迁移到 CommonErrorHandler

请参阅 CommonErrorHandler 中的 JavaDocs。

要替换 ErrorHandlerConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并将 seeksAfterHandle() 保持为返回 false(默认值)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如消费者错误)。

要替换 RemainingRecordsErrorHandler 的实现,您应该实现 handleRemaining() 并重写 seeksAfterHandle() 以返回 true(错误处理程序必须执行必要的查找)。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。

要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。

回滚处理器

在使用事务时,如果监听器抛出异常(如果存在错误处理程序,且该处理程序也抛出异常),事务将被回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下一个轮询时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现的。对于批量监听器,整个记录批次将被重新处理(容器无法知道批次中哪个记录失败)。要修改此行为,可以使用自定义的 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,可能通过将其发布到死信主题。

从版本 2.2 开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)一个持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(在 ERROR 级别)。您可以使用自定义恢复器(BiConsumer)和最大失败次数来配置处理器。将 maxFailures 属性设置为负数会导致无限重试。以下示例在尝试三次后配置恢复:

AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
java

当您不使用事务时,可以通过配置 DefaultErrorHandler 来实现类似的功能。请参阅 Container Error Handlers

从版本 3.2 开始,Recovery 现在可以恢复(跳过)整个批次的记录,这些记录一直失败。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 以启用此功能。

important

默认行为是,使用批量监听器时无法进行恢复,因为框架无法知道批量中的哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。

另请参见 发布死信记录

从版本 2.2.5 开始,DefaultAfterRollbackProcessor 可以在一个新的事务中调用(在失败的事务回滚后启动)。然后,如果您使用 DeadLetterPublishingRecoverer 来发布失败的记录,处理器将把恢复的记录的偏移量发送到原始主题/分区的事务中。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。

important

如果恢复器失败(抛出异常),失败的记录将包含在查找中。从版本 2.5.5 开始,如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。在早期版本中,BackOff 不会重置,恢复将在下一个失败时重新尝试。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false

从版本 2.6 开始,您现在可以向处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常来确定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });
java

如果函数返回 null,将使用处理器的默认 BackOff

从版本 2.6.3 开始,将 resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将被重新启动(包括选择新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从版本 2.3.1 开始,类似于 DefaultErrorHandlerDefaultAfterRollbackProcessor 将某些异常视为致命异常,并且对于这些异常将跳过重试;在第一次失败时调用恢复器。默认情况下,被视为致命的异常包括:

  • 反序列化异常

  • 消息转换异常

  • 转换异常

  • 方法参数解析异常

  • 没有这样的函数异常

  • 类转换异常

因为这些异常不太可能在重试交付时得到解决。

您可以将更多异常类型添加到不可重试类别中,或完全替换分类异常的映射。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications() 的 Javadocs,以及 spring-retryBinaryExceptionClassifier 的 Javadocs。

这里是一个将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
java

另请参见 Delivery Attempts Header

important

使用当前的 kafka-clients,容器无法检测 ProducerFencedException 是由于重新平衡引起的,还是由于生产者的 transactional.id 因超时或过期而被撤销。因为在大多数情况下,它是由于重新平衡引起的,所以容器不会调用 AfterRollbackProcessor(因为寻求分区不合适,因为我们不再被分配这些分区)。如果您确保超时足够大以处理每个事务,并定期执行一次“空”事务(例如,通过 ListenerContainerIdleEvent),您可以避免因超时和过期而导致的围栏。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,这样容器将停止,从而避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性是否为 FENCED 以检测此条件。由于该事件还引用了容器,您可以使用此事件重新启动容器。

Request error occurred:

从版本 2.7 开始,处理器可以配置一个或多个 RetryListener,以接收重试和恢复进度的通知。

@FunctionalInterface
public interface RetryListener {

void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}

default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}

}
java

请参阅 JavaDocs 以获取更多信息。

Delivery Attempts Header

以下内容仅适用于记录监听器,不适用于批处理监听器。

从版本 2.5 开始,当使用实现了 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以启用将 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)添加到记录中。该头的值是一个从 1 开始递增的整数。当接收原始的 ConsumerRecord<?, ?> 时,该整数以 byte[4] 的形式存在。

int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
java

使用 @KafkaListenerDefaultKafkaHeaderMapperSimpleKafkaHeaderMapper 时,可以通过在监听方法的参数中添加 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 来获取。

要启用此头的填充,将容器属性 deliveryAttemptHeader 设置为 true。默认情况下禁用此功能,以避免为每条记录查找状态并添加头部所带来的(小)开销。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。

Delivery Attempts Header for batch listener

在使用 BatchListener 处理 ConsumerRecord 时,KafkaHeaders.DELIVERY_ATTEMPT 头部的表现方式可能与 SingleRecordListener 不同。

从版本 3.3 开始,如果您希望在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请在 ErrorHandler 中将 DeliveryAttemptAwareRetryListener 设置为 RetryListener

请参考下面的代码。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
java

然后,每当一个批次未能完成时,DeliveryAttemptAwareRetryListener 将会向 ConsumerRecord 注入一个 KafkaHeaders.DELIVERY_ATTMPT 头信息。

Listener Info Header

在某些情况下,能够知道监听器正在运行在哪个容器中是很有用的。

从版本 2.8.4 开始,您现在可以在监听器容器上设置 listenerInfo 属性,或者在 @KafkaListener 注解上设置 info 属性。然后,容器会将其添加到所有传入消息的 KafkaListener.LISTENER_INFO 头中;它可以在记录拦截器、过滤器等中使用,或者在监听器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
java

当在 RecordInterceptorRecordFilterStrategy 实现中使用时,头部作为字节数组存在于消费者记录中,通过 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。

头映射器在从消费者记录创建 MessageHeaders 时也会转换为 String,并且从不在出站记录上映射此头。

对于 POJO 批处理监听器,从版本 2.8.6 开始,头信息被复制到批处理的每个成员中,并且在转换后也可以作为一个单独的 String 参数使用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
java
备注

如果批处理监听器有一个过滤器,并且过滤器的结果是一个空批次,则需要在 @Header 参数中添加 required = false,因为对于空批次,信息是不可用的。

如果你收到 List<Message<Thing>>,信息在每个 Message<?>KafkaHeaders.LISTENER_INFO 头部中。

有关消费批次的更多信息,请参见 Batch Listeners

发布死信记录

您可以在达到记录的最大失败次数时,使用记录恢复器配置 DefaultErrorHandlerDefaultAfterRollbackProcessor。框架提供了 DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。恢复器需要一个 KafkaTemplate<Object, Object>,用于发送记录。您还可以选择性地将其配置为 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,该函数用于解析目标主题和分区。

important

默认情况下,死信记录会发送到一个名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt),并且发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题 必须至少具有与原始主题相同数量的分区。

如果返回的 TopicPartition 有一个负的分区,则在 ProducerRecord 中未设置分区,因此分区由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中检测到异常时抛出)都增强了 groupId 属性。这允许目标解析器使用此属性,除了 ConsumerRecord 中的信息外,以选择死信主题。

以下示例展示了如何连接一个自定义目标解析器:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
java

发送到死信主题的记录增强了以下头信息:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常是 ListenerExecutionFailedException,但也可以是其他类)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名(如果存在,版本 2.8 及以上)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅限键反序列化错误)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 未能处理记录的原始消费者组(版本 2.8 及以上)。

关键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有两种机制可以添加更多的头部。

  1. 子类化 recoverer 并重写 createProducerRecord() - 调用 super.createProducerRecord() 并添加更多的头部信息。

  2. 提供一个 BiFunction 来接收消费者记录和异常,返回一个 Headers 对象;从那里获取的头部信息将被复制到最终的生产者记录中;另请参见 Managing Dead Letter Record Headers。使用 setHeadersFunction() 来设置 BiFunction

第二种实现起来更简单,但第一种提供了更多的信息,包括已经组装好的标准头部。

从版本 2.3 开始,当与 ErrorHandlingDeserializer 一起使用时,发布者将在死信生产者记录中恢复 value() 的值为未能反序列化的原始值。之前,value() 为 null,用户代码必须从消息头中解码 DeserializationException。此外,您可以为发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationExceptionbyte[],以及使用与成功反序列化的记录不同的序列化器的值,这可能是必要的。以下是配置使用 Stringbyte[] 序列化器的 KafkaTemplate 的发布者的示例:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
java

发布者使用地图键来定位适合即将发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。

当发布 null 值时,如果有多个模板,恢复器将查找 Void 类的模板;如果没有找到,将使用 values().iterator() 中的第一个模板。

自 2.7 版本以来,您可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout 设置发送成功验证的超时时间。

important

如果恢复器失败(抛出异常),失败的记录将包含在查找中。从版本 2.5.5 开始,如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。在早期版本中,BackOff 不会重置,恢复将在下一个失败时重新尝试。要恢复到以前的行为,请将错误处理程序的 resetStateOnRecoveryFailure 属性设置为 false

从版本 2.6.3 开始,将 resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将会重新启动(包括选择一个新的 BackOff,如果进行了相应配置)。默认情况下,不会考虑异常类型。

从版本 2.3 开始,恢复器也可以与 Kafka Streams 一起使用 - 有关更多信息,请参见 Recovery from Deserialization Exceptions

ErrorHandlingDeserializer 将反序列化异常添加到头部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。默认情况下,这些头部不会保留在发布到死信主题的消息中。从版本 2.7 开始,如果键和值都反序列化失败,原始的键和值将被填充到发送到 DLT 的记录中。

如果传入的记录相互依赖,但可能会乱序到达,将失败的记录重新发布到原始主题的尾部(进行一定次数)可能会很有用,而不是直接将其发送到死信主题。有关示例,请参见 这个 Stack Overflow 问题

以下错误处理程序配置将完全实现这一点:

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
java

从版本 2.7 开始,恢复器检查目标解析器选择的分区是否实际存在。如果分区不存在,ProducerRecord 中的分区将被设置为 null,允许 KafkaProducer 选择分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。

从版本 3.1 开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。

管理死信记录头

参考上文的 Publishing Dead-letter RecordsDeadLetterPublishingRecoverer 有两个属性用于管理当这些头部已经存在时的情况(例如在重新处理一个失败的死信记录时,包括使用 Non-Blocking Retries 的情况)。

  • appendOriginalHeaders (默认 true

  • stripPreviousExceptionHeaders (默认 true 自版本 2.8 起)

Apache Kafka 支持多个相同名称的头部;要获取“最新”的值,可以使用 headers.lastHeader(headerName);要获取多个头部的迭代器,请使用 headers.headers(headerName).iterator()

当重复重新发布一个失败的记录时,这些头部可能会增长(并最终导致发布失败,出现 RecordTooLargeException);这对于异常头部尤其如此,特别是对于堆栈跟踪头部。

这两个属性的原因在于,虽然你可能只想保留最后的异常信息,但你可能还想保留记录在每次失败时经过的主题历史。

appendOriginalHeaders 应用于所有名为 **ORIGINAL** 的头,而 stripPreviousExceptionHeaders 应用于所有名为 **EXCEPTION** 的头。

从版本 2.8.4 开始,您现在可以控制将哪些标准头添加到输出记录中。请参见 enum HeadersToAdd 以获取(目前)默认添加的 10 个标准头的通用名称(这些不是实际的头名称,只是一种抽象;实际的头名称由 getHeaderNames() 方法设置,子类可以重写该方法)。

要排除头部,可以使用 excludeHeaders() 方法;例如,要抑制在头部中添加异常堆栈跟踪,可以使用:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
java

此外,您可以通过添加一个 ExceptionHeadersCreator 完全自定义异常头的添加;这也会禁用所有标准异常头。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
java

从版本 2.8.4 开始,您现在可以通过 addHeadersFunction 方法提供多个 headers 函数。这允许额外的函数应用,即使已经注册了另一个函数,例如,在使用 Non-Blocking Retries 时。

ExponentialBackOffWithMaxRetries 实现

Spring Framework 提供了多种 BackOff 实现。默认情况下,ExponentialBackOff 将无限期重试;要在某个重试次数后放弃,需要计算 maxElapsedTime。自版本 2.7.3 起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个子类,它接收 maxRetries 属性并自动计算 maxElapsedTime,这更加方便。

@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
java

这将在 1, 2, 4, 8, 10, 10 秒后重试,然后再调用恢复器。