RSocket
本节描述了 Spring Framework 对 RSocket 协议的支持。
概述
RSocket 是一种应用协议,用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双工通信,使用以下交互模型之一:
-
Request-Response
— 发送一条消息并接收一条回复。 -
Request-Stream
— 发送一条消息并接收一系列消息。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
一旦建立了初始连接,“客户端”与“服务器”的区别就消失了,因为双方变得对称,任一方都可以发起上述交互。这就是为什么在协议中称参与方为“请求者”和“响应者”,而上述交互被称为“请求流”或简单地称为“请求”。
这些是 RSocket 协议的关键特性和优势:
-
Reactive Streams 语义跨网络边界 — 对于诸如
Request-Stream
和Channel
的流请求,背压信号在请求者和响应者之间传递,允许请求者在源头减缓响应者,从而减少对网络层拥塞控制的依赖,以及在网络层或任何层级的缓冲需求。 -
请求限流 — 此功能被称为 "Leasing",源于可以从每一端发送的
LEASE
帧,用于限制另一端在给定时间内允许的请求总数。租约会定期续订。 -
会话恢复 — 此功能旨在应对连接丢失,并需要保持某些状态。状态管理对应用程序是透明的,并且与背压结合使用时效果良好,可以在可能的情况下停止生产者,并减少所需的状态量。
-
大消息的分片和重组。
-
保持连接(心跳)。
RSocket 在多种语言中有 实现。 Java 库 基于 Project Reactor 和 Reactor Netty 进行传输。这意味着您应用程序中来自 Reactive Streams 发布者的信号可以通过 RSocket 在网络中透明地传播。
协议
连接
最初,客户端通过一些低级流传输协议(如 TCP 或 WebSocket)连接到服务器,并向服务器发送一个 SETUP
帧以设置连接的参数。
服务器可能会拒绝 SETUP
帧,但通常在发送(对于客户端)和接收(对于服务器)之后,双方可以开始发起请求,除非 SETUP
指示使用租赁语义来限制请求的数量,在这种情况下,双方必须等待来自另一端的 LEASE
帧以允许发起请求。
发起请求
一旦建立连接,双方可以通过其中一个帧 REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或 REQUEST_FNF
发起请求。每个帧都携带一条从请求者到响应者的消息。
响应者可以返回 PAYLOAD
帧带有响应消息,在 REQUEST_CHANNEL
的情况下,请求者也可以发送 PAYLOAD
帧带有更多请求消息。
当请求涉及一系列消息,例如 Request-Stream
和 Channel
时,响应者必须尊重请求者的需求信号。需求以消息数量的形式表达。初始需求在 REQUEST_STREAM
和 REQUEST_CHANNEL
帧中指定。后续需求通过 REQUEST_N
帧进行信号传递。
每一方也可以通过 METADATA_PUSH
帧发送元数据通知,这些通知与任何单个请求无关,而是与整个连接相关。
消息格式
RSocket 消息包含数据和元数据。元数据可以用于发送路由、安全令牌等。数据和元数据可以采用不同的格式。每种类型的 MIME 类型在 SETUP
帧中声明,并适用于给定连接上的所有请求。
虽然所有消息都可以有元数据,但通常像路由这样的元数据是每个请求的,因此仅包含在请求的第一条消息中,即与以下某个帧 REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或 REQUEST_FNF
一起。
协议扩展定义了在应用程序中使用的通用元数据格式:
Java 实现
RSocket 的 Java 实现 是基于 Project Reactor 构建的。TCP 和 WebSocket 的传输是基于 Reactor Netty 构建的。作为一个反应式流库,Reactor 简化了协议的实现工作。对于应用程序来说,使用 Flux
和 Mono
结合声明式操作符和透明的背压支持是非常自然的选择。
RSocket Java 中的 API 故意保持简约和基础。它专注于协议特性,并将应用程序编程模型(例如,RPC 代码生成与其他)作为一个更高层次的独立问题。
主要合同 io.rsocket.RSocket 模型四种请求交互类型,其中 Mono
代表单个消息的承诺,Flux
代表消息流,io.rsocket.Payload
是实际消息,提供对数据和元数据的访问,作为字节缓冲区。RSocket
合同是对称使用的。对于请求,应用程序提供一个 RSocket
以执行请求。对于响应,应用程序实现 RSocket
以处理请求。
这并不是一个全面的介绍。大多数情况下,Spring 应用程序不需要直接使用它的 API。然而,独立于 Spring 查看或实验 RSocket 可能是重要的。RSocket Java 仓库包含多个 示例应用,展示了其 API 和协议特性。
Spring 支持
spring-messaging
模块包含以下内容:
-
RSocketRequester — 流畅的 API,通过
io.rsocket.RSocket
进行请求,支持数据和元数据的编码/解码。 -
Annotated Responders — 使用
@MessageMapping
和@RSocketExchange
注解的处理方法,用于响应。 -
RSocket Interface — 作为 Java 接口的 RSocket 服务声明,包含
@RSocketExchange
方法,可用作请求者或响应者。
spring-web
模块包含 Encoder
和 Decoder
实现,例如 Jackson CBOR/JSON 和 Protobuf,这些是 RSocket 应用程序可能需要的。它还包含可以插入以实现高效路由匹配的 PathPatternParser
。
Spring Boot 2.2 支持通过 TCP 或 WebSocket 搭建 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 暴露 RSocket 的选项。还支持客户端,并为 RSocketRequester.Builder
和 RSocketStrategies
提供自动配置。有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。
Spring Security 5.2 提供了 RSocket 支持。
Spring Integration 5.2 提供了与 RSocket 客户端和服务器交互的入站和出站网关。有关更多详细信息,请参阅 Spring Integration 参考手册。
Spring Cloud Gateway 支持 RSocket 连接。
RSocketRequester
RSocketRequester
提供了一个流畅的 API 来执行 RSocket 请求,接受和返回数据和元数据的对象,而不是低级数据缓冲区。它可以对称地使用,用于从客户端发起请求和从服务器发起请求。
客户端请求者
在客户端获取 RSocketRequester
是通过连接到服务器,这涉及发送一个 RSocket SETUP
帧以及连接设置。 RSocketRequester
提供了一个构建器,帮助准备一个 io.rsocket.core.RSocketConnector
,包括 SETUP
帧的连接设置。
这是连接默认设置的最基本方法:
- Java
- Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上述内容不会立即连接。当发出请求时,会透明地建立一个共享连接并使用它。
连接设置
RSocketRequester.Builder
提供以下选项来定制初始 SETUP
帧:
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
— 要包含在SETUP
中的数据。 -
setupRoute(String, Object…)
— 要包含在SETUP
中的元数据路由。 -
setupMetadata(Object, MimeType)
— 要包含在SETUP
中的其他元数据。
对于数据,默认的 mime 类型是从第一个配置的 Decoder
中派生的。对于元数据,默认的 mime 类型是 复合元数据,它允许每个请求多个元数据值和 mime 类型对。通常这两者都不需要更改。
SETUP
帧中的数据和元数据是可选的。在服务器端,可以使用 @ConnectMapping 方法来处理连接的开始和 SETUP
帧的内容。元数据可用于连接级别的安全性。
策略
RSocketRequester.Builder
接受 RSocketStrategies
来配置请求者。您需要使用此配置来提供数据和元数据值的(反)序列化的编码器和解码器。默认情况下,仅注册了来自 spring-core
的基本编解码器,用于 String
、byte[]
和 ByteBuffer
。添加 spring-web
可以提供更多可以注册的编解码器,如下所示:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
旨在重用。在某些场景中,例如客户端和服务器在同一个应用程序中,可能更倾向于在 Spring 配置中声明它。
客户端响应者
RSocketRequester.Builder
可用于配置来自服务器的请求的响应者。
您可以使用注解处理程序进行客户端响应,基于与服务器上使用的相同基础设施,但以如下方式进行程序注册:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) 1
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); 2
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) 3
.tcp("localhost", 7000);
如果存在
spring-web
,请使用PathPatternRouteMatcher
以实现高效的路由匹配。从具有
@MessageMapping
和/或@ConnectMapping
方法的类中创建响应者。注册响应者。
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) 1
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); 2
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } 3
.tcp("localhost", 7000)
如果存在
spring-web
,请使用PathPatternRouteMatcher
以实现高效的路由匹配。从具有
@MessageMapping
和/或@ConnectMapping
方法的类中创建响应者。注册响应者。
注意,上述内容仅是为程序化注册客户端响应者设计的快捷方式。对于其他场景,如果客户端响应者在 Spring 配置中,您仍然可以将 RSocketMessageHandler
声明为 Spring bean,然后按如下方式应用:
- Java
- Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述内容,您可能还需要在 RSocketMessageHandler
中使用 setHandlerPredicate
来切换到不同的策略,以检测客户端响应者,例如,基于自定义注解,如 @RSocketClientResponder
与默认的 @Controller
。在客户端和服务器,或同一应用程序中的多个客户端的场景中,这是必要的。
另请参见 Annotated Responders,以获取有关编程模型的更多信息。
高级
RSocketRequesterBuilder
提供了一个回调,以暴露底层的 io.rsocket.core.RSocketConnector
,以便进行进一步的配置选项,例如保持活动间隔、会话恢复、拦截器等。您可以按如下方式在该级别配置选项:
- Java
- Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求者
从服务器向连接的客户端发出请求,关键在于从服务器获取连接客户端的请求者。
在 Annotated Responders 中,@ConnectMapping
和 @MessageMapping
方法支持一个 RSocketRequester
参数。使用它来访问连接的请求者。请记住,@ConnectMapping
方法本质上是 SETUP
帧的处理程序,必须在请求开始之前处理。因此,开始时的请求必须与处理解耦。例如:
- Java
- Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { 1
// ...
});
return ... 2
}
异步启动请求,与处理无关。
执行处理并返回完成
Mono<Void>
。
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 1
// ...
}
}
/// ... // <2>
}
异步启动请求,与处理无关。
在挂起函数中执行处理。
Requests
- Java
- Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") 1
.data(viewBox) 2
.retrieveFlux(AirportLocation.class); 3
指定要包含在请求消息元数据中的路由。
提供请求消息的数据。
声明预期的响应。
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") 1
.data(viewBox) 2
.retrieveFlow<AirportLocation>() 3
指定要包含在请求消息元数据中的路由。
提供请求消息的数据。
声明预期的响应。
交互类型是根据输入和输出的基数隐式确定的。上述示例是一个 Request-Stream
,因为发送了一个值并接收了一系列值。在大多数情况下,只要输入和输出的选择与 RSocket 交互类型匹配,并且与响应者预期的输入和输出类型相符,您就不需要考虑这一点。唯一一个无效组合的例子是多对一。
data(Object)
方法还接受任何 Reactive Streams Publisher
,包括 Flux
和 Mono
,以及在 ReactiveAdapterRegistry
中注册的任何其他值生产者。对于生成相同类型值的多值 Publisher
(例如 Flux
),考虑使用重载的 data
方法之一,以避免对每个元素进行类型检查和 Encoder
查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步骤是可选的。对于不发送数据的请求,可以跳过它:
- Java
- Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用 复合元数据(默认情况下)并且这些值受到注册的 Encoder
支持,则可以添加额外的元数据值。例如:
- Java
- Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于 Fire-and-Forget
,使用返回 Mono<Void>
的 send()
方法。请注意,Mono
仅表示消息已成功发送,而不表示消息已被处理。
对于 Metadata-Push
,使用 sendMetadata()
方法,返回值为 Mono<Void>
。
注释响应者
RSocket 响应者可以实现为 @MessageMapping
和 @ConnectMapping
方法。 @MessageMapping
方法处理单个请求,而 @ConnectMapping
方法处理连接级事件(设置和元数据推送)。 注解响应者在服务器端和客户端的响应中是对称支持的。
服务器响应者
要在服务器端使用注解响应者,请将 RSocketMessageHandler
添加到您的 Spring 配置中,以检测带有 @MessageMapping
和 @ConnectMapping
方法的 @Controller
bean:
- Java
- Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动一个 RSocket 服务器,并为响应者插入 RSocketMessageHandler
,如下所示:
- Java
- Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持 复合 和 路由 元数据。如果需要切换到不同的 MIME 类型或注册额外的元数据 MIME 类型,您可以设置其 MetadataExtractor。
您需要设置所需的 Encoder
和 Decoder
实例,以支持元数据和数据格式。您可能需要 spring-web
模块来实现编解码器。
默认情况下,SimpleRouteMatcher
用于通过 AntPathMatcher
匹配路由。我们建议使用 spring-web
中的 PathPatternRouteMatcher
进行高效的路由匹配。RSocket 路由可以是层次结构的,但不是 URL 路径。这两种路由匹配器默认配置为使用 "." 作为分隔符,并且没有像 HTTP URL 那样的 URL 解码。
RSocketMessageHandler
可以通过 RSocketStrategies
进行配置,如果您需要在同一进程中的客户端和服务器之间共享配置,这可能会很有用:
- Java
- Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应者
客户端的注释响应者需要在 RSocketRequester.Builder
中进行配置。有关详细信息,请参见 Client Responders。
@MessageMapping
- Java
- Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述 @MessageMapping
方法响应具有路由 "locate.radars.within" 的请求流交互。它支持灵活的方法签名,可以使用以下方法参数:
方法参数 | 描述 |
---|---|
@Payload | 请求的有效负载。这可以是异步类型的具体值,如 Mono 或 Flux 。注意: 使用此注解是可选的。一个不是简单类型且不属于其他支持参数的方法参数,被假定为预期的有效负载。 |
RSocketRequester | 用于向远程端发起请求的请求者。 |
@DestinationVariable | 从路由中提取的值,基于映射模式中的变量,例如 @MessageMapping("find.radar.{id}") 。 |
@Header | 注册用于提取的元数据值,如 MetadataExtractor 中所述。 |
@Headers Map<String, Object> | 注册用于提取的所有元数据值,如 MetadataExtractor 中所述。 |
返回值预计为一个或多个对象,这些对象将被序列化为响应有效负载。这可以是异步类型,如 Mono
或 Flux
,一个具体值,或者是 void
或无值的异步类型,如 Mono<Void>
。
@MessageMapping
方法支持的 RSocket 交互类型是根据输入(即 @Payload
参数)和输出的基数来确定的,其中基数的含义如下:
基数 | 描述 |
---|---|
1 | 这是一个显式值,或一个单值异步类型,例如 Mono<T> 。 |
多 | 一个多值异步类型,例如 Flux<T> 。 |
0 | 对于输入,这意味着该方法没有 @Payload 参数。对于输出,这是 void 或一个无值异步类型,例如 Mono<Void> 。 |
下表显示了所有输入和输出基数组合及其对应的交互类型:
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 | 0 | Fire-and-Forget, Request-Response |
0, 1 | 1 | Request-Response |
0, 1 | Many | Request-Stream |
Many | 0, 1, Many | Request-Channel |
@RSocketExchange
作为 @MessageMapping
的替代方案,您还可以使用 @RSocketExchange
方法处理请求。这些方法在 RSocket 接口 上声明,可以通过 RSocketServiceProxyFactory
作为请求者使用,或由响应者实现。
例如,作为响应者处理请求:
- Java
- Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExhange
和 @MessageMapping
之间存在一些差异,因为前者需要适用于请求者和响应者的使用。例如,虽然 @MessageMapping
可以声明处理任意数量的路由,并且每个路由可以是一个模式,但 @RSocketExchange
必须声明一个单一的、具体的路由。在与元数据相关的支持方法参数方面也存在一些小差异,具体支持的参数列表请参见 @MessageMapping 和 RSocket Interface。
@RSocketExchange
可以在类型级别使用,以指定给定 RSocket 服务接口的所有路由的公共前缀。
@ConnectMapping
@ConnectMapping
处理 RSocket 连接开始时的 SETUP
帧,以及通过 METADATA_PUSH
帧的任何后续元数据推送通知,即 metadataPush(Payload)
在 io.rsocket.RSocket
中。
@ConnectMapping
方法支持与 @MessageMapping 相同的参数,但基于来自 SETUP
和 METADATA_PUSH
帧的元数据和数据。 @ConnectMapping
可以具有一个模式,以将处理范围缩小到在元数据中具有路由的特定连接,或者如果未声明模式,则所有连接都匹配。
@ConnectMapping
方法不能返回数据,必须声明为 void
或 Mono<Void>
作为返回值。如果处理返回一个新连接的错误,则该连接会被拒绝。处理不得被阻塞以向 RSocketRequester
发起连接请求。有关详细信息,请参见 Server Requester。
MetadataExtractor
响应者必须解释元数据。 复合元数据 允许独立格式化的元数据值(例如,用于路由、安全、追踪),每个都有其自己的 mime 类型。 应用程序需要一种配置支持的元数据 mime 类型的方法,以及一种访问提取值的方法。
MetadataExtractor
是一个契约,用于接收序列化的元数据并返回解码的名称-值对,这些对可以像通过名称访问的头部一样被访问,例如通过注解处理方法中的 @Header
。
DefaultMetadataExtractor
可以接受 Decoder
实例来解码元数据。开箱即用,它内置支持 "message/x.rsocket.routing.v0",将其解码为 String
并保存在 "route" 键下。对于任何其他 MIME 类型,您需要提供一个 Decoder
并按如下方式注册 MIME 类型:
- Java
- Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据很好地结合了独立的元数据值。然而,请求者可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor
可能需要自定义逻辑将解码的值映射到输出映射中。以下是一个使用 JSON 作为元数据的示例:
- Java
- Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
在通过 RSocketStrategies
配置 MetadataExtractor
时,您可以让 RSocketStrategies.Builder
使用配置的解码器创建提取器,并简单地使用回调自定义注册,如下所示:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring 框架允许您将 RSocket 服务定义为具有 @RSocketExchange
方法的 Java 接口。您可以将这样的接口传递给 RSocketServiceProxyFactory
来创建一个代理,该代理通过 RSocketRequester 执行请求。您还可以将接口实现为处理请求的响应者。
首先创建带有 @RSocketExchange
方法的接口:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在你可以创建一个代理,当方法被调用时执行请求:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现接口以作为响应者处理请求。请参见 Annotated Responders。
方法参数
注释的 RSocket 交换方法支持灵活的方法签名,具有以下方法参数:
方法参数 | 描述 |
---|---|
@DestinationVariable | 添加一个路由变量,以便与 @RSocketExchange 注解中的路由一起传递给 RSocketRequester ,以扩展路由中的模板占位符。该变量可以是一个 String 或任何 Object,然后通过 toString() 格式化。 |
@Payload | 设置请求的输入负载。可以是一个具体值,或任何可以通过 ReactiveAdapterRegistry 适配为 Reactive Streams Publisher 的值的生产者。除非将 required 属性设置为 false ,或者根据 MethodParameter#isOptional 确定参数是可选的,否则必须提供负载。 |
Object ,如果后面跟着 MimeType | 输入负载中元数据条目的值。只要下一个参数是元数据条目 MimeType ,这可以是任何 Object 。该值可以是一个具体值或任何可以通过 ReactiveAdapterRegistry 适配为 Reactive Streams Publisher 的单值生产者。 |
MimeType | 元数据条目的 MimeType 。前面的方法参数预计是元数据值。 |
返回值
注释的 RSocket 交换方法支持返回具体值或任何可以通过 ReactiveAdapterRegistry
适配为 Reactive Streams Publisher
的值的生产者。
默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于底层 RSocket ClientTransport
的响应超时设置以及 RSocket 保活设置。 RSocketServiceProxyFactory.Builder
确实提供了一个 blockTimeout
选项,允许您配置等待响应的最长时间,但我们建议在 RSocket 级别配置超时值以获得更好的控制。