跳到主要内容

WebFlux 支持

QWen Plus 中英对照 WebFlux Support

WebFlux Spring集成模块 (spring-integration-webflux) 允许以反应式方式执行 HTTP 请求和处理传入的 HTTP 请求。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.4.2</version>
</dependency>
xml

在非 Servlet 基础的服务器配置情况下,必须包含 io.projectreactor.netty:reactor-netty 依赖。

WebFlux 支持包括以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。该支持完全基于 Spring WebFluxProject Reactor 基础。有关更多信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间是共享的。

WebFlux 命名空间支持

Spring Integration 提供了一个 webflux 命名空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
xml

WebFlux 入站组件

从 5.0 版本开始,提供了 WebFluxInboundEndpointWebHandler 实现。这个组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,它们通过新提取的 BaseHttpInboundEndpoint 共享一些通用选项。它用于 Spring WebFlux 反应式环境中(而不是 MVC)。以下示例展示了一个简单的 WebFlux 端点实现:

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
java

配置类似于 HttpRequestHandlingEndpointSupport(在示例中之前提到的),不同的是我们使用 @EnableWebFlux 将 WebFlux 基础设施添加到我们的集成应用程序中。此外,WebFluxInboundEndpoint 通过反应式 HTTP 服务器实现提供的基于背压和按需的功能,对下游流执行 sendAndReceive 操作。

备注

回复部分也是非阻塞的,并且是基于内部的 FutureReplyChannel,它被扁平映射到一个回复 Mono 以实现按需解析。

你可以使用自定义的 ServerCodecConfigurerRequestedContentTypeResolver 以及 ReactiveAdapterRegistry 来配置 WebFluxInboundEndpoint。后者提供了一种机制,你可以用它来返回任何反应式类型的回复:Reactor Flux,RxJava ObservableFlowable 等等。通过这种方式,我们可以使用 Spring Integration 组件实现 服务器发送事件场景,如下例所示:

@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
java

更多信息配置选项,请参见 [请求映射支持](changes-2.2-3.0.md#x3.0 请求映射) 和 跨源资源共享 (CORS) 支持

当请求体为空或 payloadExpression 返回 null 时,将使用请求参数 (MultiValueMap<String, String>) 作为目标消息的 payload 进行处理。

负载验证

从 5.2 版本开始,WebFluxInboundEndpoint 可以配置一个 Validator。与 HTTP 支持中的 MVC 验证不同,它用于验证由 HttpMessageReader 将请求转换后的 Publisher 中的元素,在执行回退和 payloadExpression 函数之前。框架无法假设构建最终有效负载后 Publisher 对象可能变得多么复杂。如果有要求限制验证可见性仅适用于最终有效负载(或其 Publisher 元素),则验证应该在 WebFlux 端点之后进行。有关更多信息,请参阅 Spring WebFlux 文档。无效的有效负载将被拒绝,并抛出 IntegrationWebExchangeBindExceptionWebExchangeBindException 的扩展),其中包含所有验证 Errors。有关验证的更多信息,请参阅 Spring 框架参考手册

WebFlux 外发组件

WebFluxRequestExecutingMessageHandler(从 5.0 版本开始)的实现与 HttpRequestExecutingMessageHandler 类似。它使用了 Spring Framework WebFlux 模块中的 WebClient。要配置它,定义一个类似于以下内容的 bean:

@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
java

WebClientexchange() 操作返回一个 Mono<ClientResponse>,该结果通过多个 Mono.map() 步骤映射为 AbstractIntegrationMessageBuilder,作为 WebFluxRequestExecutingMessageHandler 的输出。与 ReactiveChannel 作为 outputChannel 一起使用时,Mono<ClientResponse> 的评估会延迟到下游订阅发生时才进行。否则,它将被视为 async 模式,并且 Mono 响应会被适配为 SettableListenableFuture,以实现从 WebFluxRequestExecutingMessageHandler 返回的异步回复。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler 的配置。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 确定了响应体元素转换的目标类型。如果设置了 replyPayloadToFluxtrue,则响应体将被转换为带有每个元素的 expectedResponseTypeFlux,并且此 Flux 将作为有效负载发送到下游。之后,你可以使用 splitter 以反应式方式迭代这个 Flux

此外,可以将 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中,而不是使用 expectedResponseTypereplyPayloadToFlux 属性。它可用于对 ClientHttpResponse 进行低级别的访问,并对消息体和 HTTP 头的转换提供更多的控制。Spring Integration 提供了 ClientHttpResponseBodyExtractor 作为身份函数来生成(下游)整个 ClientHttpResponse,以及任何其他可能的自定义逻辑。

从 5.2 版本开始,WebFluxRequestExecutingMessageHandler 支持响应式的 PublisherResourceMultiValueMap 类型作为请求消息的有效载荷。相应的 BodyInserter 在内部使用,以填充到 WebClient.RequestBodySpec 中。当有效载荷是响应式的 Publisher 时,可以使用配置的 publisherElementTypepublisherElementTypeExpression 来确定发布者的元素类型。表达式必须解析为一个 Class<?>String(该字符串将被解析为目标 Class<?>ParameterizedTypeReference)。

从 5.5 版本开始,WebFluxRequestExecutingMessageHandler 暴露了一个 extractResponseBody 标志(默认为 true),用于仅返回响应体,或作为回复消息的有效负载返回整个 ResponseEntity,这独立于提供的 expectedResponseTypereplyPayloadToFlux。如果 ResponseEntity 中不存在主体,则忽略此标志并返回整个 ResponseEntity

更多信息配置选项,请参阅 HTTP 外发组件

WebFlux Header 映射

由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 标头映射上没有区别。有关更多可能的选项和用于映射标头的组件,请参阅HTTP 标头映射

WebFlux 请求属性

从 6.0 版本开始,可以配置 WebFluxRequestExecutingMessageHandler 通过 setAttributeVariablesExpression() 来评估请求属性。此 SpEL 表达式必须在 Map 中进行评估。然后将此类映射传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调。如果需要以键值对象的形式从 Message 传递信息给请求,并且下游过滤器需要访问这些属性以进行进一步处理时,这将非常有帮助。