跳到主要内容

消息头部

ChatGPT-4o-mini 中英对照 Message Headers

0.11.0.0 客户端引入了对消息中头部的支持。从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头部映射到 spring-messagingMessageHeaders 中。

备注

以前的版本将 ConsumerRecordProducerRecord 映射到 spring-messaging 的 Message<?>,其中值属性映射到 payload,其他属性(topicpartition 等)映射到头部。现在仍然如此,但可以映射额外的(任意)头部。

Apache Kafka 的头部有一个简单的 API,如下所示的接口定义:

public interface Header {

String key();

byte[] value();

}
java

KafkaHeaderMapper 策略用于在 Kafka HeadersMessageHeaders 之间映射头条目。其接口定义如下:

public interface KafkaHeaderMapper {

void fromHeaders(MessageHeaders headers, Headers target);

void toHeaders(Headers source, Map<String, Object> target);

}
java

SimpleKafkaHeaderMapper 将原始头映射为 byte[],并提供将其转换为 String 值的配置选项。

DefaultKafkaHeaderMapper 将键映射到 MessageHeaders 头名称,并且为了支持出站消息的丰富头类型,执行 JSON 转换。一个 "special" 头(键为 spring_json_header_types)包含一个 <key>:<type> 的 JSON 映射。该头在入站时使用,以提供每个头值到原始类型的适当转换。

在入站方面,所有 Kafka Header 实例都映射到 MessageHeaders。在出站方面,默认情况下,所有 MessageHeaders 都被映射,除了 idtimestamp 和映射到 ConsumerRecord 属性的头部。

您可以通过向映射器提供模式来指定哪些头部要映射到出站消息。以下列表显示了一些示例映射:

public DefaultKafkaHeaderMapper() { 1
...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { 2
...
}

public DefaultKafkaHeaderMapper(String... patterns) { 3
...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { 4
...
}
java
  • 使用默认的 Jackson ObjectMapper 并映射大多数头部,如示例前所讨论的。

  • 使用提供的 Jackson ObjectMapper 并映射大多数头部,如示例前所讨论的。

  • 使用默认的 Jackson ObjectMapper 并根据提供的模式映射头部。

  • 使用提供的 Jackson ObjectMapper 并根据提供的模式映射头部。

模式相当简单,可以包含前导通配符(*)、后导通配符,或两者都有(例如,*.cat.*)。你可以用前导 ! 来否定模式。第一个匹配头名称的模式(无论是正向还是负向)将获胜。

当您提供自己的模式时,我们建议包括 !id!timestamp,因为这些头部在入站侧是只读的。

important

默认情况下,映射器只反序列化 java.langjava.util 中的类。您可以通过 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果您接收到来自不受信任来源的消息,您可能希望仅添加您信任的那些包。要信任所有包,您可以使用 mapper.addTrustedPackages("*")

备注

以原始形式映射 String 头值在与不理解映射器 JSON 格式的系统进行通信时是有用的。

从版本 2.2.5 开始,您可以指定某些字符串值的头部不使用 JSON 进行映射,而是使用原始 byte[] 进行映射。AbstractKafkaHeaderMapper 有了新的属性;当 mapAllStringsOut 设置为 true 时,所有字符串值的头部将使用 charset 属性(默认 UTF-8)转换为 byte[]。此外,还有一个属性 rawMappedHeaders,它是一个 header name : boolean 的映射;如果映射中包含一个头部名称,并且该头部包含一个 String 值,它将使用字符集映射为原始 byte[]。这个映射也用于将原始传入的 byte[] 头部使用字符集映射为 String,仅当映射值中的布尔值为 true 时。如果布尔值为 false,或者头部名称不在映射中且值为 true,则传入的头部将简单地映射为原始未映射的头部。

以下测试用例说明了该机制。

@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
java

两个头部映射器默认情况下会映射所有传入的头部。从版本 2.8.8 开始,模式也可以应用于传入映射。要为传入映射创建一个映射器,请使用相应映射器上的静态方法之一:

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
java

例如:

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
java

这将排除所有以 abc 开头的头部,并包含所有其他头部。

默认情况下,只要 Jackson 在类路径中,MessagingMessageConverterBatchMessagingMessageConverter 中将使用 DefaultKafkaHeaderMapper

使用批量转换器,转换后的头信息可在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中以 List<Map<String, Object>> 的形式获取,其中列表中的每个映射对应于有效负载中的数据位置。

如果没有转换器(要么因为 Jackson 不存在,要么它被显式设置为 null),则消费者记录中的头部将以未转换的形式提供在 KafkaHeaders.NATIVE_HEADERS 头部中。该头部是一个 Headers 对象(在批量转换器的情况下是一个 List<Headers>),列表中的位置对应于有效负载中的数据位置。

important

某些类型不适合进行 JSON 序列化,对于这些类型,可能更倾向于使用简单的 toString() 序列化。DefaultKafkaHeaderMapper 有一个名为 addToStringClasses() 的方法,允许您提供应以这种方式处理的类名,以便进行出站映射。在入站映射期间,它们被映射为 String。默认情况下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 以这种方式映射。

备注

从版本 2.3 开始,处理字符串值的头部变得更加简单。这些头部默认不再进行 JSON 编码(即不再添加包围的 "...")。类型仍然会被添加到 JSON_TYPES 头部,以便接收系统可以将其转换回字符串(从 byte[])。映射器可以处理(解码)由旧版本生成的头部(它会检查前导的 ");通过这种方式,使用 2.3 的应用程序可以消费来自旧版本的记录。

important

为了与早期版本兼容,如果由使用 2.3 版本生成的记录可能会被使用早期版本的应用程序消费,请将 encodeStrings 设置为 true。当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为默认值 false

@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
java

如果使用 Spring Boot,它会将此转换器 bean 自动配置到自动配置的 KafkaTemplate 中;否则,您应该将此转换器添加到模板中。