跳到主要内容
版本:7.0.2

关于非阻塞 I/O (NIO)

DeepSeek V3 中英对照 About Non-blocking I/O (NIO)

使用NIO(参见IP配置属性中的using-nio)可以避免为每个套接字分配专用读取线程。对于少量套接字,您可能会发现不使用NIO并配合异步传递机制(例如传递到QueueChannel),其性能与使用NIO相当甚至更优。

在处理大量连接时,您应考虑使用NIO。然而,使用NIO会带来一些其他影响。所有套接字共享一个线程池(位于任务执行器中)。每个传入消息会被组装,并作为独立的工作单元发送到配置的通道,该工作单元由线程池中选出的一个线程执行。到达同一套接字的两个连续消息可能由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。到达套接字的消息的严格顺序无法得到保证。

对于某些应用场景,这不成问题。但对于其他场景,这可能是个问题。如果您需要严格的顺序处理,请考虑将 using-nio 设置为 false 并采用异步交接机制。

或者,您可以在入站端点的下游插入一个重排序器,以将消息恢复到正确的顺序。如果在连接工厂上将 apply-sequence 设置为 true,到达 TCP 连接的消息将设置 sequenceNumbercorrelationId 头部。重排序器会利用这些头部信息将消息恢复到正确的顺序。

important

从版本 5.1.4 开始,系统会优先接受新连接,而不是从现有连接读取数据。通常,除非您有非常高的新连接传入速率,否则这应该影响不大。如果您希望恢复为优先读取的先前行为,请将 TcpNioServerConnectionFactory 上的 multiAccept 属性设置为 false

池大小

pool-size 属性已不再使用。先前,当未指定任务执行器时,它用于指定默认线程池的大小。它也曾用于设置服务器套接字上的连接积压队列。第一个功能已不再需要;请参阅下一段。第二个功能已被 backlog 属性所取代。

此前,在使用固定线程池任务执行器(这是默认设置)配合NIO时,可能会出现死锁并导致处理停止。该问题发生在缓冲区已满、从套接字读取数据的线程试图向缓冲区添加更多数据,且没有可用线程来释放缓冲区空间时。这种情况仅在池大小非常小时才会发生,但在极端条件下仍可能出现。自2.2版本起,通过两项变更消除了此问题:首先,默认任务执行器已改为缓存线程池执行器;其次,新增了死锁检测逻辑——当出现线程饥饿时,系统会抛出异常而非陷入死锁,从而释放被死锁占用的资源。

important

现在默认的任务执行器是无界的,如果消息处理耗时较长,在高消息流入速率下可能会出现内存不足的情况。如果你的应用程序表现出此类行为,应该使用具有适当池大小的池化任务执行器,但请参阅下一节

使用 CALLER_RUNS 策略的线程池任务执行器

在使用固定线程池并配合 CallerRunsPolicy(使用 <task/> 命名空间时为 CALLER_RUNS)且队列容量较小时,您需要牢记一些重要的注意事项。

以下内容仅在不使用固定线程池时不适用。

在使用NIO连接时,有三种不同的任务类型。I/O选择器处理在一个专用线程上执行(检测事件、接受新连接,并通过任务执行器将I/O读操作分派给其他线程)。当I/O读取器线程(读操作被分派到的线程)读取数据时,它会将数据移交给另一个线程来组装传入的消息。大型消息可能需要多次读取才能完成。这些“组装器”线程在等待数据时可能会阻塞。当新的读取事件发生时,读取器会判断此套接字是否已有组装器,如果没有,则启动一个新的组装器。当组装过程完成后,组装器线程会被返回到线程池中。

当线程池耗尽、使用 CALLER_RUNS 拒绝策略且任务队列已满时,可能导致死锁。当线程池为空且队列无空间时,IO 选择器线程收到 OP_READ 事件并通过执行器调度读取操作。由于队列已满,选择器线程自身开始执行读取过程。此时它检测到该套接字没有装配器,于是在执行读取前触发装配器创建。同样因队列已满,选择器线程转而执行装配器任务。装配器随后被阻塞,等待永远不会发生的读取数据。此时连接工厂陷入死锁,因为选择器线程无法处理新事件。

为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们希望为 IO 和组装操作使用独立的线程池。

该框架提供了一个 CompositeExecutor,允许配置两个独立的执行器:一个用于执行IO操作,另一个用于消息组装。在这种环境下,IO线程永远不会成为组装线程,因此不会发生死锁。

此外,任务执行器应配置为使用 AbortPolicy(使用 <task> 时设为 ABORT)。当 I/O 任务无法完成时,它会短暂延迟并持续重试,直到能够完成并分配到一个组装器。默认延迟时间为 100 毫秒,但您可以通过在连接工厂上设置 readDelay 属性(使用 XML 命名空间配置时为 read-delay)来更改此设置。

以下三个示例展示了如何配置复合执行器:

@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>