变更历史 :: Spring Kafka
Request error occurred:
3.2 中的新特性(自 3.1 以来)
本节涵盖了从版本 3.1 到版本 3.2 的更改。有关早期版本的更改,请参见 Change History。
Kafka Client Version
此版本需要 3.7.0 kafka-clients
。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参见 KIP-848。新的消费者组协议是一个早期访问版本,不适合在生产环境中使用。仅建议在此版本中用于测试目的。因此,Spring for Apache Kafka 仅在 kafka-client
本身提供的测试级别支持的范围内支持此新的消费者组协议。默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,而在测试新的消费者组协议时,需要通过消费者上的 group.protocol
属性进行选择。
测试支持变更
kraft
模式在 EmbeddedKafka
中默认是禁用的,想要使用 kraft
模式的用户必须手动启用。这是因为在使用 EmbeddedKafka
的 kraft
模式时观察到了一些不稳定性,特别是在测试新的消费者组协议时。新的消费者组协议仅在 kraft
模式下受支持,因此在测试新协议时,需要在真实的 Kafka 集群上进行,而不是基于 KafkaClusterTestKit
的集群,EmbeddedKafka
就是基于此。除此之外,在使用 EmbeddedKafka
的 kraft
模式运行多个 KafkaListener
方法时,还观察到了一些其他的竞争条件。在这些问题解决之前,EmbeddedKafka
中的 kraft
默认值将保持为 false
。
Kafka Streams 交互查询支持
一个新的 API KafkaStreamsInteractiveQuerySupport
用于访问在 Kafka Streams 交互查询中使用的可查询存储。有关更多详细信息,请参见 Kafka Streams Interactive Support。
TransactionIdSuffixStrategy
引入了一个新的 TransactionIdSuffixStrategy
接口来管理 transactional.id
后缀。当设置 maxCache
大于零时,默认实现为 DefaultTransactionIdSuffixStrategy
,可以在特定范围内重用 transactional.id
,否则后缀将通过递增计数器动态生成。有关更多信息,请参见 Fixed TransactionIdSuffix。
Async @KafkaListener 返回
@KafkaListener
(和 @KafkaHandler
)方法现在可以返回异步返回类型,包括 CompletableFuture<?>
、Mono<?>
和 Kotlin 的 suspend
函数。有关更多信息,请参见 Async Returns。
基于抛出异常的消息路由到自定义 DLTs
现在可以根据在消息处理过程中抛出的异常类型,将消息重定向到自定义 DLT。重定向的规则可以通过 RetryableTopic.exceptionBasedDltRouting
或 RetryTopicConfigurationBuilder.dltRoutingRules
设置。自定义 DLT 以及其他重试和死信主题会自动创建。有关更多信息,请参见 Routing of messages to custom DLTs based on thrown exceptions。
弃用 ContainerProperties transactionManager 属性
在 ContainerProperties
中弃用 transactionManager
属性,改为使用 KafkaAwareTransactionManager
,这是一个比通用的 PlatformTransactionManager
更狭义的类型。请参见 ContainerProperties 和 Transaction Synchronization。
回滚处理后
提供了一个新的 AfterRollbackProcessor
API processBatch
。有关更多信息,请参见 After-rollback Processor。
更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值
将 @RetryableTopic
属性 SameIntervalTopicReuseStrategy
的默认值更改为 SINGLE_TOPIC
。请参见 Single Topic for maxInterval Exponential Delay。
非阻塞重试支持类级别 @KafkaListener
非阻塞重试支持 @KafkaListener on a Class。请参见 Non-Blocking Retries。
Support process @RetryableTopic 在 RetryTopicConfigurationProvider 中的一个类。
提供了一个新的公共 API 来查找 RetryTopicConfiguration
。请参见 Find RetryTopicConfiguration
RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint.
RetryTopicConfigurer
支持处理和注册 MultiMethodKafkaListenerEndpoint
。MultiMethodKafkaListenerEndpoint
为属性 defaultMethod
和 methods
提供 getter/setter
。修改 EndpointCustomizer
以严格适用于 MethodKafkaListenerEndpoint
类型。EndpointHandlerMethod
添加新的构造函数,以为提供的 bean 构造实例。提供新的类 EndpointHandlerMultiMethod
来处理重试端点的多方法。
新的 API 方法,根据用户提供的函数寻址到一个偏移量
ConsumerCallback
提供了一个新的 API,可以根据用户定义的函数寻址到一个偏移量,该函数将当前消费者的偏移量作为参数。有关更多详细信息,请参见 Seek API Docs。
@PartitionOffset 支持 SeekPosition
将 seekPosition
属性添加到 @PartitionOffset
以支持 TopicPartitionOffset.SeekPosition
。有关更多详细信息,请参见 manual-assignment。
TopicPartitionOffset 中的新构造函数,接受一个函数来计算要寻址的偏移量
TopicPartitionOffset
有一个新的构造函数,该构造函数接受一个用户提供的函数来计算要寻址的偏移量。当使用此构造函数时,框架会使用当前消费者偏移位置作为输入参数调用该函数。有关更多详细信息,请参见 Seek API Docs。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义了应用名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型的自动生成客户端 ID 的默认前缀。有关更多详细信息,请参见 Default client ID prefixes。
增强的消息监听容器检索
ListenerContainerRegistry
提供了两个新的 API 动态查找和过滤 MessageListenerContainer
实例。getListenerContainersMatching(Predicate<String> idMatcher)
用于通过 ID 进行过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
用于通过 ID 和容器属性进行过滤。
请参阅 @KafkaListener 生命周期管理的 API 文档 以获取更多信息。
通过提供更多追踪标签增强观察
KafkaTemplateObservation
提供了更多的追踪标签(低基数)。 KafkaListenerObservation
提供了一个新的 API 来查找高基数的键名和更多的追踪标签(高或低基数)。请参见 Micrometer Observation
3.0 以来 3.1 的新特性
本节涵盖了从版本 3.0 到版本 3.1 的更改。有关早期版本的更改,请参见 Change History。
Kafka Client Version
此版本需要 3.6.0 kafka-clients
。
EmbeddedKafkaBroker
现在提供了一个额外的实现,可以使用 Kraft
代替 Zookeeper。有关更多信息,请参见 Embedded Kafka Broker。
JsonDeserializer
当发生反序列化异常时,SerializationException
消息不再包含形式为 Can’t deserialize data [[123, 34, 98, 97, 122, …
的数据;每个数据字节的数值数组对于大数据来说并没有用处,并且可能会显得冗长。当与 ErrorHandlingDeserializer
一起使用时,发送到错误处理程序的 DeserializationException
包含 data
属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer
一起使用时,KafkaConsumer
将不断为相同的记录发出异常,显示主题/分区/偏移量以及 Jackson 抛出的原因。
ContainerPostProcessor
后处理可以通过在 @KafkaListener
注解中指定 ContainerPostProcessor
的 bean 名称来应用于监听器容器。这发生在容器创建之后,以及在容器工厂上配置的任何 ContainerCustomizer
之后。有关更多信息,请参见 Container Factory。
ErrorHandlingDeserializer
您现在可以将 Validator
添加到此反序列化器;如果委托的 Deserializer
成功反序列化对象,但该对象未通过验证,则会抛出异常,类似于发生反序列化异常。这允许将原始原始数据传递给错误处理程序。有关更多信息,请参见 Using ErrorHandlingDeserializer。
可重试主题
将后缀 -retry-5000
更改为 -retry
,当使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
时。如果您想保留后缀 -retry-5000
,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。有关更多信息,请参见 Topic Naming。
监听器容器更改
当手动分配分区时,如果 null
消费者 group.id
,则 AckMode
现在会自动强制为 MANUAL
。有关更多信息,请参见 Manually Assigning All Partitions。
3.0 自 2.9 以来的新特性
Kafka Client Version
此版本需要 3.3.1 kafka-clients
。
Exactly Once Semantics
EOSMode.V1
(即 ALPHA
)不再支持。
使用事务时,最低代理版本为 2.5。
请参见 Exactly Once Semantics 和 KIP-447 以获取更多信息。
Observation
Request error occurred:
Native Images
支持创建本地映像。有关更多信息,请参见 Native Images。
全球单一嵌入式 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker
) 现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参见 使用相同的 Broker 进行多个测试类。
可重试主题更改
此功能不再被视为实验性(就其 API 而言),该功能自 2.7 以来一直得到支持,但发生 API 变更的可能性大于正常情况。
在本次发布中,非阻塞重试基础设施 bean 的引导方式发生了变化,以避免某些应用程序在应用初始化过程中出现的一些时序问题。
您现在可以为重试容器设置不同的 concurrency
;默认情况下,concurrency
与主容器相同。
@RetryableTopic
现在可以作为自定义注解上的元注解使用,包括对 @AliasFor
属性的支持。
有关更多信息,请参见 Configuration。
重试主题的默认复制因子现在是 -1
(使用代理默认值)。如果您的代理版本早于 2.4,您现在需要显式设置该属性。
您现在可以在同一应用程序上下文中为同一主题配置多个 @RetryableTopic
监听器。之前,这是不可能的。有关更多信息,请参见 Multiple Listeners, Same Topic(s)。
在 RetryTopicConfigurationSupport
中有重大的 API 变更;具体来说,如果您重写了 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 bean 定义方法,这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory>
参数。
监听器容器更改
与消费者身份验证和授权失败相关的事件现在由容器发布。有关更多信息,请参见 Application Events。
您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参见 Container Thread Naming。
容器属性 restartAfterAuthException
已添加。有关更多信息,请参见 Listener Container Properties。
KafkaTemplate
更改
这个类返回的期货现在是 CompletableFuture
而不是 ListenableFuture
。请参见 Using KafkaTemplate。
ReplyingKafkaTemplate
更改
这个类返回的 futures 现在是 CompletableFuture
而不是 ListenableFuture
。请参见 Using ReplyingKafkaTemplate 和 Request/Reply with Message<?> s。
@KafkaListener
变更
您现在可以使用自定义关联头,该头将在任何回复消息中回显。有关更多信息,请参见 Using ReplyingKafkaTemplate 末尾的注释。
您现在可以在整个批处理被处理之前手动提交批处理的部分。有关更多信息,请参见 Committing Offsets。
KafkaHeaders
更改
在 KafkaHeaders
中,四个在 2.9.x 中被弃用的常量现在已被移除。
-
Instead of
MESSAGE_KEY
,useKEY
。 -
Instead of
PARTITION_ID
,usePARTITION
。
同样,RECEIVED_MESSAGE_KEY
被替换为 RECEIVED_KEY
,而 RECEIVED_PARTITION_ID
被替换为 RECEIVED_PARTITION
。
测试变更
版本 3.0.7 引入了 MockConsumerFactory
和 MockProducerFactory
。有关更多信息,请参见 Mock Consumer and Producer。
从版本 3.0.10 开始,嵌入式 Kafka 代理默认将 Spring Boot 属性 spring.kafka.bootstrap-servers
设置为嵌入式代理的地址。
2.9 相较于 2.8 有什么新变化
Kafka Client Version
此版本需要 3.2.0 kafka-clients
。
错误处理程序更改
DefaultErrorHandler
现在可以配置为暂停容器一次轮询,并使用上一次轮询的剩余结果,而不是寻址剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler。
DefaultErrorHandler
现在有一个 BackOffHandler
属性。有关更多信息,请参见 Back Off Handlers。
Listener Container Changes
interceptBeforeTx
现在适用于所有事务管理器(之前仅在使用 KafkaAwareTransactionManager
时应用)。请参见 [interceptBeforeTx]。
提供了一个新的容器属性 pauseImmediate
,该属性允许容器在处理完当前记录后暂停消费者,而不是在处理完上一个轮询的所有记录后再暂停。请参见 [pauseImmediate]。
与消费者身份验证和授权相关的事件
Header Mapper Changes
您现在可以配置哪些入站头应该被映射。此功能在版本 2.8.8 或更高版本中可用。有关更多信息,请参见 Message Headers。
KafkaTemplate
变更
在 3.0 中,该类返回的 futures 将是 CompletableFuture
而不是 ListenableFuture
。有关在使用此版本时过渡的帮助,请参见 Using KafkaTemplate。
ReplyingKafkaTemplate
更改
该模板现在提供了一种方法,可以在回复容器上等待分配,以避免在回复容器初始化之前发送请求时发生竞争。此功能在版本 2.8.8 或更高版本中可用。请参阅 Using ReplyingKafkaTemplate。
在 3.0 中,这个类返回的 futures 将是 CompletableFuture
而不是 ListenableFuture
。请参阅 Using ReplyingKafkaTemplate 和 Request/Reply with Message<?> s 以获取在使用此版本时过渡的帮助。
2.8 相较于 2.7 有什么新变化
本节涵盖了从版本 2.7 到版本 2.8 的更改。有关早期版本的更改,请参见 Change History。
Kafka Client Version
此版本需要 3.0.0 kafka-clients
。
包更改
与类型映射相关的类和接口已从 …support.converter
移动到 …support.mapping
。
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
Out of Order Manual Commits
监听器容器现在可以配置为接受无序的手动偏移提交(通常是异步的)。容器将推迟提交,直到缺失的偏移被确认。有关更多信息,请参见 Manually Committing Offsets。
@KafkaListener
的变化
现在可以在方法本身上指定监听器方法是否为批量监听器。这允许同一个容器工厂同时用于记录监听器和批量监听器。
请参见 [batch-listeners] 以获取更多信息。
批处理监听器现在可以处理转换异常。
请参见 Conversion Errors with Batch Error Handlers 以获取更多信息。
RecordFilterStrategy
在与批处理监听器一起使用时,现在可以在一次调用中过滤整个批次。有关更多信息,请参见 [batch-listeners] 末尾的说明。
@KafkaListener
注解现在有了 filter
属性,可以仅为此监听器覆盖容器工厂的 RecordFilterStrategy
。
@KafkaListener
注解现在有了 info
属性;这个属性用于填充新的监听器容器属性 listenerInfo
。然后,这个属性用于在每个记录中填充 KafkaHeaders.LISTENER_INFO
头部,可以在 RecordInterceptor
、RecordFilterStrategy
或监听器本身中使用。有关更多信息,请参见 Listener Info Header 和 AbstractMessageListenerContainer Properties。
KafkaTemplate
的更改
您现在可以根据主题、分区和偏移量接收单个记录。有关更多信息,请参见 Using KafkaTemplate to Receive。
CommonErrorHandler
已添加
遗留的 GenericErrorHandler
及其子接口层次结构用于记录和批处理监听器,已被新的单一接口 CommonErrorHandler
替代,其实现对应于大多数遗留的 GenericErrorHandler
实现。有关更多信息,请参见 Container Error Handlers 和 Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler。
Listener Container Changes
interceptBeforeTx
容器属性现在默认值为 true
。
authorizationExceptionRetryInterval
属性已重命名为 authExceptionRetryInterval
,现在除了之前的 AuthorizationException
之外,还适用于 AuthenticationException
。这两种异常都被视为致命异常,默认情况下容器将停止,除非设置了此属性。
请参见 Using KafkaMessageListenerContainer 和 Listener Container Properties 以获取更多信息。
序列化/反序列化更改
DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
现在已提供。有关更多信息,请参见 Delegating Serializer and Deserializer。
DeadLetterPublishingRecover
变更
属性 stripPreviousExceptionHeaders
现在默认为 true
。
现在有几种技术可以自定义添加到输出记录的标题。
请参见 Managing Dead Letter Record Headers 以获取更多信息。
可重试主题的更改
现在您可以对可重试和不可重试的主题使用相同的工厂。有关更多信息,请参见 Specifying a ListenerContainerFactory。
现在有一个可管理的全球致命异常列表,这些异常会使失败的记录直接进入 DLT。请参阅 Exception Classifier 以了解如何管理它。
您现在可以结合使用阻塞和非阻塞重试。有关更多信息,请参见 Combining Blocking and Non-Blocking Retries。
当使用可重试主题功能时抛出的 KafkaBackOffException 现在以 DEBUG 级别记录。如果您需要将日志级别更改回 WARN 或设置为其他任何级别,请参见 Changing KafkaBackOffException Logging Level。
2.6 和 2.7 之间的变化
Kafka Client Version
此版本需要 2.7.0 kafka-clients
。自版本 2.7.1 起,它也与 2.8.0 客户端兼容;请参见 Override Spring Boot Dependencies。
非阻塞延迟重试使用主题
在此版本中添加了这个重要的新功能。当严格的顺序不重要时,失败的交付可以发送到另一个主题,以便稍后消费。可以配置一系列这样的重试主题,具有逐渐增加的延迟。有关更多信息,请参见 Non-Blocking Retries。
Listener Container Changes
onlyLogRecordMetadata
容器属性现在默认值为 true
。
一个新的容器属性 stopImmediate
现在可用。
有关更多信息,请参见 Listener Container Properties。
使用 BackOff
在交付尝试之间的错误处理程序(例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
)现在将在容器停止后很快退出回退间隔,而不是延迟停止。
错误处理程序和扩展 FailedRecordProcessor
的回滚后处理程序现在可以配置一个或多个 RetryListener
,以接收有关重试和恢复进度的信息。
RecordInterceptor
现在有额外的方法,在监听器返回后调用(正常返回或抛出异常时)。它还具有一个子接口 ConsumerAwareRecordInterceptor
。此外,现在还有一个用于批量监听器的 BatchInterceptor
。有关更多信息,请参见 Message Listener Containers。
@KafkaListener
变更
您现在可以验证 @KafkaHandler
方法(类级监听器)的 payload 参数。有关更多信息,请参见 @KafkaListener @Payload Validation。
您现在可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上设置 rawRecordHeader
属性,这将导致原始的 ConsumerRecord
被添加到转换后的 Message<?>
中。这在您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer
时非常有用。有关更多信息,请参见 Listener Error Handlers。
您现在可以在应用程序初始化期间修改 @KafkaListener
注解。有关更多信息,请参见 @KafkaListener Attribute Modification。
DeadLetterPublishingRecover
变更
现在,如果键和值都无法反序列化,原始值将被发布到 DLT。之前,值被填充,但键 DeserializationException
保留在头部中。如果你子类化了恢复器并重写了 createProducerRecord
方法,这将是一个破坏性的 API 更改。
此外,恢复者在发布之前会验证目标解析器选择的分区是否确实存在。
请参阅 Publishing Dead-letter Records 以获取更多信息。
ChainedKafkaTransactionManager
已被弃用
有关更多信息,请参见 Transactions。
ReplyingKafkaTemplate
变更
现在有一个机制可以检查回复,如果存在某些条件,将异常地失败未来的操作。
已添加对发送和接收 spring-messaging
Message<?>
的支持。
请参见 Using ReplyingKafkaTemplate 以获取更多信息。
Kafka Streams Changes
默认情况下,StreamsBuilderFactoryBean
现在配置为不清理本地状态。有关更多信息,请参见 Configuration。
KafkaAdmin
变更
新增了方法 createOrModifyTopics
和 describeTopics
。添加了 KafkaAdmin.NewTopics
以便在单个 bean 中配置多个主题。有关更多信息,请参见 [configuring-topics]。
MessageConverter
更改
现在可以将 spring-messaging
的 SmartMessageConverter
添加到 MessagingMessageConverter
中,从而允许基于 contentType
头进行内容协商。有关更多信息,请参见 Spring Messaging Message Conversion。
Sequencing @KafkaListener
s
请参见 Starting @KafkaListener s in Sequence 以获取更多信息。
ExponentialBackOffWithMaxRetries
提供了一个新的 BackOff
实现,使配置最大重试次数更加方便。有关更多信息,请参见 ExponentialBackOffWithMaxRetries Implementation。
条件委托错误处理器
这些新的错误处理程序可以配置为根据异常类型委托给不同的错误处理程序。有关更多信息,请参见 Delegating Error Handler。
2.5 和 2.6 之间的变化
Kafka 客户端版本
此版本需要 2.6.0 kafka-clients
。
监听器容器更改
默认的 EOSMode
现在是 BETA
。有关更多信息,请参见 Exactly Once Semantics。
各种错误处理程序(扩展了 FailedRecordProcessor
)和 DefaultAfterRollbackProcessor
现在会在恢复失败时重置 BackOff
。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff
。
您现在可以在容器属性中配置 adviceChain
。有关更多信息,请参见 Listener Container Properties。
当容器配置为发布 ListenerContainerIdleEvent
时,它现在在接收到记录后发布 ListenerContainerNoLongerIdleEvent
,此时之前已发布了一个闲置事件。有关更多信息,请参见 Application Events 和 Detecting Idle and Non-Responsive Consumers。
@KafkaListener 变更
在使用手动分区分配时,您现在可以指定一个通配符来确定哪些分区应该重置为初始偏移量。此外,如果监听器实现了 ConsumerSeekAware
,则在手动分配后会调用 onPartitionsAssigned()
。(此功能也在版本 2.5.5 中添加)。有关更多信息,请参见 Explicit Partition Assignment。
已向 AbstractConsumerSeekAware
添加了便利方法,以简化查找过程。有关更多信息,请参见 [seek]。
ErrorHandler 更改
FailedRecordProcessor
的子类(例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
)现在可以配置为在异常类型与之前该记录发生的异常类型不同的情况下重置重试状态。
Producer Factory Changes
您现在可以为生产者设置一个最大年龄,超过该年龄后,它们将被关闭并重新创建。有关更多信息,请参见 Transactions。
您现在可以在创建 DefaultKafkaProducerFactory
之后更新配置映射。这在某些情况下可能会很有用,例如,如果您在凭据更改后需要更新 SSL 密钥/信任存储位置。有关更多信息,请参见 Using DefaultKafkaProducerFactory。
2.4 和 2.5 之间的变化
本节涵盖了从版本 2.4 到版本 2.5 的更改。有关早期版本的更改,请参见 Change History。
Consumer/Producer Factory Changes
默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了原生 Micrometer 指标的实现。有关更多信息,请参见 Factory Listeners。
您现在可以在运行时更改 bootstrap server 属性,从而实现故障转移到另一个 Kafka 集群。有关更多信息,请参见 Connecting to Kafka。
StreamsBuilderFactoryBean
的变更
工厂 bean 现在可以在每次创建或销毁 KafkaStreams
时调用回调。提供了原生 Micrometer 指标的实现。有关更多信息,请参见 KafkaStreams Micrometer Support。
Kafka Client Version
此版本需要 2.5.0 kafka-clients
。
类/包变更
SeekUtils
已从 o.s.k.support
包移动到 o.s.k.listener
。
Delivery Attempts Header
现在有一个选项可以添加一个头部,用于跟踪在使用某些错误处理程序和回滚处理器后进行的交付尝试。有关更多信息,请参见 Delivery Attempts Header。
@KafkaListener 变更
默认的回复头将会在需要时自动填充,如果 @KafkaListener
的返回类型是 Message<?>
。有关更多信息,请参见 Reply Type Message<?>。
KafkaHeaders.RECEIVED_MESSAGE_KEY
不再在传入记录的键为 null
时填充 null
值;该头部会被完全省略。
@KafkaListener
方法现在可以指定一个 ConsumerRecordMetadata
参数,而不是使用离散的头部来传递诸如主题、分区等元数据。有关更多信息,请参见 Consumer Record Metadata。
Listener Container Changes
assignmentCommitOption
容器属性现在默认值为 LATEST_ONLY_NO_TX
。有关更多信息,请参见 Listener Container Properties。
subBatchPerPartition
容器属性在使用事务时现在默认为 true
。有关更多信息,请参见 Transactions。
现在提供了一个新的 RecoveringBatchErrorHandler
。
现在支持静态组成员资格。有关更多信息,请参见 Message Listener Containers。
当配置了增量/协作再平衡时,如果偏移量未能提交并出现非致命的 RebalanceInProgressException
,容器将在再平衡完成后尝试重新提交仍分配给该实例的分区的偏移量。
默认的错误处理程序现在是记录监听器的 SeekToCurrentErrorHandler
和批处理监听器的 RecoveringBatchErrorHandler
。有关更多信息,请参见 Container Error Handlers。
您现在可以控制标准错误处理程序故意抛出的异常记录的级别。有关更多信息,请参见 Container Error Handlers。
getAssignmentsByClientId()
方法已被添加,使得在并发容器中更容易确定哪些消费者被分配了哪些分区。有关更多信息,请参见 Listener Container Properties。
您现在可以在错误、调试日志等中抑制整个 ConsumerRecord
的日志。请参见 Listener Container Properties 中的 onlyLogRecordMetadata
。
KafkaTemplate 更改
KafkaTemplate
现在可以维护 micrometer timers。有关更多信息,请参见 Monitoring。
KafkaTemplate
现在可以使用 ProducerConfig
属性进行配置,以覆盖生产者工厂中的属性。有关更多信息,请参见 Using KafkaTemplate。
现在提供了一个 RoutingKafkaTemplate
。有关更多信息,请参见 Using RoutingKafkaTemplate。
您现在可以使用 KafkaSendCallback
代替 ListenerFutureCallback
来获取更具体的异常,从而更容易提取失败的 ProducerRecord
。有关更多信息,请参见 Using KafkaTemplate。
Kafka 字符串序列化/反序列化
新的 ToStringSerializer
/ StringDeserializer
以及相关的 SerDe
现已提供。有关更多信息,请参见 String serialization。
JsonDeserializer
JsonDeserializer
现在在确定反序列化类型方面具有更大的灵活性。有关更多信息,请参见 Using Methods to Determine Types。
委托序列化器/反序列化器
DelegatingSerializer
现在可以处理 "标准" 类型,当出站记录没有头部时。有关更多信息,请参见 Delegating Serializer and Deserializer。
测试变更
KafkaTestUtils.consumerProps()
辅助记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为 earliest
。有关更多信息,请参见 JUnit。
2.3 和 2.4 之间的变化
Kafka Client Version
此版本需要 2.4.0 kafka-clients
或更高版本,并支持新的增量再平衡功能。
ConsumerAwareRebalanceListener
像 ConsumerRebalanceListener
一样,这个接口现在有一个额外的方法 onPartitionsLost
。有关更多信息,请参考 Apache Kafka 文档。
与 ConsumerRebalanceListener
不同,默认实现不会调用 onPartitionsRevoked
。相反,监听器容器会在调用 onPartitionsLost
之后调用该方法;因此,在实现 ConsumerAwareRebalanceListener
时,您不应这样做。
请查看 Rebalancing Listeners 末尾的 IMPORTANT 注释以获取更多信息。
GenericErrorHandler
isAckAfterHandle()
的默认实现现在默认返回 true。
KafkaTemplate
KafkaTemplate
现在支持非事务性发布和事务性发布。有关更多信息,请参见 KafkaTemplate 事务性和非事务性发布。
AggregatingReplyingKafkaTemplate
releaseStrategy
现在是一个 BiConsumer
。它现在在超时后被调用(以及在记录到达时);在超时后调用时,第二个参数为 true
。
请参见 Aggregating Multiple Replies 以获取更多信息。
Listener Container
ContainerProperties
提供了一个 authorizationExceptionRetryInterval
选项,以便在 KafkaConsumer
抛出任何 AuthorizationException
后,监听器容器可以进行重试。有关更多信息,请参见其 JavaDocs 和 使用 KafkaMessageListenerContainer。
@KafkaListener
@KafkaListener
注解有一个新属性 splitIterables
;默认为 true。当一个回复监听器返回一个 Iterable
时,此属性控制返回结果是作为单个记录发送,还是为每个元素发送一个记录。有关更多信息,请参见 Forwarding Listener Results using @SendTo。
批处理监听器现在可以通过 BatchToRecordAdapter
进行配置;这允许例如在事务中处理批次,同时监听器一次获取一条记录。使用默认实现时,可以使用 ConsumerRecordRecoverer
来处理批次中的错误,而不会停止整个批次的处理 - 这在使用事务时可能会很有用。有关更多信息,请参见 Transactions with Batch Listeners。
Kafka Streams
StreamsBuilderFactoryBean
接受一个新的属性 KafkaStreamsInfrastructureCustomizer
。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参见 Spring Management。
2.2 和 2.3 之间的变化
本节涵盖了从版本 2.2 到版本 2.3 的更改。
提示、技巧和示例
新增了一章 Tips, Tricks and Examples。请提交 GitHub 问题和/或拉取请求,以便在该章节中添加更多条目。
Kafka 客户端版本
此版本需要 2.3.0 kafka-clients
或更高版本。
类/包变更
TopicPartitionInitialOffset
已被弃用,推荐使用 TopicPartitionOffset
。
配置更改
从版本 2.3.4 开始,missingTopicsFatal
容器属性默认值为 false。当该属性为 true 时,如果代理不可用,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。
生产者和消费者工厂变更
DefaultKafkaProducerFactory
现在可以配置为每个线程创建一个生产者。您还可以在构造函数中提供 Supplier<Serializer>
实例,作为配置类(需要无参构造函数)或使用 Serializer
实例构造的替代方案,后者在所有生产者之间共享。有关更多信息,请参见 Using DefaultKafkaProducerFactory。
在 DefaultKafkaConsumerFactory
中,Supplier<Deserializer>
实例也提供相同的选项。有关更多信息,请参见 Using KafkaMessageListenerContainer。
Listener Container Changes
之前,错误处理程序在使用监听器适配器(例如 @KafkaListener
)调用监听器时,会收到 ListenerExecutionFailedException
(实际监听器异常作为 cause
)。原生的 GenericMessageListener
抛出的异常会原样传递给错误处理程序。现在,ListenerExecutionFailedException
始终作为参数(实际监听器异常作为 cause
),这提供了对容器的 group.id
属性的访问。
因为监听器容器有自己的提交偏移量的机制,它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
设置为 false
。除非在消费者工厂或容器的消费者属性覆盖中特别设置,否则它现在会自动将其设置为 false。
ackOnError
属性现在默认值为 false
。
现在可以在监听器方法中获取消费者的 group.id
属性。有关更多信息,请参见 Obtaining the Consumer group.id。
容器具有一个新属性 recordInterceptor
,允许在调用监听器之前检查或修改记录。如果需要调用多个拦截器,还提供了 CompositeRecordInterceptor
。有关更多信息,请参见 Message Listener Containers。
ConsumerSeekAware
有新的方法,允许您相对于开始、结束或当前位置进行查找,并根据时间戳查找第一个大于或等于该时间戳的偏移量。有关更多信息,请参见 [seek]。
现在提供了一个便利类 AbstractConsumerSeekAware
来简化查找。有关更多信息,请参见 [seek]。
ContainerProperties
提供了一个 idleBetweenPolls
选项,允许监听器容器中的主循环在 KafkaConsumer.poll()
调用之间休眠。有关更多信息,请参见其 JavaDocs 和 使用 KafkaMessageListenerContainer。
当使用 AckMode.MANUAL
(或 MANUAL_IMMEDIATE
)时,您现在可以通过在 Acknowledgment
上调用 nack
来导致重新投递。有关更多信息,请参见 Committing Offsets。
现在可以使用 Micrometer Timer
监控监听器性能。有关更多信息,请参见 Monitoring。
容器现在发布与启动相关的额外消费者生命周期事件。有关更多信息,请参见 Application Events。
事务批处理监听器现在可以支持僵尸围栏。有关更多信息,请参见 Transactions。
监听器容器工厂现在可以配置 ContainerCustomizer
,以便在每个容器创建和配置之后进一步配置它。有关更多信息,请参见 Container factory。
ErrorHandler 更改
SeekToCurrentErrorHandler
现在将某些异常视为致命异常,并对这些异常禁用重试,在第一次失败时调用恢复器。
SeekToCurrentErrorHandler
和 SeekToCurrentBatchErrorHandler
现在可以配置在投递尝试之间应用 BackOff
(线程休眠)。
从版本 2.3.2 开始,恢复的记录的偏移量将在错误处理程序在恢复失败记录后返回时被提交。
DeadLetterPublishingRecoverer
在与 ErrorHandlingDeserializer
一起使用时,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。之前,它是 null
,用户代码需要从消息头中提取 DeserializationException
。有关更多信息,请参见 Publishing Dead-letter Records。
TopicBuilder
提供了一个新的类 TopicBuilder
,用于更方便地创建 NewTopic
@Bean
以实现自动主题配置。有关更多信息,请参见 [configuring-topics]。
Kafka Streams 变更
您现在可以对由 @EnableKafkaStreams
创建的 StreamsBuilderFactoryBean
进行额外配置。有关更多信息,请参见 Streams Configuration。
现在提供了一个 RecoveringDeserializationExceptionHandler
,它允许恢复具有反序列化错误的记录。它可以与 DeadLetterPublishingRecoverer
一起使用,将这些记录发送到死信主题。有关更多信息,请参见 Recovery from Deserialization Exceptions。
HeaderEnricher
转换器已经提供,使用 SpEL 生成头部值。有关更多信息,请参见 Header Enricher。
MessagingTransformer
已经提供。这允许 Kafka streams 拓扑与 spring-messaging 组件进行交互,例如 Spring Integration 流。有关更多信息,请参见 MessagingProcessor 和 [从 KStream 调用 Spring Integration 流]。
JSON 组件变更
现在所有 JSON 感知组件默认配置为使用 Jackson
的 ObjectMapper
,该 ObjectMapper
由 JacksonUtils.enhancedObjectMapper()
生成。JsonDeserializer
现在提供基于 TypeReference
的构造函数,以更好地处理目标泛型容器类型。同时,引入了 JacksonMimeTypeModule
,用于将 org.springframework.util.MimeType
序列化为普通字符串。有关更多信息,请参见其 JavaDocs 和 Serialization, Deserialization, and Message Conversion。
提供了一个 ByteArrayJsonMessageConverter
,以及一个新的超类 JsonMessageConverter
,适用于所有 Json 转换器。此外,现在可以使用 StringOrBytesSerializer
;它可以序列化 byte[]
、Bytes
和 String
值在 ProducerRecord
中。有关更多信息,请参见 Spring Messaging Message Conversion。
JsonSerializer
、JsonDeserializer
和 JsonSerde
现在拥有流式 API,以简化编程配置。有关更多信息,请参见 javadocs、序列化、反序列化和消息转换 和 Streams JSON 序列化和反序列化。
ReplyingKafkaTemplate
当回复超时时,未来会异常完成,抛出 KafkaReplyTimeoutException
而不是 KafkaException
。
此外,现在提供了一个重载的 sendAndReceive
方法,允许在每条消息的基础上指定回复超时时间。
AggregatingReplyingKafkaTemplate
扩展了 ReplyingKafkaTemplate
,通过聚合来自多个接收者的回复。有关更多信息,请参见 聚合多个回复。
事务变更
您现在可以在 KafkaTemplate
和 KafkaTransactionManager
上覆盖生产者工厂的 transactionIdPrefix
。有关更多信息,请参见 transactionIdPrefix。
新的委托序列化器/反序列化器
该框架现在提供了一个委托 Serializer
和 Deserializer
,利用一个头部来支持生成和消费具有多种键/值类型的记录。有关更多信息,请参见 Delegating Serializer and Deserializer。
新的重试反序列化器
该框架现在提供了一个委托的 RetryingDeserializer
,用于在发生网络问题等瞬态错误时重试序列化。有关更多信息,请参见 Retrying Deserializer。
2.1 和 2.2 之间的变化
Kafka 客户端版本
此版本需要 2.0.0 及以上的 kafka-clients
。
类和包的变化
ContainerProperties
类已从 org.springframework.kafka.listener.config
移动到 org.springframework.kafka.listener
。
AckMode
枚举已从 AbstractMessageListenerContainer
移动到 ContainerProperties
。
setBatchErrorHandler()
和 setErrorHandler()
方法已从 ContainerProperties
移动到 AbstractMessageListenerContainer
和 AbstractKafkaListenerContainerFactory
。
回滚处理后
提供了一种新的 AfterRollbackProcessor
策略。有关更多信息,请参见 After-rollback Processor。
ConcurrentKafkaListenerContainerFactory
更改
您现在可以使用 ConcurrentKafkaListenerContainerFactory
来创建和配置任何 ConcurrentMessageListenerContainer
,不仅限于 @KafkaListener
注解的容器。有关更多信息,请参见 Container factory。
Listener Container Changes
新增了一个容器属性(missingTopicsFatal
)。有关更多信息,请参见 Using KafkaMessageListenerContainer。
当消费者停止时,现在会发出 ConsumerStoppedEvent
。有关更多信息,请参见 Thread Safety。
批处理监听器可以选择接收完整的 ConsumerRecords<?, ?>
对象,而不是 List<ConsumerRecord<?, ?>
。有关更多信息,请参见 [batch-listeners]。
DefaultAfterRollbackProcessor
和 SeekToCurrentErrorHandler
现在可以恢复(跳过)持续失败的记录,默认情况下,在 10 次失败后进行恢复。它们可以被配置为将失败的记录发布到死信主题。
从版本 2.2.4 开始,在选择死信主题名称时可以使用消费者的组 ID。
已添加 ConsumerStoppingEvent
。有关更多信息,请参见 Application Events。
SeekToCurrentErrorHandler
现在可以配置为在容器配置为 AckMode.MANUAL_IMMEDIATE
时提交恢复记录的偏移量(自 2.2.4 起)。
@KafkaListener 变更
您现在可以通过在注解上设置属性来覆盖监听器容器工厂的 concurrency
和 autoStartup
属性。您现在可以添加配置以确定哪些头部(如果有的话)会被复制到回复消息中。有关更多信息,请参见 @KafkaListener Annotation。
您现在可以将 @KafkaListener
用作您自己注解的元注解。有关更多信息,请参见 @KafkaListener as a Meta Annotation。
现在更容易为 @Payload
验证配置 Validator
。有关更多信息,请参见 @KafkaListener @Payload Validation。
您现在可以直接在注解上指定 kafka 消费者属性;这些属性将覆盖消费者工厂中定义的同名属性(自版本 2.2.4 起)。有关更多信息,请参见 Annotation Properties。
Header Mapping Changes
类型为 MimeType
和 MediaType
的头现在在 RecordHeader
值中映射为简单字符串。之前,它们被映射为 JSON,且只有 MimeType
被解码。MediaType
无法被解码。现在它们是简单字符串,以便于互操作性。
此外,DefaultKafkaHeaderMapper
具有一个新的 addToStringClasses
方法,允许指定应使用 toString()
而不是 JSON 进行映射的类型。有关更多信息,请参见 Message Headers。
嵌入式 Kafka 变更
KafkaEmbedded
类及其 KafkaRule
接口已被弃用,取而代之的是 EmbeddedKafkaBroker
及其 JUnit 4 的 EmbeddedKafkaRule
包装器。现在,@EmbeddedKafka
注解会填充一个 EmbeddedKafkaBroker
bean,而不是已弃用的 KafkaEmbedded
。此更改允许在 JUnit 5 测试中使用 @EmbeddedKafka
。@EmbeddedKafka
注解现在具有 ports
属性,以指定填充 EmbeddedKafkaBroker
的端口。有关更多信息,请参见 Testing Applications。
JsonSerializer/Deserializer 增强
您现在可以通过使用生产者和消费者属性提供类型映射信息。
新的构造函数可用于反序列化器,以允许用提供的目标类型覆盖类型头信息。
JsonDeserializer
现在默认移除任何类型信息头。
您现在可以通过使用 Kafka 属性(自 2.2.3 起)配置 JsonDeserializer
以忽略类型信息头。
Kafka Streams 变更
流配置 bean 现在必须是一个 KafkaStreamsConfiguration
对象,而不是 StreamsConfig
对象。
StreamsBuilderFactoryBean
已从包 …core
移动到 …config
。
KafkaStreamBrancher
被引入以改善在 KStream
实例上构建条件分支时的最终用户体验。
请参见 Apache Kafka Streams Support 和 Configuration 以获取更多信息。
Transactional ID
当监听器容器启动事务时,transactional.id
现在是 transactionIdPrefix
加上 <group.id>.<topic>.<partition>
。这个变化允许对僵尸进行适当的围栏, 如这里所述。
2.0 与 2.1 之间的变化
Kafka Client Version
此版本需要 1.0.0 及以上的 kafka-clients
。
1.1.x 客户端在 2.2 版本中得到原生支持。
JSON 改进
StringJsonMessageConverter
和 JsonSerializer
现在在 Headers
中添加类型信息,让转换器和 JsonDeserializer
能够根据消息本身而不是固定配置的类型在接收时创建特定类型。有关更多信息,请参见 Serialization, Deserialization, and Message Conversion。
容器停止错误处理程序
容器错误处理程序现在为记录和批处理监听器提供,这些处理程序将监听器抛出的任何异常视为致命错误/ 它们会停止容器。有关更多信息,请参见 Handling Exceptions。
暂停和恢复容器
监听器容器现在具有 pause()
和 resume()
方法(自版本 2.1.3 起)。有关更多信息,请参见 Pausing and Resuming Listener Containers。
有状态重试
从版本 2.1.3 开始,您可以配置有状态重试。有关更多信息,请参见 Stateful Retry。
Client ID
从版本 2.1.1 开始,您现在可以在 @KafkaListener
上设置 client.id
前缀。之前,要自定义客户端 ID,您需要为每个监听器使用单独的消费者工厂(和容器工厂)。当您使用并发时,前缀会以 -n
作为后缀,以提供唯一的客户端 ID。
记录偏移提交
默认情况下,主题偏移量提交的日志记录使用 DEBUG
日志级别。从版本 2.1.2 开始,ContainerProperties
中新增了一个名为 commitLogLevel
的属性,允许您指定这些消息的日志级别。有关更多信息,请参见 Using KafkaMessageListenerContainer。
Default @KafkaHandler
从版本 2.1.3 开始,您可以在类级别的 @KafkaListener
上指定一个 @KafkaHandler
注解作为默认值。有关更多信息,请参见 @KafkaListener on a Class。
ReplyingKafkaTemplate
从版本 2.1.3 开始,提供了 KafkaTemplate
的一个子类,以支持请求/回复语义。有关更多信息,请参见 Using ReplyingKafkaTemplate。
ChainedKafkaTransactionManager
版本 2.1.3 引入了 ChainedKafkaTransactionManager
。 (它现在已被弃用)。
从 2.0 迁移指南
请参阅 2.0 到 2.1 迁移 指南。
1.3 和 2.0 之间的变化
Spring Framework 和 Java 版本
Apache Kafka 的 Spring 项目现在要求使用 Spring Framework 5.0 和 Java 8。
@KafkaListener
变更
您现在可以使用 @SendTo
注解 @KafkaListener
方法(以及类和 @KafkaHandler
方法)。如果该方法返回结果,它将被转发到指定的主题。有关更多信息,请参见 使用 @SendTo 转发监听器结果。
消息监听器
消息监听器现在可以感知 Consumer
对象。有关更多信息,请参见 [message-listeners]。
使用 ConsumerAwareRebalanceListener
Rebalance listeners 现在可以在重新平衡通知期间访问 Consumer
对象。有关更多信息,请参见 Rebalancing Listeners。
1.2 和 1.3 之间的变化
对事务的支持
0.11.0.0 客户端库添加了对事务的支持。添加了 KafkaTransactionManager
和其他对事务的支持。有关更多信息,请参见 Transactions。
对标题的支持
0.11.0.0 客户端库添加了对消息头的支持。这些现在可以映射到 spring-messaging
的 MessageHeaders
,也可以从中映射。有关更多信息,请参见 Message Headers。
创建主题
0.11.0.0 客户端库提供了一个 AdminClient
,您可以使用它来创建主题。KafkaAdmin
使用此客户端自动添加定义为 @Bean
实例的主题。
对 Kafka 时间戳的支持
KafkaTemplate
现在支持一个 API 来添加带有时间戳的记录。关于 timestamp
支持,新的 KafkaHeaders
已被引入。同时,新增了 KafkaConditions.timestamp()
和 KafkaMatchers.hasTimestamp()
测试工具。有关更多详细信息,请参见 Using KafkaTemplate、@KafkaListener Annotation 和 Testing Applications。
@KafkaListener
变更
您现在可以配置一个 KafkaListenerErrorHandler
来处理异常。有关更多信息,请参见 Handling Exceptions。
默认情况下,@KafkaListener
的 id
属性现在用作 group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置 groupId
。之前,您需要一个单独的容器工厂(和消费者工厂)来为监听器使用不同的 group.id
值。要恢复使用工厂配置的 group.id
的先前行为,请将注解上的 idIsGroup
属性设置为 false
。
@EmbeddedKafka
注解
为了方便,提供了一个测试类级别的 @EmbeddedKafka
注解,用于将 KafkaEmbedded
注册为一个 bean。有关更多信息,请参见 Testing Applications。
Kerberos 配置
现在提供了配置 Kerberos 的支持。有关更多信息,请参见 JAAS and Kerberos。
1.1 和 1.2 之间的变化
此版本使用 0.10.2.x 客户端。
1.0 和 1.1 之间的变化
Kafka Client
此版本使用 Apache Kafka 0.10.x.x 客户端。
批处理监听器
可以配置监听器以接收 consumer.poll()
操作返回的整个消息批次,而不是逐个接收。
Null Payloads
Null payloads 用于在使用日志压缩时“删除”键。
初始偏移
Request error occurred:
Seek
您现在可以查找每个主题或分区的位置。您可以在使用组管理并且 Kafka 分配分区时,在初始化期间使用此功能来设置初始位置。您还可以在检测到空闲容器时或在应用程序执行的任何任意点进行查找。有关更多信息,请参见 [seek]。