关于非阻塞 I/O (NIO)
使用 NIO(参见 using-nio
在 IP 配置属性 中)可以避免为每个套接字分配一个线程。对于少量的套接字,您可能会发现不使用 NIO,结合异步交接(例如到 QueueChannel
),性能与使用 NIO 一样好或更好。
当你处理大量连接时,应该考虑使用 NIO。然而,NIO 的使用会带来一些其他的影响。一个线程池(在任务执行器中)被所有套接字共享。每个传入的消息都会被组装,并作为一个独立的工作单元发送到配置的通道上,所使用的线程是从该池中选择的。两个连续到达同一套接字的消息可能会由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。严格来说,到达套接字的消息顺序不会被保持。
对于某些应用程序,这不是一个问题。对于其他应用程序,这是一个问题。如果您需要严格的顺序,考虑将 using-nio
设置为 false
并使用异步交接。
或者,你可以在入站端点的下游插入一个重新排序器,以将消息恢复到正确的顺序。如果你将连接工厂的 apply-sequence
设置为 true
,到达 TCP 连接的消息会带有 sequenceNumber
和 correlationId
头。重新排序器使用这些头信息将消息恢复到正确的顺序。
从 5.1.4 版本开始,优先接受新连接而不是从现有连接读取。通常情况下,这影响很小,除非你有非常高的新入站连接速率。如果你想恢复到之前的行为,即优先进行读取,可以将 TcpNioServerConnectionFactory
上的 multiAccept
属性设置为 false
。
连接池大小
池大小属性不再使用。以前,当未指定任务执行器时,它指定了默认线程池的大小。它还用于设置服务器套接字上的连接积压。第一个功能已不再需要(请参见下一段)。第二个功能已被 backlog
属性取代。
之前,在使用固定线程池任务执行器(这是默认设置)与 NIO 时,可能会遇到死锁,导致处理停止。当缓冲区已满,从套接字读取的线程试图向缓冲区添加更多数据,并且没有可用的线程来腾出缓冲区空间时,问题就会发生。这只会发生在非常小的池大小时,但在极端条件下也有可能发生。自 2.2 版本以来,两项更改消除了这个问题。首先,默认的任务执行器是一个缓存线程池执行器。其次,添加了死锁检测逻辑,因此如果出现线程饥饿,不会发生死锁,而是抛出异常,从而释放死锁的资源。
由于默认任务执行器是无界的,因此如果消息处理需要较长时间,高频率的传入消息可能会导致内存不足的情况。如果您的应用程序表现出这种行为,您应该使用带有适当池大小的线程池任务执行器,但请参阅下一节。
使用 CALLER_RUNS
策略的线程池任务执行器
当线程池使用 CALLER_RUNS
策略时,如果线程池无法分配新的线程来执行任务,则提交任务的线程(即调用者线程)会自己执行该任务。这种策略可以防止任务被拒绝执行,并且在高负载情况下能够缓解线程池的压力。
当你使用带有 CallerRunsPolicy
(在使用 <task/>
命名空间时为 CALLER_RUNS
)的固定线程池时,并且队列容量较小时,应该记住一些重要的注意事项。
以下内容不适用于你不使用固定线程池的情况。
使用 NIO 连接时,存在三种不同的任务类型。I/O 选择器处理是在一个专用线程上执行的(检测事件、接受新连接,并通过使用任务执行器将 I/O 读操作分派给其他线程)。当 I/O 读取线程(读操作被分派到该线程)读取数据时,它会将数据传递给另一个线程来组装传入的消息。大消息可能需要多次读取才能完成。这些“组装”线程可以在等待数据时阻塞。当发生新的读取事件时,读取器会确定此套接字是否已经有一个组装程序,如果没有,则运行一个新的组装程序。当组装过程完成后,组装线程会被返回到线程池。
这会在池耗尽、使用 CALLER_RUNS
拒绝策略且任务队列已满时导致死锁。当池为空且队列中没有空间时,IO 选择线程接收一个 OP_READ
事件并使用执行器调度读取操作。由于队列已满,选择线程本身开始读取过程。此时它检测到此套接字没有组装程序,并在进行读取之前触发一个组装程序。同样,队列已满,选择线程成为组装程序。现在组装程序被阻塞,等待从未发生的数据读取。连接工厂因此陷入死锁,因为选择线程无法处理新事件。
为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们希望为 IO 和组装操作使用单独的线程池。
该框架提供了一个 CompositeExecutor
,它允许配置两个不同的执行器:一个用于执行 IO 操作,另一个用于消息组装。在这种环境下,IO 线程永远不会成为组装线程,因此死锁不会发生。
此外,任务执行者应该被配置为使用 AbortPolicy
(在使用 <task>
时为 ABORT
)。当 I/O 任务无法完成时,它会被延迟一小段时间,并不断重试,直到能够完成并分配一个组装器。默认情况下,延迟时间为 100 毫秒,但您可以通过在连接工厂上设置 readDelay
属性(在使用 XML 命名空间配置时为 read-delay
)来更改它。
以下三个示例显示了如何配置复合执行器:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>