跳到主要内容

使用 @SendTo 转发监听器结果

ChatGPT-4o-mini 中英对照 Forwarding Listener Results using @SendTo Forwarding Listener Results using @SendTo

从版本 2.0 开始,如果您还在 @KafkaListener 上添加了 @SendTo 注解,并且方法调用返回了结果,则该结果将被转发到 @SendTo 指定的主题。

@SendTo 的值可以有几种形式:

  • @SendTo("someTopic") 路由到字面主题。

  • @SendTo("#{someExpression}") 路由到通过在应用程序上下文初始化期间评估表达式确定的主题。

  • @SendTo("!{someExpression}") 路由到通过在运行时评估表达式确定的主题。评估的 #root 对象有三个属性:

    • request: 入站的 ConsumerRecord(或用于批量监听器的 ConsumerRecords 对象)。

    • source: 从 request 转换而来的 org.springframework.messaging.Message<?>

    • result: 方法返回结果。

  • @SendTo(无属性):这被视为 !{source.headers['kafka_replyTopic']}(自版本 2.1.3 起)。

从版本 2.1.11 和 2.2.1 开始,@SendTo 值中的属性占位符会被解析。

表达式求值的结果必须是一个 String,表示主题名称。以下示例展示了 @SendTo 的多种用法:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

@KafkaHandler
public String foo(String in) {
...
}

@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}

}
java
important

为了支持 @SendTo,监听器容器工厂必须提供一个 KafkaTemplate(在其 replyTemplate 属性中),该模板用于发送回复。这应该是一个 KafkaTemplate,而不是用于客户端请求/回复处理的 ReplyingKafkaTemplate。在使用 Spring Boot 时,它会自动将模板配置到工厂中;当配置您自己的工厂时,必须按照下面的示例进行设置。

从版本 2.2 开始,您可以将 ReplyHeadersConfigurer 添加到监听器容器工厂中。它用于确定您希望在回复消息中设置哪些头部。以下示例展示了如何添加 ReplyHeadersConfigurer

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
java

您也可以根据需要添加更多的标题。以下示例演示了如何做到这一点:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}

@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}

});
return factory;
}
java

当你使用 @SendTo 时,必须在 replyTemplate 属性中配置 ConcurrentKafkaListenerContainerFactory,并使用 KafkaTemplate 来执行发送。Spring Boot 会自动注入其自动配置的模板(如果存在单个实例,则使用该实例)。

备注

除非您使用 request/reply semantics,否则只使用简单的 send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。以下示例演示了如何做到这一点:

@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<String, String>(producerFactory()) {

@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}

...

};
}
java
important

如果监听器方法返回 Message<?>Collection<Message<?>>,则监听器方法负责设置回复的消息头。例如,当处理来自 ReplyingKafkaTemplate 的请求时,你可能会这样做:

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
}
java

在使用请求/回复语义时,目标分区可以由发送者请求。

备注

您可以在 @KafkaListener 方法上使用 @SendTo 注解,即使没有返回结果。这是为了允许配置一个 errorHandler,可以将有关失败消息传递的信息转发到某个主题。以下示例演示了如何做到这一点:

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
return (m, e) -> {
return ... // 有关失败和输入数据的一些信息
};
}
java

有关更多信息,请参见 Handling Exceptions

备注

如果一个监听方法返回一个 Iterable,默认情况下会为每个元素发送一个记录作为值。从版本 2.3.5 开始,可以在 @KafkaListener 上将 splitIterables 属性设置为 false,这样整个结果将作为单个 ProducerRecord 的值发送。这需要在回复模板的生产者配置中使用合适的序列化器。然而,如果回复是 Iterable<Message<?>>,则该属性会被忽略,每条消息将单独发送。