跳到主要内容

数据缓冲区和编解码器

ChatGPT-4o-mini 中英对照 Data Buffers and Codecs

Java NIO 提供了 ByteBuffer,但许多库在其上构建了自己的字节缓冲区 API,特别是在网络操作中,重用缓冲区和/或使用直接缓冲区对性能有益。例如,Netty 有 ByteBuf 层次结构,Undertow 使用 XNIO,Jetty 使用带有回调释放的池化字节缓冲区,等等。spring-core 模块提供了一组抽象,以便与各种字节缓冲区 API 一起使用,如下所示:

DataBufferFactory

DataBufferFactory 用于以两种方式创建数据缓冲区:

  1. 分配一个新的数据缓冲区,选择性地提前指定容量(如果已知),这更高效,即使 DataBuffer 的实现可以按需增长和缩小。

  2. 包装一个现有的 byte[]java.nio.ByteBuffer,这会用 DataBuffer 实现装饰给定的数据,并且不涉及分配。

请注意,WebFlux 应用程序并不直接创建 DataBufferFactory,而是通过 ServerHttpResponse 或客户端的 ClientHttpRequest 访问它。工厂的类型取决于底层的客户端或服务器,例如,Reactor Netty 使用 NettyDataBufferFactory,其他则使用 DefaultDataBufferFactory

DataBuffer

DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但还带来了一些额外的好处,其中一些灵感来自于 Netty 的 ByteBuf。以下是部分好处的列表:

  • 以独立位置进行读写,即不需要调用 flip() 来在读和写之间切换。

  • 根据需要扩展容量,类似于 java.lang.StringBuilder

  • 通过 PooledDataBuffer 实现缓冲池和引用计数。

  • 将缓冲区视为 java.nio.ByteBufferInputStreamOutputStream

  • 确定给定字节的索引或最后一个索引。

PooledDataBuffer

正如 ByteBuffer 的 Javadoc 中所解释的,字节缓冲区可以是直接缓冲区或非直接缓冲区。直接缓冲区可能位于 Java 堆外,这消除了本机 I/O 操作所需的复制。这使得直接缓冲区在通过套接字接收和发送数据时特别有用,但它们的创建和释放成本更高,这引出了缓冲区池的概念。

PooledDataBufferDataBuffer 的扩展,帮助进行引用计数,这对于字节缓冲池是必不可少的。它是如何工作的呢?当分配一个 PooledDataBuffer 时,引用计数为 1。调用 retain() 会增加计数,而调用 release() 会减少计数。只要计数大于 0,缓冲区就保证不会被释放。当计数减少到 0 时,池中的缓冲区可以被释放,这在实践中可能意味着为缓冲区保留的内存被返回到内存池。

请注意,在大多数情况下,直接操作 PooledDataBuffer 并不是最佳选择,使用 DataBufferUtils 中的便捷方法更好,这些方法仅在 DataBufferPooledDataBuffer 的实例时才会应用释放或保留。

DataBufferUtils

DataBufferUtils 提供了一些用于操作数据缓冲区的实用方法:

  • 将一系列数据缓冲区合并为一个单一的缓冲区,可能实现零拷贝,例如,通过复合缓冲区,如果底层字节缓冲区 API 支持的话。

  • InputStream 或 NIO Channel 转换为 Flux<DataBuffer>,反之将 Publisher<DataBuffer> 转换为 OutputStream 或 NIO Channel

  • 如果缓冲区是 PooledDataBuffer 的实例,提供释放或保留 DataBuffer 的方法。

  • 从字节流中跳过或获取特定字节数。

编解码器

org.springframework.core.codec 包提供以下策略接口:

  • Encoder 用于将 Publisher<T> 编码为数据缓冲区的流。

  • Decoder 用于将 Publisher<DataBuffer> 解码为更高级对象的流。

spring-core 模块提供 byte[]ByteBufferDataBufferResourceString 编码器和解码器实现。spring-web 模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。请参见 WebFlux 部分的 Codecs

使用 DataBuffer

在处理数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能是 pooled。我们将使用编解码器来说明这一点,但这些概念更普遍适用。让我们看看编解码器在内部必须做些什么来管理数据缓冲区。

Decoder 是最后一个读取输入数据缓冲区的组件,在创建更高层次的对象之前,因此它必须按照以下方式释放它们:

  1. 如果一个 Decoder 只是简单地读取每个输入缓冲区并准备立即释放它,可以通过 DataBufferUtils.release(dataBuffer) 来实现。

  2. 如果一个 Decoder 使用 FluxMono 操作符,如 flatMapreduce 等,这些操作符会在内部预取和缓存数据项,或者使用如 filterskip 等会省略某些项的操作符,则必须在组合链中添加 doOnDiscard(DataBuffer.class, DataBufferUtils::release),以确保在丢弃之前释放这些缓冲区,这可能也是由于错误或取消信号导致的。

  3. 如果一个 Decoder 以其他方式持有一个或多个数据缓冲区,则必须确保在完全读取后释放它们,或者在缓存的数据缓冲区被读取和释放之前发生错误或取消信号的情况下释放它们。

请注意,DataBufferUtils#join 提供了一种安全且高效的方法,将数据缓冲区流聚合为单个数据缓冲区。同样,skipUntilByteCounttakeUntilByteCount 是解码器可以使用的其他安全方法。

一个 Encoder 分配数据缓冲区,其他人必须读取(并释放)。因此,Encoder 的工作并不多。然而,如果在用数据填充缓冲区时发生序列化错误,Encoder 必须小心释放数据缓冲区。例如:

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
java

Encoder 的消费者负责释放它接收到的数据缓冲区。在 WebFlux 应用程序中,Encoder 的输出用于写入 HTTP 服务器响应,或写入客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任在于写入服务器响应或客户端请求的代码。

请注意,在 Netty 上运行时,有调试选项用于 排查缓冲区泄漏