消息存储
企业集成模式 (EIP) 一书指出了几种能够缓冲消息的模式。例如,聚合器会缓冲消息直到它们可以被释放,而 QueueChannel 会缓冲消息直到消费者显式地从该通道接收这些消息。由于消息流中的任何一点都可能发生故障,因此缓冲消息的 EIP 组件也引入了消息可能丢失的风险点。
为了降低消息丢失的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常使用某种类型的持久化存储(例如关系型数据库)。
Spring Integration 通过以下方式支持消息存储模式:
-
定义
org.springframework.integration.store.MessageStore策略接口 -
提供该接口的多种实现
-
在所有具备缓冲消息能力的组件上公开
message-store属性,以便您可以注入任何实现MessageStore接口的实例。
有关如何配置特定消息存储实现以及如何将 MessageStore 实现注入特定缓冲组件的详细信息,请参阅手册中的相关章节(参见具体组件,如 QueueChannel、Aggregator、Delayer 等)。以下两个示例展示了如何为 QueueChannel 和聚合器添加对消息存储的引用:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息通过使用 o.s.i.store.SimpleMessageStore(MessageStore 的一个实现)存储在内存中。这对于开发环境或简单的低负载环境可能适用,因为在这些环境中,非持久化消息的潜在丢失不是问题。然而,典型的生产应用程序需要一个更健壮的选项,不仅是为了降低消息丢失的风险,也是为了避免潜在的内存溢出错误。因此,我们还为多种数据存储提供了 MessageStore 实现。以下是支持的实现的完整列表:
-
Hazelcast 消息存储: 使用 Hazelcast 分布式缓存存储消息
-
JDBC 消息存储: 使用关系型数据库管理系统 (RDBMS) 存储消息
-
Redis 消息存储: 使用 Redis 键/值数据存储存储消息
-
MongoDB 消息存储: 使用 MongoDB 文档存储存储消息
然而,在使用 MessageStore 的持久化实现时,请注意一些限制。
消息数据(负载和头部)会通过不同的序列化策略进行序列化和反序列化,具体取决于 MessageStore 的实现。例如,在使用 JdbcMessageStore 时,默认情况下只有 Serializable 数据会被持久化。在这种情况下,不可序列化的头部会在序列化之前被移除。同时,请注意由传输适配器(如 FTP、HTTP、JMS 等)注入的协议特定头部。例如,<http:inbound-channel-adapter/> 会将 HTTP 头部映射为消息头部,其中一个是包含不可序列化的 org.springframework.http.MediaType 实例的 ArrayList。不过,你可以将自定义的 Serializer 和 Deserializer 策略接口实现注入到某些 MessageStore 实现(如 JdbcMessageStore)中,以改变序列化和反序列化的行为。
请特别注意那些代表特定类型数据的头部。例如,如果某个头部包含某个 Spring bean 的实例,在反序列化后,你可能会得到该 bean 的不同实例,这会直接影响框架创建的某些隐式头部(如 REPLY_CHANNEL 或 ERROR_CHANNEL)。目前,这些头部是不可序列化的,但即使它们可以序列化,反序列化后的通道也不会代表预期的实例。
从 Spring Integration 3.0 版本开始,你可以在将通道注册到 HeaderChannelRegistry 后,通过配置一个头部增强器(header enricher)将这些头部替换为名称来解决此问题。
另外,请考虑当你配置如下消息流时会发生什么:gateway → queue-channel(由持久化 Message Store 支持)→ service-activator。该网关会创建一个临时的回复通道,该通道在 service-activator 的轮询器从队列中读取时已经丢失。同样,你可以使用头部增强器将这些头部替换为 String 表示形式。
更多信息,请参阅头部增强器。
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore:用于实现对QueueChannel实例的特定操作 -
PriorityCapableChannelMessageStore:用于标记供PriorityChannel实例使用的MessageStore实现,并为持久化消息提供优先级排序。
实际行为取决于具体实现。框架提供了以下实现,可作为 QueueChannel 和 PriorityChannel 的持久化 MessageStore:
关于 SimpleMessageStore 的注意事项
从 4.1 版本开始,SimpleMessageStore 在调用 getMessageGroup() 时不再复制消息组。对于大型消息组,这曾是一个显著的性能问题。4.0.1 版本引入了一个布尔属性 copyOnGet,允许您控制此行为。当聚合器等组件内部使用时,此属性被设置为 false 以提高性能。现在它默认是 false。
在聚合器等组件之外访问组存储的用户现在会获得聚合器正在使用的组的直接引用,而不是副本。在聚合器之外操作组可能会导致不可预测的结果。
因此,您应该避免进行此类操作,或者将 copyOnGet 属性设置为 true。
使用 MessageGroupFactory
从版本 4.3 开始,部分 MessageGroupStore 实现可以注入自定义的 MessageGroupFactory 策略,以创建和定制 MessageGroupStore 所使用的 MessageGroup 实例。默认情况下,它使用 SimpleMessageGroupFactory,该工厂基于 GroupType.HASH_SET (LinkedHashSet) 内部集合生成 SimpleMessageGroup 实例。其他可能的选项包括 SYNCHRONISED_SET 和 BLOCKING_QUEUE,其中后者可用于恢复先前 SimpleMessageGroup 的行为。此外,还提供了 PERSISTENT 选项。更多信息请参阅下一节。从版本 5.0.1 开始,当组内消息的顺序和唯一性不重要时,还可以使用 LIST 选项。
持久化 MessageGroupStore 与延迟加载
您可以通过配置选项 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 来关闭延迟加载行为。
我们对 MongoDB MessageStore(MongoDB 消息存储)和 <aggregator>(聚合器)的延迟加载性能测试使用了自定义的 release-strategy,其配置类似于以下示例:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于1000条简单消息,它产生的结果类似于以下内容:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从版本5.5开始,所有持久化的MessageGroupStore实现都提供了一个基于目标数据库流式API的streamMessagesForGroup(Object groupId)契约。当存储中的消息组非常大时,这提高了资源利用率。在框架内部,这个新API被用于Delayer中,例如在启动时重新调度持久化消息的场景。返回的Stream<Message<?>>必须在处理结束时关闭,例如通过try-with-resources自动关闭。每当使用PersistentMessageGroup时,其streamMessages()方法会委托给MessageGroupStore.streamMessagesForGroup()。
消息组条件
从版本 5.5 开始,MessageGroup 抽象提供了一个 condition 字符串选项。该选项的值可以是任何内容,以便后续出于任何原因解析该值来为组做出决策。例如,来自关联消息处理器的 ReleaseStrategy 可以查询组的此属性,而无需遍历组中的所有消息。MessageGroupStore 公开了一个 setGroupCondition(Object groupId, String condition) API。为此,AbstractCorrelatingMessageHandler 中添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 选项。此函数在每条消息添加到组后,针对该消息以及组的现有条件进行评估。实现可以决定返回新值、现有值,或将目标条件重置为 null。condition 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自文件聚合器组件的 FileMarkerReleaseStrategy,从 FileSplitter.FileMarker.Mark.END 消息的 FileHeaders.LINE_COUNT 标头中为组填充一个条件,并在其 canRelease() 中通过比较组大小与此条件中的值来使用它。这样,它就不需要遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT 标头的 FileSplitter.FileMarker.Mark.END 消息。它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为便于配置,系统引入了 GroupConditionProvider 合约。AbstractCorrelatingMessageHandler 会检查提供的 ReleaseStrategy 是否实现了此接口,并提取用于组条件评估逻辑的 conditionSupplier。
使用 LockRegistry
从版本 6.5 开始,AbstractMessageGroupStore 抽象类使用锁来操作消息组的元数据。该锁通过 LockRegister 获取 groupId 并生成。其目的是确保消息和消息组操作的原子性。在多线程环境中,若同时添加或移除消息或更新元数据,缺少锁可能导致某些实现出现消息组错误。默认使用 DefaultLockRegistry,任何 LockRegister 均可通过 AbstractMessageGroupStore.setLockRegistry() 注入,通常为同一持久化存储的实现。更多信息请参阅分布式锁。