RSocket 支持
RSocket Spring Integration 模块 (spring-integration-rsocket
) 允许执行 RSocket 应用协议。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.4.2"
此模块从 5.2 版开始提供,并基于 Spring Messaging 基础,使用其 RSocket 组件实现,例如 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。
在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立一个 RSocket 连接。为此,Spring Integration RSocket 支持提供了 ServerRSocketConnector
和 ClientRSocketConnector
,它们是 AbstractRSocketConnector
的实现。
ServerRSocketConnector
根据提供的 io.rsocket.transport.ServerTransport
在主机和端口上暴露一个监听器,以接受来自客户端的连接。可以通过 setServerConfigurer()
自定义内部的 RSocketServer
实例,以及其他可以配置的选项,例如用于有效负载数据和头部元数据的 RSocketStrategies
和 MimeType
。当客户端请求者提供了 setupRoute
(参见下面的 ClientRSocketConnector
),连接的客户端将作为 RSocketRequester
存储在由 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
确定的键下。默认情况下,连接数据用作键,并转换为使用 UTF-8 字符集的字符串值。这样的 RSocketRequester
注册表可以在应用程序逻辑中使用,以确定特定的客户端连接进行交互,或者向所有已连接的客户端发布相同的消息。当从客户端建立连接时,ServerRSocketConnector
会发出 RSocketConnectedEvent
。这类似于 Spring Messaging 模块中 @ConnectMapping
注解所提供的功能。映射模式 *
表示接受所有客户端路由。RSocketConnectedEvent
可以通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
头来区分不同的路由。
一个典型的服务器配置可能看起来像这样:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括 RSocketStrategies
bean 和用于 RSocketConnectedEvent
的 @EventListener
,都是可选的。更多信息请参见 ServerRSocketConnector
JavaDocs。
从 5.2.1 版本开始,ServerRSocketMessageHandler
被提取为一个公共的顶级类,以便可能与现有的 RSocket 服务器连接。当 ServerRSocketConnector
使用外部实例的 ServerRSocketMessageHandler
提供时,它不会在内部创建 RSocket 服务器,而是将所有的处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler
可以配置一个 messageMappingCompatible
标志来处理 RSocket 控制器的 @MessageMapping
,完全替代标准 RSocketMessageHandler
提供的功能。这在混合配置中很有用,当经典的 @MessageMapping
方法与 RSocket 通道适配器一起存在于同一个应用程序中,并且应用程序中存在外部配置的 RSocket 服务器。
ClientRSocketConnector
用作基于通过提供的 ClientTransport
连接的 RSocket
的 RSocketRequester
的持有者。可以通过提供的 RSocketConnectorConfigurer
来定制 RSocketConnector
。该组件还可以配置带有可选模板变量的 setupRoute
和带有元数据的 setupData
。
一个典型的客户端配置可能看起来像这样:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 RSocketStrategies
bean)都是可选的。请注意我们如何连接到在任意端口上本地启动的 RSocket 服务器。有关 setupData
的用法,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy
。另请参阅 ClientRSocketConnector
及其父类 AbstractRSocketConnector
的 JavaDocs 以获取更多信息。
ClientRSocketConnector
和 ServerRSocketConnector
都负责将入站通道适配器映射到它们的 path
配置,以路由传入的 RSocket 请求。有关更多信息,请参阅下一节。
RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并生成响应(如果有)。它需要一个 path
映射数组,这些映射可以是类似于 MVC 请求映射或 @MessageMapping
语义的模式。此外,(从版本 5.2.2 开始),可以在 RSocketInboundGateway
上配置一组交互模型(参见 RSocketInteractionModel
),以限制 RSocket 请求到此端点的特定帧类型。默认情况下,所有交互模型都受支持。根据其 IntegrationRSocketEndpoint
实现(ReactiveMessageHandler
的扩展),这种类型的 bean 会被 ServerRSocketConnector
或 ClientRSocketConnector
自动检测到,用于内部 IntegrationRSocketMessageHandler
中传入请求的路由逻辑。可以为 RSocketInboundGateway
提供一个 AbstractRSocketConnector
以进行显式端点注册。这样,在该 AbstractRSocketConnector
上禁用自动检测选项。RSocketStrategies
也可以注入到 RSocketInboundGateway
,或者从提供的 AbstractRSocketConnector
获取,覆盖任何显式的注入。使用这些 RSocketStrategies
中的解码器根据提供的 requestElementType
解码请求有效负载。如果在传入的 Message
中没有提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
标头,则 RSocketInboundGateway
将请求视为 fireAndForget
RSocket 交互模型。在这种情况下,RSocketInboundGateway
在 outputChannel
中执行简单的 send
操作。否则,来自 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
标头的 MonoProcessor
值用于向 RSocket 发送回复。为此,RSocketInboundGateway
在 outputChannel
上执行 sendAndReceiveMessageReactive
操作。要发送下游的消息的有效负载始终是一个 Flux
,这符合 MessagingRSocket
逻辑。当处于 fireAndForget
RSocket 交互模型时,消息具有简单转换的有效负载。回复的有效负载可以是一个简单的对象或一个 Publisher
——RSocketInboundGateway
根据 RSocketStrategies
中提供的编码器将它们正确地转换为 RSocket 响应。
从 5.3 版本开始,在 RSocketInboundGateway
中添加了一个 decodeFluxAsUnit
选项(默认值为 false
)。默认情况下,传入的 Flux
会以每个事件分别解码的方式进行转换。这与当前 @MessageMapping
语义下的行为完全一致。为了恢复之前的行为或将整个 Flux
根据应用需求作为单个单元解码,需要将 decodeFluxAsUnit
设置为 true
。然而,目标解码逻辑取决于所选的 Decoder
,例如,StringDecoder
需要在流中包含换行符分隔符(默认情况下)以指示字节缓冲区的结束。
请参阅 使用 Java 配置 RSocket 端点,了解如何配置 RSocketInboundGateway
端点以及如何处理下游有效负载的示例。
RSocket 外发网关
RSocketOutboundGateway
是一个 AbstractReplyProducingMessageHandler
,用于执行进入 RSocket 的请求,并根据 RSocket 的回复(如果有)生成回复。低级别的 RSocket 协议交互被委托给从提供的 ClientRSocketConnector
或从服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头解析出的 RSocketRequester
。服务器端的目标 RSocketRequester
可以从 RSocketConnectedEvent
解析,或者使用 ServerRSocketConnector.getClientRSocketRequester()
API 根据选定的连接请求映射业务键来解析。有关更多信息,请参阅 ServerRSocketConnector
的 JavaDocs。
发送请求的 route
必须显式配置(连同路径变量)或通过针对请求消息评估的 SpEL 表达式。
RSocket 交互模型可以通过 RSocketInteractionModel
选项或相应的表达式设置来提供。默认情况下,对于常见的网关用例,使用 requestResponse
。
当请求消息的有效负载是一个 Publisher
时,可以提供一个 publisherElementType
选项,根据目标 RSocketRequester
中提供的 RSocketStrategies
对其元素进行编码。此选项的表达式可以求值为一个 ParameterizedTypeReference
。有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data()
的 JavaDocs。
一个 RSocket 请求也可以通过 metadata
来增强。为此,可以在 RSocketOutboundGateway
上配置针对请求消息的 metadataExpression
。此类表达式必须评估为 Map<Object, MimeType>
类型。
当 interactionModel
不是 fireAndForget
时,必须提供 expectedResponseType
。它默认是一个 String.class
。此选项的表达式可以评估为一个 ParameterizedTypeReference
。有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
的 JavaDocs。
来自 RSocketOutboundGateway
的回复 payload
是一个 Mono
(即使对于 fireAndForget
交互模型,它也是 Mono<Void>
),这使得此组件始终是 async
。此类 Mono
在生产到 outputChannel
之前会进行订阅,以用于常规通道,或由 FluxMessageChannel
按需处理。对于 requestStream
或 requestChannel
交互模型的 Flux
响应也会被包装成回复 Mono
。它可以通过带有直通服务激活器的 FluxMessageChannel
在下游展平:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
预期的响应类型也可以被配置(或通过表达式评估)为 void
,将此网关视为一个出站通道适配器。但是,outputChannel
仍然需要进行配置(即使它只是一个 NullChannel
),以启动对返回的 Mono
的订阅。
请参阅 使用 Java 配置 RSocket 端点 以了解如何配置 RSocketOutboundGateway
端点以及如何处理下游有效负载的示例。
RSocket 命名空间支持
Spring Integration 提供了一个 rsocket
命名空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,你需要使用 int-rsocket
命名空间中的适当 inbound-gateway
组件。以下示例展示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
应将 ClientRSocketConnector
和 ServerRSocketConnector
配置为通用的 <bean>
定义。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
请参阅 spring-integration-rsocket.xsd
以获取所有这些 XML 属性的描述。
使用 Java 配置 RSocket 端点
以下示例展示了如何用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中,假定有一个 ClientRSocketConnector
或 ServerRSocketConnector
,用于自动检测“echo”路径上的此类端点。请注意 @Transformer
的签名,它完全响应式地处理 RSocket 请求并生成响应式的回复。
以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中,假定有一个 ClientRSocketConnector
或 ServerRSocketConnector
,用于自动检测在路径 “/uppercase” 上的此类端点,并且预期的交互模型为 “请求通道”。
以下示例展示了如何使用 Java 配置 RSocket 外发网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅在客户端需要。在服务器端,请求消息中必须提供带有 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头。
以下示例展示了如何使用 Java DSL 配置 RSocket 外发网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在上述流程的开头使用提到的 Function
接口的更多信息,请参阅 集成流作为网关。