处理异常
本节描述了在使用 Spring for Apache Kafka 时如何处理可能出现的各种异常。
Listener Error Handlers
从版本 2.0 开始,@KafkaListener 注解新增了一个属性:errorHandler。
您可以使用 errorHandler 提供 KafkaListenerErrorHandler 实现的 bean 名称。这个函数式接口有一个方法,如下所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问由消息转换器生成的 spring-messaging Message<?> 对象以及由监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。错误处理程序可以抛出原始异常或新的异常,这些异常会被抛给容器。错误处理程序返回的任何内容都会被忽略。
从版本 2.7 开始,您可以在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致原始的 ConsumerRecord 被添加到转换后的 Message<?> 的 KafkaHeaders.RAW_DATA 头中。这在某些情况下非常有用,例如,如果您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer。它可能在请求/回复场景中使用,在该场景中,您希望在经过一定次数的重试后,将失败结果发送给发送者,同时将失败的记录捕获到死信主题中。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}
它有一个子接口(ConsumerAwareListenerErrorHandler),可以通过以下方法访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckMode 时提供对 Acknowledgment 对象的访问。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何情况下,您都不应该在消费者上执行任何查找,因为容器将对此毫不知情。
容器错误处理程序
从版本 2.8 开始,传统的 ErrorHandler 和 BatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理程序可以处理记录和批处理监听器的错误,允许单个监听器容器工厂为这两种类型的监听器创建容器。提供了 CommonErrorHandler 实现,以替代大多数传统框架的错误处理程序实现。
请参见 Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler 以获取将自定义错误处理程序迁移到 CommonErrorHandler 的信息。
当使用事务时,默认情况下没有配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果在使用事务时提供自定义错误处理程序,则必须抛出异常以使事务回滚。
该接口有一个默认方法 isAckAfterHandle(),由容器调用以确定在错误处理程序返回而不抛出异常的情况下,是否应该提交偏移量;默认返回 true。
通常,框架提供的错误处理程序在错误未被“处理”时(例如,在执行查找操作后)会抛出异常。默认情况下,这些异常会被容器以 ERROR 级别记录。所有框架的错误处理程序都扩展了 KafkaExceptionLogLevelAware,这使您能够控制这些异常被记录的级别。
/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}
您可以指定一个全局错误处理程序,以便在容器工厂中的所有监听器中使用。以下示例展示了如何做到这一点:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}
默认情况下,如果一个注解的监听器方法抛出异常,该异常会被抛给容器,并且消息会根据容器的配置进行处理。
容器在调用错误处理程序之前,会提交任何待处理的偏移量提交。
如果您使用 Spring Boot,只需将错误处理程序添加为 @Bean,Boot 将其添加到自动配置的工厂中。
Back Off Handlers
错误处理程序,例如 DefaultErrorHandler,使用 BackOff 来确定在重试传递之前等待多长时间。从版本 2.9 开始,您可以配置自定义的 BackOffHandler。默认处理程序简单地挂起线程,直到回退时间过去(或容器停止)。该框架还提供了 ContainerPausingBackOffHandler,它在回退时间过去之前暂停监听器容器,然后恢复容器。当延迟时间超过 max.poll.interval.ms 消费者属性时,这非常有用。请注意,实际回退时间的分辨率将受到 pollTimeout 容器属性的影响。
DefaultErrorHandler
这个新的错误处理程序替换了 SeekToCurrentErrorHandler 和 RecoveringBatchErrorHandler,这两者在多个版本中一直是默认的错误处理程序。一个不同之处在于,对于批处理监听器的回退行为(当抛出除 BatchListenerFailedException 之外的异常时),相当于 Retrying Complete Batches。
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与下面讨论的获取未处理记录偏移量相同的语义,但实际上并不进行寻址。相反,记录由监听器容器保留,并在错误处理程序退出后(并在执行一次暂停的 poll() 以保持消费者存活之后)重新提交给监听器;如果使用了 Non-Blocking Retries 或 ContainerPausingBackOffHandler,暂停可能会延续到多个轮询中。错误处理程序向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者如果它已被恢复,则不会再次发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false。
错误处理程序可以恢复(跳过)一个不断失败的记录。默认情况下,在十次失败后,失败的记录会被记录(在 ERROR 级别)。您可以使用自定义恢复器(BiConsumer)和 BackOff 来配置处理程序,以控制交付尝试和每次之间的延迟。使用 FixedBackOff 和 FixedBackOff.UNLIMITED_ATTEMPTS 会导致(实际上)无限重试。以下示例配置在三次尝试后进行恢复:
DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));
要使用自定义实例配置监听器容器,请将其添加到容器工厂中。
例如,使用 @KafkaListener 容器工厂,您可以如下添加 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}
对于记录监听器,它将重试交付最多 2 次(共 3 次交付尝试),并且回退时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。在重试耗尽后,失败将被简单记录。
作为一个例子,如果 poll 返回六条记录(每个分区 0、1、2 各两条),而监听器在第四条记录上抛出异常,容器会通过提交它们的偏移量来确认前三条消息。DefaultErrorHandler 会将分区 1 的偏移量寻址到 1,将分区 2 的偏移量寻址到 0。下一个 poll() 返回三条未处理的记录。
如果 AckMode 为 BATCH,容器会在调用错误处理程序之前提交前两个分区的偏移量。
对于批量监听器,监听器必须抛出一个 BatchListenerFailedException,以指示批次中哪些记录失败。
事件的顺序是:
- 
在索引之前提交记录的偏移量。
 - 
如果重试次数未用尽,执行查找,以便所有剩余记录(包括失败的记录)将被重新传送。
 - 
如果重试次数用尽,尝试恢复失败的记录(默认仅限日志),并执行查找,以便剩余记录(不包括失败的记录)将被重新传送。恢复的记录的偏移量被提交。
 - 
如果重试次数用尽且恢复失败,则执行查找,仿佛重试次数未用尽。
 
从版本 2.9 开始,DefaultErrorHandler 可以配置为提供与上述讨论的查找未处理记录偏移量相同的语义,但实际上并不进行查找。相反,错误处理程序会创建一个新的 ConsumerRecords<?, ?>,仅包含未处理的记录,然后将其提交给监听器(在执行一次暂停的 poll() 之后,以保持消费者处于活动状态)。要启用此模式,请将属性 seekAfterError 设置为 false。
默认的恢复器在重试耗尽后记录失败的记录。您可以使用自定义恢复器,或者使用框架提供的恢复器,例如 DeadLetterPublishingRecoverer。
当使用 POJO 批量监听器(例如 List<Thing>)时,如果没有完整的消费者记录可以添加到异常中,您可以仅添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}
当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以配置错误处理程序以提交已恢复记录的偏移量;将 commitRecovered 属性设置为 true。
另见 发布死信记录。
在使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参见 After-rollback Processor。
DefaultErrorHandler 将某些异常视为致命异常,因此对于这些异常会跳过重试;在第一次失败时会调用恢复器。默认情况下,被视为致命的异常包括:
- 
反序列化异常 - 
消息转换异常 - 
转换异常 - 
方法参数解析异常 - 
没有这样的函数异常 - 
类转换异常 
由于这些异常不太可能在重试交付时得到解决。
您可以将更多异常类型添加到不可重试类别中,或完全替换分类异常的映射。有关更多信息,请参阅 DefaultErrorHandler.addNotRetryableException() 和 DefaultErrorHandler.setClassifications() 的 Javadocs,以及 spring-retry 的 BinaryExceptionClassifier 的相关文档。
这里是一个将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}
错误处理程序可以配置一个或多个 RetryListener,以接收重试和恢复进度的通知。从版本 2.8.10 开始,添加了批处理监听器的方法。
@FunctionalInterface
public interface RetryListener {
    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }
    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }
    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }
    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }
	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}
}
请查看 JavaDocs 以获取更多信息。
如果恢复器失败(抛出异常),失败的记录将包含在查找中。如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。要在恢复失败后跳过重试,请将错误处理程序的 resetStateOnRecoveryFailure 设置为 false。
您可以为错误处理程序提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,将使用处理程序的默认 BackOff。
将 resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将被重新启动(包括选择一个新的 BackOff,如果已配置)。当设置为 false(在 2.9 版本之前的默认值)时,不会考虑异常类型。
从版本 2.9 开始,这默认值现在为 true。
另请参见 Delivery Attempts Header。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,批处理监听器现在可以正确处理转换错误,当使用 MessageConverter 与 ByteArrayDeserializer、BytesDeserializer 或 StringDeserializer 以及 DefaultErrorHandler 时。当发生转换错误时,负载被设置为 null,并且一个反序列化异常被添加到记录头中,类似于 ErrorHandlingDeserializer。监听器中提供了一系列 ConversionException,以便监听器可以抛出 BatchListenerFailedException,指示发生转换异常的第一个索引。
示例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}
重试完整批次
这是现在 DefaultErrorHandler 对于批处理监听器的回退行为,当监听器抛出除 BatchListenerFailedException 之外的异常时。
没有保证在重新交付一个批次时,该批次具有相同数量的记录和/或重新交付的记录顺序相同。因此,无法轻松维护一个批次的重试状态。FallbackBatchErrorHandler 采取以下方法。如果一个批处理监听器抛出一个不是 BatchListenerFailedException 的异常,则从内存中的记录批次执行重试。为了避免在延长的重试序列期间发生重新平衡,错误处理程序在每次重试时暂停消费者,在休眠之前进行轮询,并再次调用监听器。如果/当重试耗尽时,将为批次中的每条记录调用 ConsumerRecordRecoverer。如果恢复器抛出异常,或者线程在休眠期间被中断,则该批次的记录将在下次轮询时重新交付。在退出之前,无论结果如何,消费者都会恢复。
此机制不能与事务一起使用。
在等待 BackOff 间隔时,错误处理程序将循环短暂休眠,直到达到所需的延迟,同时检查容器是否已停止,从而允许在 stop() 之后尽快退出休眠,而不是造成延迟。
容器停止错误处理程序
CommonContainerStoppingErrorHandler 会在监听器抛出异常时停止容器。对于记录监听器,当 AckMode 为 RECORD 时,已经处理的记录的偏移量会被提交。对于记录监听器,当 AckMode 为任何手动值时,已经确认的记录的偏移量会被提交。对于记录监听器,当 AckMode 为 BATCH 时,或者对于批量监听器,当容器重新启动时,整个批次会被重放。
在容器停止后,会抛出一个包装了 ListenerExecutionFailedException 的异常。这是为了使事务回滚(如果启用了事务)。
委托错误处理器
CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理程序。例如,您可能希望对大多数异常调用 DefaultErrorHandler,或者对其他异常调用 CommonContainerStoppingErrorHandler。
所有委托必须共享相同的兼容属性(ackAfterHandle、seekAfterError …)。
Logging Error Handler
CommonLoggingErrorHandler 只是记录异常;对于记录监听器,来自上一个轮询的剩余记录会传递给监听器。对于批处理监听器,批处理中的所有记录都会被记录。
使用不同的常见错误处理程序用于记录和批处理监听器
如果您希望为记录和批处理监听器使用不同的错误处理策略,可以使用 CommonMixedErrorHandler,该工具允许为每种监听器类型配置特定的错误处理程序。
常见错误处理程序摘要
- 
DefaultErrorHandler - 
CommonContainerStoppingErrorHandler - 
CommonDelegatingErrorHandler - 
CommonLoggingErrorHandler - 
CommonMixedErrorHandler 
传统错误处理程序及其替代方案
| 旧版错误处理程序 | 替换 | 
|---|---|
LoggingErrorHandler | CommonLoggingErrorHandler | 
BatchLoggingErrorHandler | CommonLoggingErrorHandler | 
ConditionalDelegatingErrorHandler | DelegatingErrorHandler | 
ConditionalDelegatingBatchErrorHandler | DelegatingErrorHandler | 
ContainerStoppingErrorHandler | CommonContainerStoppingErrorHandler | 
ContainerStoppingBatchErrorHandler | CommonContainerStoppingErrorHandler | 
SeekToCurrentErrorHandler | DefaultErrorHandler | 
SeekToCurrentBatchErrorHandler | 无替换,使用 DefaultErrorHandler 和无限 BackOff。 | 
RecoveringBatchErrorHandler | DefaultErrorHandler | 
RetryingBatchErrorHandler | 无替换,使用 DefaultErrorHandler 并抛出除 BatchListenerFailedException 之外的异常。 | 
将自定义遗留错误处理程序实现迁移到 CommonErrorHandler
请参阅 CommonErrorHandler 中的 JavaDocs。
要替换 ErrorHandler 或 ConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并将 seeksAfterHandle() 保持为返回 false(默认值)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如消费者错误)。
要替换 RemainingRecordsErrorHandler 的实现,您应该实现 handleRemaining() 并重写 seeksAfterHandle() 以返回 true(错误处理程序必须执行必要的查找)。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。
要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。
回滚处理器
在使用事务时,如果监听器抛出异常(如果存在错误处理程序,且该处理程序也抛出异常),事务将被回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下一个轮询时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现的。对于批量监听器,整个记录批次将被重新处理(容器无法知道批次中哪个记录失败)。要修改此行为,可以使用自定义的 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,可能通过将其发布到死信主题。
从版本 2.2 开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)一个持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(在 ERROR 级别)。您可以使用自定义恢复器(BiConsumer)和最大失败次数来配置处理器。将 maxFailures 属性设置为负数会导致无限重试。以下示例在尝试三次后配置恢复:
AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));
当您不使用事务时,可以通过配置 DefaultErrorHandler 来实现类似的功能。请参阅 Container Error Handlers。
从版本 3.2 开始,Recovery 现在可以恢复(跳过)整个批次的记录,这些记录一直失败。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 以启用此功能。
默认行为是,使用批量监听器时无法进行恢复,因为框架无法知道批量中的哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。
另请参见 发布死信记录。
从版本 2.2.5 开始,DefaultAfterRollbackProcessor 可以在一个新的事务中调用(在失败的事务回滚后启动)。然后,如果您使用 DeadLetterPublishingRecoverer 来发布失败的记录,处理器将把恢复的记录的偏移量发送到原始主题/分区的事务中。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecovered 和 kafkaTemplate 属性。
如果恢复器失败(抛出异常),失败的记录将包含在查找中。从版本 2.5.5 开始,如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。在早期版本中,BackOff 不会重置,恢复将在下一个失败时重新尝试。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false。
从版本 2.6 开始,您现在可以向处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常来确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,将使用处理器的默认 BackOff。
从版本 2.6.3 开始,将 resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将被重新启动(包括选择新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。
从版本 2.3.1 开始,类似于 DefaultErrorHandler,DefaultAfterRollbackProcessor 将某些异常视为致命异常,并且对于这些异常将跳过重试;在第一次失败时调用恢复器。默认情况下,被视为致命的异常包括:
- 
反序列化异常 - 
消息转换异常 - 
转换异常 - 
方法参数解析异常 - 
没有这样的函数异常 - 
类转换异常 
因为这些异常不太可能在重试交付时得到解决。
您可以将更多异常类型添加到不可重试类别中,或完全替换分类异常的映射。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications() 的 Javadocs,以及 spring-retry 的 BinaryExceptionClassifier 的 Javadocs。
这里是一个将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
另请参见 Delivery Attempts Header。
使用当前的 kafka-clients,容器无法检测 ProducerFencedException 是由于重新平衡引起的,还是由于生产者的 transactional.id 因超时或过期而被撤销。因为在大多数情况下,它是由于重新平衡引起的,所以容器不会调用 AfterRollbackProcessor(因为寻求分区不合适,因为我们不再被分配这些分区)。如果您确保超时足够大以处理每个事务,并定期执行一次“空”事务(例如,通过 ListenerContainerIdleEvent),您可以避免因超时和过期而导致的围栏。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,这样容器将停止,从而避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性是否为 FENCED 以检测此条件。由于该事件还引用了容器,您可以使用此事件重新启动容器。
Request error occurred:
从版本 2.7 开始,处理器可以配置一个或多个 RetryListener,以接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }
    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }
}
请参阅 JavaDocs 以获取更多信息。
Delivery Attempts Header
以下内容仅适用于记录监听器,不适用于批处理监听器。
从版本 2.5 开始,当使用实现了 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 时,可以启用将 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)添加到记录中。该头的值是一个从 1 开始递增的整数。当接收原始的 ConsumerRecord<?, ?> 时,该整数以 byte[4] 的形式存在。
int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();
使用 @KafkaListener 与 DefaultKafkaHeaderMapper 或 SimpleKafkaHeaderMapper 时,可以通过在监听方法的参数中添加 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 来获取。
要启用此头的填充,将容器属性 deliveryAttemptHeader 设置为 true。默认情况下禁用此功能,以避免为每条记录查找状态并添加头部所带来的(小)开销。
DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支持此功能。
Delivery Attempts Header for batch listener
在使用 BatchListener 处理 ConsumerRecord 时,KafkaHeaders.DELIVERY_ATTEMPT 头部的表现方式可能与 SingleRecordListener 不同。
从版本 3.3 开始,如果您希望在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请在 ErrorHandler 中将 DeliveryAttemptAwareRetryListener 设置为 RetryListener。
请参考下面的代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当一个批次未能完成时,DeliveryAttemptAwareRetryListener 将会向 ConsumerRecord 注入一个 KafkaHeaders.DELIVERY_ATTMPT 头信息。
Listener Info Header
在某些情况下,能够知道监听器正在运行在哪个容器中是很有用的。
从版本 2.8.4 开始,您现在可以在监听器容器上设置 listenerInfo 属性,或者在 @KafkaListener 注解上设置 info 属性。然后,容器会将其添加到所有传入消息的 KafkaListener.LISTENER_INFO 头中;它可以在记录拦截器、过滤器等中使用,或者在监听器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}
当在 RecordInterceptor 或 RecordFilterStrategy 实现中使用时,头部作为字节数组存在于消费者记录中,通过 KafkaListenerAnnotationBeanPostProcessor 的 charSet 属性进行转换。
头映射器在从消费者记录创建 MessageHeaders 时也会转换为 String,并且从不在出站记录上映射此头。
对于 POJO 批处理监听器,从版本 2.8.6 开始,头信息被复制到批处理的每个成员中,并且在转换后也可以作为一个单独的 String 参数使用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理监听器有一个过滤器,并且过滤器的结果是一个空批次,则需要在 @Header 参数中添加 required = false,因为对于空批次,信息是不可用的。
如果你收到 List<Message<Thing>>,信息在每个 Message<?> 的 KafkaHeaders.LISTENER_INFO 头部中。
有关消费批次的更多信息,请参见 Batch Listeners。
发布死信记录
您可以在达到记录的最大失败次数时,使用记录恢复器配置 DefaultErrorHandler 和 DefaultAfterRollbackProcessor。框架提供了 DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。恢复器需要一个 KafkaTemplate<Object, Object>,用于发送记录。您还可以选择性地将其配置为 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,该函数用于解析目标主题和分区。
默认情况下,死信记录会发送到一个名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt),并且发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题 必须至少具有与原始主题相同数量的分区。
如果返回的 TopicPartition 有一个负的分区,则在 ProducerRecord 中未设置分区,因此分区由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中检测到异常时抛出)都增强了 groupId 属性。这允许目标解析器使用此属性,除了 ConsumerRecord 中的信息外,以选择死信主题。
以下示例展示了如何连接一个自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录增强了以下头信息:
- 
KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常是ListenerExecutionFailedException,但也可以是其他类)。 - 
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名(如果存在,版本 2.8 及以上)。 - 
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。 - 
KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。 - 
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名(仅限键反序列化错误)。 - 
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅限键反序列化错误)。 - 
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅限键反序列化错误)。 - 
KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。 - 
KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。 - 
KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。 - 
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。 - 
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。 - 
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 未能处理记录的原始消费者组(版本 2.8 及以上)。 
关键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有两种机制可以添加更多的头部。
- 
子类化 recoverer 并重写
createProducerRecord()- 调用super.createProducerRecord()并添加更多的头部信息。 - 
提供一个
BiFunction来接收消费者记录和异常,返回一个Headers对象;从那里获取的头部信息将被复制到最终的生产者记录中;另请参见 Managing Dead Letter Record Headers。使用setHeadersFunction()来设置BiFunction。 
第二种实现起来更简单,但第一种提供了更多的信息,包括已经组装好的标准头部。
从版本 2.3 开始,当与 ErrorHandlingDeserializer 一起使用时,发布者将在死信生产者记录中恢复 value() 的值为未能反序列化的原始值。之前,value() 为 null,用户代码必须从消息头中解码 DeserializationException。此外,您可以为发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationException 的 byte[],以及使用与成功反序列化的记录不同的序列化器的值,这可能是必要的。以下是配置使用 String 和 byte[] 序列化器的 KafkaTemplate 的发布者的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}
发布者使用地图键来定位适合即将发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。
当发布 null 值时,如果有多个模板,恢复器将查找 Void 类的模板;如果没有找到,将使用 values().iterator() 中的第一个模板。
自 2.7 版本以来,您可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout 设置发送成功验证的超时时间。
如果恢复器失败(抛出异常),失败的记录将包含在查找中。从版本 2.5.5 开始,如果恢复器失败,BackOff 将默认重置,重新投递将再次经过退避,然后再尝试恢复。在早期版本中,BackOff 不会重置,恢复将在下一个失败时重新尝试。要恢复到以前的行为,请将错误处理程序的 resetStateOnRecoveryFailure 属性设置为 false。
从版本 2.6.3 开始,将 resetStateOnExceptionChange 设置为 true,如果在失败之间异常类型发生变化,重试序列将会重新启动(包括选择一个新的 BackOff,如果进行了相应配置)。默认情况下,不会考虑异常类型。
从版本 2.3 开始,恢复器也可以与 Kafka Streams 一起使用 - 有关更多信息,请参见 Recovery from Deserialization Exceptions。
ErrorHandlingDeserializer 将反序列化异常添加到头部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER 和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。默认情况下,这些头部不会保留在发布到死信主题的消息中。从版本 2.7 开始,如果键和值都反序列化失败,原始的键和值将被填充到发送到 DLT 的记录中。
如果传入的记录相互依赖,但可能会乱序到达,将失败的记录重新发布到原始主题的尾部(进行一定次数)可能会很有用,而不是直接将其发送到死信主题。有关示例,请参见 这个 Stack Overflow 问题。
以下错误处理程序配置将完全实现这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复器检查目标解析器选择的分区是否实际存在。如果分区不存在,ProducerRecord 中的分区将被设置为 null,允许 KafkaProducer 选择分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。
从版本 3.1 开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。
管理死信记录头
参考上文的 Publishing Dead-letter Records,DeadLetterPublishingRecoverer 有两个属性用于管理当这些头部已经存在时的情况(例如在重新处理一个失败的死信记录时,包括使用 Non-Blocking Retries 的情况)。
- 
appendOriginalHeaders(默认true) - 
stripPreviousExceptionHeaders(默认true自版本 2.8 起) 
Apache Kafka 支持多个相同名称的头部;要获取“最新”的值,可以使用 headers.lastHeader(headerName);要获取多个头部的迭代器,请使用 headers.headers(headerName).iterator()。
当重复重新发布一个失败的记录时,这些头部可能会增长(并最终导致发布失败,出现 RecordTooLargeException);这对于异常头部尤其如此,特别是对于堆栈跟踪头部。
这两个属性的原因在于,虽然你可能只想保留最后的异常信息,但你可能还想保留记录在每次失败时经过的主题历史。
appendOriginalHeaders 应用于所有名为 **ORIGINAL** 的头,而 stripPreviousExceptionHeaders 应用于所有名为 **EXCEPTION** 的头。
从版本 2.8.4 开始,您现在可以控制将哪些标准头添加到输出记录中。请参见 enum HeadersToAdd 以获取(目前)默认添加的 10 个标准头的通用名称(这些不是实际的头名称,只是一种抽象;实际的头名称由 getHeaderNames() 方法设置,子类可以重写该方法)。
要排除头部,可以使用 excludeHeaders() 方法;例如,要抑制在头部中添加异常堆栈跟踪,可以使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加一个 ExceptionHeadersCreator 完全自定义异常头的添加;这也会禁用所有标准异常头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});
从版本 2.8.4 开始,您现在可以通过 addHeadersFunction 方法提供多个 headers 函数。这允许额外的函数应用,即使已经注册了另一个函数,例如,在使用 Non-Blocking Retries 时。
ExponentialBackOffWithMaxRetries 实现
Spring Framework 提供了多种 BackOff 实现。默认情况下,ExponentialBackOff 将无限期重试;要在某个重试次数后放弃,需要计算 maxElapsedTime。自版本 2.7.3 起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个子类,它接收 maxRetries 属性并自动计算 maxElapsedTime,这更加方便。
@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}
这将在 1, 2, 4, 8, 10, 10 秒后重试,然后再调用恢复器。