跳到主要内容

序列化、反序列化和消息转换

ChatGPT-4o-mini 中英对照 Serialization, Deserialization, and Message Conversion

概述

Apache Kafka 提供了一个高层次的 API,用于序列化和反序列化记录值以及它们的键。它通过 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 抽象类提供了一些内置实现。同时,我们可以通过使用 ProducerConsumer 配置属性来指定序列化器和反序列化器类。以下示例展示了如何做到这一点:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
java

对于更复杂或特定的情况,KafkaConsumer(因此,KafkaProducer)提供了重载构造函数,以接受 SerializerDeserializer 实例,分别用于 keysvalues

当你使用这个 API 时,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 也提供属性(通过构造函数或 setter 方法)来将自定义的 SerializerDeserializer 实例注入到目标 ProducerConsumer 中。此外,你还可以通过构造函数传入 Supplier<Serializer>Supplier<Deserializer> 实例——这些 Supplier 会在每个 ProducerConsumer 创建时被调用。

字符串序列化

自版本 2.5 起,Spring for Apache Kafka 提供了 ToStringSerializerParseStringDeserializer 类,这些类使用实体的字符串表示。它们依赖于 toString 方法以及某些 Function<String>BiFunction<String, Headers> 来解析字符串并填充实例的属性。通常,这会调用类上的某个静态方法,例如 parse

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
java

默认情况下,ToStringSerializer 被配置为在记录 Headers 中传递有关序列化实体的类型信息。您可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。这些信息可以在接收端由 ParseStringDeserializer 使用。

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 来禁用 ToStringSerializer 上的此功能(设置 addTypeInfo 属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);

if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
java

您可以配置用于将 String 转换为 byte[] 及反向转换的 Charset,默认值为 UTF-8

您可以使用 ConsumerConfig 属性配置反序列化器的解析方法名称:

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

属性必须包含类的完全限定名,后跟方法名,用句点 . 分隔。该方法必须是静态的,并且具有 (String, Headers)(String) 的签名。

提供了一个 ToFromStringSerde,用于 Kafka Streams。

JSON

Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的 JsonSerializerJsonDeserializer 实现。JsonSerializer 允许将任何 Java 对象写入 JSON byte[]JsonDeserializer 需要一个额外的 Class<?> targetType 参数,以便将消费的 byte[] 反序列化为正确的目标对象。以下示例展示了如何创建一个 JsonDeserializer

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
java

您可以使用 ObjectMapper 自定义 JsonSerializerJsonDeserializer。您还可以扩展它们,以在 configure(Map<String, ?> configs, boolean isKey) 方法中实现一些特定的配置逻辑。

从版本 2.3 开始,所有 JSON 感知组件默认配置为使用 JacksonUtils.enhancedObjectMapper() 实例,该实例禁用了 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。此外,该实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。有关更多信息,请参见 JacksonUtils.enhancedObjectMapper() 的 JavaDocs。该方法还注册了 org.springframework.kafka.support.JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 对象序列化为普通字符串,以实现跨平台的网络兼容性。JacksonMimeTypeModule 可以作为 bean 注册到应用程序上下文中,并将自动配置到 Spring Boot ObjectMapper 实例 中。

从版本 2.3 开始,JsonDeserializer 提供了基于 TypeReference 的构造函数,以更好地处理目标泛型容器类型。

从版本 2.1 开始,您可以在记录 Headers 中传达类型信息,从而允许处理多种类型。此外,您可以通过使用以下 Kafka 属性来配置序列化器和反序列化器。如果您已经为 KafkaConsumerKafkaProducer 提供了 SerializerDeserializer 实例,则这些属性将无效。

配置属性

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 来禁用 JsonSerializer 上的此功能(设置 addTypeInfo 属性)。

  • JsonSerializer.TYPE_MAPPINGS(默认 empty):请参见 Mapping Types

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 来忽略序列化器设置的头信息。

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 来保留序列化器设置的头信息。

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果没有头信息,则用于反序列化键的后备类型。

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果没有头信息,则用于反序列化值的后备类型。

  • JsonDeserializer.TRUSTED_PACKAGES(默认 java.util, java.lang):允许反序列化的包模式的逗号分隔列表。* 表示反序列化所有。

  • JsonDeserializer.TYPE_MAPPINGS(默认 empty):请参见 Mapping Types

  • JsonDeserializer.KEY_TYPE_METHOD(默认 empty):请参见 Using Methods to Determine Types

  • JsonDeserializer.VALUE_TYPE_METHOD(默认 empty):请参见 Using Methods to Determine Types

从版本 2.2 开始,类型信息头(如果由序列化器添加)会被反序列化器移除。您可以通过将 removeTypeHeaders 属性设置为 false 来恢复到之前的行为,既可以直接在反序列化器上设置,也可以通过之前描述的配置属性进行设置。

important

从版本 2.8 开始,如果您按照 程序化构建 中所示的方式以编程方式构造序列化器或反序列化器,只要您没有显式设置任何属性(使用 set*() 方法或使用流式 API),上述属性将由工厂应用。之前,在以编程方式创建时,配置属性从未被应用;如果您直接在对象上显式设置属性,这种情况仍然存在。

映射类型

从版本 2.2 开始,在使用 JSON 时,您现在可以通过使用前面列表中的属性提供类型映射。之前,您必须在序列化器和反序列化器中自定义类型映射器。映射由以逗号分隔的 token:className 对组成。在出站时,负载的类名映射到相应的 token。在入站时,类型头中的 token 映射到相应的类名。

以下示例创建了一组映射:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
java
important

对应的对象必须兼容。

如果你使用 Spring Boot,你可以在 application.properties(或 yaml)文件中提供这些属性。以下示例展示了如何做到这一点:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
none
important

您只能使用属性进行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义 ObjectMapper),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖了默认工厂:

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
}
java

还提供了 setter 方法,作为使用这些构造函数的替代方案。

备注

在使用 Spring Boot 并重写 ConsumerFactoryProducerFactory 如上所示时,需要在 bean 方法的返回类型中使用通配符泛型。如果提供了具体的泛型类型,则 Spring Boot 将忽略这些 bean,并仍然使用默认的 bean。

从版本 2.2 开始,您可以显式配置反序列化器以使用提供的目标类型,并通过使用带有布尔值 useHeadersIfPresent 参数的重载构造函数来忽略头部中的类型信息(默认为 true)。以下示例演示了如何做到这一点:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
java

使用方法来确定类型

从版本 2.5 开始,您现在可以通过属性配置反序列化器,以调用一个方法来确定目标类型。如果存在,这将覆盖上述讨论的其他任何技术。如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他头部反序列化为不同类型,这可能会很有用。将这些属性设置为方法名称 - 一个完全限定的类名,后面跟着方法名称,用点 . 分隔。该方法必须声明为 public static,并具有以下三种签名之一 (String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data),并返回一个 Jackson JavaType

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

您可以使用任意头部或检查数据以确定类型。

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
java

对于更复杂的数据检查,可以考虑使用 JsonPath 或类似工具,但越简单的类型判断测试,过程将越高效。

以下是通过编程方式创建反序列化器的示例(在构造函数中向消费者工厂提供反序列化器):

JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
java

程序化构建

在为生产者/消费者工厂以编程方式构建序列化器/反序列化器时,从版本 2.3 开始,您可以使用流式 API,这简化了配置。

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
java

要以编程方式提供类型映射,类似于 使用方法确定类型,请使用 typeFunction 属性。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
java

另外,只要您不使用流式 API 来配置属性,或使用 set*() 方法设置它们,工厂将使用配置属性来配置序列化器/反序列化器;请参见 Configuration Properties

委托序列化器和反序列化器

使用标题

版本 2.3 引入了 DelegatingSerializerDelegatingDeserializer,允许使用不同的键和/或值类型来生成和消费记录。生产者必须设置一个头部 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 为一个选择器值,该值用于选择用于值的序列化器,以及 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 用于键;如果没有找到匹配项,将抛出 IllegalStateException

对于传入的记录,反序列化器使用相同的头部来选择要使用的反序列化器;如果未找到匹配项或头部不存在,则返回原始 byte[]

您可以通过构造函数将选择器的映射配置为 Serializer / Deserializer,或者可以通过 Kafka 生产者/消费者属性配置,使用键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG。对于序列化器,生产者属性可以是一个 Map<String, Object>,其中键是选择器,值是 Serializer 实例、序列化器 Class 或类名。该属性也可以是一个以逗号分隔的映射条目的字符串,如下所示。

对于反序列化器,消费者属性可以是一个 Map<String, Object>,其中键是选择器,值是 Deserializer 实例、反序列化器 Class 或类名。该属性也可以是一个以逗号分隔的映射条目的字符串,如下所示。

要使用属性进行配置,请使用以下语法:

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
java

生产者随后会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 头设置为 thing1thing2

该技术支持将不同类型发送到同一主题(或不同主题)。

备注

从版本 2.5.1 开始,如果类型(键或值)是 Serdes 支持的标准类型之一(如 LongInteger 等),则不需要设置选择器头。相反,序列化器将把头设置为类型的类名。对于这些类型,不需要配置序列化器或反序列化器,它们将动态创建(一次)。

有关将不同类型发送到不同主题的另一种技术,请参见 Using RoutingKafkaTemplate

按类型

版本 2.8 引入了 DelegatingByTypeSerializer

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
java

从版本 2.8.3 开始,您可以配置序列化器以检查映射键是否可以分配给目标对象,这在委托序列化器可以序列化子类时非常有用。在这种情况下,如果存在模糊匹配,则应提供一个有序的 Map,例如 LinkedHashMap

按主题

从版本 2.8 开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允许根据主题名称选择序列化器/反序列化器。使用正则表达式 Pattern 来查找要使用的实例。可以通过构造函数或通过属性(以逗号分隔的 pattern:serializer 列表)来配置该映射。

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
java

使用 KEY_SERIALIZATION_TOPIC_CONFIG 作为键时。

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
java

您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定默认的序列化器/反序列化器,以便在没有模式匹配时使用。

一个额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认值为 true),当设置为 false 时,会使主题查找不区分大小写。

重试反序列化

RetryingDeserializer 使用一个委托 DeserializerRetryTemplate 来在委托可能在反序列化过程中遇到瞬时错误(例如网络问题)时重试反序列化。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
java

从版本 3.1.2 开始,可以选择在 RetryingDeserializer 上设置 RecoveryCallback

请参阅 spring-retry 项目,以获取 RetryTemplate 的配置,包括重试策略、退避策略等。

Spring Messaging 消息转换

虽然 SerializerDeserializer API 从低级 Kafka ConsumerProducer 的角度来看相当简单且灵活,但在使用 @KafkaListenerSpring Integration 的 Apache Kafka 支持 时,您可能需要在 Spring Messaging 层面上获得更多灵活性。为了让您轻松地在 org.springframework.messaging.Message 之间进行转换,Spring for Apache Kafka 提供了一个 MessageConverter 抽象,具有 MessagingMessageConverter 实现及其 JsonMessageConverter(及子类)的自定义。您可以直接将 MessageConverter 注入到 KafkaTemplate 实例中,并通过使用 AbstractKafkaListenerContainerFactory bean 定义来设置 @KafkaListener.containerFactory() 属性。以下示例演示了如何做到这一点:

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
java

在使用 Spring Boot 时,只需将转换器定义为 @Bean,Spring Boot 自动配置将其连接到自动配置的模板和容器工厂中。

当你使用 @KafkaListener 时,参数类型会提供给消息转换器,以帮助进行转换。

备注

这种类型推断仅在 @KafkaListener 注解声明在方法级别时才能实现。对于类级别的 @KafkaListener,有效负载类型用于选择要调用的 @KafkaHandler 方法,因此在选择方法之前,必须已经完成转换。

备注

在消费者端,您可以配置一个 JsonMessageConverter;它可以处理类型为 byte[]BytesStringConsumerRecord 值,因此应与 ByteArrayDeserializerBytesDeserializerStringDeserializer 一起使用。(byte[]Bytes 更高效,因为它们避免了不必要的 byte[]String 的转换)。如果您愿意,您还可以配置与反序列化器对应的 JsonMessageConverter 的特定子类。

在生产者端,当您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message) 方法时(请参见 Using KafkaTemplate),您必须配置与配置的 Kafka Serializer 兼容的消息转换器。

  • StringJsonMessageConverterStringSerializer

  • BytesJsonMessageConverterBytesSerializer

  • ByteArrayJsonMessageConverterByteArraySerializer

同样,使用 byte[]Bytes 更高效,因为它们避免了 Stringbyte[] 的转换。

为了方便,从版本 2.3 开始,框架还提供了一个 StringOrBytesSerializer,它可以序列化所有三种值类型,因此可以与任何消息转换器一起使用。

从版本 2.7.1 开始,消息负载转换可以委托给 spring-messagingSmartMessageConverter;这使得转换可以基于 MessageHeaders.CONTENT_TYPE 头进行。

important

KafkaMessageConverter.fromMessage() 方法被调用以将消息转换为 ProducerRecord,消息负载存储在 ProducerRecord.value() 属性中。KafkaMessageConverter.toMessage() 方法被调用以从 ConsumerRecord 进行入站转换,负载为 ConsumerRecord.value() 属性。SmartMessageConverter.toMessage() 方法被调用以从传递给 fromMessage()Message 创建一个新的出站 Message<?>(通常通过 KafkaTemplate.send(Message<?> msg))。类似地,在 KafkaMessageConverter.toMessage() 方法中,在转换器从 ConsumerRecord 创建了一个新的 Message<?> 之后,调用 SmartMessageConverter.fromMessage() 方法,然后使用新转换的负载创建最终的入站消息。在这两种情况下,如果 SmartMessageConverter 返回 null,则使用原始消息。

当在 KafkaTemplate 和监听器容器工厂中使用默认转换器时,您可以通过在模板上调用 setMessagingConverter() 来配置 SmartMessageConverter,以及通过 @KafkaListener 方法上的 contentTypeConverter 属性进行配置。

示例:

template.setMessagingConverter(mySmartConverter);
java
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
java

使用 Spring Data 投影接口

从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档内部多个位置查找值。例如,可以将以下接口定义为消息负载类型:

interface SomeSample {

@JsonPath({ "$.username", "$.user.name" })
String getUsername();

}
java
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
java

访问器方法将默认用于在接收到的 JSON 文档中查找属性名称作为字段。@JsonPath 表达式允许自定义值查找,甚至可以定义多个 JSON Path 表达式,以从多个地方查找值,直到某个表达式返回实际值。

要启用此功能,请使用配置了适当委托转换器的 ProjectingMessageConverter(用于出站转换和转换非投影接口)。您还必须将 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 添加到类路径中。

当作为 @KafkaListener 方法的参数时,接口类型会自动作为正常参数传递给转换器。

使用 ErrorHandlingDeserializer

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer。这个反序列化器委托给一个真实的反序列化器(键或值)。如果委托的反序列化器无法反序列化记录内容,ErrorHandlingDeserializer 将返回一个 null 值,并在一个头部中返回一个 DeserializationException,该头部包含原因和原始字节。当你使用记录级的 MessageListener 时,如果 ConsumerRecord 的键或值包含 DeserializationException 头部,容器的 ErrorHandler 将被调用,并传入失败的 ConsumerRecord。该记录不会传递给监听器。

或者,您可以配置 ErrorHandlingDeserializer 以通过提供 failedDeserializationFunction 来创建自定义值,该函数是 Function<FailedDeserializationInfo, T>。此函数被调用以创建 T 的实例,该实例以通常的方式传递给监听器。一个类型为 FailedDeserializationInfo 的对象,其中包含所有上下文信息,会提供给该函数。您可以在头信息中找到 DeserializationException(作为序列化的 Java 对象)。有关 ErrorHandlingDeserializer 的更多信息,请参见 Javadoc

您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数接受键和值的 Deserializer 对象,并连接您已配置的适当 ErrorHandlingDeserializer 实例,使用正确的委托。或者,您可以使用消费者配置属性(这些属性由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例演示如何设置这些属性:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
java

以下示例使用了一个 failedDeserializationFunction

public class BadThing extends Thing {

private final FailedDeserializationInfo failedDeserializationInfo;

public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}

public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}

}
java

前面的示例使用了以下配置:

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
java
important

如果消费者配置了 ErrorHandlingDeserializer,那么配置 KafkaTemplate 及其生产者时,重要的是使用一个能够处理正常对象以及由于反序列化异常而产生的原始 byte[] 值的序列化器。模板的通用值类型应该是 Object。一种技术是使用 DelegatingByTypeSerializer;以下是一个示例:

@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
java

当使用 ErrorHandlingDeserializer 与批量监听器时,您必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler 一起使用时,您可以使用该头部来确定异常失败的记录,并通过 BatchListenerFailedException 与错误处理程序进行通信。

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
java

SerializationUtils.byteArrayToDeserializationException() 可以用来将头部转换为 DeserializationException

在消费 List<ConsumerRecord<?, ?> 时,使用 SerializationUtils.getExceptionFromHeader() 代替:

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
java
important

如果您还在使用 DeadLetterPublishingRecoverer,则因 DeserializationException 发布的记录将具有类型为 byte[]record.value();这不应被序列化。考虑使用配置为对 byte[] 使用 ByteArraySerializerDelegatingByTypeSerializer,并对所有其他类型使用正常的序列化器(Json、Avro 等)。

从版本 3.1 开始,您可以将 Validator 添加到 ErrorHandlingDeserializer 中。如果委托的 Deserializer 成功反序列化对象,但该对象未通过验证,则会抛出类似于反序列化异常的异常。这允许将原始数据传递给错误处理程序。在自己创建反序列化器时,只需调用 setValidator;如果您使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS 设置为您的 Validator 的类或完全限定类名。在使用 Spring Boot 时,此属性名称为 spring.kafka.consumer.properties.spring.deserializer.validator.class

Payload Conversion with Batch Listeners

您还可以在 BatchMessagingMessageConverter 中使用 JsonMessageConverter 来转换批量消息,当您使用批量监听器容器工厂时。有关更多信息,请参见 Serialization, Deserialization, and Message ConversionSpring Messaging Message Conversion

默认情况下,转换的类型是从监听器参数推断的。如果你使用 DefaultJackson2TypeMapper 配置 JsonMessageConverter,并将其 TypePrecedence 设置为 TYPE_ID(而不是默认的 INFERRED),则转换器将使用头部中的类型信息(如果存在)。这允许,例如,监听器方法可以用接口而不是具体类来声明。此外,类型转换器支持映射,因此反序列化可以转换为与源不同的类型(只要数据是兼容的)。当你使用 class-level @KafkaListener 实例 时,这也是有用的,因为有效负载必须已经被转换以确定调用哪个方法。以下示例创建使用此方法的 bean:

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}

@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
java

注意,为了使其正常工作,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下内容:

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
java

请注意,您仍然可以访问批处理头部。

如果批量转换器具有支持的记录转换器,您还可以接收一系列消息,其中有效负载根据通用类型进行转换。以下示例展示了如何做到这一点:

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
java

如果批次中的记录无法转换,则其有效负载在目标 payloads 列表中设置为 null。转换异常作为警告记录到日志中,并作为 List<ConversionException> 的一个项存储到 KafkaHeaders.CONVERSION_FAILURES 头中。目标 @KafkaListener 方法可以使用 Java Stream API 从有效负载列表中过滤掉这些 null 值,或者对转换异常头进行其他操作:

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
java

ConversionService 自定义

从版本 2.1.1 开始,默认的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 使用的 org.springframework.core.convert.ConversionService 被提供了所有实现以下任一接口的 bean:

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

这让你可以进一步自定义监听器反序列化,而无需更改 ConsumerFactoryKafkaListenerContainerFactory 的默认配置。

important

通过 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上设置自定义 MessageHandlerMethodFactory 会禁用此功能。

添加自定义 HandlerMethodArgumentResolver@KafkaListener

从版本 2.4.2 开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。您所需要做的就是实现 KafkaListenerConfigurer 并使用 KafkaListenerEndpointRegistrar 类中的方法 setCustomMethodArgumentResolvers()

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {

@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}

@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}

}
java

您还可以通过向 KafkaListenerEndpointRegistrar bean 添加自定义 MessageHandlerMethodFactory 来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理墓碑记录,具有 nullvalue()(例如,来自压缩主题),您应该将 KafkaNullAwarePayloadArgumentResolver 添加到工厂中;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配没有 @Payload 注解的参数。如果您使用的是 DefaultMessageHandlerMethodFactory,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准的 PayloadMethodArgumentResolver 之前使用此解析器,而后者对 KafkaNull 有效负载没有了解。