跳到主要内容
版本:7.0.2

WebFlux 支持

DeepSeek V3 中英对照 WebFlux Support

WebFlux Spring Integration 模块 (spring-integration-webflux) 允许以响应式方式执行 HTTP 请求并处理入站 HTTP 请求。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>7.0.2</version>
</dependency>

在非基于 Servlet 的服务器配置中,必须包含 io.projectreactor.netty:reactor-netty 依赖项。

WebFlux支持包含以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。该支持完全基于 Spring WebFluxProject Reactor 基础构建。更多信息请参阅 HTTP 支持,因为响应式与常规 HTTP 组件之间共享了许多配置选项。

WebFlux 命名空间支持

Spring Integration 提供了一个 webflux 命名空间及相应的模式定义。要在配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>

WebFlux 入站组件

从 5.0 版本开始,提供了 WebHandlerWebFluxInboundEndpoint 实现。该组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,并通过新提取的 BaseHttpInboundEndpoint 与其共享一些通用选项。它用于 Spring WebFlux 响应式环境(而非 MVC)。以下示例展示了一个简单的 WebFlux 端点实现:

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}

该配置与之前提到的 HttpRequestHandlingEndpointSupport 类似,区别在于我们使用 @EnableWebFlux 将 WebFlux 基础设施添加到集成应用程序中。此外,WebFluxInboundEndpoint 通过使用响应式 HTTP 服务器实现提供的基于背压和按需处理的能力,对下游流执行 sendAndReceive 操作。

备注

回复部分同样是非阻塞的,它基于内部的 FutureReplyChannel,该通道被扁平映射为一个用于按需解析的回复 Mono

你可以配置 WebFluxInboundEndpoint,使用自定义的 ServerCodecConfigurerRequestedContentTypeResolver,甚至是 ReactiveAdapterRegistry。后者提供了一种机制,允许你以任何响应式类型返回回复:Reactor 的 Flux、RxJava 的 ObservableFlowable 等。这样,我们就可以使用 Spring Integration 组件实现服务器发送事件场景,如下例所示:

@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}

请参阅请求映射支持跨域资源共享 (CORS) 支持以获取更多可能的配置选项。

当请求体为空或 payloadExpression 返回 null 时,请求参数(MultiValueMap<String, String>)将用作待处理目标消息的 payload

负载验证

从版本5.2开始,WebFluxInboundEndpoint 可以配置一个 Validator。与 HTTP 支持 中的 MVC 验证不同,它用于在执行回退和 payloadExpression 函数之前,验证请求被 HttpMessageReader 转换后的 Publisher 中的元素。框架无法假设在构建最终负载后,Publisher 对象可能有多复杂。如果需要将验证范围限制在最终负载(或其 Publisher 元素)上,验证应放在下游而不是 WebFlux 端点中。更多信息请参阅 Spring WebFlux 文档。无效负载会被 IntegrationWebExchangeBindException(一个 WebExchangeBindException 扩展)拒绝,其中包含所有验证 Errors。有关验证的更多信息,请参阅 Spring Framework 参考手册

WebFlux 出站组件

WebFluxRequestExecutingMessageHandler(自 5.0 版本起)的实现与 HttpRequestExecutingMessageHandler 类似。它使用 Spring Framework WebFlux 模块中的 WebClient。要配置它,请定义一个类似于以下内容的 bean:

@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}

WebClientexchange() 操作返回一个 Mono<ClientResponse>,该对象通过多个 Mono.map() 步骤被映射为 AbstractIntegrationMessageBuilder,作为 WebFluxRequestExecutingMessageHandler 的输出。结合作为 outputChannelReactiveChannelMono<ClientResponse> 的求值会延迟到下游进行订阅时。否则,它将被视为 async 模式,并且 Mono 响应会被适配为 SettableListenableFuture,用于 WebFluxRequestExecutingMessageHandler 的异步回复。输出消息的目标负载取决于 WebFluxRequestExecutingMessageHandler 的配置。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 用于指定响应体元素转换的目标类型。如果 replyPayloadToFlux 设置为 true,则响应体会转换为一个 Flux,其中每个元素都使用提供的 expectedResponseType 进行转换,并且这个 Flux 会作为负载发送到下游。之后,你可以使用 splitter 以响应式方式遍历这个 Flux

此外,可以将 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中,以替代 expectedResponseTypereplyPayloadToFlux 属性。这可用于对 ClientHttpResponse 进行底层访问,并更精细地控制响应体与 HTTP 头部的转换。Spring Integration 提供了 ClientHttpResponseBodyExtractor 作为恒等函数,用于生成(下游)完整的 ClientHttpResponse 以及任何其他可能的自定义逻辑。

从版本 5.2 开始,WebFluxRequestExecutingMessageHandler 支持响应式的 PublisherResourceMultiValueMap 类型作为请求消息的有效负载。内部使用相应的 BodyInserter 来填充到 WebClient.RequestBodySpec 中。当有效负载是响应式 Publisher 时,可以使用配置的 publisherElementTypepublisherElementTypeExpression 来确定发布者元素类型的类型。该表达式必须解析为 Class<?>String,后者将解析为目标 Class<?>ParameterizedTypeReference

从版本5.5开始,WebFluxRequestExecutingMessageHandler 公开了一个 extractResponseBody 标志(默认为 true),用于仅返回响应体,或将整个 ResponseEntity 作为回复消息的有效负载返回,这与提供的 expectedResponseTypereplyPayloadToFlux 无关。如果 ResponseEntity 中不存在响应体,则忽略此标志并返回整个 ResponseEntity

更多可能的配置选项请参见 HTTP 出站组件

WebFlux 头部映射

由于 WebFlux 组件完全基于 HTTP 协议,HTTP 头部映射并无差异。更多可用的映射选项及组件,请参阅 HTTP 头部映射

WebFlux 请求属性

从版本6.0开始,WebFluxRequestExecutingMessageHandler 可以通过 setAttributeVariablesExpression() 配置来评估请求属性。此 SpEL 表达式必须在 Map 中进行评估。然后,该映射将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调中。如果以键值对象形式的信息需要从 Message 传递到请求,并且下游过滤器将能够访问这些属性以进行进一步处理,这将非常有用。