TCP 连接工厂
概述
对于 TCP,底层连接的配置是通过使用连接工厂提供的。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立传出连接。服务器连接工厂监听传入连接。
一个 outbound 通道适配器使用客户端连接工厂,但您也可以为 inbound 通道适配器提供对客户端连接工厂的引用。该适配器接收在由 outbound 适配器创建的连接上收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。(实际上,连接工厂无法在没有它的情况下工作)。你也可以为出站适配器提供一个服务器连接工厂的引用。然后你可以使用该适配器在同一连接上发送对入站消息的回复。
只有当回复包含连接工厂插入原始消息中的 ip_connectionId
标头时,回复消息才会路由到连接。
这是在共享入站和出站适配器之间的连接工厂时执行的消息关联的程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,仅传输有效负载信息使用 TCP。因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。版本 3.0 引入了对传输选定标题的支持。有关详细信息,请参阅 TCP 消息关联。
您可以将连接工厂的引用最多给予每种类型的 一个 适配器。
Spring Integration 提供了使用 java.net.Socket
和 java.nio.channel.SocketChannel
的连接工厂。
以下示例展示了一个简单的服务器连接工厂,它使用 java.net.Socket
连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例展示了一个简单的服务器连接工厂,它使用 java.nio.channel.SocketChannel
连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 4.2 版本开始,如果服务器配置为监听随机端口(通过将端口设置为 0
),你可以通过使用 getPort()
获取操作系统实际选择的端口。此外,getServerSocketAddress()
可让你获取完整的 SocketAddress
。更多信息请参阅 TcpServerConnectionFactory 接口的 Javadoc。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例展示了一个客户端连接工厂,它使用 java.net.Socket
连接,并为每条消息创建一个新的连接:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从 5.2 版本开始,客户端连接工厂支持 connectTimeout
属性,以秒为单位指定, 默认值为 60。
也请参阅 基于注解的配置 和 使用 Java DSL 用于 TCP 组件。
消息分隔(序列化器和反序列化器)
TCP 是一种流协议。这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收方可以对数据进行定界以形成离散的消息。连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。Spring Integration 提供了许多标准的序列化器和反序列化器。
ByteArrayCrlfSerializer
* 将字节数组转换为以回车和换行字符 (\r\n
) 结尾的字节流。这是默认的序列化器(和反序列化器),可以与 telnet 等客户端一起使用(例如)。
ByteArraySingleTerminatorSerializer
* 将字节数组转换为以单个终止字符结尾的字节流(默认值是 0x00
)。
ByteArrayLfSerializer
将字节数组转换为以单个换行符 (0x0a
) 结尾的字节流。
ByteArrayStxEtxSerializer
* 将字节数组转换为以 STX (0x02
) 开头和以 ETX (0x03
) 结尾的字节流。
ByteArrayLengthHeaderSerializer
将字节数组转换为以网络字节序(大端序)的二进制长度开头的字节流。这是一个高效的反序列化器,因为它不需要解析每个字节来查找终止字符序列。它也可以用于包含二进制数据的有效负载。前面的序列化器仅支持有效负载中的文本。长度头的默认大小是四个字节(一个整数),允许消息长度达到 (2^31 - 1) 字节。但是,对于最大 255 字节的消息,length
头可以是一个字节(无符号);对于最大 (2^16 - 1) 字节的消息,可以是无符号短整型(2 字节)。如果您需要其他格式的头部,可以继承 ByteArrayLengthHeaderSerializer
并为 readHeader
和 writeHeader
方法提供实现。绝对最大数据大小是 (2^31 - 1) 字节。从版本 5.2 开始,头部值可以包括头部本身的长度以及有效负载的长度。设置 inclusive
属性以启用该机制(必须为生产者和消费者设置相同)。
ByteArrayRawSerializer
* 将字节数组转换为字节流,并不添加任何额外的消息分隔数据。使用此序列化器(和反序列化器)时,消息的结束由客户端以有序的方式关闭套接字来表示。当使用此序列化器时,消息接收会一直挂起,直到客户端关闭套接字或发生超时。超时不会产生消息。当此序列化器与客户端是 Spring Integration 应用程序一起使用时,客户端必须使用配置有 single-use="true"
的连接工厂。这样做会使适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应该仅在通道适配器(而不是网关)使用的连接工厂中使用此序列化器,且这些连接工厂应由入站或出站适配器之一使用,但不能同时使用两者。另请参阅本节稍后的 ByteArrayElasticRawDeserializer
。但是,自 5.2 版以来,出站网关有一个新属性 closeStreamAfterSend
;这允许使用原始序列化器/反序列化器,因为 EOF 信号已发送给服务器,同时保持连接打开以接收回复。
在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化程序会将超时(在读取期间)视为文件结束,并将迄今为止读取的数据作为消息发出。这是不可靠的,不应该用来分隔消息。现在它将此类情况视为异常。如果极少数情况下你以这种方式使用它,可以通过将构造函数参数 treatTimeoutAsEndOfMessage
设置为 true
来恢复以前的行为。
这些类都是 AbstractByteArraySerializer
的子类,它们实现了 org.springframework.core.serializer.Serializer
和 org.springframework.core.serializer.Deserializer
。为了向后兼容,使用 AbstractByteArraySerializer
任何子类进行序列化的连接也接受一个先转换为字节数组的 String
。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。
为了避免由于客户端行为不当(不遵守配置的序列化程序的协议)导致内存耗尽,这些序列化程序会限制最大消息大小。如果传入的消息超过此大小,则会抛出异常。默认的最大消息大小是 2048 字节。您可以通过设置 maxMessageSize
属性来增加它。如果您使用默认的序列化程序或反序列化程序并希望增加最大消息大小,则必须声明一个具有 maxMessageSize
属性的显式 bean,并配置连接工厂以使用该 bean。
本节前面标记有 * 的类使用了一个中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。从 4.3 版开始,您可以通过设置 poolSize
属性来配置这些缓冲区,以让这些原始缓冲区被重用,而不是为每条消息分配和丢弃,这是默认行为。将属性设置为负值会创建一个没有边界的池。如果池是有界的,您还可以设置 poolWaitTimeout
属性(以毫秒为单位),如果在指定时间内没有缓冲区可用,则会抛出异常。它默认为无穷大。此类异常会导致套接字关闭。
如果你想在自定义反序列化程序中使用相同的机制,你可以扩展 AbstractPooledBufferByteArraySerializer
(而不是它的父类 AbstractByteArraySerializer
),并实现 doDeserialize()
而不是 deserialize()
。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer
还提供了一个方便的实用方法:copyToSizedArray()
。
版本 5.0 添加了 ByteArrayElasticRawDeserializer
。这与上面的 ByteArrayRawSerializer
的反序列化端类似,不同之处在于不需要设置 maxMessageSize
。内部,它使用一个 ByteArrayOutputStream
,可以根据需要让缓冲区增长。客户端必须有序地关闭套接字以表示消息结束。
此反序列化器仅应在对等方受信任时使用;它容易受到由于内存不足导致的拒绝服务攻击。
MapJsonSerializer
使用 Jackson ObjectMapper
在 Map
和 JSON 之间进行转换。您可以将此序列化程序与 MessageConvertingTcpMessageMapper
和 MapMessageConverter
结合使用,以 JSON 格式传输选定的标题和有效负载。
Jackson ObjectMapper
无法在流中划分消息。因此,MapJsonSerializer
需要委托给另一个序列化器或反序列化器来处理消息划分。默认情况下,使用 ByteArrayLfSerializer
,这会导致线路上的消息格式为 <json><LF>
,但你可以配置它以使用其他序列化器。(下一个示例展示了如何进行配置。)
最后的标准序列化器是 org.springframework.core.serializer.DefaultSerializer
,你可以使用它通过 Java 序列化来转换可序列化的对象。org.springframework.core.serializer.DefaultDeserializer
用于反序列化包含可序列化对象的输入流。
如果你不希望使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer
),则必须在连接工厂中设置 serializer
和 deserializer
属性。以下示例展示了如何进行设置:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个使用 java.net.Socket
连接并在线路上使用 Java 序列化的服务器连接工厂。
有关连接工厂上可用属性的详细信息,请参阅本节末尾的参考。
默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。为了将 IP 地址转换为主机名以用于消息标头,可以通过将 lookup-host
属性设置为 true
来覆盖默认行为。
您还可以修改套接字和套接字工厂的属性。有关更多信息,请参见 SSL/TLS 支持。如上所述,如果使用了 SSL 或未使用 SSL,都可以进行此类修改。
也请参阅 基于注解的配置 和 使用 Java DSL 用于 TCP 组件。
主机验证
从版本 5.1.0 开始,默认启用了主机验证以增强安全性。此功能确保在 TCP 连接期间验证服务器的身份。
如果你遇到需要禁用主机验证的情形(不推荐),你可以配置 tcp-connection-factory 中的 socket-support 属性。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="0"
socket-support="customSocketSupport"
single-use="true"
so-timeout="10000"/>
<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
<constructor-arg value="false" />
</bean>
自定义序列化器和反序列化器
如果您的数据不是以标准反序列化程序支持的格式,您可以实现自己的;您也可以实现自定义序列化程序。
要实现自定义的序列化器和反序列化器对,需要实现 org.springframework.core.serializer.Deserializer
和 org.springframework.core.serializer.Serializer
接口。
当反序列化程序在消息之间检测到关闭的输入流时,它必须抛出一个 SoftEndOfStreamException
;这是向框架发出的信号,表示关闭是“正常的”。如果在解码消息时流被关闭,则应抛出其他异常。
从 5.2 版本开始,SoftEndOfStreamException
现在是一个 RuntimeException
,而不是继承自 IOException
。
TCP 缓存客户端连接工厂
如前面所述,TCP 套接字可以是 '一次性'(一个请求或响应)或共享的。共享套接字在高流量环境下与出站网关配合使用时性能不佳,因为套接字一次只能处理一个请求或响应。
要提高性能,你可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。有关更多信息,请参阅 TCP 消息关联。
Spring Integration 2.2 引入了一个缓存客户端连接工厂,它使用共享套接字池,让网关可以使用共享连接池处理多个并发请求。
TCP 故障切换客户端连接工厂
您可以配置一个支持故障转移到一个或多个其他服务器的 TCP 连接工厂。在发送消息时,工厂会遍历其所有已配置的工厂,直到消息可以被发送或者找不到连接为止。最初,使用已配置列表中的第一个工厂。如果连接随后失败,则下一个工厂成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
在使用故障转移连接工厂时,singleUse
属性必须在工厂本身和其配置使用的工厂列表之间保持一致。
连接工厂在与共享连接 (singleUse=false
) 一起使用时,有两个与故障回退相关的属性:
-
refreshSharedInterval
-
closeOnRefresh
考虑以下基于上述配置的场景:假设 clientFactory1
无法建立连接,但 clientFactory2
可以。当在 refreshSharedInterval
过期后调用 failCF
的 getConnection()
方法时,我们将再次尝试使用 clientFactory1
建立连接;如果成功,则会关闭与 clientFactory2
的连接。如果 closeOnRefresh
为 false
,则“旧”的连接将保持打开状态,并且如果第一个工厂再次失败,将来可能会重新使用该连接。
将 refreshSharedInterval
设置为仅在该时间到期后尝试重新连接第一个工厂;如果只希望在当前连接失败时回退到第一个工厂,则将其设置为 Long.MAX_VALUE
(默认值)。
将 closeOnRefresh
设置为在刷新实际创建新连接后关闭“旧”连接。
如果任何委托工厂是 CachingClientConnectionFactory
,这些属性将不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查询连接工厂列表以获取连接。
从 5.3 版本开始,这些默认为 Long.MAX_VALUE
和 true
,因此工厂只会在当前连接失败时尝试回退。要恢复到以前版本的默认行为,请将它们设置为 0
和 false
。
也请参阅 测试连接。
TCP 线程亲和性连接工厂
Spring Integration 5.0 引入了这个连接工厂。它将一个连接绑定到调用线程,每次该线程发送消息时都会重用相同的连接。这会一直持续,直到连接被关闭(由服务器或网络关闭)或者直到线程调用 releaseConnection()
方法。连接本身是由另一个客户端工厂实现提供的,必须配置为提供非共享(单次使用)连接,以便每个线程都能获得一个连接。
以下示例展示了如何配置 TCP 线程亲和性连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}