序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高层次的 API,用于序列化和反序列化记录值以及它们的键。它通过 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象类提供了一些内置实现。同时,我们可以通过使用 Producer
或 Consumer
配置属性来指定序列化器和反序列化器类。以下示例展示了如何做到这一点:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特定的情况,KafkaConsumer
(因此,KafkaProducer
)提供了重载构造函数,以接受 Serializer
和 Deserializer
实例,分别用于 keys
和 values
。
当你使用这个 API 时,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
也提供属性(通过构造函数或 setter 方法)来将自定义的 Serializer
和 Deserializer
实例注入到目标 Producer
或 Consumer
中。此外,你还可以通过构造函数传入 Supplier<Serializer>
或 Supplier<Deserializer>
实例——这些 Supplier
会在每个 Producer
或 Consumer
创建时被调用。
字符串序列化
自版本 2.5 起,Spring for Apache Kafka 提供了 ToStringSerializer
和 ParseStringDeserializer
类,这些类使用实体的字符串表示。它们依赖于 toString
方法以及某些 Function<String>
或 BiFunction<String, Headers>
来解析字符串并填充实例的属性。通常,这会调用类上的某个静态方法,例如 parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
被配置为在记录 Headers
中传递有关序列化实体的类型信息。您可以通过将 addTypeInfo
属性设置为 false
来禁用此功能。这些信息可以在接收端由 ParseStringDeserializer
使用。
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
来禁用ToStringSerializer
上的此功能(设置addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置用于将 String
转换为 byte[]
及反向转换的 Charset
,默认值为 UTF-8
。
您可以使用 ConsumerConfig
属性配置反序列化器的解析方法名称:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名,后跟方法名,用句点 .
分隔。该方法必须是静态的,并且具有 (String, Headers)
或 (String)
的签名。
提供了一个 ToFromStringSerde
,用于 Kafka Streams。
JSON
Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的 JsonSerializer
和 JsonDeserializer
实现。JsonSerializer
允许将任何 Java 对象写入 JSON byte[]
。JsonDeserializer
需要一个额外的 Class<?> targetType
参数,以便将消费的 byte[]
反序列化为正确的目标对象。以下示例展示了如何创建一个 JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用 ObjectMapper
自定义 JsonSerializer
和 JsonDeserializer
。您还可以扩展它们,以在 configure(Map<String, ?> configs, boolean isKey)
方法中实现一些特定的配置逻辑。
从版本 2.3 开始,所有 JSON 感知组件默认配置为使用 JacksonUtils.enhancedObjectMapper()
实例,该实例禁用了 MapperFeature.DEFAULT_VIEW_INCLUSION
和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
特性。此外,该实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。有关更多信息,请参见 JacksonUtils.enhancedObjectMapper()
的 JavaDocs。该方法还注册了 org.springframework.kafka.support.JacksonMimeTypeModule
,用于将 org.springframework.util.MimeType
对象序列化为普通字符串,以实现跨平台的网络兼容性。JacksonMimeTypeModule
可以作为 bean 注册到应用程序上下文中,并将自动配置到 Spring Boot ObjectMapper 实例 中。
从版本 2.3 开始,JsonDeserializer
提供了基于 TypeReference
的构造函数,以更好地处理目标泛型容器类型。
从版本 2.1 开始,您可以在记录 Headers
中传达类型信息,从而允许处理多种类型。此外,您可以通过使用以下 Kafka 属性来配置序列化器和反序列化器。如果您已经为 KafkaConsumer
和 KafkaProducer
提供了 Serializer
和 Deserializer
实例,则这些属性将无效。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
来禁用JsonSerializer
上的此功能(设置addTypeInfo
属性)。 -
JsonSerializer.TYPE_MAPPINGS
(默认empty
):请参见 Mapping Types。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
来忽略序列化器设置的头信息。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认true
):您可以将其设置为false
来保留序列化器设置的头信息。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果没有头信息,则用于反序列化键的后备类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果没有头信息,则用于反序列化值的后备类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(默认java.util
,java.lang
):允许反序列化的包模式的逗号分隔列表。*
表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS
(默认empty
):请参见 Mapping Types。 -
JsonDeserializer.KEY_TYPE_METHOD
(默认empty
):请参见 Using Methods to Determine Types。 -
JsonDeserializer.VALUE_TYPE_METHOD
(默认empty
):请参见 Using Methods to Determine Types。
从版本 2.2 开始,类型信息头(如果由序列化器添加)会被反序列化器移除。您可以通过将 removeTypeHeaders
属性设置为 false
来恢复到之前的行为,既可以直接在反序列化器上设置,也可以通过之前描述的配置属性进行设置。
从版本 2.8 开始,如果您按照 程序化构建 中所示的方式以编程方式构造序列化器或反序列化器,只要您没有显式设置任何属性(使用 set*()
方法或使用流式 API),上述属性将由工厂应用。之前,在以编程方式创建时,配置属性从未被应用;如果您直接在对象上显式设置属性,这种情况仍然存在。
映射类型
从版本 2.2 开始,在使用 JSON 时,您现在可以通过使用前面列表中的属性提供类型映射。之前,您必须在序列化器和反序列化器中自定义类型映射器。映射由以逗号分隔的 token:className
对组成。在出站时,负载的类名映射到相应的 token。在入站时,类型头中的 token 映射到相应的类名。
以下示例创建了一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
对应的对象必须兼容。
如果你使用 Spring Boot,你可以在 application.properties
(或 yaml)文件中提供这些属性。以下示例展示了如何做到这一点:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性进行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义 ObjectMapper
),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖了默认工厂:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
}
还提供了 setter 方法,作为使用这些构造函数的替代方案。
在使用 Spring Boot 并重写 ConsumerFactory
和 ProducerFactory
如上所示时,需要在 bean 方法的返回类型中使用通配符泛型。如果提供了具体的泛型类型,则 Spring Boot 将忽略这些 bean,并仍然使用默认的 bean。
从版本 2.2 开始,您可以显式配置反序列化器以使用提供的目标类型,并通过使用带有布尔值 useHeadersIfPresent
参数的重载构造函数来忽略头部中的类型信息(默认为 true
)。以下示例演示了如何做到这一点:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法来确定类型
从版本 2.5 开始,您现在可以通过属性配置反序列化器,以调用一个方法来确定目标类型。如果存在,这将覆盖上述讨论的其他任何技术。如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他头部反序列化为不同类型,这可能会很有用。将这些属性设置为方法名称 - 一个完全限定的类名,后面跟着方法名称,用点 .
分隔。该方法必须声明为 public static
,并具有以下三种签名之一 (String topic, byte[] data, Headers headers)
、(byte[] data, Headers headers)
或 (byte[] data)
,并返回一个 Jackson JavaType
。
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意头部或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,可以考虑使用 JsonPath
或类似工具,但越简单的类型判断测试,过程将越高效。
以下是通过编程方式创建反序列化器的示例(在构造函数中向消费者工厂提供反序列化器):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
在为生产者/消费者工厂以编程方式构建序列化器/反序列化器时,从版本 2.3 开始,您可以使用流式 API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以编程方式提供类型映射,类似于 使用方法确定类型,请使用 typeFunction
属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
另外,只要您不使用流式 API 来配置属性,或使用 set*()
方法设置它们,工厂将使用配置属性来配置序列化器/反序列化器;请参见 Configuration Properties。
委托序列化器和反序列化器
使用标题
版本 2.3 引入了 DelegatingSerializer
和 DelegatingDeserializer
,允许使用不同的键和/或值类型来生成和消费记录。生产者必须设置一个头部 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
为一个选择器值,该值用于选择用于值的序列化器,以及 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
用于键;如果没有找到匹配项,将抛出 IllegalStateException
。
对于传入的记录,反序列化器使用相同的头部来选择要使用的反序列化器;如果未找到匹配项或头部不存在,则返回原始 byte[]
。
您可以通过构造函数将选择器的映射配置为 Serializer
/ Deserializer
,或者可以通过 Kafka 生产者/消费者属性配置,使用键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
。对于序列化器,生产者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Serializer
实例、序列化器 Class
或类名。该属性也可以是一个以逗号分隔的映射条目的字符串,如下所示。
对于反序列化器,消费者属性可以是一个 Map<String, Object>
,其中键是选择器,值是 Deserializer
实例、反序列化器 Class
或类名。该属性也可以是一个以逗号分隔的映射条目的字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生产者随后会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
头设置为 thing1
或 thing2
。
该技术支持将不同类型发送到同一主题(或不同主题)。
从版本 2.5.1 开始,如果类型(键或值)是 Serdes
支持的标准类型之一(如 Long
、Integer
等),则不需要设置选择器头。相反,序列化器将把头设置为类型的类名。对于这些类型,不需要配置序列化器或反序列化器,它们将动态创建(一次)。
有关将不同类型发送到不同主题的另一种技术,请参见 Using RoutingKafkaTemplate。
按类型
版本 2.8 引入了 DelegatingByTypeSerializer
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,您可以配置序列化器以检查映射键是否可以分配给目标对象,这在委托序列化器可以序列化子类时非常有用。在这种情况下,如果存在模糊匹配,则应提供一个有序的 Map
,例如 LinkedHashMap
。
按主题
从版本 2.8 开始,DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
允许根据主题名称选择序列化器/反序列化器。使用正则表达式 Pattern
来查找要使用的实例。可以通过构造函数或通过属性(以逗号分隔的 pattern:serializer
列表)来配置该映射。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
使用 KEY_SERIALIZATION_TOPIC_CONFIG
作为键时。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
指定默认的序列化器/反序列化器,以便在没有模式匹配时使用。
一个额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE
(默认值为 true
),当设置为 false
时,会使主题查找不区分大小写。
重试反序列化
RetryingDeserializer
使用一个委托 Deserializer
和 RetryTemplate
来在委托可能在反序列化过程中遇到瞬时错误(例如网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从版本 3.1.2
开始,可以选择在 RetryingDeserializer
上设置 RecoveryCallback
。
请参阅 spring-retry 项目,以获取 RetryTemplate
的配置,包括重试策略、退避策略等。
Spring Messaging 消息转换
虽然 Serializer
和 Deserializer
API 从低级 Kafka Consumer
和 Producer
的角度来看相当简单且灵活,但在使用 @KafkaListener
或 Spring Integration 的 Apache Kafka 支持 时,您可能需要在 Spring Messaging 层面上获得更多灵活性。为了让您轻松地在 org.springframework.messaging.Message
之间进行转换,Spring for Apache Kafka 提供了一个 MessageConverter
抽象,具有 MessagingMessageConverter
实现及其 JsonMessageConverter
(及子类)的自定义。您可以直接将 MessageConverter
注入到 KafkaTemplate
实例中,并通过使用 AbstractKafkaListenerContainerFactory
bean 定义来设置 @KafkaListener.containerFactory()
属性。以下示例演示了如何做到这一点:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
在使用 Spring Boot 时,只需将转换器定义为 @Bean
,Spring Boot 自动配置将其连接到自动配置的模板和容器工厂中。
当你使用 @KafkaListener
时,参数类型会提供给消息转换器,以帮助进行转换。
这种类型推断仅在 @KafkaListener
注解声明在方法级别时才能实现。对于类级别的 @KafkaListener
,有效负载类型用于选择要调用的 @KafkaHandler
方法,因此在选择方法之前,必须已经完成转换。
在消费者端,您可以配置一个 JsonMessageConverter
;它可以处理类型为 byte[]
、Bytes
和 String
的 ConsumerRecord
值,因此应与 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
一起使用。(byte[]
和 Bytes
更高效,因为它们避免了不必要的 byte[]
到 String
的转换)。如果您愿意,您还可以配置与反序列化器对应的 JsonMessageConverter
的特定子类。
在生产者端,当您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message)
方法时(请参见 Using KafkaTemplate),您必须配置与配置的 Kafka Serializer
兼容的消息转换器。
-
StringJsonMessageConverter
与StringSerializer
-
BytesJsonMessageConverter
与BytesSerializer
-
ByteArrayJsonMessageConverter
与ByteArraySerializer
同样,使用 byte[]
或 Bytes
更高效,因为它们避免了 String
到 byte[]
的转换。
为了方便,从版本 2.3 开始,框架还提供了一个 StringOrBytesSerializer
,它可以序列化所有三种值类型,因此可以与任何消息转换器一起使用。
从版本 2.7.1 开始,消息负载转换可以委托给 spring-messaging
的 SmartMessageConverter
;这使得转换可以基于 MessageHeaders.CONTENT_TYPE
头进行。
KafkaMessageConverter.fromMessage()
方法被调用以将消息转换为 ProducerRecord
,消息负载存储在 ProducerRecord.value()
属性中。KafkaMessageConverter.toMessage()
方法被调用以从 ConsumerRecord
进行入站转换,负载为 ConsumerRecord.value()
属性。SmartMessageConverter.toMessage()
方法被调用以从传递给 fromMessage()
的 Message
创建一个新的出站 Message<?>
(通常通过 KafkaTemplate.send(Message<?> msg)
)。类似地,在 KafkaMessageConverter.toMessage()
方法中,在转换器从 ConsumerRecord
创建了一个新的 Message<?>
之后,调用 SmartMessageConverter.fromMessage()
方法,然后使用新转换的负载创建最终的入站消息。在这两种情况下,如果 SmartMessageConverter
返回 null
,则使用原始消息。
当在 KafkaTemplate
和监听器容器工厂中使用默认转换器时,您可以通过在模板上调用 setMessagingConverter()
来配置 SmartMessageConverter
,以及通过 @KafkaListener
方法上的 contentTypeConverter
属性进行配置。
示例:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data 投影接口
从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。这允许对数据进行非常选择性和低耦合的绑定,包括从 JSON 文档内部多个位置查找值。例如,可以将以下接口定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于在接收到的 JSON 文档中查找属性名称作为字段。@JsonPath
表达式允许自定义值查找,甚至可以定义多个 JSON Path 表达式,以从多个地方查找值,直到某个表达式返回实际值。
要启用此功能,请使用配置了适当委托转换器的 ProjectingMessageConverter
(用于出站转换和转换非投影接口)。您还必须将 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
添加到类路径中。
当作为 @KafkaListener
方法的参数时,接口类型会自动作为正常参数传递给转换器。
使用 ErrorHandlingDeserializer
当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll()
返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer
。这个反序列化器委托给一个真实的反序列化器(键或值)。如果委托的反序列化器无法反序列化记录内容,ErrorHandlingDeserializer
将返回一个 null
值,并在一个头部中返回一个 DeserializationException
,该头部包含原因和原始字节。当你使用记录级的 MessageListener
时,如果 ConsumerRecord
的键或值包含 DeserializationException
头部,容器的 ErrorHandler
将被调用,并传入失败的 ConsumerRecord
。该记录不会传递给监听器。
或者,您可以配置 ErrorHandlingDeserializer
以通过提供 failedDeserializationFunction
来创建自定义值,该函数是 Function<FailedDeserializationInfo, T>
。此函数被调用以创建 T
的实例,该实例以通常的方式传递给监听器。一个类型为 FailedDeserializationInfo
的对象,其中包含所有上下文信息,会提供给该函数。您可以在头信息中找到 DeserializationException
(作为序列化的 Java 对象)。有关 ErrorHandlingDeserializer
的更多信息,请参见 Javadoc。
您可以使用 DefaultKafkaConsumerFactory
构造函数,该构造函数接受键和值的 Deserializer
对象,并连接您已配置的适当 ErrorHandlingDeserializer
实例,使用正确的委托。或者,您可以使用消费者配置属性(这些属性由 ErrorHandlingDeserializer
使用)来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。以下示例演示如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了一个 failedDeserializationFunction
。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用了以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置了 ErrorHandlingDeserializer
,那么配置 KafkaTemplate
及其生产者时,重要的是使用一个能够处理正常对象以及由于反序列化异常而产生的原始 byte[]
值的序列化器。模板的通用值类型应该是 Object
。一种技术是使用 DelegatingByTypeSerializer
;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用 ErrorHandlingDeserializer
与批量监听器时,您必须检查消息头中的反序列化异常。当与 DefaultBatchErrorHandler
一起使用时,您可以使用该头部来确定异常失败的记录,并通过 BatchListenerFailedException
与错误处理程序进行通信。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可以用来将头部转换为 DeserializationException
。
在消费 List<ConsumerRecord<?, ?>
时,使用 SerializationUtils.getExceptionFromHeader()
代替:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还在使用 DeadLetterPublishingRecoverer
,则因 DeserializationException
发布的记录将具有类型为 byte[]
的 record.value()
;这不应被序列化。考虑使用配置为对 byte[]
使用 ByteArraySerializer
的 DelegatingByTypeSerializer
,并对所有其他类型使用正常的序列化器(Json、Avro 等)。
从版本 3.1 开始,您可以将 Validator
添加到 ErrorHandlingDeserializer
中。如果委托的 Deserializer
成功反序列化对象,但该对象未通过验证,则会抛出类似于反序列化异常的异常。这允许将原始数据传递给错误处理程序。在自己创建反序列化器时,只需调用 setValidator
;如果您使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS
设置为您的 Validator
的类或完全限定类名。在使用 Spring Boot 时,此属性名称为 spring.kafka.consumer.properties.spring.deserializer.validator.class
。
Payload Conversion with Batch Listeners
您还可以在 BatchMessagingMessageConverter
中使用 JsonMessageConverter
来转换批量消息,当您使用批量监听器容器工厂时。有关更多信息,请参见 Serialization, Deserialization, and Message Conversion 和 Spring Messaging Message Conversion。
默认情况下,转换的类型是从监听器参数推断的。如果你使用 DefaultJackson2TypeMapper
配置 JsonMessageConverter
,并将其 TypePrecedence
设置为 TYPE_ID
(而不是默认的 INFERRED
),则转换器将使用头部中的类型信息(如果存在)。这允许,例如,监听器方法可以用接口而不是具体类来声明。此外,类型转换器支持映射,因此反序列化可以转换为与源不同的类型(只要数据是兼容的)。当你使用 class-level @KafkaListener 实例 时,这也是有用的,因为有效负载必须已经被转换以确定调用哪个方法。以下示例创建使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
注意,为了使其正常工作,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下内容:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理头部。
如果批量转换器具有支持的记录转换器,您还可以接收一系列消息,其中有效负载根据通用类型进行转换。以下示例展示了如何做到这一点:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
如果批次中的记录无法转换,则其有效负载在目标 payloads
列表中设置为 null
。转换异常作为警告记录到日志中,并作为 List<ConversionException>
的一个项存储到 KafkaHeaders.CONVERSION_FAILURES
头中。目标 @KafkaListener
方法可以使用 Java Stream
API 从有效负载列表中过滤掉这些 null
值,或者对转换异常头进行其他操作:
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
ConversionService
自定义
从版本 2.1.1 开始,默认的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
使用的 org.springframework.core.convert.ConversionService
被提供了所有实现以下任一接口的 bean:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这让你可以进一步自定义监听器反序列化,而无需更改 ConsumerFactory
和 KafkaListenerContainerFactory
的默认配置。
通过 KafkaListenerConfigurer
bean 在 KafkaListenerEndpointRegistrar
上设置自定义 MessageHandlerMethodFactory
会禁用此功能。
添加自定义 HandlerMethodArgumentResolver
到 @KafkaListener
从版本 2.4.2 开始,您可以添加自己的 HandlerMethodArgumentResolver
并解析自定义方法参数。您所需要做的就是实现 KafkaListenerConfigurer
并使用 KafkaListenerEndpointRegistrar
类中的方法 setCustomMethodArgumentResolvers()
。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 KafkaListenerEndpointRegistrar
bean 添加自定义 MessageHandlerMethodFactory
来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理墓碑记录,具有 null
的 value()
(例如,来自压缩主题),您应该将 KafkaNullAwarePayloadArgumentResolver
添加到工厂中;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配没有 @Payload
注解的参数。如果您使用的是 DefaultMessageHandlerMethodFactory
,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准的 PayloadMethodArgumentResolver
之前使用此解析器,而后者对 KafkaNull
有效负载没有了解。