异步请求
Spring MVC与Servlet异步请求处理有着广泛的集成:
-
DeferredResult、Callable 和 WebAsyncTask 在控制器方法中返回的值支持单一的异步返回值。
-
控制器可以使用响应式客户端,并返回响应式类型来处理响应。
有关这与Spring WebFlux有何不同的概述,请参阅下面的Async Spring MVC与WebFlux的比较部分。
DeferredResult
一旦在Servlet容器中启用了异步请求处理功能,控制器方法就可以使用DeferredResult来包装任何受支持的控制器方法返回值,如下例所示:
- Java
- Kotlin
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(result);
@GetMapping("/quotes")
@ResponseBody
fun quotes(): DeferredResult<String> {
val deferredResult = DeferredResult<String>()
// Save the deferredResult somewhere..
return deferredResult
}
// From some other thread...
deferredResult.setResult(result)
控制器可以异步地生成返回值,来自不同的线程——例如,作为对外部事件(JMS消息)、定时任务或其他事件的响应。
Callable
控制器可以使用java.util.concurrent.Callable包装任何支持的返回值,如下例所示:
- Java
- Kotlin
@PostMapping
public Callable<String> processUpload(final MultipartFile file) {
return () -> "someView";
}
@PostMapping
fun processUpload(file: MultipartFile) = Callable<String> {
// ...
"someView"
}
然后可以通过运行给定的任务来获取返回值,该任务是通过配置好的 AsyncTaskExecutor来执行的。
WebAsyncTask
WebAsyncTask 与使用 Callable 相似,但它允许自定义额外的设置,例如请求超时值,并且可以使用 AsyncTaskExecutor 来执行 java.util.concurrent.Callable,而不是 Spring MVC 全局设置的默认值。以下是使用 WebAsyncTask 的示例:
- Java
- Kotlin
@GetMapping("/callable")
WebAsyncTask<String> handle() {
return new WebAsyncTask<String>(20000L,()->{
Thread.sleep(10000); //simulate long-running task
return "asynchronous request completed";
});
}
@GetMapping("/callable")
fun handle(): WebAsyncTask<String> {
return WebAsyncTask(20000L) {
Thread.sleep(10000) // simulate long-running task
"asynchronous request completed"
}
}
处理
以下是关于Servlet异步请求处理的非常简洁的概述:
-
通过调用
request.startAsync()可以将ServletRequest设置为异步模式。这样做的主要效果是,Servlet(以及任何过滤器)可以退出,但响应仍然保持打开状态,以便稍后完成处理。 -
调用
request.startAsync()会返回一个AsyncContext对象,你可以利用该对象进一步控制异步处理。例如,它提供了dispatch方法,这个方法类似于 Servlet API 中的转发操作,不同之处在于它可以允许应用程序在 Servlet 容器的线程上继续请求处理。 -
ServletRequest提供了对当前DispatcherType的访问权限,你可以利用这一点来区分是处理初始请求、异步分发、转发还是其他类型的调度。
DeferredResult 的处理方式如下:
-
控制器返回一个
DeferredResult,并将其保存在内存中的队列或列表中,以便后续可以访问。 -
Spring MVC 调用
request.startAsync()。 -
与此同时,
DispatcherServlet和所有配置的过滤器会退出请求处理线程,但响应仍然保持开启状态。 -
应用程序从某个线程中设置
DeferredResult,Spring MVC 然后将请求重新分发回 Servlet 容器。 -
DispatcherServlet被再次调用,处理继续进行,使用异步产生的返回值。
Callable 处理的工作原理如下:
- 控制器返回一个
Callable对象。 - Spring MVC 调用
request.startAsync(),并将该Callable对象提交给AsyncTaskExecutor,在单独的线程中对其进行处理。 - 与此同时,
DispatcherServlet及所有过滤器会退出 Servlet 容器线程,但响应仍然处于开启状态。 - 最终,
Callable会产生一个结果,Spring MVC 会将请求重新分发回 Servlet 容器以完成后续处理。 DispatcherServlet会被再次调用,处理会继续进行,使用Callable异步产生的返回值来完成剩余的流程。
如需进一步了解背景和上下文,您还可以阅读这些博客文章,这些文章介绍了Spring MVC 3.2中对异步请求处理的支持。
异常处理
当你使用 DeferredResult 时,你可以选择是调用 setResult 还是带有异常的 setErrorResult。在这两种情况下,Spring MVC 都会将请求重新发送回 Servlet 容器以完成处理。之后,系统会认为控制器方法返回了给定的值,或者认为它产生了给定的异常。然后这个异常会通过常规的异常处理机制进行处理(例如,调用 @ExceptionHandler 方法)。
当你使用Callable时,会遵循类似的处理逻辑,主要的区别在于结果是从Callable返回的,或者Callable会抛出一个异常。
拦截
HandlerInterceptor的实例可以是AsyncHandlerInterceptor类型,以便在开始异步处理的初始请求时接收到afterConcurrentHandlingStarted回调(而不是postHandle和afterCompletion)。
HandlerInterceptor 的实现也可以注册一个 CallableProcessingInterceptor 或 DeferredResultProcessingInterceptor,以便更深入地与异步请求的生命周期集成(例如,处理超时事件)。有关更多详细信息,请参阅 AsyncHandlerInterceptor。
DeferredResult 提供了 onTimeout(Runnable) 和 onCompletion(Runnable) 回调函数。有关更多详细信息,请参阅 DeferredResult 的 Javadoc。Callable 可以替代 WebAsyncTask,后者还提供了用于超时和完成回调的额外方法。
Async Spring MVC与WebFlux的比较
Servlet API最初是为了实现一次性的Filter-Servlet链处理而设计的。异步请求处理允许应用程序退出Filter-Servlet链,但会保持响应状态以便后续处理。Spring MVC的异步支持就是基于这种机制构建的。当控制器返回一个DeferredResult时,Filter-Servlet链的处理就会终止,Servlet容器的线程也会被释放。之后,当DeferredResult被设置时,会进行一次异步调度(指向相同的URL),在此期间控制器会被再次调用,但不是直接调用它,而是使用DeferredResult的值来继续处理(就好像控制器自己返回了该结果一样)。
相比之下,Spring WebFlux既不是基于Servlet API构建的,也不需要这样的异步请求处理功能,因为它本身就是异步设计的。异步处理内置于所有框架契约中,并在请求处理的各个阶段都得到了天然的支持。
从编程模型的角度来看,Spring MVC和Spring WebFlux都支持在控制器方法中将异步和反应式类型作为返回值。Spring MVC甚至支持流处理,包括反应式背压(reactive back pressure)。然而,对响应的单独写入操作仍然是阻塞的(并且在单独的线程上执行),这与WebFlux不同,WebFlux依赖于非阻塞I/O,每次写入不需要额外的线程。
另一个根本性的区别是,Spring MVC不支持控制器方法参数中的异步或反应式类型(例如@RequestBody、@RequestPart等),也不对模型属性中的异步和反应式类型提供任何显式的支持。而Spring WebFlux则支持所有这些功能。
最后,从配置的角度来看,异步请求处理功能必须在Servlet容器级别启用。
HTTP流式传输
你可以使用DeferredResult和Callable来处理单个异步返回值。但如果你想要产生多个异步值,并将这些值写入响应中呢?本节将描述如何实现这一目标。
对象
你可以使用 ResponseBodyEmitter 的返回值来生成一个对象流,每个对象都会通过 HttpMessageConverter 进行序列化,然后写入响应中,如下例所示:
- Java
- Kotlin
@GetMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events")
fun handle() = ResponseBodyEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
您也可以使用 ResponseBodyEmitter 作为 ResponseEntity 的主体,从而自定义响应的状态码和头部信息。
当一个emitter抛出IOException时(例如,如果远程客户端断开连接),应用程序没有责任清理该连接,也不应该调用emitter.complete或emitter.completeWithError。相反,servlet容器会自动发起一个AsyncListener错误通知,在此过程中Spring MVC会执行一次completeWithError调用。这次调用随后会对应用程序进行最后一次ASYNC调度,在此期间Spring MVC会调用配置好的异常处理器并完成请求处理。
SSE
SseEmitter(ResponseBodyEmitter的子类)支持服务器发送的事件,即从服务器发送的事件会按照W3C SSE规范进行格式化。要从控制器生成一个SSE流,需要返回SseEmitter,如下例所示:
- Java
- Kotlin
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {
SseEmitter emitter = new SseEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun handle() = SseEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
虽然SSE是向浏览器推送数据的主要选择,但请注意Internet Explorer不支持Server-Sent Events(服务器发送事件)。可以考虑使用Spring的WebSocket消息传递,结合SockJS回退方案(包括SSE)来进行数据传输,这种方案可以兼容多种浏览器。
有关异常处理的说明,请参阅前一节。
原始数据
有时,绕过消息转换并直接将数据流发送到响应的OutputStream会很有用(例如,在文件下载时)。你可以使用StreamingResponseBody返回值类型来实现这一点,如下例所示:
- Java
- Kotlin
@GetMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}
@GetMapping("/download")
fun handle() = StreamingResponseBody {
// write...
}
你可以使用StreamingResponseBody作为ResponseEntity的体(body),来自定义响应的状态码和头部信息。
反应式类型
Spring MVC支持在控制器中使用响应式客户端库(也可以在WebFlux部分的响应式库中阅读相关内容)。这包括spring-webflux中的WebClient以及其他库,如Spring Data的响应式数据存储库。在这种情况下,能够从控制器方法返回响应式类型会非常方便。
反应式返回值的处理方式如下:
-
单值承诺(single-value promise)的适应方式类似于使用
DeferredResult。例如CompletionStage(JDK)、Mono(Reactor)和Single(RxJava)。 -
对于具有流媒体类型的多值流(如
application/x-ndjson或text/event-stream),其适应方式类似于使用ResponseBodyEmitter或SseEmitter。例如Flux(Reactor)或Observable(RxJava)。应用程序也可以返回Flux<ServerSentEvent>或Observable<ServerSentEvent>。 -
对于具有其他任何媒体类型的多值流(如
application/json),其适应方式类似于使用DeferredResult<List<?>。
Spring MVC 通过 spring-core 中的 ReactiveAdapterRegistry 支持 Reactor 和 RxJava,这使得它能够适配多种反应式(reactive)编程库。
对于响应流式传输,支持反应式背压(reactive back pressure),但写入响应的操作仍然是阻塞的,并通过已配置的AsyncTaskExecutor在单独的线程中执行,以避免阻塞上游源(例如从WebClient返回的Flux)。
上下文传播
如果类路径上存在Micrometer Context Propagation,当控制器方法返回一个reactive类型,如Flux或Mono时,所有注册了io.micrometer.ThreadLocalAccessor的ThreadLocal值,都会以键值对的形式写入Reactor的Context中,其中键由ThreadLocalAccessor所指定。
对于其他异步处理场景,你可以直接使用Context Propagation库。例如:
// Capture ThreadLocal values from the main thread ...
ContextSnapshot snapshot = ContextSnapshot.captureAll();
// On a different thread: restore ThreadLocal values
try (ContextSnapshot.Scope scope = snapshot.setThreadLocals()) {
// ...
}
以下ThreadLocalAccessor实现是开箱即用的:
LocaleContextThreadLocalAccessor— 通过LocaleContextHolder传递LocaleContextRequestAttributesThreadLocalAccessor— 通过RequestContextHolder传递RequestAttributes
以上内容不会自动注册。您需要在启动时通过 ContextRegistry.getInstance() 进行注册。
如需更多详细信息,请参阅Micrometer Context Propagation库的文档。
断开连接
Servlet API在远程客户端断开连接时不会提供任何通知。因此,在向响应流中传输数据时,无论是通过SseEmitter还是reactive types,定期发送数据都是非常重要的,因为如果客户端已经断开连接,写入操作将会失败。发送的数据可以是空的(仅包含注释的)SSE事件,也可以是任何其他形式的数据,只要对方能够将其理解为“心跳”信号并忽略即可。
或者,可以考虑使用具有内置心跳机制的网页消息解决方案(例如 通过WebSocket传输STOMP 或使用 SockJS的WebSocket)。
配置
异步请求处理功能必须在Servlet容器级别启用。MVC配置还提供了几个针对异步请求的选项。
Servlet 容器
Filter和Servlet的声明中有一个asyncSupported标志,需要将其设置为true才能启用异步请求处理。此外,还应该声明Filter映射来处理ASYNC的jakarta.servletDispatchType。
在Java配置中,当你使用AbstractAnnotationConfigDispatcherServletInitializer来初始化Servlet容器时,这一过程会自动完成。
在web.xml配置中,你可以在DispatcherServlet和Filter的声明中添加<async-supported>true</async-supported>,并在过滤器映射中添加<dispatcher>ASYNC</dispatcher>。
Spring MVC
MVC配置提供了以下异步请求处理的选项:
- Java配置:在
WebMvcConfigurer上使用configureAsyncSupport回调。 - XML命名空间:在
<mvc:annotation-driven>下使用<async-support>元素。
您可以配置以下内容:
-
异步请求的默认超时值取决于底层的 Servlet 容器,除非有明确的设置。
-
AsyncTaskExecutor用于在流式处理时执行阻塞写操作(使用 Reactive Types),以及执行从控制器方法返回的Callable实例。默认使用的AsyncTaskExecutor在高负载环境下并不适合生产环境。 -
DeferredResultProcessingInterceptor的实现和CallableProcessingInterceptor的实现。
请注意,您还可以为 DeferredResult、ResponseBodyEmitter 和 SseEmitter 设置默认的超时值。对于 Callable,您可以使用 WebAsyncTask 来提供超时值。