跳到主要内容

事务

ChatGPT-4o-mini 中英对照 Transactions

本节描述了 Spring for Apache Kafka 如何支持事务。

概述

0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式增加了支持:

  • KafkaTransactionManager: 用于普通的 Spring 事务支持(@TransactionalTransactionTemplate 等)

  • 事务性的 KafkaMessageListenerContainer

  • 使用 KafkaTemplate 的本地事务

  • 与其他事务管理器的事务同步

通过为 DefaultKafkaProducerFactory 提供 transactionIdPrefix 来启用事务。在这种情况下,工厂维护一个事务性生产者的缓存,而不是管理一个共享的 Producer。当用户在生产者上调用 close() 时,它会被返回到缓存中以供重用,而不是实际关闭。每个生产者的 transactional.id 属性为 transactionIdPrefix + n,其中 n0 开始,并为每个新生产者递增。在之前版本的 Spring for Apache Kafka 中,transactional.id 是为通过记录基础的监听器启动的监听器容器生成的,以支持围栏僵尸,但在 3.0 版本开始,EOSMode.V2 是唯一的选项,因此不再需要这种支持。对于运行多个实例的应用程序,transactionIdPrefix 必须在每个实例中是唯一的。

另请参见 Exactly Once Semantics

另请参见 transactionIdPrefix

使用 Spring Boot,只需设置 spring.kafka.producer.transaction-id-prefix 属性 - Spring Boot 将自动配置一个 KafkaTransactionManager bean 并将其注入到监听器容器中。

important

从版本 2.5.8 开始,您现在可以在生产者工厂上配置 maxAge 属性。这在使用可能因代理的 transactional.id.expiration.ms 而闲置的事务性生产者时非常有用。使用当前的 kafka-clients,这可能会导致 ProducerFencedException,而无需重新平衡。通过将 maxAge 设置为小于 transactional.id.expiration.ms,工厂将在生产者超过其最大年龄时刷新生产者。

使用 KafkaTransactionManager

KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的一个实现。在其构造函数中提供了对生产者工厂的引用。如果您提供一个自定义的生产者工厂,它必须支持事务。请参见 ProducerFactory.transactionCapable()

您可以将 KafkaTransactionManager 与普通的 Spring 事务支持(@TransactionalTransactionTemplate 等)一起使用。如果事务处于活动状态,在事务范围内执行的任何 KafkaTemplate 操作都将使用该事务的 Producer。管理器根据成功或失败提交或回滚事务。您必须配置 KafkaTemplate 以使用与事务管理器相同的 ProducerFactory

事务同步

本节涉及仅由生产者发起的事务(未由监听器容器启动的事务);有关容器启动事务时如何链接事务的信息,请参见 Using Consumer-Initiated Transactions

如果你想将记录发送到 Kafka 并执行一些数据库更新,可以使用普通的 Spring 事务管理,比如 DataSourceTransactionManager

@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
java

@Transactional 注解的拦截器启动事务,KafkaTemplate 将与该事务管理器同步一个事务;每次发送都会参与该事务。当方法退出时,数据库事务将提交,随后是 Kafka 事务。如果您希望提交按相反顺序执行(Kafka 优先),请使用嵌套的 @Transactional 方法,外部方法配置为使用 DataSourceTransactionManager,内部方法配置为使用 KafkaTransactionManager

请参见 Examples of Kafka Transactions with Other Transaction Managers,了解在 Kafka 优先或数据库优先配置中同步 JDBC 和 Kafka 事务的应用示例。

备注

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务上提交失败(在主事务已提交之后),将会抛出异常给调用者。之前,这种情况会被静默忽略(以调试级别记录)。如果有必要,应用程序应该采取补救措施,以补偿已提交的主事务。

使用消费者发起的交易

ChainedKafkaTransactionManager 从 2.7 版本开始已被弃用;有关其超类 ChainedTransactionManager 的更多信息,请参阅 JavaDocs。相反,请在容器中使用 KafkaTransactionManager 来启动 Kafka 事务,并使用 @Transactional 注解监听器方法以启动其他事务。

请参见 Examples of Kafka Transactions with Other Transaction Managers,了解一个将 JDBC 和 Kafka 事务链在一起的示例应用程序。

important

非阻塞重试 不能与 容器事务 结合使用。当监听器代码抛出异常时,容器事务提交成功,并且记录被发送到可重试主题。

KafkaTemplate 本地事务

您可以使用 KafkaTemplate 在本地事务中执行一系列操作。以下示例演示了如何做到这一点:

boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
java

回调中的参数是模板本身(this)。如果回调正常退出,事务将被提交。如果抛出异常,事务将被回滚。

备注

如果正在进行 KafkaTransactionManager(或同步)事务,则不会使用它。相反,将使用一个新的“嵌套”事务。

TransactionIdPrefix

EOSMode.V2(即 BETA)下,唯一支持的模式,不再需要使用相同的 transactional.id,即使是消费者发起的事务;实际上,它在每个实例上必须是唯一的,和生产者发起的事务一样。该属性在每个应用实例上必须具有不同的值。

TransactionIdSuffix Fixed

自 3.2 版本以来,引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。当设置 maxCache 大于零时,默认实现是 DefaultTransactionIdSuffixStrategy,可以在特定范围内重用 transactional.id,否则后缀将通过递增计数器动态生成。当请求一个事务生产者并且 transactional.id 全部被使用时,将抛出 NoProducerAvailableException。用户可以使用配置为重试该异常的 RetryTemplate,并设置合适的退避策略。

public static class Config {

@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}

}
java

当将 maxCache 设置为 5 时,transactional.idmy.txid.+`{0-4}`。

important

在使用 KafkaTransactionManagerConcurrentMessageListenerContainer 并启用 maxCache 时,必须将 maxCache 设置为大于或等于 concurrency 的值。如果 MessageListenerContainer 无法获取 transactional.id 后缀,它将抛出 NoProducerAvailableException。在 ConcurrentMessageListenerContainer 中使用嵌套事务时,需要调整 maxCache 设置以处理增加的嵌套事务数量。

KafkaTemplate 事务性和非事务性发布

通常,当 KafkaTemplate 是事务性的(配置了支持事务的生产者工厂)时,事务是必需的。事务可以通过 TransactionTemplate@Transactional 方法、调用 executeInTransaction 或通过配置了 KafkaTransactionManager 的监听容器来启动。任何在事务范围之外使用模板的尝试都会导致模板抛出 IllegalStateException。从版本 2.4.3 开始,您可以将模板的 allowNonTransactional 属性设置为 true。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用 ProducerFactorycreateNonTransactionalProducer() 方法;生产者将像往常一样被缓存或线程绑定以供重用。请参见 Using DefaultKafkaProducerFactory

使用批处理监听器的事务

当监听器在使用事务时失败时,AfterRollbackProcessor 会被调用以在回滚发生后采取一些行动。当使用默认的 AfterRollbackProcessor 和记录监听器时,会执行查找操作,以便失败的记录将被重新投递。然而,对于批处理监听器,由于框架不知道批处理中的哪个记录失败,因此整个批次将被重新投递。有关更多信息,请参见 After-rollback Processor

当使用批处理监听器时,版本 2.4.2 引入了一种替代机制来处理批处理中的失败:BatchToRecordAdapter。当配置了 batchListener 设置为 true 的容器工厂与 BatchToRecordAdapter 时,监听器将一次处理一条记录。这使得在批处理中能够进行错误处理,同时仍然可以根据异常类型停止处理整个批次。提供了一个默认的 BatchToRecordAdapter,可以与标准的 ConsumerRecordRecoverer(例如 DeadLetterPublishingRecoverer)进行配置。以下测试用例配置片段说明了如何使用此功能:

public static class TestListener {

final List<String> values = new ArrayList<>();

@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}

}

@Configuration
@EnableKafka
public static class Config {

ConsumerRecord<?, ?> failed;

@Bean
public TestListener test() {
return new TestListener();
}

@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}

}
java