聚合器
基本上是拆分器的镜像,聚合器是一种消息处理程序,它接收多条消息并将它们组合成一条消息。实际上,聚合器经常是在包含拆分器的管道中的下游消费者。
技术上来说,聚合器比拆分器更复杂,因为它具有状态。它必须保存要聚合的消息,并确定完整的消息组何时准备好进行聚合。为了实现这一点,它需要一个 MessageStore
。
功能
聚合器通过关联和存储一组相关消息,直到这组消息被认为完整为止。在这一点上,聚合器通过处理整个组来创建一条单一的消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即,从多个消息创建一个单一的消息)。两个相关概念是关联和释放。
相关性确定了消息如何分组进行聚合。在 Spring Integration 中,默认情况下,相关性是基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头来实现的。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID
的消息会被分组在一起。但是,您可以自定义相关性策略,以允许其他方式来指定消息应该如何分组在一起。为此,您可以实现一个 CorrelationStrategy
(本章后面会介绍)。
要确定一组消息何时准备好被处理,会咨询 ReleaseStrategy
。聚合器的默认释放策略是在序列中所有消息都存在时释放一组消息,这是基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头信息。你可以通过提供自定义 ReleaseStrategy
实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成:
-
接口
MessageGroupProcessor
及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
接口
ReleaseStrategy
及其默认实现:SimpleSequenceSizeReleaseStrategy
-
接口
CorrelationStrategy
及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
( AbstractCorrelatingMessageHandler
的一个子类)是 MessageHandler
的一个实现,封装了聚合器(及其他相关用例)的通用功能,具体如下:
-
将消息关联到一个组中进行聚合
-
在
MessageStore
中维护这些消息,直到组可以被释放 -
决定组何时可以被释放
-
将释放的组聚合成一条消息
-
识别并响应过期的组
决定消息应该如何分组的职责被委托给一个 CorrelationStrategy
实例。决定消息组是否可以释放的职责被委托给一个 ReleaseStrategy
实例。
以下列表简要介绍了 AbstractAggregatingMessageGroupProcessor
的主要特点(实现 aggregatePayloads
方法的责任留给开发人员):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
,它们是 AbstractAggregatingMessageGroupProcessor
的开箱即用实现。
从 5.2 版本开始,AbstractAggregatingMessageGroupProcessor
提供了 Function<MessageGroup, Map<String, Object>>
策略来合并和计算(聚合)输出消息的头信息。提供了 DefaultAggregateHeadersFunction
实现,其逻辑是返回在组中没有冲突的所有头信息;在组内一个或多个消息中缺少的头信息不被视为冲突。冲突的头信息将被省略。随着新引入的 DelegatingMessageGroupProcessor
,此函数用于任何任意的(非 AbstractAggregatingMessageGroupProcessor
的)MessageGroupProcessor
实现。实际上,框架会将提供的函数注入到 AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之间的逻辑差异在于后者不会提前计算头信息,在调用委托策略之前,并且如果委托返回 Message
或 AbstractIntegrationMessageBuilder
则不会调用该函数。在这种情况下,框架假定目标实现已经负责生成一组适当的头信息并填充到返回的结果中。Function<MessageGroup, Map<String, Object>>
策略可通过 XML 配置中的 headers-function
引用属性、Java DSL 中的 AggregatorSpec.headersFunction()
选项以及纯 Java 配置中的 AggregatorFactoryBean.setHeadersFunction()
使用。
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
拥有,并且默认值基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头,如下例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor
。它创建一个单一的 Message
,其有效负载是一个 List
,包含为给定组接收到的有效负载。这在上游有拆分器、发布订阅通道或接收者列表路由器的简单散射收集(scatter-gather)实现中工作得很好。
在此类场景中使用发布-订阅通道或接收者列表路由器时,请确保启用 apply-sequence
标志。这样做会添加必要的标头:CORRELATION_ID
、SEQUENCE_NUMBER
和 SEQUENCE_SIZE
。在 Spring Integration 中,这种行为默认为拆分器启用,但不为发布-订阅通道或接收者列表路由器启用,因为这些组件可能在各种上下文中使用,而这些上下文中不需要这些标头。
当为应用程序实现特定的聚合策略时,你可以扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads
方法。然而,存在更好的解决方案,这些方案与 API 的耦合度较低,可以通过 XML 或注解来配置聚合逻辑。
一般来说,任何 POJO 只要提供了一个接受单个 java.util.List
作为参数的方法(也支持参数化列表),就可以实现聚合算法。此方法按如下方式调用以聚合消息:
-
如果参数是一个
java.util.Collection<T>
并且参数类型 T 可以赋值给Message
,则将累积用于聚合的整个消息列表发送给聚合器。 -
如果参数是一个非参数化的
java.util.Collection
或者参数类型不能赋值给Message
,则该方法接收累积消息的有效负载。 -
如果返回类型不能赋值给
Message
,它将被视为框架自动创建的Message
的有效负载。
为了代码的简洁性以及促进低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过一个 POJO,并使用 XML 或注解支持在应用程序中对其进行配置。
从 5.3 版本开始,在处理消息组之后,AbstractCorrelatingMessageHandler
会执行 MessageBuilder.popSequenceDetails()
消息头修改,以确保在多层嵌套情况下正确实现拆分器-聚合器场景。这仅在消息组释放结果不是消息集合时完成。在这种情况下,目标 MessageGroupProcessor
负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回一个 Message
,只有当 sequenceDetails
与组中的第一个消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()
。 (以前仅在从 MessageGroupProcessor
返回纯负载或 AbstractIntegrationMessageBuilder
时执行此操作。)
此功能可以通过一个新的 popSequence
boolean
属性来控制,因此可以在某些场景中禁用 MessageBuilder.popSequenceDetails()
,当标准拆分器未填充相关细节时。此属性实际上撤销了最近的上游 applySequence = true
在 AbstractMessageSplitter
中所做的操作。更多信息请参见 拆分器。
SimpleMessageGroup.getMessages()
方法返回一个 unmodifiableCollection
。因此,如果聚合 POJO 方法有一个 Collection<Message>
参数,传递的参数正是那个 Collection
实例,并且当您使用 SimpleMessageStore
作为聚合器时,在释放组之后,该原始 Collection<Message>
将被清空。因此,如果将它从聚合器中传出,POJO 中的 Collection<Message>
变量也会被清空。如果您希望直接释放该集合以进行进一步处理,则必须构建一个新的 Collection
(例如,new ArrayList<Message>(messages)
)。从 4.3 版开始,框架不再将消息复制到新的集合中,以避免不必要的额外对象创建。
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor
。只能使用 POJO 方法进行聚合。现在,如果框架检测到引用的(或内部)bean 实现了 MessageProcessor
,它将被用作聚合器的输出处理器。
如果你想释放一组对象作为消息的有效负载,你的类应该继承 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads()
。
此外,自 4.2 版本以来,提供了一个 SimpleMessageGroupProcessor
。它返回组中的消息集合,如前所述,这会导致释放的消息被单独发送。
这使得聚合器可以作为消息屏障工作,到达的消息会被持有,直到释放策略触发,然后组作为一系列单独的消息被释放。
从 6.0 版本开始,上述拆分行为仅在组处理器是 SimpleMessageGroupProcessor
时有效。否则,对于任何其他返回 Collection<Message>
的 MessageGroupProcessor
实现,只会发出一个回复消息,其有效负载是整个消息集合。这样的逻辑是由聚合器的规范目的决定的 - 根据某些键收集请求消息并生成单个分组消息。
在 6.5 版本之前,如果 MessageGroupProcessor
(通常是来自 DSL 的 lambda)返回一个有效负载集合,则 AbstractCorrelatingMessageHandler
会因 IllegalArgumentException
而失败,该异常指出只能返回消息集合。从现在起,这种限制已被取消,返回的有效负载集合将作为单个回复消息从聚合器发出,仅带有最后一个请求消息的头信息。如果需要与有效负载集合一起进行头信息聚合,则建议使用 AbstractAggregatingMessageGroupProcessor
实现,而不是简单的 MessageGroupProcessor
函数式接口。
ReleaseStrategy
ReleaseStrategy
接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 只要提供了一个接受单一 java.util.List
作为参数(也支持参数化列表)并返回一个布尔值的方法,就可以实现完成决策逻辑。此方法在每个新消息到达后被调用,以决定组是否已完成,如下所示:
-
如果参数是
java.util.List<T>
且参数类型T
可以赋值给Message
,则将组中累积的整个消息列表发送到该方法。 -
如果参数是一个非参数化的
java.util.List
或者参数类型不能赋值给Message
,则该方法接收累积消息的有效负载。 -
如果消息组已准备好进行聚合,则该方法必须返回
true
,否则返回 false。
以下示例展示了如何为类型为 Message
的 List
使用 @ReleaseStrategy
注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何为类型为 String
的 List
使用 @ReleaseStrategy
注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前面两个例子中的签名,POJO 为基础的发布策略会传递一个尚未发布的消息的 Collection
(如果你需要访问整个 Message
)或者一个负载对象的 Collection
(如果类型参数不是 Message
)。这满足了大多数用例。然而,如果由于某种原因,你需要访问完整的 MessageGroup
,你应该提供 ReleaseStrategy
接口的一个实现。
在处理潜在的大型组时,您应该了解这些方法是如何被调用的,因为释放策略可能在组被释放之前被调用多次。最有效的是 ReleaseStrategy
的实现,因为聚合器可以直接调用它。第二有效的是具有 Collection<Message<?>>
参数类型的 POJO 方法。效率最低的是具有 Collection<Something>
类型的 POJO 方法。框架必须每次在调用释放策略时将消息中的有效负载复制到新的集合中(并且可能尝试将有效负载转换为 Something
)。使用 Collection<?>
可以避免转换,但仍需要创建新的 Collection
。
基于这些原因,对于大型组,我们建议您实现 ReleaseStrategy
。
当组被释放以进行聚合时,所有尚未释放的消息都将被处理并从组中移除。如果组也已完成(即,如果来自序列的所有消息都已到达或没有定义序列),则该组将被标记为完成。此组的任何新消息都将发送到丢弃通道(如果已定义)。将 expire-groups-upon-completion
设置为 true
(默认值为 false
)会移除整个组,任何新消息(与已移除组具有相同相关 ID 的消息)将形成一个新组。您可以通过使用 MessageGroupStoreReaper
并将 send-partial-result-on-expiry
设置为 true
来释放部分序列。
从 6.5 版本开始,相关处理器还可以使用 discardIndividuallyOnExpiry
选项配置为将整个组作为单个消息丢弃。实际上,此消息的有效负载是来自过期组的消息列表。仅在 sendPartialResultOnExpiry
设置为 false
(默认值)且提供了 dicardChannel
时有效。
为了便于丢弃迟到的消息,聚合器必须在组释放后仍保持其状态。这最终可能会导致内存不足的情况。为了避免此类情况,您应该考虑配置一个 MessageGroupStoreReaper
来移除组元数据。过期参数应设置为在达到某个时间点后过期,该时间点之后不期望有迟到的消息到达。有关配置收割机的信息,请参阅 聚合器中的状态管理:MessageGroupStore。
Spring Integration 提供了 ReleaseStrategy
的一个实现:SimpleSequenceSizeReleaseStrategy
。此实现会参考每个到达的消息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
标头,以决定消息组何时完成并准备好进行聚合。如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是 SequenceSizeReleaseStrategy
,该策略在处理大型组时表现不佳。使用该策略时,会检测并拒绝重复的序列号。此操作可能代价高昂。
如果你正在聚合大型组,你不需要发布部分组,也不需要检测/拒绝重复的序列,则可以考虑使用 SimpleSequenceSizeReleaseStrategy
- 在这些用例中它要高效得多,并且自 version 5.0 起,在未指定部分组发布时它是默认选项。
聚合大组
4.3 版本将 SimpleMessageGroup
中消息的默认 Collection
更改为 HashSet
(之前是 BlockingQueue
)。当从大组中删除单个消息时,这种变化较为昂贵(需要 O(n) 的线性扫描)。尽管哈希集合通常更快地进行删除操作,但对于大消息而言,由于在插入和删除时都需要计算哈希值,这也可能变得昂贵。如果你的消息计算哈希值的成本很高,请考虑使用其他集合类型。正如在 使用 MessageGroupFactory 中所讨论的,提供了一个 SimpleMessageGroupFactory
,以便你可以选择最适合你需求的 Collection
。你也可以提供自己的工厂实现来创建其他类型的 Collection<Message<?>>
。
以下示例展示了如何使用之前的实现和 SimpleSequenceSizeReleaseStrategy
配置聚合器:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点涉及聚合器上游的流程,序列大小释放策略(固定或基于 sequenceSize
头)将无法发挥作用,因为序列中的一些消息可能会被过滤器丢弃。在这种情况下,建议选择另一种 ReleaseStrategy
,或者使用从丢弃子流发送的补偿消息,在其内容中携带一些信息以在自定义完成组函数中跳过。更多信息请参见 Filter。
相关性策略
CorrelationStrategy
接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个表示用于将消息与消息组关联的 correlation key 的 Object
。该键必须满足用作 Map
中键的标准,即关于 equals()
和 hashCode()
的实现。
一般来说,任何 POJO 都可以实现相关逻辑,将消息映射到方法参数(或多个参数)的规则与 ServiceActivator
相同(包括支持 @Header
注解)。该方法必须返回一个值,并且该值不能为 null
。
Spring Integration 提供了 CorrelationStrategy
的一个实现:HeaderAttributeCorrelationStrategy
。此实现将消息头之一的值(其名称由构造函数参数指定)作为相关键返回。默认情况下,相关策略是返回 CORRELATION_ID
头属性值的 HeaderAttributeCorrelationStrategy
。如果您有一个自定义的头名称希望用于相关,可以在 HeaderAttributeCorrelationStrategy
的实例上进行配置,并将其作为聚合器相关策略的引用提供。
锁注册表
对组的更改是线程安全的。因此,当您并发发送相同关联 ID 的消息时,聚合器中只会处理其中一个,这实际上使得它成为 每个消息组单线程。LockRegistry
用于为解析后的关联 ID 获取锁。默认使用 DefaultLockRegistry
(内存中)。为了在跨服务器同步更新时使用共享 MessageGroupStore
,您必须配置一个共享锁注册表。
避免死锁
如上所述,当消息组发生变更(消息被添加或释放)时,会持有 一个锁。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共的锁注册表,则可能会发生死锁。这将导致线程挂起,并且 jstack <pid>
可能会呈现如下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器都有自己的锁注册表(这可以在应用程序实例之间共享,但在流中的两个或多个聚合器必须各自拥有一个不同的注册表)
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流在新线程上运行 -
从版本 5.1.1 开始,将聚合器的
releaseLockBeforeSend
属性设置为true
如果由于某种原因,单个聚合器的输出最终路由回同一个聚合器,这也可能导致此问题。当然,在这种情况下,上述第一个解决方案不适用。
在 Java DSL 中配置 Aggregator
详见 聚合器和重排序器 ,了解如何在 Java DSL 中配置聚合器。
使用 XML 配置聚合器
Spring Integration 支持通过 <aggregator/>
元素使用 XML 配置聚合器。以下示例显示了一个聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" // <1>
auto-startup="true" // <2>
input-channel="inputChannel" // <3>
output-channel="outputChannel" // <4>
discard-channel="throwAwayChannel" // <5>
message-store="persistentMessageStore" // <6>
order="1" // <7>
send-partial-result-on-expiry="false" // <8>
send-timeout="1000" // <9>
correlation-strategy="correlationStrategyBean" // <10>
correlation-strategy-method="correlate" // <11>
correlation-strategy-expression="headers['foo']" // <12>
ref="aggregatorBean" // <13>
method="aggregate" // <14>
release-strategy="releaseStrategyBean" // <15>
release-strategy-method="release" // <16>
release-strategy-expression="size() == 5" // <17>
expire-groups-upon-completion="false" // <18>
empty-group-min-timeout="60000" // <19>
lock-registry="lockRegistry" // <20>
group-timeout="60000" // <21>
group-timeout-expression="size() ge 2 ? 100 : -1" // <22>
expire-groups-upon-timeout="true" // <23>
scheduler="taskScheduler" > // <24>
<expire-transactional/> // <25>
<expire-advice-chain/> // <26>
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
聚合器的 id 是可选的。
生命周期属性,用于指示聚合器是否应在应用程序上下文启动时启动。可选(默认值为 'true')。
聚合器接收消息的通道。必需。
聚合器发送聚合结果的通道。可选(因为传入的消息可以在
replyChannel
消息头中指定回复通道)。聚合器发送超时消息的通道(如果
send-partial-result-on-expiry
为false
)。可选。引用一个
MessageGroupStore
,用于在它们的相关键下存储消息组,直到它们完成。可选。默认情况下,它是一个易失性内存存储。更多信息请参见 消息存储。当多个处理器订阅同一个
DirectChannel
时,此聚合器的顺序(用于负载均衡目的)。可选。表明过期的消息应该被聚合并发送到 'output-channel' 或 'replyChannel' 一旦包含它们的
MessageGroup
过期(参见 MessageGroupStore.expireMessageGroups(long))。一种使MessageGroup
过期的方法是配置一个MessageGroupStoreReaper
。然而,您也可以通过调用MessageGroupStore.expireMessageGroups(timeout)
来使MessageGroup
过期。可以通过控制总线操作或如果您有MessageGroupStore
实例的引用,调用expireMessageGroups(timeout)
来实现这一点。否则,该属性本身什么也不做。它仅作为指示符,表示是否丢弃或发送到输出或回复通道任何即将过期的MessageGroup
中仍然存在的消息。可选(默认值为false
)。注意:此属性可能更恰当地称为send-partial-result-on-timeout
,因为如果设置了expire-groups-upon-timeout
为false
,则组实际上可能不会过期。在将回复
Message
发送到output-channel
或discard-channel
时等待的超时间隔。默认为30
秒。只有当输出通道有一些“发送”限制时才会应用,例如具有固定 'capacity' 的QueueChannel
。在这种情况下,会抛出MessageDeliveryException
。对于AbstractSubscribableChannel
实现,send-timeout
将被忽略。对于group-timeout(-expression)
,来自计划过期任务的MessageDeliveryException
将导致此任务重新安排。可选。引用一个实现消息相关(分组)算法的 bean。该 bean 可以是
CorrelationStrategy
接口的实现或 POJO。如果是后者,则必须定义correlation-strategy-method
属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID
头)。定义在
correlation-strategy
引用的 bean 上的方法。它实现了相关决策算法。可选,但有限制(correlation-strategy
必须存在)。表示相关策略的 SpEL 表达式。示例:
"headers['something']"
。correlation-strategy
和correlation-strategy-expression
两者只能选择其一。引用在应用程序上下文中定义的 bean。该 bean 必须实现如前所述的聚合逻辑。可选(默认情况下,聚合消息列表成为输出消息的有效载荷)。
定义在
ref
属性引用的 bean 上的方法。它实现了消息聚合算法。可选(取决于ref
属性是否已定义)。引用一个实现释放策略的 bean。该 bean 可以是
ReleaseStrategy
接口的实现或 POJO。如果是后者,则必须定义release-strategy-method
属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头属性)。定义在
release-strategy
属性引用的 bean 上的方法。它实现了完成决策算法。可选,但有限制(release-strategy
必须存在)。表示释放策略的 SpEL 表达式。表达式的根对象是
MessageGroup
。示例:"size() == 5"
。release-strategy
和release-strategy-expression
两者只能选择其一。设置为
true
时(默认为false
),已完成的组将从消息存储中移除,允许后续具有相同相关性的消息形成新的组。默认行为是将具有相同相关性的消息作为已完成组发送到discard-channel
。仅适用于为
<aggregator>
的MessageStore
配置了MessageGroupStoreReaper
的情况。默认情况下,当MessageGroupStoreReaper
配置为过期部分组时,空组也会被移除。空组是在组正常释放后存在的。空组使得能够检测和丢弃迟到的消息。如果您希望以比过期间隔更长的时间表过期空组,请设置此属性。空组在未修改至少此数量的毫秒之前不会从MessageStore
中移除。请注意,实际过期空组的时间还受到收割者的timeout
属性的影响,可能会达到此值加上超时时间。引用一个
org.springframework.integration.util.LockRegistry
bean。它用于根据groupId
获取MessageGroup
并发操作的Lock
。默认情况下,使用内部DefaultLockRegistry
。使用分布式LockRegistry
(如ZookeeperLockRegistry
)确保只有一个聚合器实例可以并发地对组进行操作。有关更多信息,请参阅 Redis 锁注册表 或 Zookeeper 锁注册表。一个超时(以毫秒为单位),当当前消息到达时
ReleaseStrategy
不释放组时强制MessageGroup
完成。此属性为聚合器提供了一个内置的时间基础释放策略,当需要在MessageGroup
内没有新消息到达的超时时间内发出部分结果(或丢弃组)时。要设置从MessageGroup
创建时开始计算的超时,请参阅group-timeout-expression
信息。当新消息到达聚合器时,任何现有的ScheduledFuture<?>
对于其MessageGroup
都会被取消。如果ReleaseStrategy
返回false
(意味着不释放)且groupTimeout > 0
,则会安排一个新任务来过期该组。我们不建议将此属性设置为零(或负值)。这样做实际上会禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关更多信息,请参阅group-timeout-expression
。完成时采取的操作取决于ReleaseStrategy
和send-partial-group-on-expiry
属性。有关更多信息,请参阅 聚合器和组超时。它与 'group-timeout-expression' 属性互斥。评估为
groupTimeout
的 SpEL 表达式,MessageGroup
作为#root
评估上下文对象。用于调度MessageGroup
强制完成。如果表达式评估为null
,则不会安排完成。如果评估为零,则组会在当前线程上立即完成。实际上,这提供了动态的group-timeout
属性。例如,如果您希望在创建组后的 10 秒内强制完成MessageGroup
,可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis()
,其中timestamp
由MessageGroup.getTimestamp()
提供,MessageGroup
是#root
评估上下文对象。请记住,组创建时间和第一个到达消息的时间可能会因其他组过期属性的配置而有所不同。有关更多信息,请参阅group-timeout
。它与 'group-timeout' 属性互斥。当由于超时(或由
MessageGroupStoreReaper
)完成组时,默认情况下组会过期(完全删除)。迟到的消息会启动一个新组。您可以将此设置为false
以完成组但保留其元数据,以便迟到的消息被丢弃。空组可以使用MessageGroupStoreReaper
和empty-group-min-timeout
属性稍后过期。默认为 'true'。引用一个
TaskScheduler
bean,用于调度MessageGroup
在groupTimeout
内没有新消息到达时强制完成。如果没有提供,将使用默认调度器(taskScheduler
),即在ApplicationContext
中注册的ThreadPoolTaskScheduler
。如果未指定group-timeout
或group-timeout-expression
,此属性不适用。自 版本 4.1 起。它允许为
forceComplete
操作启动一个事务。它由group-timeout(-expression)
或MessageGroupStoreReaper
发起,并不应用于正常的add
、release
和discard
操作。只允许此子元素或<expire-advice-chain/>
。自 版本 4.1 起。它允许为
forceComplete
操作配置任何Advice
。它由group-timeout(-expression)
或MessageGroupStoreReaper
发起,并不应用于正常的add
、release
和discard
操作。只允许此子元素或<expire-transactional/>
。还可以在这里使用 Springtx
命名空间配置事务Advice
。
过期的组
有两个属性与过期(完全移除)组有关。当一个组过期时,不会保留其记录,并且如果带有相同关联的新消息到达,则会开始一个新的组。当一个组完成(不过期),空组将保留,并且晚到的消息会被丢弃。稍后可以通过使用 MessageGroupStoreReaper
并结合 empty-group-min-timeout
属性来删除空组。
expire-groups-upon-completion
与“正常”完成有关,即当 ReleaseStrategy
释放该组时。这默认为 false
。
如果一个组没有正常完成而是由于超时而被释放或丢弃,则该组通常会过期。从 4.1 版本开始,您可以使用 expire-groups-upon-timeout
来控制这种行为。它默认为 true
以保持向后兼容性。
当一个组超时时,ReleaseStrategy
会再获得一次机会来释放该组。如果它这样做并且 expire-groups-upon-timeout
为 false,则过期由 expire-groups-upon-completion
控制。如果在超时期间,该组未被释放策略释放,则过期由 expire-groups-upon-timeout
控制。超时的组要么被丢弃,要么发生部分释放(基于 send-partial-result-on-expiry
)。
自从 5.0 版本以来,空组也将在 empty-group-min-timeout
后被安排移除。如果 expireGroupsUponCompletion == false
且 minimumTimeoutForEmptyGroups > 0
,则在正常或部分序列释放时,会安排任务来移除该组。
从 5.4 版本开始,聚合器(和重排序器)可以配置为使孤立组过期(那些在持久消息存储中可能不会被释放的)。expireTimeout
(如果大于 0
)表示存储中比此值更旧的组应该被清除。purgeOrphanedGroups()
方法在启动时被调用,并且与提供的 expireDuration
一起定期在一个计划任务内执行。此方法也可以随时从外部调用。过期逻辑完全委托给根据上述提供的过期选项的 forceComplete(MessageGroup)
功能。这种周期性清除功能在需要从不再通过常规消息到达逻辑释放的老组中清理消息存储时非常有用。在大多数情况下,这发生在应用程序重启后,当使用持久消息组存储时。该功能类似于带有计划任务的 MessageGroupStoreReaper
,但提供了一种方便的方法来处理特定组件中的老组,当使用组超时而不是收割者时。MessageGroupStore
必须专门为当前相关端点提供。否则,一个聚合器可能会清除来自另一个的组。对于聚合器,使用此技术过期的组将根据 expireGroupsUponCompletion
属性被丢弃或作为部分组释放。
:::
我们通常建议使用 ref
属性,如果自定义聚合处理器实现可能在其他 <aggregator>
定义中被引用。然而,如果自定义聚合器实现仅在一个 <aggregator>
的单个定义中使用,则可以从版本 1.0.3 开始使用内部 bean 定义,在 <aggregator>
元素内配置聚合 POJO,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 <aggregator>
配置中使用 ref
属性和内部 bean 定义是不允许的,因为它会创建一个模糊的条件。在这种情况下,会抛出一个 Exception。
以下示例展示了聚合器 bean 的一种实现方式:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 的实现可能如下:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在任何合适的地方,可以将发布策略方法和聚合器方法组合成一个单一的 bean。
上述示例的相关策略bean的实现可能如下:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器会根据某些标准(在本例中,是除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字总和超过某个值。
在合理的情况下,可以将发布策略方法、相关策略方法和聚合器方法组合在一个 bean 中。(实际上,可以组合所有这些方法或其中的任意两个。)
聚合器和Spring表达式语言 (SpEL)
自从 Spring Integration 2.0 以来,你可以使用 SpEL 来处理各种策略(相关性、释放和聚合),如果此类释放策略背后的逻辑相对简单,我们建议使用 SpEL。假设你有一个旧组件,它被设计为接收对象数组。我们知道,默认的释放策略会在 List
中组装所有已聚合的消息。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每个消息的有效负载并组装对象数组。以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,这样的需求实际上可以通过一个一行表达式相对容易地处理,从而 spared you from writing a custom class and configuring it as a bean。以下示例展示了如何做到这一点:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在上述配置中,我们使用了一个 集合投影 表达式,从列表中的所有消息的有效负载组装一个新的集合,然后将其转换为数组,从而实现了与前面的 Java 代码相同的结果。
在处理自定义发布和相关策略时,可以采用相同的基于表达式的方法。
与其在 correlation-strategy
属性中定义一个自定义 CorrelationStrategy
的 bean,您可以将简单的相关逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression
属性中进行配置,如下例所示:
correlation-strategy-expression="payload.person.id"
在前面的例子中,我们假设有效载荷有一个 person
属性,其中包含一个 id
,它将被用来关联消息。
同样地,对于 ReleaseStrategy
,你可以将发布逻辑实现为 SpEL 表达式,并在 release-strategy-expression
属性中进行配置。评估上下文的根对象是 MessageGroup
本身。可以通过表达式中的 message
属性引用消息组的消息 List
。
在 5.0 版本之前,根对象是 Message<?>
的集合,如前面的例子所示:
release-strategy-expression="!messages.?[payload==5].empty"
在前面的例子中,SpEL 评估上下文的根对象是 MessageGroup
本身,你正在说明,一旦这个组中有一条消息的有效载荷为 5 ,该组就应该被释放。
聚合器和组超时
从 4.0 版本开始,引入了两个新的互斥属性:group-timeout
和 group-timeout-expression
。请参阅 使用 XML 配置聚合器。在某些情况下,如果 ReleaseStrategy
在当前消息到达时不释放,则您可能需要在超时后发出聚合器结果(或丢弃组)。为此,groupTimeout
选项允许调度 MessageGroup
强制完成,如下例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在这个例子中,如果聚合器接收到由 release-strategy-expression
定义的最后一个顺序消息,则可以进行正常释放。如果那个特定的消息没有到达,groupTimeout
会在十秒后强制组完成,只要组中至少包含两个消息。
强制组完成的结果取决于 ReleaseStrategy
和 send-partial-result-on-expiry
。首先,再次咨询释放策略,以确定是否进行正常释放。虽然组没有变化,但 ReleaseStrategy
可以决定在此时释放该组。如果释放策略仍然不释放该组,则认为它已过期。如果 send-partial-result-on-expiry
为 true
,则现有的消息在(部分)MessageGroup
中作为正常的聚合器回复消息释放到 output-channel
。否则,它将被丢弃。
groupTimeout
行为和 MessageGroupStoreReaper
之间存在差异(参见 使用 XML 配置聚合器)。收割者会定期为 MessageGroupStore
中的所有 MessageGroup
发起强制完成。groupTimeout
则是在每个 MessageGroup
上单独进行操作,如果在 groupTimeout
期间没有新的消息到达,则会触发该操作。此外,收割者可以用于移除空组(那些为了在 expire-groups-upon-completion
为 false 时丢弃迟到的消息而保留的组)。
从 5.5 版本开始,groupTimeoutExpression
可以被评估为 java.util.Date
实例。这在根据组创建时间(MessageGroup.getTimestamp()
)确定计划任务时刻的情况下很有用,而不是基于当前消息到达的时间,因为在 groupTimeoutExpression
被评估为 long
时是按后者计算的:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注解配置聚合器
以下示例展示了一个使用注解配置的聚合器:
public class Waiter {
...
@Aggregator 1
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy 2
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy 3
public String correlateBy(OrderItem item) {
...
}
}
一个注解,表明该方法应作为聚合器使用。如果此类用作聚合器,则必须指定此注解。
一个注解,表明该方法用作聚合器的释放策略。如果没有在任何方法上出现,聚合器将使用
SimpleSequenceSizeReleaseStrategy
。一个注解,表明该方法应作为聚合器的相关策略。如果没有指定相关策略,聚合器将使用基于
CORRELATION_ID
的HeaderAttributeCorrelationStrategy
。
XML 元素提供的所有配置选项也适用于 @Aggregator
注解。
聚合器可以显式地从 XML 中引用,或者如果类上定义了 @MessageEndpoint
,则可以通过类路径扫描自动检测。
注解配置(@Aggregator
等)仅适用于聚合器组件的简单使用场景,在这些场景中,大多数默认选项就足够了。如果您在使用注解配置时需要对这些选项进行更多控制,请考虑为 AggregatingMessageHandler
使用 @Bean
定义,并在其 @Bean
方法上标注 @ServiceActivator
,如下例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
更多信息请参见 编程模型 和 关于 @Bean 方法的注解。
从 4.2 版本开始,提供了 AggregatorFactoryBean
以简化 AggregatingMessageHandler
的 Java 配置。
在聚合器中管理状态:MessageGroupStore
聚合器(和 Spring Integration 中的一些其他模式)是一种有状态的模式,它需要根据在一段时间内到达的消息组做出决策,所有消息都具有相同的关联键。有状态模式接口(如 ReleaseStrategy
)的设计原则是,组件(无论是框架定义的还是用户定义的)应能够保持无状态。所有状态都由 MessageGroup
携带,并将其管理委托给 MessageGroupStore
。MessageGroupStore
接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
MessageGroupStore
在等待发布策略被触发时会在 MessageGroups
中累积状态信息,而该事件可能永远不会发生。因此,为了防止陈旧消息滞留,以及为了让易失存储在应用程序关闭时提供清理的钩子,MessageGroupStore
允许你注册回调,在其 MessageGroups
过期时应用这些回调。接口非常简单,如下所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,因此它可以管理持久状态(例如,通过从存储中完全删除该组)。
MessageGroupStore
维护着这些回调函数的列表,根据需要,将其应用于所有时间戳早于所提供参数时间的消息(参见前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
在不同的聚合器组件中,重要的是不要使用相同的 MessageGroupStore
实例,当你打算依赖于 expireMessageGroups
功能时。每个 AbstractCorrelatingMessageHandler
都会根据 forceComplete()
回调注册其自己的 MessageGroupCallback
。这样,每个过期的组可能会被错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler
使用 UniqueExpiryCallback
作为 MessageGroupStore
中的注册回调。反过来,MessageGroupStore
检查此类的实例是否存在,并在回调集中已经存在此类实例时记录带有适当消息的错误。通过这种方式,框架禁止在不同的聚合器/重排序器中使用 MessageGroupStore
实例,以避免上述由特定相关处理程序未创建的组过期所带来的副作用。
您可以使用超时值调用 expireMessageGroups
方法。任何比当前时间减去该值还旧的消息都会过期并应用回调。因此,消息组“过期”的定义由存储的使用者决定。
为了方便用户,Spring Integration 提供了 MessageGroupStoreReaper
形式的消息过期包装器,如下例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个 Runnable
。在前面的例子中,消息组存储的过期方法每十秒调用一次。超时时间本身是 30 秒。
重要的是要理解 MessageGroupStoreReaper
的 'timeout' 属性是一个近似值,并且会受到任务调度器速率的影响,因为此属性仅在下一次计划执行 MessageGroupStoreReaper
任务时进行检查。例如,如果超时设置为十分钟,但 MessageGroupStoreReaper
任务每小时运行一次,并且 MessageGroupStoreReaper
任务的上次执行是在超时前一分钟发生的,则 MessageGroup
在接下来的 59 分钟内不会过期。因此,我们建议将速率至少设置为等于超时值或更短。
除了收割者之外,当应用程序通过 AbstractCorrelatingMessageHandler
中的生命周期回调关闭时,过期回调也会被调用。
AbstractCorrelatingMessageHandler
注册了自己的过期回调,这是与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry
的关联。如果该标志设置为 true
,那么当过期回调被调用时,任何未标记的消息在尚未释放的组中都可以发送到输出通道。
由于 MessageGroupStoreReaper
是从计划任务中调用的,并且可能会根据 sendPartialResultOnExpiry
选项产生一条消息到下游集成流,因此建议提供一个带有 MessagePublishingErrorHandler
的自定义 TaskScheduler
以通过 errorChannel
处理异常,这可能是常规聚合器发布功能所期望的。相同的逻辑也适用于依赖于 TaskScheduler
的组超时功能。有关更多信息,请参阅 错误处理。
当为不同的关联端点使用共享的 MessageStore
时,必须配置适当的 CorrelationStrategy
以确保组 ID 的唯一性。否则,当一个关联端点释放或使其他消息过期时,可能会发生意外行为。具有相同关联键的消息存储在相同的消息组中。
一些 MessageStore
实现允许通过分区数据来使用相同的物理资源。例如,JdbcMessageStore
有一个 region
属性,而 MongoDbMessageStore
有一个 collectionName
属性。
有关 MessageStore
接口及其实现的更多信息,请参阅消息存储。
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler
组件。它是基于 Project Reactor 的 Flux.groupBy()
和 Flux.window()
操作符。传入的消息会通过 Flux.create()
在该组件的构造函数中初始化的 FluxSink
发出。如果未提供 outputChannel
或它不是 ReactiveStreamsSubscribableChannel
的实例,则从 Lifecycle.start()
实现中对主 Flux
进行订阅。否则,订阅将推迟到由 ReactiveStreamsSubscribableChannel
实现完成的订阅。消息通过 Flux.groupBy()
使用 CorrelationStrategy
作为分组键进行分组。默认情况下,会咨询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
头。
默认情况下,每个关闭的窗口都会作为一个 Flux
释放到要生成的消息的有效负载中。此消息包含窗口中第一个消息的所有头信息。输出消息有效负载中的这个 Flux
必须被订阅并在下游进行处理。这种逻辑可以通过 FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项进行自定义(或覆盖)。例如,如果我们希望在最终消息中有一个有效负载的 List
,我们可以像这样配置一个 Flux.collectList()
:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
在 FluxAggregatorMessageHandler
中有几种选项可以选择合适的窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
操作符。更多信息请参见其 JavaDocs。它优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
。默认情况下,窗口大小是从组中的第一个消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头计算得出的。 -
setWindowTimespan(Duration)
- 根据窗口大小配置传播到Flux.window(Duration)
或windowTimeout(int, Duration)
。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于对分组后的 flux 进行转换,以执行任何未被公开选项涵盖的自定义窗口操作。
由于此组件是 MessageHandler
的实现,因此可以简单地作为 @Bean
定义与 @ServiceActivator
消息注解一起使用。使用 Java DSL 时,可以从 .handle()
EIP 方法中使用它。下面的示例演示了如何在运行时注册一个 IntegrationFlow
,以及如何将 FluxAggregatorMessageHandler
与上游的拆分器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组的条件
从 5.5 版本开始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSL)提供了一个 groupConditionSupplier
选项,该选项是 BiFunction<Message<?>, String, String>
接口的实现。此函数会在每条消息添加到组时使用,并将结果条件语句存储在组中以供将来参考。ReleaseStrategy
可以咨询这个条件,而不需要遍历组中的所有消息。有关更多信息,请参阅 GroupConditionProvider
的 Java 文档和消息组条件。
另请参阅 文件聚合器。