跳到主要内容
版本:7.0.2

TCP 消息关联

DeepSeek V3 中英对照 TCP Message Correlation

IP端点的一个目标是提供与Spring Integration应用之外的其他系统进行通信。为此,默认情况下仅发送和接收消息的有效载荷。自3.0版本起,您可以通过使用JSON、Java序列化或自定义序列化器与反序列化器来传输消息头。更多信息请参见传输消息头。框架本身不提供消息关联功能(使用网关时除外),服务器端的协作通道适配器也不提供。本文档后续部分将讨论应用程序可用的各种关联技术。在大多数情况下,这需要特定的应用级消息关联,即使消息有效载荷包含某些自然关联数据(例如订单号)也是如此。

网关

网关会自动关联消息。然而,对于相对低流量的应用,您应该使用出站网关。当您将连接工厂配置为对所有消息对使用单个共享连接(single-use="false")时,一次只能处理一条消息。新消息必须等到前一条消息的回复被接收后才能处理。当连接工厂配置为每条新消息都使用新连接(single-use="true")时,此限制不适用。虽然此设置比共享连接环境能提供更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。

因此,对于高吞吐量的消息,请考虑使用一对协作的通道适配器。然而,这样做需要您提供协作逻辑。

另一种解决方案,在Spring Integration 2.2中引入,是使用CachingClientConnectionFactory,它允许使用共享连接池。

协作出站与入站通道适配器

为实现高吞吐量(避免使用网关的缺陷,如前文所述),您可以配置一对协作的出站和入站通道适配器。您还可以使用协作适配器(服务器端或客户端)进行完全异步的通信(而非采用请求-应答语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器会添加一个头部,使得出站适配器在发送回复消息时能够确定使用哪个连接。

备注

在服务器端,你必须填充 ip_connectionId 头部,因为它用于将消息与连接关联起来。源自入站适配器的消息会自动设置此头部。如果你希望构造其他消息进行发送,则需要设置该头部。你可以从传入的消息中获取该头部的值。

在客户端,应用程序必须根据需要提供自己的关联逻辑。您可以通过多种方式实现这一点。

如果消息负载本身包含一些自然关联数据(例如交易ID或订单号),并且您无需保留原始出站消息中的任何信息(例如回复通道头部),那么关联过程将非常简单,并且无论如何都会在应用层进行处理。

如果消息负载本身包含某些自然关联数据(例如交易ID或订单号),但您需要保留原始出站消息中的某些信息(例如回复通道头部),您可以通过保留原始出站消息的副本(可能通过使用发布-订阅通道),并利用聚合器来重新组合必要的数据。

对于前两种场景中的任意一种,如果负载本身没有自然的关联数据,您可以在出站通道适配器的上游提供一个转换器,用此类数据增强负载。这种转换器可以将原始负载转换为一个新对象,该对象既包含原始负载,也包含消息头中的部分子集。当然,消息头中的动态对象(例如回复通道)不能包含在转换后的负载中。

如果你选择这样的策略,需要确保连接工厂配备有合适的序列化器-反序列化器对来处理此类负载(例如使用 Java 序列化的 DefaultSerializerDefaultDeserializer,或自定义的序列化器与反序列化器)。在 TCP 连接工厂 中提到的 ByteArray*Serializer 选项(包括默认的 ByteArrayCrLfSerializer)不支持此类负载,除非转换后的负载是 Stringbyte[] 类型。

备注

在 2.2 版本发布之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认值为默认回复超时时间(10 秒)。这意味着,如果入站适配器在此期间未接收到任何数据,套接字将被关闭。

这种默认行为在真正的异步环境中并不合适,因此现在它默认设置为无限超时。您可以通过将客户端连接工厂上的 so-timeout 属性设置为 10000 毫秒来恢复之前的默认行为。

从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。这使得应用程序能够同时支持请求/回复和任意的服务器 → 客户端消息传递。更多信息请参阅 TCP 网关

传输头部

TCP是一种流式协议。SerializersDeserializers在流中划分消息边界。在 3.0 版本之前,只有消息负载(Stringbyte[])可以通过 TCP 传输。从 3.0 版本开始,除了负载之外,您还可以传输选定的消息头。然而,诸如 replyChannel 消息头之类的“活动”对象无法被序列化。

通过TCP发送头部信息需要一些额外的配置。

第一步是为 ConnectionFactory 配置一个使用 mapper 属性的 MessageConvertingTcpMessageMapper。该映射器会委托给任何 MessageConverter 实现,将消息转换为可由配置的 serializerdeserializer 进行序列化和反序列化的对象,或从该对象转换回消息。

Spring Integration 提供了一个 MapMessageConverter,它允许指定一个头部列表,这些头部会与有效负载一起添加到 Map 对象中。生成的 Map 包含两个条目:payloadheadersheaders 条目本身也是一个 Map,其中包含了选定的头部信息。

第二步是提供一个序列化器和反序列化器,用于在 Map 和某种传输格式之间进行转换。这可以是一个自定义的 SerializerDeserializer,通常在对方系统不是 Spring Integration 应用程序时需要。

Spring Integration 提供了一个 MapJsonSerializer,用于在 Map 与 JSON 之间进行转换。它使用 Spring Integration 的 JsonObjectMapper。如果需要,您可以提供一个自定义的 JsonObjectMapper。默认情况下,序列化器会在对象之间插入换行符 (0x0a)。更多信息请参阅 Javadoc

备注

JsonObjectMapper 使用类路径上存在的任何版本的 Jackson

你也可以通过使用 DefaultSerializerDefaultDeserializer 来对 Map 进行标准的 Java 序列化。

以下示例展示了使用 JSON 传输 correlationIdsequenceNumbersequenceSize 标头的连接工厂配置:

<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>

<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用上述配置发送的消息,若负载为 'something',在网络上传输时将呈现如下形式:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}