TCP 连接工厂
概述
对于 TCP,底层连接的配置是通过连接工厂来提供的。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂用于建立出站连接。服务器连接工厂则用于监听入站连接。
出站通道适配器使用客户端连接工厂,但你也可以为入站通道适配器提供对客户端连接工厂的引用。该适配器接收通过出站适配器创建的连接上收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。(实际上,没有它,连接工厂无法正常工作。)你也可以为出站适配器提供对服务器连接工厂的引用。然后,你可以使用该适配器在同一连接上发送对传入消息的回复。
只有当回复消息包含由连接工厂插入到原始消息中的 ip_connectionId 头部时,回复消息才会被路由到该连接。
这是在入站和出站适配器之间共享连接工厂时所执行的消息关联程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,仅使用 TCP 传输有效负载信息。因此,任何消息关联必须由下游组件(如聚合器或其他端点)执行。从版本 3.0 开始引入了对传输选定消息头的支持。更多信息,请参阅 TCP 消息关联。
每种类型的适配器最多只能引用一个连接工厂。
Spring Integration 提供了使用 java.net.Socket 和 java.nio.channel.SocketChannel 的连接工厂。
以下示例展示了一个使用 java.net.Socket 连接的简单服务器连接工厂:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例展示了一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 4.2 版本开始,如果服务器被配置为监听随机端口(通过将端口设置为 0),你可以使用 getPort() 方法来获取操作系统实际选择的端口。此外,getServerSocketAddress() 方法允许你获取完整的 SocketAddress。更多信息请参阅 TcpServerConnectionFactory 接口的 Javadoc。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例展示了一个使用 java.net.Socket 连接并为每条消息创建新连接的客户端连接工厂:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从 5.2 版本开始,客户端连接工厂支持 connectTimeout 属性,以秒为单位指定,默认值为 60。
消息定界(序列化器与反序列化器)
TCP是一种流式协议。这意味着需要通过TCP传输的数据必须具有某种结构,以便接收方能够将数据划分为离散的消息。连接工厂通过配置序列化器和反序列化器,实现消息负载与TCP传输比特流之间的相互转换。具体而言,需要分别为入站和出站消息提供反序列化器和序列化器。Spring Integration提供了多种标准序列化器和反序列化器。
ByteArrayCrlfSerializer* 将字节数组转换为字节流,并在其后附加回车和换行字符 (\r\n)。这是默认的序列化器(及反序列化器),可用于(例如)telnet 作为客户端。
ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,并在其后附加单个终止字符(默认为 0x00)。
ByteArrayLfSerializer* 将字节数组转换为字节流,并在其后附加一个换行符 (0x0a)。
ByteArrayStxEtxSerializer* 将字节数组转换为以 STX (0x02) 开头、以 ETX (0x03) 结尾的字节流。
ByteArrayLengthHeaderSerializer 将字节数组转换为字节流,并在其前添加网络字节序(大端序)的二进制长度头。这是一种高效的解串器,因为它无需解析每个字节来查找终止字符序列。它也可用于包含二进制数据的负载。之前的串行器仅支持负载中的文本。长度头的默认大小为四个字节(一个整数),允许消息最大为 (2^31 - 1) 字节。然而,对于最大 255 字节的消息,长度头可以是单个字节(无符号);对于最大 (2^16 - 1) 字节的消息,可以是无符号短整型(2 字节)。如果您需要其他格式的头部,可以继承 ByteArrayLengthHeaderSerializer 并为 readHeader 和 writeHeader 方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从版本 5.2 开始,头部值除了负载长度外还可以包含头部本身的长度。设置 inclusive 属性以启用该机制(生产者和消费者必须设置为相同的值)。
ByteArrayRawSerializer* 将字节数组转换为字节流,且不添加任何额外的消息界定数据。使用此序列化器(及反序列化器)时,消息的结束由客户端以有序方式关闭套接字来指示。使用此序列化器时,消息接收会挂起,直到客户端关闭套接字或发生超时。超时不会产生消息。当使用此序列化器且客户端为 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会使适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应仅将此序列化器与通道适配器(而非网关)使用的连接工厂配合使用,且连接工厂应仅由入站或出站适配器之一使用,不可两者共用。另请参阅本节稍后的 ByteArrayElasticRawDeserializer。然而,自版本 5.2 起,出站网关新增了一个属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 会发送给服务器,同时保持连接开放以接收回复。
在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器会将读取超时视为文件结束,并将已读取的数据作为消息发出。这种方式不可靠,不应用于界定消息。现在,此类情况将被视为异常。如果您以这种方式使用它(这种情况很少见),可以通过将构造函数参数 treatTimeoutAsEndOfMessage 设置为 true 来恢复之前的行为。
这些类都是 AbstractByteArraySerializer 的子类,该类同时实现了 org.springframework.core.serializer.Serializer 和 org.springframework.core.serializer.Deserializer 接口。为保持向后兼容性,使用 AbstractByteArraySerializer 任何子类进行序列化的连接也接受 String 类型输入,该字符串会首先被转换为字节数组。每个序列化器和反序列化器都会将包含相应格式的输入流转换为字节数组负载。
为避免因客户端行为不当(即未遵循所配置序列化器的协议)而导致内存耗尽,这些序列化器会设置最大消息大小限制。如果传入的消息超过此大小,将抛出异常。默认的最大消息大小为 2048 字节。您可以通过设置 maxMessageSize 属性来增加此限制。如果使用默认的序列化器或反序列化器并希望增加最大消息大小,必须将最大消息大小声明为一个显式的 bean,并设置 maxMessageSize 属性,然后配置连接工厂以使用该 bean。
本节前面标记为 * 的类使用中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。从版本 4.3 开始,你可以通过设置 poolSize 属性来配置这些缓冲区,让这些原始缓冲区被重复使用,而不是像默认行为那样为每条消息分配和丢弃。将该属性设置为负值会创建一个无限制的池。如果池是有界的,你还可以设置 poolWaitTimeout 属性(以毫秒为单位),如果在指定时间内没有可用的缓冲区,则会抛出异常。该属性默认值为无限。此类异常会导致套接字被关闭。
若希望在自定义反序列化器中使用相同的机制,你可以扩展 AbstractPooledBufferByteArraySerializer(而非其父类 AbstractByteArraySerializer),并实现 doDeserialize() 方法而非 deserialize() 方法。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个便捷的实用方法:copyToSizedArray()。
版本 5.0 新增了 ByteArrayElasticRawDeserializer。它与上述 ByteArrayRawSerializer 的反序列化端类似,但无需设置 maxMessageSize。在内部,它使用一个 ByteArrayOutputStream,允许缓冲区根据需要增长。客户端必须有序地关闭套接字以表示消息的结束。
此反序列化器仅应在信任对等方时使用;由于内存不足的情况,它容易受到 DoS 攻击。
MapJsonSerializer 使用 Jackson ObjectMapper 在 Map 和 JSON 之间进行转换。你可以将此序列化器与 MessageConvertingTcpMessageMapper 和 MapMessageConverter 结合使用,以 JSON 格式传输选定的头部信息和负载。
Jackson ObjectMapper 无法在流中划分消息边界。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息边界划分。默认情况下,使用 ByteArrayLfSerializer,这会在传输线上产生格式为 <json><LF> 的消息,但你可以配置它使用其他序列化器。(下一个示例展示了如何操作。)
最终的标准序列化器是 org.springframework.core.serializer.DefaultSerializer,你可以用它通过 Java 序列化来转换可序列化的对象。org.springframework.core.serializer.DefaultDeserializer 则用于对包含可序列化对象的流进行入站反序列化。
如果你不希望使用默认的序列化器和反序列化器(ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializer 和 deserializer 属性。以下示例展示了如何操作:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个使用 java.net.Socket 连接并在传输过程中采用 Java 序列化的服务器连接工厂。
有关连接工厂可用属性的完整详细信息,请参阅本节末尾的参考文档。
默认情况下,不会对入站数据包执行反向DNS查询:在未配置DNS的环境中(例如Docker容器),这可能导致连接延迟。若要将IP地址转换为主机名以用于消息头,可通过将 lookup-host 属性设置为 true 来覆盖默认行为。
你也可以修改套接字和套接字工厂的属性。更多信息请参阅 SSL/TLS 支持。如文档所述,无论是否使用 SSL,此类修改都是可行的。
主机验证
自 5.1.0 版本起,默认启用主机验证以增强安全性。此功能确保在 TCP 连接过程中验证服务器的身份。
如果遇到需要禁用主机验证的场景(不推荐),可以在 tcp-connection-factory 中配置 socket-support 属性。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="0"
socket-support="customSocketSupport"
single-use="true"
so-timeout="10000"/>
<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
<constructor-arg value="false" />
</bean>
自定义序列化器和反序列化器
如果你的数据格式不受标准反序列化器支持,你可以自行实现一个;同样地,你也可以实现自定义的序列化器。
要实现自定义的序列化器和反序列化器对,需要实现 org.springframework.core.serializer.Deserializer 和 org.springframework.core.serializer.Serializer 接口。
当反序列化器在消息之间检测到输入流已关闭时,必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表明关闭是“正常的”。如果在解码消息时流被关闭,则应抛出其他异常。
从 5.2 版本开始,SoftEndOfStreamException 现在是一个 RuntimeException,而不再继承 IOException。
TCP 缓存客户端连接工厂
如之前所述,TCP套接字可以是“单次使用”(处理单个请求或响应)或共享使用。在高流量环境中,共享套接字与出站网关配合使用时性能不佳,因为套接字一次只能处理一个请求或响应。
为了提高性能,您可以使用协作通道适配器替代网关,但这需要应用级别的消息关联。更多信息请参阅 TCP 消息关联。
Spring Integration 2.2 引入了缓存客户端连接工厂,该工厂使用共享套接字池,允许网关通过共享连接池处理多个并发请求。
TCP 故障转移客户端连接工厂
您可以配置一个支持故障转移到一台或多台其他服务器的 TCP 连接工厂。在发送消息时,该工厂会遍历其所有已配置的工厂,直到消息能够发送或找不到连接为止。初始时,使用配置列表中的第一个工厂。如果后续连接失败,则下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 属性必须在工厂本身与其配置使用的工厂列表之间保持一致。
当与共享连接(singleUse=false)一起使用时,连接工厂有两个与故障恢复相关的属性:
-
refreshSharedInterval -
closeOnRefresh
考虑以下基于上述配置的场景:假设 clientFactory1 无法建立连接,但 clientFactory2 可以。当 refreshSharedInterval 过后调用 failCF 的 getConnection() 方法时,我们将再次尝试使用 clientFactory1 进行连接;如果成功,与 clientFactory2 的连接将被关闭。如果 closeOnRefresh 为 false,则“旧”连接将保持打开状态,并在未来第一个工厂再次失败时可能被重用。
将 refreshSharedInterval 设置为仅在该时间到期后尝试与第一个工厂重新连接;如果您只想在当前连接失败时回退到第一个工厂,请将其设置为 Long.MAX_VALUE(默认值)。
将 closeOnRefresh 设置为在刷新实际创建新连接后关闭“旧”连接。
如果任一委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存由该工厂处理;在这种情况下,将始终通过查询连接工厂列表来获取连接。
从版本5.3开始,这些默认值改为 Long.MAX_VALUE 和 true,因此工厂仅在当前连接失败时尝试故障恢复。若要恢复先前版本的默认行为,请将它们设置为 0 和 false。
另请参阅测试连接。
TCP 线程亲和连接工厂
Spring Integration 5.0 版本引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重用同一连接。这种情况会持续到连接关闭(由服务器或网络关闭)或线程调用 releaseConnection() 方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)连接,以便每个线程都能获得一个连接。
以下示例展示了如何配置 TCP 线程亲和性连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}