异步 @KafkaListener
返回类型
@KafkaListener
Return Types
从版本 3.2 开始,@KafkaListener
(和 @KafkaHandler
)方法可以指定异步返回类型,从而让回复异步发送。返回类型包括 CompletableFuture<?>
、Mono<?>
和 Kotlin 的 suspend
函数。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
important
AckMode
将自动设置为 MANUAL
,并在检测到异步返回类型时启用无序提交;相反,异步完成将在异步操作完成时确认。当异步结果以错误完成时,消息是否可以恢复取决于容器错误处理程序。如果在监听器方法中发生某些异常,导致无法创建异步结果对象,您必须捕获该异常并返回一个适当的返回对象,以使消息得到确认或恢复。
如果在具有异步返回类型(包括 Kotlin 的挂起函数)的监听器上配置了 KafkaListenerErrorHandler
,则在发生故障后会调用错误处理程序。有关此错误处理程序及其目的的更多信息,请参见 Handling Exceptions。