数据缓冲区和编解码器
Java NIO提供了ByteBuffer,但许多库在此基础上构建了自己的字节缓冲区API,特别是在网络操作中,重用缓冲区和/或使用直接缓冲区对性能有益。例如,Netty有ByteBuf层次结构,Jetty使用池化的字节缓冲区,并通过回调来释放这些缓冲区,等等。spring-core模块提供了一组抽象,以便与各种字节缓冲区API进行交互,具体如下:
- DataBufferFactory 抽象了数据缓冲区的创建过程。
- DataBuffer 代表一个字节缓冲区,该缓冲区可以是池化的。
- DataBufferUtils 提供了用于数据缓冲区的实用方法。
- Codecs 将数据缓冲区流解码或编码为更高级别的对象。
DataBufferFactory
DataBufferFactory 用于以两种方式之一创建数据缓冲区:
-
分配一个新的数据缓冲区,如果已知容量,可以提前指定该容量;这样做效率更高,尽管
DataBuffer的实现可以根据需求动态扩展或收缩。 -
将现有的
byte[]或java.nio.ByteBuffer包装起来,这样会在给定的数据上添加一个DataBuffer的封装层,而无需进行额外的内存分配。
请注意,WebFlux应用程序不会直接创建DataBufferFactory,而是通过ServerHttpResponse或在客户端侧的ClientHttpRequest来访问它。该工厂的类型取决于底层的客户端或服务器;例如,对于Reactor Netty来说使用NettyDataBufferFactory,而对于其他情况则使用DefaultDataBufferFactory。
DataBuffer
DataBuffer接口提供了与java.nio(ByteBuffer)类似的操作,同时还带来了一些额外的优势,其中一些优势的灵感来自于Netty的ByteBuf。以下是这些优势的部分列表:
-
支持独立地进行读写操作,即无需调用
flip()方法来在读写之间切换。 -
容量可根据需求进行扩展,与
java.lang.StringBuilder类似。 -
采用PooledDataBuffer实现缓冲区的池化管理和引用计数。
-
可将缓冲区视为
java.nio.ByteBuffer、InputStream或OutputStream使用。 -
可确定给定字节的索引,或最后一个字节的索引。
PooledDataBuffer
如JByteBuffer的Javadoc中所解释的那样,字节数组缓冲区(byte buffers)可以是直接的(direct)或非直接的(non-direct)。直接缓冲区可以位于Java堆(Java heap)之外,这样就无需在本地I/O操作中进行数据复制。这使得直接缓冲区在通过套接字(socket)发送和接收数据时特别有用,但创建和释放直接缓冲区的成本也更高,因此就产生了缓冲池(buffer pool)的概念。
PooledDataBuffer 是 DataBuffer 的扩展,它有助于引用计数,而引用计数对于字节缓冲池的管理至关重要。它是如何工作的呢?当分配一个 PooledDataBuffer 时,其引用计数为 1。调用 retain() 会增加计数,而调用 release() 会减少计数。只要计数大于 0,就保证该缓冲区不会被释放。当计数降为 0 时,这个池化的缓冲区就可以被释放了,实际上这意味着为该缓冲区预留的内存会被归还到内存池中。
请注意,在大多数情况下,直接操作PooledDataBuffer并不是最佳选择。更好的方法是使用DataBufferUtils中的便捷方法,这些方法仅在DataBuffer是PooledDataBuffer的实例时才会应用“释放”或“保留”操作。
DataBufferUtils
DataBufferUtils 提供了许多用于操作数据缓冲区的实用方法:
-
将多个数据缓冲区串联成一个单一的缓冲区,可能实现“零拷贝”操作(zero copy),例如,如果底层的字节缓冲区API支持的话,可以通过复合缓冲区(composite buffers)来实现这一功能。
-
将
InputStream或NIOChannel转换为Flux<DataBuffer>,反之亦然,即将Publisher<DataBuffer>转换为OutputStream或NIOChannel。 -
如果
DataBuffer是PooledDataBuffer的实例,提供释放或保留该缓冲区的方法。 -
从字节流中跳过指定数量的字节,或者读取指定数量的字节。
编解码器
org.springframework.corecodec 包提供了以下策略接口:
Encoder用于将Publisher<T>编码为数据缓冲区的流。Decoder用于将Publisher<DataBuffer>解码为更高级别的对象流。
spring-core 模块提供了 byte[]、ByteBuffer、DataBuffer、Resource 和 String 的编码器与解码器实现。spring-web 模块则添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 等编码器与解码器。有关详细信息,请参阅 WebFlux 部分中的 Codecs。
使用 DataBuffer
在处理数据缓冲区时,必须格外小心确保这些缓冲区能够被释放,因为它们可能是[池化(pool)]的(#databuffers-buffer-pooled)。我们将使用编解码器(codecs)来说明这一过程是如何运作的,但这些概念具有更普遍的适用性。让我们来看看编解码器在内部需要做些什么来管理数据缓冲区。
Decoder 是最后一个读取输入数据缓冲区的组件,在创建更高级别的对象之前,它必须按照以下方式释放这些缓冲区:
-
如果一个
Decoder只是简单地读取每个输入缓冲区,并且准备立即释放它们,那么可以通过DataBufferUtils.release(dataBuffer)来执行这一操作。 -
如果
Decoder使用了Flux或Mono操作符,如flatMap、reduce等,这些操作符会在内部预取和缓存数据项;或者使用了filter、skip等操作符,这些操作符会省略某些数据项,那么必须在组合链中添加doOnDiscard(DataBuffer.class, DataBufferUtils::release),以确保这些缓冲区在被丢弃之前能够被释放,这种情况可能也是由于错误或取消信号导致的。 -
如果
Decoder以任何其他方式保留了一个或多个数据缓冲区,那么它必须确保在完全读取这些缓冲区之后,或者在发生错误或在缓存的数据缓冲区尚未被读取和释放之前就收到取消信号时,能够释放这些缓冲区。
请注意,DataBufferUtils#join提供了一种安全且高效的方法,可以将多个数据缓冲流聚合到一个数据缓冲区中。同样,skipUntilByteCount和takeUntilByteCount也是解码器可以使用的其他安全方法。
Encoder负责分配数据缓冲区,其他组件需要读取这些缓冲区的数据(并在使用完毕后释放它们)。因此,Encoder本身并没有太多需要执行的具体操作。不过,如果在用数据填充缓冲区的过程中发生序列化错误,Encoder就必须负责释放该数据缓冲区。例如:
- Java
- Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder的消费者负责释放它接收到的数据缓冲区。在WebFlux应用程序中,Encoder的输出被用来写入HTTP服务器响应中,或者写入客户端HTTP请求中;在这种情况下,释放数据缓冲区的责任就落在将数据写入服务器响应或客户端请求的代码上。
请注意,在使用 Netty 运行时,有用于排查缓冲区泄漏问题的调试选项。