跳到主要内容

TCP 消息关联

QWen Plus 中英对照 TCP Message Correlation

IP 端点的一个目标是提供与非 Spring Integration 应用程序系统的通信。因此,默认情况下,只发送和接收消息的有效负载。自 3.0 版本以来,您可以通过使用 JSON、Java 序列化或自定义序列化程序和反序列化程序来传输头信息。有关详细信息,请参阅传输头信息。框架不提供任何消息关联(除非使用网关)或服务器端协作通道适配器。在此文档后面,我们将讨论应用程序可用的各种关联技术。在大多数情况下,这需要特定的应用程序级别的消息关联,即使消息有效负载包含一些自然的关联数据(例如订单号)。

网关

网关自动关联消息。但是,您应该将出站网关用于相对低流量的应用程序。当您将连接工厂配置为对所有消息对使用单个共享连接(single-use="false")时,一次只能处理一条消息。新消息必须等待前一条消息的回复接收完毕。当为每个新消息配置连接工厂以使用新连接(single-use="true")时,此限制不适用。虽然这种设置比共享连接环境提供更高的吞吐量,但它伴随着为每条消息对打开和关闭新连接的开销。

因此,对于高容量的消息,考虑使用协作的一对通道适配器。但是,这样做的话,你需要提供协作逻辑。

另一种解决方案是在 Spring Integration 2.2 中引入的,可以使用 CachingClientConnectionFactory,它允许使用共享连接池。

协作的出站和入站通道适配器

要实现大容量吞吐量(避免使用网关的陷阱,如前面所述),你可以配置一对协作的出站和入站通道适配器。你也可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是带有请求-回复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个标头,使出站适配器能够确定发送回复消息时使用哪个连接。

备注

在服务器端,你必须填充 ip_connectionId 头,因为它是用来将消息与连接关联起来的。来自入站适配器的消息会自动设置该头。如果你希望构造其他消息来发送,则需要设置该头。你可以从传入的消息中获取头的值。

在客户端,应用程序必须根据需要提供自己的关联逻辑。你可以通过多种方式来实现。

如果消息有效载荷有一些自然的关联数据(例如 事务 ID 或 订单号),并且您不需要保留来自原始出站消息的任何信息(例如 回复通道头),那么关联将很简单,并且无论如何都会在应用程序级别进行。

如果消息的有效载荷有一些自然的相关数据(例如 事务 ID 或 订单号),但您需要保留原始出站消息中的一些信息(例如 回复通道标题),您可以保留原始出站消息的副本(可能通过使用发布-订阅通道)并使用聚合器重新组合必要的数据。

对于上述两种场景中的任何一种,如果有效负载没有自然的相关数据,你可以在传出通道适配器的上游提供一个转换器,以增强有效负载中的此类数据。这样的转换器可以将原始有效负载转换为一个新的对象,该对象包含原始有效负载和消息头的一些子集。当然,来自头的活动对象(例如回复通道)不能包含在转换后的有效负载中。

如果你选择了这样的策略,你需要确保连接工厂拥有适当的序列化器-反序列化器对来处理这种负载(例如 DefaultSerializerDefaultDeserializer,它们使用 Java 序列化,或者自定义的序列化器和反序列化器)。在 TCP 连接工厂 中提到的 ByteArray*Serializer 选项,包括默认的 ByteArrayCrLfSerializer,除非转换后的负载是 Stringbyte[],否则不支持此类负载。

备注

在 2.2 版本发布之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认为默认回复超时时间(10 秒)。这意味着,如果在此时间段内入站适配器没有收到任何数据,套接字将被关闭。

这种默认行为在一个真正的异步环境中并不合适,因此现在它默认为无限超时。您可以通过将客户端连接工厂的 so-timeout 属性设置为 10000 毫秒来恢复之前的默认行为。

从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。这允许应用程序同时支持请求/回复和任意服务器 → 客户端消息传递。更多信息请参见 TCP 网关

传输标题

TCP 是一种流协议。SerializersDeserializers 在流中划分消息。在 3.0 之前,只有消息负载(Stringbyte[])可以通过 TCP 传输。从 3.0 开始,你可以传输选定的头部信息以及负载。但是,“实时”对象,例如 replyChannel 头部,无法被序列化。

通过 TCP 发送标头信息需要一些额外的配置。

第一步是为 ConnectionFactory 提供一个使用 mapper 属性的 MessageConvertingTcpMessageMapper。这个映射器委托给任何 MessageConverter 实现,以将消息转换为可以由配置的 serializerdeserializer 进行序列化和反序列化的某些对象。

Spring Integration 提供了一个 MapMessageConverter,它允许指定一个头列表,这些头将被添加到 Map 对象中,连同有效负载一起。生成的 Map 有两个条目:payloadheadersheaders 条目本身是一个 Map,并包含选定的头。

第二步是提供一个序列化器和反序列化器,能够在这两者之间进行转换 Map 和某些线程格式。这可以是一个自定义的 SerializerDeserializer,如果你对等的系统不是一个 Spring Integration 应用程序,通常你需要这样做。

Spring Integration 提供了 MapJsonSerializer 用于将 Map 转换为 JSON 及从 JSON 转换回 Map。它使用 Spring Integration 的 JsonObjectMapper。如果需要,你可以提供一个自定义的 JsonObjectMapper。默认情况下,序列化程序会在对象之间插入一个换行符 (0x0a)。更多信息请参阅 Javadoc

备注

JsonObjectMapper 使用类路径上的 Jackson 版本。

您也可以通过使用 DefaultSerializerDefaultDeserializer 来使用标准的 Java 序列化 Map

以下示例展示了配置连接工厂以使用 JSON 传输 correlationIdsequenceNumbersequenceSize 头:

<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>

<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
xml

带有前述配置发送的消息,其有效负载为 'something',在传输过程中将如下所示:

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
xml