跳到主要内容
版本:7.0.2

RSocket 支持

DeepSeek V3 中英对照 RSocket Support

RSocket Spring Integration 模块 (spring-integration-rsocket) 支持执行 RSocket 应用协议

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>7.0.2</version>
</dependency>

该模块自 5.2 版本起可用,它基于 Spring Messaging 基础框架及其 RSocket 组件实现,例如 RSocketRequesterRSocketMessageHandlerRSocketStrategies。有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持

在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供了 AbstractRSocketConnectorServerRSocketConnectorClientRSocketConnector 实现。

ServerRSocketConnector 根据提供的 io.rsocket.transport.ServerTransport 在主机和端口上暴露一个监听器,用于接受来自客户端的连接。内部的 RSocketServer 实例可以通过 setServerConfigurer() 进行自定义,同时还可以配置其他选项,例如用于负载数据和头部元数据的 RSocketStrategiesMimeType。当客户端请求者(参见下面的 ClientRSocketConnector)提供 setupRoute 时,已连接的客户端将作为 RSocketRequester 存储,其键由 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 确定。默认情况下,连接数据用作键,并使用 UTF-8 字符集转换为字符串。这样的 RSocketRequester 注册表可以在应用程序逻辑中用于确定特定的客户端连接以与其交互,或者用于向所有连接的客户端发布相同的消息。当从客户端建立连接时,ServerRSocketConnector 会发出 RSocketConnectedEvent。这与 Spring Messaging 模块中的 @ConnectMapping 注解提供的功能类似。映射模式 * 表示接受所有客户端路由。RSocketConnectedEvent 可以通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 头部来区分不同的路由。

典型的服务器配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}

所有选项,包括 RSocketStrategies bean 和用于 RSocketConnectedEvent@EventListener 都是可选的。更多信息请参阅 ServerRSocketConnector 的 JavaDocs。

自5.2.1版本起,ServerRSocketMessageHandler 被提取为一个公共的顶级类,以便与现有的 RSocket 服务器建立连接。当 ServerRSocketConnector 被提供一个外部的 ServerRSocketMessageHandler 实例时,它不会在内部创建 RSocket 服务器,而是将所有处理逻辑委托给所提供的实例。此外,ServerRSocketMessageHandler 可以通过配置 messageMappingCompatible 标志来处理 RSocket 控制器的 @MessageMapping 注解,从而完全替代标准 RSocketMessageHandler 提供的功能。这在混合配置中非常有用,例如当传统的 @MessageMapping 方法与 RSocket 通道适配器共存于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。

ClientRSocketConnector 作为基于通过提供的 ClientTransport 连接的 RSocketRSocketRequester 的持有者。RSocketConnector 可以通过提供的 RSocketConnectorConfigurer 进行自定义。此外,还可以在此组件上配置 setupRoute(带有可选的模板变量)以及带有元数据的 setupData

典型的客户端配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}

这些选项(包括 RSocketStrategies bean)大多是可选的。请注意我们如何连接到在任意端口上本地启动的 RSocket 服务器。关于 setupData 的使用场景,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy。更多信息,请参见 ClientRSocketConnector 及其超类 AbstractRSocketConnector 的 Javadocs。

ClientRSocketConnectorServerRSocketConnector 都负责将入站通道适配器映射到其 path 配置,以路由传入的 RSocket 请求。更多信息请参阅下一节。

RSocket 入站网关

RSocketInboundGateway 负责接收 RSocket 请求并生成响应(如果有)。它需要一个 path 映射数组,这些映射可以是类似于 MVC 请求映射或 @MessageMapping 语义的模式。此外(自 5.2.2 版本起),可以在 RSocketInboundGateway 上配置一组交互模型(参见 RSocketInteractionModel),以通过特定的帧类型限制 RSocket 请求访问此端点。默认情况下,支持所有交互模型。根据其 IntegrationRSocketEndpoint 实现(ReactiveMessageHandler 的扩展),此类 bean 会被 ServerRSocketConnectorClientRSocketConnector 自动检测,用于内部 IntegrationRSocketMessageHandler 中针对传入请求的路由逻辑。可以向 RSocketInboundGateway 提供 AbstractRSocketConnector 以进行显式端点注册。这样,该 AbstractRSocketConnector 上的自动检测选项将被禁用。RSocketStrategies 也可以注入到 RSocketInboundGateway 中,或者从提供的 AbstractRSocketConnector 中获取,从而覆盖任何显式注入。根据提供的 requestElementType,使用这些 RSocketStrategies 中的解码器来解码请求负载。如果传入的 Message 中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 标头,则 RSocketInboundGateway 将请求视为 fireAndForget RSocket 交互模型。在这种情况下,RSocketInboundGateway 会向 outputChannel 执行普通的 send 操作。否则,将使用 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 标头中的 MonoProcessor 值来向 RSocket 发送回复。为此,RSocketInboundGateway 会在 outputChannel 上执行 sendAndReceiveMessageReactive 操作。根据 MessagingRSocket 逻辑,要向下游发送的消息的 payload 始终是一个 Flux。在 fireAndForget RSocket 交互模型中,消息具有普通的转换后的 payload。回复 payload 可以是普通对象或 Publisher——RSocketInboundGateway 会根据 RSocketStrategies 中提供的编码器,将两者都正确转换为 RSocket 响应。

从版本 5.3 开始,RSocketInboundGateway 新增了一个 decodeFluxAsUnit 选项(默认值为 false)。默认情况下,传入的 Flux 会以每个事件单独解码的方式进行转换。这正是当前 @MessageMapping 语义所体现的精确行为。若要根据应用程序需求恢复之前的行为或将整个 Flux 作为单个单元解码,必须将 decodeFluxAsUnit 设置为 true。然而,目标解码逻辑取决于所选的 Decoder,例如 StringDecoder 要求(默认情况下)流中存在换行分隔符以指示字节缓冲区的结束。

请参阅使用Java配置RSocket端点以获取配置 RSocketInboundGateway 端点及处理下游负载的示例。

RSocket 出站网关

RSocketOutboundGateway 是一个 AbstractReplyProducingMessageHandler,用于向 RSocket 发起请求并根据 RSocket 的回复(如果有)生成响应。底层的 RSocket 协议交互委托给 RSocketRequester 处理,该对象从提供的 ClientRSocketConnector 或服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 标头解析而来。服务器端的目标 RSocketRequester 可以从 RSocketConnectedEvent 中解析,或根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy() 为连接请求映射选择的业务键,使用 ServerRSocketConnector.getClientRSocketRequester() API 获取。更多信息请参阅 ServerRSocketConnector 的 Java 文档。

发送请求的 route 必须显式配置(与路径变量一起)或通过针对请求消息求值的 SpEL 表达式来配置。

RSocket 交互模型可通过 RSocketInteractionModel 选项或相应的表达式设置提供。默认情况下,通用网关用例使用 requestResponse 模式。

当请求消息负载为 Publisher 时,可以提供 publisherElementType 选项,以根据目标 RSocketRequester 中提供的 RSocketStrategies 对其元素进行编码。此选项的表达式可解析为 ParameterizedTypeReference。有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data() 的 JavaDocs。

RSocket 请求还可以通过 metadata 进行增强。为此,可以在 RSocketOutboundGateway 上配置针对请求消息的 metadataExpression。此表达式必须解析为 Map<Object, MimeType>

interactionModel 不是 fireAndForget 时,必须提供 expectedResponseType。默认情况下,它是一个 String.class。此选项的表达式可以求值为 ParameterizedTypeReference。有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux() 的 JavaDocs。

来自 RSocketOutboundGateway 的回复 payload 始终是一个 Mono(即使是 fireAndForget 交互模型,它也是 Mono<Void>),这始终使该组件表现为 async。在将此类 Mono 生成到常规通道的 outputChannel 之前,会对其进行订阅,或者由 FluxMessageChannel 按需处理。对于 requestStreamrequestChannel 交互模型,Flux 响应也会被包装到回复 Mono 中。它可以通过直通服务激活器在 FluxMessageChannel 下游展平:

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}

或在目标应用程序逻辑中显式订阅。

预期的响应类型也可以配置(或通过表达式求值)为 void,将此网关视为出站通道适配器。然而,outputChannel 仍然必须配置(即使只是一个 NullChannel),以启动对返回的 Mono 的订阅。

请参阅使用Java配置RSocket端点以获取示例,了解如何配置 RSocketOutboundGateway 端点并处理下游负载。

RSocket 命名空间支持

Spring Integration 提供了一个 rsocket 命名空间及相应的模式定义。要在配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>

入站

要通过 XML 配置 Spring Integration RSocket 入站通道适配器,你需要使用 int-rsocket 命名空间中的相应 inbound-gateway 组件。以下示例展示了如何配置它:

<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 应配置为通用的 <bean> 定义。

出站

<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的描述,请参阅 spring-integration-rsocket.xsd

使用 Java 配置 RSocket 端点

以下示例展示了如何使用 Java 配置 RSocket 入站端点:

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}

在此配置中,假定存在一个 ClientRSocketConnectorServerRSocketConnector,其意义在于自动检测“echo”路径上的此类端点。请注意 @Transformer 签名,它完全以响应式方式处理 RSocket 请求并生成响应式回复。

以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关:

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}

在此配置中,假定存在一个 ClientRSocketConnectorServerRSocketConnector,其含义是在 “/uppercase” 路径上自动检测此类端点,并期望交互模型为“请求通道”。

以下示例展示了如何使用 Java 配置 RSocket 出站网关:

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}

setClientRSocketConnector() 仅客户端需要。在服务器端,必须在请求消息中提供带有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 标头。

以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关:

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}

有关如何在上述流程开头使用提到的 Function 接口的更多信息,请参阅IntegrationFlow 作为网关