RSocket
本节描述了Spring框架对RSocket协议的支持。
概述
RSocket是一种应用协议,用于通过TCP、WebSocket和其他字节流传输方式进行多路复用、双向通信,它采用以下交互模型中的一种:
请求-响应(Request-Response)——发送一条消息并接收一条回复。请求-流(Request-Stream)——发送一条消息并接收一连串的消息作为回复。通道(Channel)——能够双向发送消息流。一次发送后忽略(Fire-and-Forget)——发送一条单向消息。
一旦初始连接建立,"客户端"与"服务器"之间的区别就不再存在了,因为双方变得对称,每一方都可以发起上述交互中的任何一种。这就是为什么在协议中,参与方被称为"请求者"(requester)和"响应者"(responder),而上述交互则被称为"请求流"(request streams)或简称为"请求"(requests)。
以下是RSocket协议的主要特性和优势:
-
Reactive Streams 的语义跨越网络边界——对于诸如
Request-Stream和Channel之类的流式请求,背压信号在请求者和响应者之间传递,使得请求者能够从源头上减缓响应者的处理速度,从而减少对网络层拥塞控制的依赖,以及减少在网络层或任何层级进行缓冲的需要。 -
请求限制——这一功能被命名为“租赁”(Leasing),是因为可以从任意一端发送
LEASE帧来限制另一端在给定时间内允许的请求数量。这些“租赁”会定期更新。 -
会话恢复——该功能是为应对连接丢失而设计的,需要维护一些状态信息。对于应用程序而言,状态管理是透明的,并且与背压机制配合得很好;背压机制可以在可能的情况下停止生产者,从而减少所需的状态信息量。
-
大消息的分片与重新组装。
-
保持连接(心跳机制)。
RSocket在多种语言中都有实现版本。其Java库是基于Project Reactor构建的,而传输层则采用了Reactor Netty。这意味着应用程序中来自Reactive Streams Publishers的信号能够通过RSocket在网络中透明地传递。
协议
连接中
最初,客户端通过某种低级别的流传输协议(如TCP或WebSocket)连接到服务器,并向服务器发送一个SETUP帧以设置连接的参数。
服务器可能会拒绝SETUP帧,但通常在客户端发送该帧且服务器收到后,双方就可以开始发起请求了,除非SETUP帧中指定了使用租赁机制(leasing semantics)来限制请求的数量;在这种情况下,双方都必须等待对方发送的LEASE帧才能继续发起请求。
提出请求
一旦连接建立,双方都可以通过REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL或REQUEST_FNF这四个帧中的任意一个来发起请求。每个这样的帧都携带一条从请求方到响应方的消息。
然后响应方可以返回包含响应消息的PAYLOAD帧,在REQUEST_CHANNEL的情况下,请求方也可以发送包含更多请求消息的PAYLOAD帧。
当一个请求涉及一系列消息(如 Request-Stream 和 Channel)时,响应者必须尊重请求者的需求信号。需求以消息的数量来表示。初始需求在 REQUEST_STREAM 和 REQUEST_CHANNEL 帧中指定。后续需求通过 REQUEST_N 帧来传达。
每一方还可以通过METADATA_push帧发送元数据通知,这些通知不涉及任何单个请求,而是与整个连接相关。
消息格式
RSocket消息包含数据和元数据。元数据可以用来发送路由信息、安全令牌等。数据和元数据的格式可以不同。每种数据的Mime类型在SETUP帧中进行了声明,并适用于给定连接上的所有请求。
虽然所有消息都可以包含元数据,但通常像路由这样的元数据是针对每个请求的,因此只会包含在请求中的第一条消息中,即在REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL或REQUEST_FNF其中一个帧中。
协议扩展定义了在应用程序中使用的通用元数据格式:
Java 实现
RSocket的Java实现是基于Project Reactor构建的。TCP和WebSocket的传输层则是基于Reactor Netty实现的。作为一款Reactive Streams库,Reactor简化了协议的实现工作。对于应用程序来说,使用Flux和Mono以及声明式操作符,并结合透明的背压支持,是非常合适的。
RSocket Java中的API被有意设计得非常简洁和基础。它专注于协议特性,而将应用程序编程模型(例如RPC代码生成等)留给更高层次、独立的开发人员来处理。
这并不旨在成为一篇全面的介绍。在大多数情况下,Spring应用程序并不需要直接使用其API。然而,了解或尝试独立于Spring的RSocket可能是重要的。RSocket Java仓库中包含了许多示例应用,这些应用展示了其API和协议特性。
Spring 支持
spring-messaging 模块包含以下内容:
-
RSocketRequester — 通过
io.rsocket.RSocket发送请求的 fluent API,支持数据和元数据的编码/解码。 -
Annotated Responders — 带有
@MessageMapping和@RSocketExchange注解的处理方法,用于响应请求。 -
RSocket Interface — 作为 Java 接口的 RSocket 服务声明,包含
@RSocketExchange方法,可用于作为请求者或响应者。
spring-web 模块包含了诸如 Jackson CBOR/JSON、Protobuf 等 Encoder 和 Decoder 实现,这些正是 RSocket 应用程序可能需要的。该模块还包含了 PathPatternParser,可以用来实现高效的路由匹配。
Spring Boot 2.2支持通过TCP或WebSocket搭建RSocket服务器,还包括在WebFlux服务器中通过WebSocket暴露RSocket服务的选项。同时,也提供了RSocketRequester.Builder和RSocketStrategies的客户端支持与自动配置功能。更多详情请参阅Spring Boot参考文档中的RSocket部分。
Spring Security 5.2 提供了 RSocket 支持。
Spring Integration 5.2提供了用于与RSocket客户端和服务器交互的入站(inbound)和出站(outbound)网关。更多详情请参阅《Spring Integration参考手册》(Spring Integration Reference Manual)。
Spring Cloud Gateway 支持 RSocket 连接。
RSocketRequester
RSocketRequester 提供了一个流畅的 API 用于执行 RSocket 请求,它接受并返回对象作为数据和元数据,而不是低级别的数据缓冲区。它可以对称地使用,既可以从客户端发起请求,也可以从服务器发起请求。
客户端请求器
要在客户端获取一个RSocketRequester,需要连接到服务器,这个过程涉及发送一个包含连接设置的RSocket SETUP帧。RSocketRequester提供了一个构建器,该构建器有助于准备一个io.rsocket.core.RSocketConnector,其中包含了用于SETUP帧的连接设置。
这是使用默认设置进行连接的最基本方法:
- Java
- Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上述内容并非立即连接。当发出请求时,会透明地建立一个共享连接并使用该连接。
连接设置
RSocketRequester.Builder 提供了以下功能来定制初始的 SETUP 帧:
dataMimeType(MimeType)— 设置连接上数据的MIME类型。metadataMimeType(MimeType)— 设置连接上元数据的MIME类型。setupData(Object)— 要包含在SETUP中的数据。setupRoute(String, Object…)— 要包含在SETUP中的元数据路由。setupMetadata(Object, MimeType)— 要包含在SETUP中的其他元数据。
对于数据,默认的 MIME 类型来自第一个配置的 Decoder。对于元数据,默认的 MIME 类型是 复合元数据,它允许每个请求包含多个元数据值和 MIME 类型对。通常情况下,这两种设置都不需要更改。
在SETUP帧中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping方法来处理连接的建立以及SETUP帧的内容。元数据可以用于连接级别的安全保护。
策略
RSocketRequester.Builder 接受 RSocketStrategies 用于配置请求者。你需要使用它来提供用于数据及元数据值的(反)序列化的编码器和解码器。默认情况下,只有来自 spring-core 的基本编解码器(针对 String、byte[] 和 ByteBuffer)被注册。通过添加 spring-web,可以访问更多可注册的编解码器,具体操作如下:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new JacksonCborEncoder()))
.decoders(decoders -> decoders.add(new JacksonCborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(JacksonCborEncoder()) }
.decoders { it.add(JacksonCborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies 是为重用而设计的。在某些场景下,例如在同一应用程序中的客户端和服务器中,可能更倾向于在 Spring 配置中声明它。
客户端响应者
RSocketRequester.Builder 可用于配置服务器对请求的响应器。
你可以使用带有注释的处理程序来进行客户端响应,这些处理程序基于与服务器相同的基础设施,但需要通过以下方式以编程方式进行注册:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) 1
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); 2
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) 3
.tcp("localhost", 7000);
如果项目中包含
spring-web,请使用PathPatternRouteMatcher以实现高效的路由匹配。从具有
@MessageMapping和/或@ConnectMapping方法的类中创建响应器。注册响应器。
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) 1
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); 2
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } 3
.tcp("localhost", 7000)
如果项目中包含
spring-web,请使用PathPatternRouteMatcher以实现高效的路由匹配。从具有
@MessageMapping和/或@ConnectMapping方法的类中创建响应器。注册响应器。
请注意,上述方法仅是一种为程序化注册客户端响应器而设计的快捷方式。对于其他场景(即客户端响应器在Spring配置中的情况),你仍然可以声明RSocketMessageHandler作为Springbean,然后按如下方式进行应用:
- Java
- Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述情况,你可能还需要在RSocketMessageHandler中使用setHandlerPredicate来切换检测客户端响应者的策略,例如,基于自定义注解(如@RSocketClientResponder)而不是默认的@Controller。在存在客户端和服务器,或者同一应用程序中有多个客户端的情况下,这是必要的。
另请参阅带注释的响应者,以了解有关编程模型的更多信息。
高级
RSocketRequesterBuilder 提供了一个回调函数,用于暴露底层的 io.rsocket.core.RSocketConnector,以便进一步配置保持连接间隔(keepalive intervals)、会话恢复(session resumption)、拦截器(interceptors)等选项。你可以在该层面进行如下配置:
- Java
- Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求器
要从服务器向已连接的客户端发起请求,就需要从服务器中获取该已连接客户端的请求者信息。
在Annotated Responders中,@ConnectMapping和@MessageMapping方法支持一个RSocketRequester参数。使用它来访问连接请求者。请记住,@ConnectMapping方法本质上是SETUP帧的处理程序,在请求开始之前必须先处理这些帧。因此,最开始的请求必须与处理过程分离。例如:
- Java
- Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { 1
// ...
});
return ... 2
}
异步发起请求,与处理过程无关。
执行处理并返回完成结果
Mono VOID>。
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
_GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { 1
// ...
}
}
/// ... // <2>
}
异步发起请求,与处理过程无关。
在协程函数中执行处理。
请求
- Java
- Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") 1
.data(viewBox) 2
.retrieveFlux(AirportLocation.class); 3
指定要包含在请求消息元数据中的路由。
为请求消息提供数据。
声明预期的响应。
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") 1
.data(viewBox) 2
.retrieveFlow<AirportLocation>() 3
指定要包含在请求消息元数据中的路由。
为请求消息提供数据。
声明预期的响应。
交互类型是根据输入和输出的基数(cardinality)隐式确定的。上面的例子属于Request-Stream类型,因为发送的是一个值,而接收的是一系列值。在大多数情况下,只要你选择的输入和输出类型与RSocket的交互类型以及响应方所期望的输入和输出类型相匹配,就不需要过多考虑这一点。唯一无效的组合是“多对一”(many-to-one)的情况。
data(Object) 方法还接受任何 Reactive Streams 的 Publisher,包括 Flux 和 Mono,以及任何在 ReactiveAdapterRegistry 中注册的值生产者。对于像 Flux 这样的多值 Publisher(它产生相同类型的值),可以考虑使用其中一个重载的 data 方法,以避免对每个元素都进行类型检查和 Encoder 查找:
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) 步骤是可选的。对于不发送数据的请求,可以跳过此步骤:
- Java
- Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用复合元数据(默认设置),并且这些值被注册的Encoder支持,那么可以添加额外的元数据值。例如:
- Java
- Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于“一次发送后忽略”的情况,应使用返回MonoVoid>的send()方法。请注意,Mono仅表示消息已成功发送,并不表示消息已被处理。
对于Metadata-Push,请使用返回值为MonoVOID>的sendMetadata()方法。
带注释的响应者
RSocket响应器可以实现为@MessageMapping和@ConnectMapping方法。@MessageMapping方法处理单个请求,而@ConnectMapping方法处理连接级别的事件(设置和元数据推送)。带注释的响应器在服务器端和客户端都有支持,可以用于从任一方向进行响应。
服务器响应器
要在服务器端使用带注释的响应器(annotated responders),请在Spring配置中添加RSocketMessageHandler,以便检测具有@Controller注解以及@MessageMapping和@ConnectMapping方法的Bean:
- Java
- Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过Java RSocket API启动一个RSocket服务器,并为响应者插入RSocketMessageHandler,具体操作如下:
- Java
- Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler 默认支持 复合元数据 和 路由元数据。如果您需要切换到不同的 MIME 类型或注册额外的 MIME 元数据类型,可以设置其 MetadataExtractor。
你需要设置用于支持元数据和数据格式所需的Encoder和Decoder实例。你可能还需要spring-web模块来实现编码解码器(codec)。
默认情况下,SimpleRouteMatcher被用于通过AntPathMatcher来匹配路由。我们建议使用来自spring-web的PathPatternRouteMatcher以实现更高效的路由匹配。RSocket路由可以是分层的,但它们不是URL路径。这两种路由匹配器默认都配置为使用“.”作为分隔符,并且不像HTTP URL那样进行URL解码。
RSocketMessageHandler 可以通过 RSocketStrategies 进行配置,这在需要在同一进程中的客户端和服务器之间共享配置时非常有用:
- Java
- Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new JacksonCborEncoder()))
.decoders(decoders -> decoders.add(new JacksonCborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(JacksonCborEncoder()) }
.decoders { it.add(JacksonCborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应者
客户端上的带注释的响应器需要在RSocketRequester.Builder中进行配置。详情,请参阅客户端响应器。
@MessageMapping
- Java
- Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述@MessageMapping方法响应于路由为“locate.radars.within”的请求流交互。它支持灵活的方法签名,并允许使用以下方法参数:
| 方法参数 | 说明 |
|---|---|
@Payload | 请求的有效负载(payload)。这可以是 Mono 或 Flux 等异步类型的实际值。注意: 使用此注解是可选的。如果方法参数不是简单类型,且不属于其他支持的参数类型,则默认该参数即为预期的有效负载。 |
RSocketRequester | 用于向远程端发起请求的请求器(requester)。 |
@DestinationVariable | 根据映射模式中的变量从路由中提取的值,例如 @MessageMapping("find.radar.{id}")。 |
@Header | 如 MetadataExtractor 中所述,注册用于提取的元数据值。 |
@Headers Map<String, Object> | 如 MetadataExtractor 中所述,注册用于提取的所有元数据值。 |
返回值预期是一个或多个对象,这些对象将被序列化为响应有效载荷。这些对象可以是异步类型(如 Mono 或 Flux),也可以是具体的值,或者是 void,或者是无值的异步类型(如 Mono<void>)。
@MessageMapping 方法支持的 RSocket 交互类型取决于输入(即 @Payload 参数)和输出的基数(cardinality),这里的基数指的是以下含义:
| 基本类型(Cardinality) | 描述(Description) |
|---|---|
| 1 | 要么是显式的值,要么是单值的异步类型,例如 Mono<T>。 |
| Many | 多值的异步类型,例如 Flux<T>。 |
| 0 | 对于输入来说,这意味着该方法没有 @Payload 参数。对于输出来说,这是 void 或者是无值的异步类型,例如 MonoVoid>。 |
下表显示了所有输入和输出基数组合以及相应的交互类型:
| 输入基数 | 输出基数 | 交互类型 |
|---|---|---|
| 0, 1 | 0 | 火力发射式(Fire-and-Forget)、请求-响应(Request-Response) |
| 0, 1 | 1 | 请求-响应(Request-Response) |
| 0, 1 | 多个(Many) | 请求流(Request-Stream) |
| 多个(Many) | 0, 1, 多个(Many) | 请求通道(Request-Channel) |
@RSocketExchange
作为@MessageMapping的替代方案,您也可以使用@RSocketExchange方法来处理请求。这些方法在RSocket接口上声明,可以通过RSocketServiceProxyFactory作为请求者来使用,或者由响应者来实现。
例如,要以响应者的身份处理请求:
- Java
- Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExchange 和 @MessageMapping 之间存在一些差异,因为前者需要同时适用于请求者和响应者。例如,虽然 @MessageMapping 可以被声明为处理任意数量的路由,并且每个路由都可以是一个模式,但 @RSocketExchange 必须使用单一的具体路由进行声明。在支持的与元数据相关的方法参数方面也存在一些小的差异,有关支持的参数列表,请参阅 @MessageMapping 和 RSocket Interface。
@RSocketExchange 可以在类型级别使用,为给定的 RSocket 服务接口的所有路由指定一个通用的前缀。
@ConnectMapping
@ConnectMapping 处理 RSocket 连接开始时的 SETUP 帧,以及之后通过 METADATA_push 帧传递的任何元数据推送通知,即在 io.rsocket.RSocket 中的 metadataPush(Payload)。
@ConnectMapping 方法支持的参数与 [@MessageMapping](#rsocket-annot-messagemapping) 相同,但它是基于 SETUP 和 METADATAPush 帧中的元数据和数据来工作的。@ConnectMapping 可以设置一个模式,以便将处理限定在元数据中包含特定路由的连接上;如果没有声明任何模式,则所有连接都会被匹配。
@ConnectMapping 方法不能返回数据,必须声明返回值为 void 或 Mono.Void>。如果在处理新连接时返回错误,则该连接将被拒绝。处理过程不得延迟对 RSocketRequester 的连接请求。详情请参见 Server Requester。
MetadataExtractor
响应者必须解析元数据。复合元数据允许使用独立格式的元数据值(例如,用于路由、安全、跟踪),每个元数据值都有其自己的MIME类型。应用程序需要一种方法来配置所支持的元数据MIME类型,以及一种方法来访问提取出的元数据值。
MetadataExtractor 是一个合约,它接收序列化的元数据,并返回解码后的键值对。这些键值对之后可以像访问头部信息(headers)一样按名称进行访问,例如可以通过带注解的处理器方法中的 @Header 来访问它们。
DefaultMetadataExtractor 可以接收 Decoder 实例来解码元数据。它本身就内置了对 "message/x.rsocket.routing.v0" 的支持,该协议会被解码为 String 并保存在 "route" 键下。对于其他任何 mime 类型,你需要提供一个 Decoder,并按照以下方式注册该 mime 类型:
- Java
- Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据在结合独立元数据值时表现良好。然而,请求者可能不支持复合元数据,或者选择不使用它。为此,DefaultMetadataExtractor可能需要自定义逻辑来将解码后的值映射到输出映射中。以下是一个使用JSON作为元数据的示例:
- Java
- Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
当通过 RSocketStrategies 配置 MetadataExtractor 时,可以让 RSocketStrategies.Builder 使用配置好的解码器来创建提取器,并且只需使用一个回调函数来自定义注册过程,具体如下:
- Java
- Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket接口
Spring Framework允许你通过带有@RSocketExchange方法的Java接口来定义一个RSocket服务。你可以将这样的接口传递给RSocketServiceProxyFactory,以创建一个代理,该代理会通过RSocketRequester来执行请求。你也可以实现这个接口作为一个响应器来处理请求。
首先,使用@RSocketExchange方法创建接口:
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在你可以创建一个代理,在调用方法时执行请求:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现该接口以作为响应者来处理请求。请参阅带注释的响应者。
方法参数
带注释的RSocket交换方法支持灵活的方法签名,这些方法参数包括:
| 方法参数 | 说明 |
|---|---|
@DestinationVariable | 添加一个路由变量,与来自@RSocketExchange注解的路由一起传递给RSocketRequester,以便在路由中扩展模板占位符。该变量可以是字符串(String)或任何对象(Object),然后通过toString()方法进行格式化。 |
@Payload | 设置请求的输入负载(payload)。这可以是一个具体的值,或者是通过ReactiveAdapterRegistry能够转换为Reactive Streams Publisher的任何值生成器。除非将required属性设置为false,或者根据MethodParameter#isOptional标记为可选参数,否则必须提供负载(payload)。 |
Object(后跟MimeType) | 输入负载中的元数据条目的值。只要接下来的参数是元数据条目的MimeType,那么这个参数就可以是任何Object类型。该值可以是一个具体的值,或者是通过ReactiveAdapterRegistry能够转换为Reactive Streams Publisher的单一值的生成器。 |
MimeType | 元数据条目的MimeType。前面的方法参数应该是元数据值。 |
返回值
带注释的RSocket交换方法支持返回具体的值,或者任何可以通过ReactiveAdapterRegistry适配为Reactive Streams Publisher的值产生器。
默认情况下,具有同步(阻塞)方法签名的RSocket服务方法的行为取决于底层RSocket ClientTransport的响应超时设置以及RSocket的保持连接(keep-alive)设置。RSocketServiceProxyFactory.Builder确实提供了一个blockTimeout选项,允许你配置等待响应的最大时间,但我们建议在RSocket级别配置超时值,以便获得更大的控制权。