跳到主要内容

消息存储

QWen Plus 中英对照 Message Store

Enterprise Integration Patterns (EIP) 一书确定了几个具有缓冲消息能力的模式。例如,聚合器会缓冲消息,直到它们可以被释放,QueueChannel 会缓冲消息,直到消费者明确地从该通道接收这些消息。由于在消息流的任何点都可能发生故障,因此 EIP 中缓冲消息的组件也会引入消息可能丢失的点。

为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常是在某种类型的持久化存储(例如 RDBMS)中。

Spring Integration 通过以下方式提供对消息存储模式的支持:

  • 定义 org.springframework.integration.store.MessageStore 策略接口

  • 提供这个接口的多个实现

  • 在所有具有缓冲消息能力的组件上暴露一个 message-store 属性,以便你可以注入任何实现了 MessageStore 接口的实例。

如何配置特定的消息存储实现以及如何将 MessageStore 实现注入到特定的缓冲组件中,在手册中有详细说明(参见特定组件,例如 QueueChannelAggregatorDelayer 等)。以下示例对显示了如何为 QueueChannel 和聚合器添加消息存储引用:

<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
xml
<int:aggregator message-store="refToMessageStore"/>
xml

默认情况下,消息通过使用 o.s.i.store.SimpleMessageStoreMessageStore 的一个实现)存储在内存中。对于开发或简单的低流量环境而言,这可能没问题,在这些环境中,非持久性消息的潜在丢失并不是问题。然而,典型的生产应用程序需要更稳健的选项,不仅为了降低消息丢失的风险,还要避免潜在的内存不足错误。因此,我们还提供了针对多种数据存储的 MessageStore 实现。以下是支持的所有实现的完整列表:

important

然而,在使用 MessageStore 的持久化实现时,需要注意一些限制。

消息数据(有效载荷和头信息)通过不同的序列化策略进行序列化和反序列化,具体取决于 MessageStore 的实现。例如,当使用 JdbcMessageStore 时,默认情况下仅持久化 Serializable 数据。在这种情况下,在序列化发生之前会移除非可序列化的头信息。此外,要注意由传输适配器(如 FTP、HTTP、JMS 等)注入的协议特定头信息。例如,<http:inbound-channel-adapter/> 将 HTTP 头信息映射到消息头信息中,其中一个头信息是一个包含非可序列化的 org.springframework.http.MediaType 实例的 ArrayList。但是,您可以将自己的 SerializerDeserializer 策略接口实现注入某些 MessageStore 实现(如 JdbcMessageStore)中,以改变序列化和反序列化的行为。

特别要注意表示某些类型数据的头信息。例如,如果其中一个头信息包含某个 Spring bean 的实例,在反序列化后,您可能会得到该 bean 的不同实例,这直接影响框架创建的一些隐式头信息(如 REPLY_CHANNELERROR_CHANNEL)。目前,这些头信息是不可序列化的,即使它们可以被序列化,反序列化后的通道也不会表示预期的实例。

从 Spring Integration 版本 3.0 开始,您可以通过配置一个头信息增强器来解决此问题,该增强器会在将通道注册到 HeaderChannelRegistry 后用名称替换这些头信息。

另外,请考虑以下情况:当您按如下方式配置消息流时:网关 → 队列通道(由持久化 Message Store 支持)→ 服务激活器。该网关会创建一个临时回复通道,但在服务激活器的轮询器从队列中读取时,该通道已丢失。同样,您可以使用头信息增强器将头信息替换为 String 表示形式。

有关更多信息,请参见 Header Enricher

Spring Integration 4.0 引入了两个新接口:

  • ChannelMessageStore:实现针对 QueueChannel 实例的具体操作

  • PriorityCapableChannelMessageStore:用于标记要为 PriorityChannel 实例使用的 MessageStore 实现,并为持久化消息提供优先级顺序。

实际行为取决于实现。框架提供了以下实现,可以用作 QueueChannelPriorityChannel 的持久化 MessageStore

注意

关于 SimpleMessageStore 的警告

从 4.1 版本开始,SimpleMessageStore 在调用 getMessageGroup() 时不再复制消息组。对于大型消息组,这曾经是一个显著的性能问题。4.0.1 引入了一个布尔属性 copyOnGet,它允许你控制这种行为。当在聚合器内部使用时,此属性被设置为 false 以提高性能。现在,默认情况下它为 false

在聚合器等组件外部访问组存储的用户现在会直接引用聚合器正在使用的组,而不是获取一个副本。在聚合器外部对组进行操作可能会导致不可预测的结果。

因此,你应该要么不执行此类操作,要么将 copyOnGet 属性设置为 true

使用 MessageGroupFactory

从 4.3 版本开始,一些 MessageGroupStore 实现可以注入自定义的 MessageGroupFactory 策略,以创建和自定义 MessageGroupStore 使用的 MessageGroup 实例。默认情况下,它使用 SimpleMessageGroupFactory,根据 GroupType.HASH_SET (LinkedHashSet) 内部集合生成 SimpleMessageGroup 实例。其他可能的选项是 SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个可以用于恢复之前的 SimpleMessageGroup 行为。此外,还有 PERSISTENT 选项可用。更多信息请参见下一节。从 5.0.1 版本开始,当组内消息的顺序和唯一性不重要时,还可以使用 LIST 选项。

持久化 MessageGroupStore 和懒加载

从 4.3 版本开始,所有持久化的 MessageGroupStore 实例以懒加载的方式从存储中检索 MessageGroup 实例及其 messages。在大多数情况下,这对于关联的 MessageHandler 实例(参见 AggregatorResequencer)是有用的,因为在每次关联操作时从存储中加载整个 MessageGroup 会增加开销。

你可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 选项从配置中关闭懒加载行为。

我们对 MongoDB MessageStore (MongoDB 消息存储) 和 <aggregator> (聚合器) 的懒加载性能测试使用了一个自定义的 release-strategy,类似于以下内容:

<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
xml

它为 1000 条简单消息生成类似以下的结果:

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
none

但是,从 5.5 版本开始,所有持久化的 MessageGroupStore 实现都提供了一个基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId) 合约。这在存储中的组非常大时改善了资源利用率。框架内部,例如在 Delayer 中,当它在启动时重新调度持久化消息时,使用了这个新的 API。返回的 Stream<Message<?>> 在处理结束时必须关闭,例如通过 try-with-resources 进行自动关闭。每当使用 PersistentMessageGroup 时,它的 streamMessages() 会委托给 MessageGroupStore.streamMessagesForGroup()

消息组条件

从 5.5 版本开始,MessageGroup 抽象提供了一个 condition 字符串选项。这个选项的值可以是任何可以在稍后解析以作决策的内容。例如,来自 关联消息处理器ReleaseStrategy 可能会查询此属性,而不是遍历组中的所有消息。MessageGroupStore 暴露了一个 setGroupCondition(Object groupId, String condition) API。为此,在 AbstractCorrelatingMessageHandler 中添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 选项。该函数会在每条消息被添加到组后以及根据组的现有条件进行评估。实现可以根据需要返回新值、现有值或重置目标条件为 nullcondition 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自 文件聚合器 组件的 FileMarkerReleaseStrategy 会从 FileSplitter.FileMarker.Mark.END 消息的 FileHeaders.LINE_COUNT 标头中填充条件到组,并通过其 canRelease() 方法与之进行比较,以将组大小与该条件中的值进行比较。这样它就不必遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT 标头的 FileSplitter.FileMarker.Mark.END 消息。它还允许结束标记在其他所有记录之前到达聚合器;例如在多线程环境中处理文件时。

此外,为了配置方便,引入了一个 GroupConditionProvider 合约。AbstractCorrelatingMessageHandler 会检查提供的 ReleaseStrategy 是否实现了此接口,并提取一个 conditionSupplier 用于组条件评估逻辑。

使用 LockRegistry

从 6.5 版本开始,AbstractMessageGroupStore 抽象类使用一个带有锁的元数据来操作消息组。这个锁通过 LockRegister 获取 groupId 并生成。它的目的是确保消息和消息组的原子性操作。在多个线程中同时添加或删除消息或更新元数据时,如果缺少锁,某些实现可能会出现消息组错误。默认情况下,使用 DefaultLockRegistry,任何 LockRegister 都可以通过 AbstractMessageGroupStore.setLockRegistry() 注入,通常是对同一持久化存储的实现。更多信息请参阅 分布式锁