发送消息
本节涵盖如何发送消息。
使用 KafkaTemplate
本节介绍如何使用 KafkaTemplate
发送消息。
概述
KafkaTemplate
封装了一个生产者,并提供了方便的方法来将数据发送到 Kafka 主题。以下列表显示了 KafkaTemplate
中相关的方法:
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
请参阅 Javadoc 以获取更多详细信息。
sendDefault
API 要求模板中提供了一个默认主题。
API 接受一个 timestamp
作为参数,并将此时间戳存储在记录中。用户提供的时间戳如何存储取决于 Kafka 主题上配置的时间戳类型。如果主题配置为使用 CREATE_TIME
,则记录用户指定的时间戳(如果未指定,则生成)。如果主题配置为使用 LOG_APPEND_TIME
,则用户指定的时间戳将被忽略,代理会添加本地代理时间。
要使用模板,您可以配置一个生产者工厂,并在模板的构造函数中提供它。以下示例演示了如何做到这一点:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig
属性,以使用相同的工厂创建具有不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,类型为 ProducerFactory<?, ?>
的 bean(例如 Spring Boot 自动配置的那个)可以使用不同的窄化泛型类型进行引用。
您还可以通过使用标准 <bean/>
定义来配置模板。
然后,要使用模板,您可以调用它的一个方法。
当您使用带有 Message<?>
参数的方法时,主题、分区、键和时间戳信息会在消息头中提供,包括以下项目:
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION
-
KafkaHeaders.KEY
-
KafkaHeaders.TIMESTAMP
消息负载是数据。
可选地,您可以使用 ProducerListener
配置 KafkaTemplate
,以便在发送结果(成功或失败)时获得异步回调,而不是等待 Future
完成。以下列表显示了 ProducerListener
接口的定义:
public interface ProducerListener<K, V> {
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
}
}
默认情况下,模板配置了一个 LoggingProducerListener
,该监听器在发送成功时记录错误并不执行任何操作。
为了方便,提供了默认方法实现,以防你只想实现其中一个方法。
注意,发送方法返回一个 CompletableFuture<SendResult>
。您可以注册一个回调与监听器,以异步接收发送的结果。以下示例展示了如何做到这一点:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
有两个属性,一个是 ProducerRecord
,另一个是 RecordMetadata
。有关这些对象的信息,请参阅 Kafka API 文档。
Throwable
可以被转换为 KafkaProducerException
;它的 producerRecord
属性包含了失败的记录。
如果您希望阻塞发送线程以等待结果,可以调用 future 的 get()
方法;建议使用带有超时的该方法。如果您设置了 linger.ms
,您可能希望在等待之前调用 flush()
,或者为了方便,模板有一个带有 autoFlush
参数的构造函数,该参数使模板在每次发送时都调用 flush()
。仅当您设置了 linger.ms
生产者属性并希望立即发送部分批次时,才需要刷新。
示例
本节展示了向 Kafka 发送消息的示例:
示例 1. 非阻塞 (异步)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
注意,ExecutionException
的原因是 KafkaProducerException
,其具有 producerRecord
属性。
使用 RoutingKafkaTemplate
从版本 2.5 开始,您可以使用 RoutingKafkaTemplate
在运行时根据目标 topic
名称选择生产者。
路由模板 不 支持事务、execute
、flush
或 metrics
操作,因为这些操作的主题未知。
该模板需要一个 java.util.regex.Pattern
到 ProducerFactory<Object, Object>
实例的映射。这个映射应该是有序的(例如,使用 LinkedHashMap
),因为它是按顺序遍历的;你应该将更具体的模式添加到开头。
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
这个示例的对应 @KafkaListener
在 Annotation Properties 中显示。
要实现类似的结果,但具有将不同类型发送到同一主题的附加能力,请参见 Delegating Serializer and Deserializer。
使用 DefaultKafkaProducerFactory
如在 Using KafkaTemplate 中所示,ProducerFactory
用于创建生产者。
当不使用 Transactions 时,默认情况下,DefaultKafkaProducerFactory
创建一个单例生产者供所有客户端使用,这在 KafkaProducer
的 JavaDocs 中是推荐的。然而,如果你在模板上调用 flush()
,这可能会导致使用同一生产者的其他线程出现延迟。从版本 2.3 开始,DefaultKafkaProducerFactory
有一个新属性 producerPerThread
。当设置为 true
时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。
当 producerPerThread
为 true
时,用户代码 必须 在不再需要生产者时调用工厂的 closeThreadBoundProducer()
。这将物理关闭生产者并将其从 ThreadLocal
中移除。调用 reset()
或 destroy()
将不会清理这些生产者。
另请参见 KafkaTemplate 事务性和非事务性发布。
在创建 DefaultKafkaProducerFactory
时,可以通过调用只接受属性 Map 的构造函数从配置中选择键和/或值的 Serializer
类(请参见 Using KafkaTemplate 中的示例),或者可以将 Serializer
实例传递给 DefaultKafkaProducerFactory
构造函数(在这种情况下,所有的 Producer
共享相同的实例)。另外,您还可以提供 Supplier<Serializer>
(从版本 2.3 开始),这些将用于为每个 Producer
获取单独的 Serializer
实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在工厂创建后更新生产者属性。这可能很有用,例如,如果您在凭据更改后需要更新 SSL 密钥/信任存储位置。更改不会影响现有的生产者实例;请调用 reset()
以关闭任何现有的生产者,以便使用新属性创建新的生产者。
您无法将一个事务性生产者工厂更改为非事务性,反之亦然。
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果您将序列化器作为对象提供(在构造函数中或通过设置器),工厂将调用 configure()
方法来使用配置属性对它们进行配置。
使用 ReplyingKafkaTemplate
版本 2.1.3 引入了 KafkaTemplate
的一个子类,以提供请求/回复语义。该类名为 ReplyingKafkaTemplate
,并具有两个附加方法;以下显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
结果是一个 CompletableFuture
,它异步填充了结果(或由于超时而产生的异常)。结果还有一个 sendFuture
属性,这是调用 KafkaTemplate.send()
的结果。您可以使用这个 future 来确定发送操作的结果。
如果使用第一种方法,或者 replyTimeout
参数为 null
,则使用模板的 defaultReplyTimeout
属性(默认值为 5 秒)。
从版本 2.8.8 开始,模板新增了一个方法 waitForAssignment
。如果回复容器配置为 auto.offset.reset=latest
,这个方法非常有用,可以避免在容器初始化之前发送请求和回复。
在使用手动分区分配(没有组管理)时,等待的持续时间必须大于容器的 pollTimeout
属性,因为通知将在第一次轮询完成后才会发送。
以下 Spring Boot 应用程序展示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。
如果正在使用非平凡的反序列化器来处理回复,请考虑使用一个 ErrorHandlingDeserializer,该反序列化器委托给您配置的反序列化器。当这样配置时,RequestReplyFuture
将异常完成,您可以捕获 ExecutionException
,其 cause
属性中包含 DeserializationException
。
从版本 2.6.7 开始,除了检测 DeserializationException
之外,如果提供了 replyErrorChecker
函数,模板将调用该函数。如果它返回一个异常,future 将以异常的方式完成。
这是一个例子:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause() instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
模板设置了一个头部(默认命名为 KafkaHeaders.CORRELATION_ID
),服务器端必须回显该头部。
在这种情况下,以下 @KafkaListener
应用程序做出响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
@KafkaListener
基础设施回显关联 ID 并确定回复主题。
请参见 Forwarding Listener Results using @SendTo 以获取有关发送回复的更多信息。模板使用默认头 KafKaHeaders.REPLY_TOPIC
来指示回复发送到的主题。
从版本 2.2 开始,模板尝试从配置的回复容器中检测回复主题或分区。如果容器配置为监听单个主题或单个 TopicPartitionOffset
,则用于设置回复头。如果容器配置为其他方式,则用户必须设置回复头。在这种情况下,初始化期间会写入一条 INFO
日志消息。以下示例使用 KafkaHeaders.REPLY_TOPIC
:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当你使用单个回复 TopicPartitionOffset
进行配置时,可以为多个模板使用相同的回复主题,只要每个实例监听不同的分区。在使用单个回复主题进行配置时,每个实例必须使用不同的 group.id
。在这种情况下,所有实例都会接收到每个回复,但只有发送请求的实例能够找到关联 ID。这对于自动扩展可能是有用的,但会带来额外的网络流量开销以及丢弃每个不需要的回复的少量成本。当你使用此设置时,我们建议将模板的 sharedReplyTopic
设置为 true
,这会将意外回复的日志级别降低到 DEBUG,而不是默认的 ERROR。
以下是配置回复容器以使用相同共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例,并且没有按照前面的段落进行配置,则每个实例需要一个专用的回复主题。另一种选择是设置 KafkaHeaders.REPLY_PARTITION
并为每个实例使用一个专用的分区。Header
包含一个四字节的整数(大端字节序)。服务器必须使用此头信息将回复路由到正确的分区(@KafkaListener
会这样做)。不过,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为监听固定的分区(通过在其 ContainerProperties
构造函数中使用 TopicPartitionOffset
)。
DefaultKafkaHeaderMapper
需要 Jackson 在类路径中(对于 @KafkaListener
)。如果不可用,消息转换器将没有头映射器,因此您必须配置一个 MessagingMessageConverter
,并使用 SimpleKafkaHeaderMapper
,如前所示。
默认情况下,使用 3 个标题:
-
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求关联 -
KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器回复的主题 -
KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器回复的分区
这些头部名称被 @KafkaListener
基础设施用于路由回复。
从版本 2.3 开始,您可以自定义头部名称 - 模板具有 3 个属性 correlationHeaderName
、replyTopicHeaderName
和 replyPartitionHeaderName
。如果您的服务器不是 Spring 应用程序(或不使用 @KafkaListener
),这将非常有用。
相反,如果请求的应用程序不是一个 spring 应用程序,并且将关联信息放在不同的头中,从版本 3.0 开始,您可以在监听器容器工厂上配置一个自定义 correlationHeaderName
,该头将被回显。之前,监听器必须回显自定义关联头。
使用 Message<?>
进行请求/回复
版本 2.7 为 ReplyingKafkaTemplate
添加了方法,以发送和接收 spring-messaging
的 Message<?>
抽象:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认 replyTimeout
,还有重载版本可以在方法调用中指定超时时间。
如果消费者的 Deserializer
或模板的 MessageConverter
可以在没有任何额外信息的情况下转换有效负载,则使用第一种方法,无论是通过配置还是在回复消息中的类型元数据。
如果您需要为返回类型提供类型信息以帮助消息转换器,请使用第二种方法。这还允许同一模板接收不同类型,即使在回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。以下是后者的一个示例:
- Java
- Kotlin
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
- Java
- Kotlin
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
Reply Type Message<?>
Request error occurred:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这也展示了如何在回复记录上设置一个键。
从版本 2.5 开始,框架将检测这些头部是否缺失,并用主题填充它们 - 主题可以是从 @SendTo
值确定的主题,或者是传入的 KafkaHeaders.REPLY_TOPIC
头部(如果存在)。它还将回显传入的 KafkaHeaders.CORRELATION_ID
和 KafkaHeaders.REPLY_PARTITION
,如果存在的话。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
原始记录键在回复中
从版本 3.3 开始,来自传入请求的 Kafka 记录键(如果存在)将保留在回复记录中。这仅适用于单条记录请求/回复场景。当监听器是批处理或返回类型是集合时,应用程序需要通过将回复记录包装在 Message
类型中来指定使用哪些键。
聚合多个回复
在 Using ReplyingKafkaTemplate 中的模板严格用于单个请求/回复场景。对于多个接收者对单个消息返回回复的情况,可以使用 AggregatingReplyingKafkaTemplate
。这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。
像 ReplyingKafkaTemplate
一样,AggregatingReplyingKafkaTemplate
构造函数接受一个生产者工厂和一个监听容器来接收回复;它有一个第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
,每次收到回复时都会咨询这个参数;当谓词返回 true
时,ConsumerRecord
的集合将用于完成 sendAndReceive
方法返回的 Future
。
有一个额外的属性 returnPartialOnTimeout
(默认值为 false)。当将其设置为 true
时,未来将不会以 KafkaReplyTimeoutException
完成,而是正常完成一个部分结果(只要至少接收到一个回复记录)。
从版本 2.3.5 开始,谓词在超时后也会被调用(如果 returnPartialOnTimeout
为 true
)。第一个参数是当前的记录列表;第二个参数如果由于超时而调用,则为 true
。谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是一个 ConsumerRecord
,其值是一个 ConsumerRecord
的集合。这个“外部” ConsumerRecord
并不是一个“真实”的记录,它是由模板合成的,用作实际回复记录的持有者,这些记录是为请求接收的。当正常释放发生时(释放策略返回 true),主题被设置为 aggregatedResults
;如果 returnPartialOnTimeout
为 true,并且发生超时(并且至少接收到一个回复记录),主题被设置为 partialResultsAfterTimeout
。模板为这些“主题”名称提供了常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
Collection
中的真实 ConsumerRecord
包含实际接收到回复的主题。
回复的监听容器 必须 配置为 AckMode.MANUAL
或 AckMode.MANUAL_IMMEDIATE
;消费者属性 enable.auto.commit
必须为 false
(自版本 2.3 起为默认值)。为了避免任何丢失消息的可能性,模板仅在没有未完成请求时提交偏移量,即当最后一个未完成请求被释放策略释放时。在重新平衡后,可能会出现重复的回复交付;对于任何正在进行的请求,这些将被忽略;当收到已释放回复的重复回复时,您可能会看到错误日志消息。
如果您在使用此聚合模板时使用了 ErrorHandlingDeserializer,框架将不会自动检测 DeserializationException
。相反,记录(值为 null
)将保持不变,并且反序列化异常将放在头部中。建议应用程序调用实用方法 ReplyingKafkaTemplate.checkDeserialization()
来确定是否发生了反序列化异常。有关更多信息,请参阅其 JavaDocs。对于此聚合模板,replyErrorChecker
也不会被调用;您应该对回复的每个元素执行检查。