WebFlux 支持
WebFlux Spring集成模块 (spring-integration-webflux
) 允许以反应式方式执行 HTTP 请求和处理传入的 HTTP 请求。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.4.2"
在非 Servlet 基础的服务器配置情况下,必须包含 io.projectreactor.netty:reactor-netty
依赖。
WebFlux 支持包括以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。该支持完全基于 Spring WebFlux 和 Project Reactor 基础。有关更多信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间是共享的。
WebFlux 命名空间支持
Spring Integration 提供了一个 webflux
命名空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从 5.0 版本开始,提供了 WebFluxInboundEndpoint
的 WebHandler
实现。这个组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,它们通过新提取的 BaseHttpInboundEndpoint
共享一些通用选项。它用于 Spring WebFlux 反应式环境中(而不是 MVC)。以下示例展示了一个简单的 WebFlux 端点实现:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
配置类似于 HttpRequestHandlingEndpointSupport
(在示例中之前提到的),不同的是我们使用 @EnableWebFlux
将 WebFlux 基础设施添加到我们的集成应用程序中。此外,WebFluxInboundEndpoint
通过反应式 HTTP 服务器实现提供的基于背压和按需的功能,对下游流执行 sendAndReceive
操作。
回复部分也是非阻塞的,并且是基于内部的 FutureReplyChannel
,它被扁平映射到一个回复 Mono
以实现按需解析。
你可以使用自定义的 ServerCodecConfigurer
、RequestedContentTypeResolver
以及 ReactiveAdapterRegistry
来配置 WebFluxInboundEndpoint
。后者提供了一种机制,你可以用它来返回任何反应式类型的回复:Reactor Flux
,RxJava Observable
,Flowable
等等。通过这种方式,我们可以使用 Spring Integration 组件实现 服务器发送事件场景,如下例所示:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
更多信息配置选项,请参见 [请求映射支持](changes-2.2-3.0.md#x3.0 请求映射) 和 跨源资源共享 (CORS) 支持。
当请求体为空或 payloadExpression
返回 null
时,将使用请求参数 (MultiValueMap<String, String>
) 作为目标消息的 payload
进行处理。
负载验证
从 5.2 版本开始,WebFluxInboundEndpoint
可以配置一个 Validator
。与 HTTP 支持中的 MVC 验证不同,它用于验证由 HttpMessageReader
将请求转换后的 Publisher
中的元素,在执行回退和 payloadExpression
函数之前。框架无法假设构建最终有效负载后 Publisher
对象可能变得多么复杂。如果有要求限制验证可见性仅适用于最终有效负载(或其 Publisher
元素),则验证应该在 WebFlux 端点之后进行。有关更多信息,请参阅 Spring WebFlux 文档。无效的有效负载将被拒绝,并抛出 IntegrationWebExchangeBindException
(WebExchangeBindException
的扩展),其中包含所有验证 Errors
。有关验证的更多信息,请参阅 Spring 框架参考手册。
WebFlux 外发组件
WebFluxRequestExecutingMessageHandler
(从 5.0 版本开始)的实现与 HttpRequestExecutingMessageHandler
类似。它使用了 Spring Framework WebFlux 模块中的 WebClient
。要配置它,定义一个类似于以下内容的 bean:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
的 exchange()
操作返回一个 Mono<ClientResponse>
,该结果通过多个 Mono.map()
步骤映射为 AbstractIntegrationMessageBuilder
,作为 WebFluxRequestExecutingMessageHandler
的输出。与 ReactiveChannel
作为 outputChannel
一起使用时,Mono<ClientResponse>
的评估会延迟到下游订阅发生时才进行。否则,它将被视为 async
模式,并且 Mono
响应会被适配为 SettableListenableFuture
,以实现从 WebFluxRequestExecutingMessageHandler
返回的异步回复。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler
的配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
确定了响应体元素转换的目标类型。如果设置了 replyPayloadToFlux
为 true
,则响应体将被转换为带有每个元素的 expectedResponseType
的 Flux
,并且此 Flux
将作为有效负载发送到下游。之后,你可以使用 splitter 以反应式方式迭代这个 Flux
。
此外,可以将 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中,而不是使用 expectedResponseType
和 replyPayloadToFlux
属性。它可用于对 ClientHttpResponse
进行低级别的访问,并对消息体和 HTTP 头的转换提供更多的控制。Spring Integration 提供了 ClientHttpResponseBodyExtractor
作为身份函数来生成(下游)整个 ClientHttpResponse
,以及任何其他可能的自定义逻辑。
从 5.2 版本开始,WebFluxRequestExecutingMessageHandler
支持响应式的 Publisher
、Resource
和 MultiValueMap
类型作为请求消息的有效载荷。相应的 BodyInserter
在内部使用,以填充到 WebClient.RequestBodySpec
中。当有效载荷是响应式的 Publisher
时,可以使用配置的 publisherElementType
或 publisherElementTypeExpression
来确定发布者的元素类型。表达式必须解析为一个 Class<?>
、String
(该字符串将被解析为目标 Class<?>
或 ParameterizedTypeReference
)。
从 5.5 版本开始,WebFluxRequestExecutingMessageHandler
暴露了一个 extractResponseBody
标志(默认为 true
),用于仅返回响应体,或作为回复消息的有效负载返回整个 ResponseEntity
,这独立于提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中不存在主体,则忽略此标志并返回整个 ResponseEntity
。
更多信息配置选项,请参阅 HTTP 外发组件。
WebFlux Header 映射
由于 WebFlux 组件完全基于 HTTP 协议,因此在 HTTP 标头映射上没有区别。有关更多可能的选项和用于映射标头的组件,请参阅HTTP 标头映射。
WebFlux 请求属性
从 6.0 版本开始,可以配置 WebFluxRequestExecutingMessageHandler
通过 setAttributeVariablesExpression()
来评估请求属性。此 SpEL 表达式必须在 Map
中进行评估。然后将此类映射传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。如果需要以键值对象的形式从 Message
传递信息给请求,并且下游过滤器需要访问这些属性以进行进一步处理时,这将非常有帮助。