Aggregator
基本上,聚合器是拆分器的镜像对应物,它是一种消息处理器,接收多条消息并将它们合并为一条消息。实际上,在包含拆分器的流水线中,聚合器通常是下游消费者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须持有待聚合的消息,并确定何时可以聚合完整的消息组。为此,它需要一个 MessageStore。
功能
聚合器通过关联和存储一组相关消息,直到该组消息被判定为完整。此时,聚合器通过处理整个消息组创建一个单一消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即从多条消息中创建单条消息)。两个相关概念是关联和释放。
相关性决定了消息如何被分组以进行聚合。在 Spring Integration 中,默认情况下,相关性是基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头来确定的。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的消息会被分组在一起。然而,您可以自定义相关性策略,以允许通过其他方式指定消息应如何分组。为此,您可以实现一个 CorrelationStrategy(本章稍后将介绍)。
要确定一组消息何时准备好被处理,需要咨询 ReleaseStrategy。聚合器的默认释放策略是:当序列中包含的所有消息都到达时(基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头部)释放该组。您可以通过提供对自定义 ReleaseStrategy 实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成:
-
接口
MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
接口
ReleaseStrategy及其默认实现:SimpleSequenceSizeReleaseStrategy -
接口
CorrelationStrategy及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler(AbstractCorrelatingMessageHandler 的子类)是一个 MessageHandler 实现,它封装了聚合器(以及其他关联用例)的通用功能,具体如下:
- 将消息关联到一个待聚合的群组中
- 在
MessageStore中维护这些消息,直到群组可以释放 - 决定群组何时可以释放
- 将已释放的群组聚合成一条消息
- 识别并响应已过期的群组
决定消息应如何分组的职责被委托给一个 CorrelationStrategy 实例。决定消息组是否可以释放的职责被委托给一个 ReleaseStrategy 实例。
下面的清单展示了基础 AbstractAggregatingMessageGroupProcessor 的简要要点(实现 aggregatePayloads 方法的责任留给了开发者):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 DefaultAggregatingMessageGroupProcessor、ExpressionEvaluatingMessageGroupProcessor 和 MethodInvokingMessageGroupProcessor,它们作为 AbstractAggregatingMessageGroupProcessor 的开箱即用实现。
从 5.2 版本开始,AbstractAggregatingMessageGroupProcessor 提供了一种 Function<MessageGroup, Map<String, Object>> 策略,用于合并和计算(聚合)输出消息的头部信息。框架提供了 DefaultAggregateHeadersFunction 实现,其逻辑是返回组内所有无冲突的头部;组内一条或多条消息中缺失的头部不被视为冲突。冲突的头部将被省略。除了新引入的 DelegatingMessageGroupProcessor,此函数可用于任何任意的(非 AbstractAggregatingMessageGroupProcessor)MessageGroupProcessor 实现。本质上,框架会将提供的函数注入到 AbstractAggregatingMessageGroupProcessor 实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor 中。AbstractAggregatingMessageGroupProcessor 与 DelegatingMessageGroupProcessor 在逻辑上的区别在于,后者不会在调用委托策略之前预先计算头部,并且如果委托返回 Message 或 AbstractIntegrationMessageBuilder,则不会调用该函数。在这种情况下,框架假定目标实现已负责生成一组正确的头部并填充到返回的结果中。Function<MessageGroup, Map<String, Object>> 策略在 XML 配置中可作为 headers-function 引用属性使用,在 Java DSL 中可作为 AggregatorSpec.headersFunction() 选项使用,在纯 Java 配置中则通过 AggregatorFactoryBean.setHeadersFunction() 进行设置。
CorrelationStrategy 由 AbstractCorrelatingMessageHandler 持有,其默认值基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头,如下例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor。它会创建一个单一的 Message,其有效载荷是给定组接收到的有效载荷的 List。这对于上游有拆分器、发布-订阅通道或收件人列表路由器的简单分散-收集实现来说效果很好。
在此类场景中使用发布-订阅通道或收件人列表路由器时,请确保启用 apply-sequence 标志。这样做会添加必要的头部信息:CORRELATION_ID、SEQUENCE_NUMBER 和 SEQUENCE_SIZE。在 Spring Integration 中,拆分器默认启用此行为,但发布-订阅通道或收件人列表路由器不会默认启用,因为这些组件可能用于各种不需要这些头部信息的场景。
在应用程序中实现特定的聚合器策略时,你可以扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads 方法。然而,对于实现聚合逻辑,存在更好的、与 API 耦合度更低的解决方案,这些方案可以通过 XML 或注解进行配置。
通常来说,任何POJO都可以实现聚合算法,只要它提供一个接受单个java.util.List作为参数的方法(也支持参数化列表)。该方法按以下方式被调用来聚合消息:
-
如果参数是
java.util.Collection<T>且参数类型 T 可分配给Message,则聚合器会将聚合过程中累积的整个消息列表发送给聚合器。 -
如果参数是非参数化的
java.util.Collection或参数类型不可分配给Message,则该方法接收累积消息的有效载荷。 -
如果返回类型不可分配给
Message,则将其视为由框架自动创建的Message的有效载荷。
为了代码简洁性并推广低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过 POJO,并使用 XML 或注解支持在应用程序中配置它。
从版本5.3开始,在处理完一个消息组后,AbstractCorrelatingMessageHandler 会执行 MessageBuilder.popSequenceDetails() 消息头修改,以支持具有多个嵌套级别的正确拆分器-聚合器场景。仅当消息组释放结果不是消息集合时,才会执行此操作。在这种情况下,目标 MessageGroupProcessor 负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()。
如果 MessageGroupProcessor 返回一个 Message,仅当 sequenceDetails 与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()。(此前,仅当 MessageGroupProcessor 返回普通负载或 AbstractIntegrationMessageBuilder 时才会执行此操作。)
此功能可通过一个新的 popSequence boolean 属性进行控制,从而在某些场景下(当标准拆分器未填充关联详情时)禁用 MessageBuilder.popSequenceDetails()。该属性本质上会撤销最近的 AbstractMessageSplitter 中 applySequence = true 上游操作所执行的内容。更多信息请参阅拆分器。
SimpleMessageGroup.getMessages() 方法返回一个 unmodifiableCollection。因此,如果一个聚合 POJO 方法有一个 Collection<Message> 参数,传入的参数正是那个 Collection 实例,并且当你为聚合器使用 SimpleMessageStore 时,该原始的 Collection<Message> 在释放组之后会被清空。因此,如果 POJO 中的 Collection<Message> 变量被传递出聚合器,它也会被清空。如果你希望简单地按原样释放该集合以供进一步处理,你必须构建一个新的 Collection(例如,new ArrayList<Message>(messages))。从版本 4.3 开始,框架不再将消息复制到新的集合中,以避免不必要的额外对象创建。
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor。只有 POJO 方法可用于聚合。现在,如果框架检测到引用的(或内部的)bean 实现了 MessageProcessor,则会将其用作聚合器的输出处理器。
若希望从自定义的 MessageGroupProcessor 中释放一组对象作为消息的有效载荷,您的类应继承 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads() 方法。
此外,自版本 4.2 起,还提供了一个 SimpleMessageGroupProcessor。它会返回消息组中的消息集合,如前所述,这会导致已释放的消息被单独发送。
这使得聚合器能够充当消息屏障,到达的消息会被暂存,直到释放策略触发并将消息组作为一系列独立消息释放。
从版本 6.0 开始,上述拆分行为仅在分组处理器为 SimpleMessageGroupProcessor 时生效。否则,对于任何其他返回 Collection<Message> 的 MessageGroupProcessor 实现,只会发出单个回复消息,其有效载荷为整个消息集合。这种逻辑由聚合器的规范目的决定——即按某个键收集请求消息并生成单个分组消息。
在6.5版本之前,如果MessageGroupProcessor(通常来自DSL的lambda表达式)返回一个有效负载集合,AbstractCorrelatingMessageHandler会因IllegalArgumentException而失败,提示仅支持消息集合。现在,这一限制已被取消,返回的有效负载集合将作为聚合器的单个回复消息发出,且仅包含最后一个请求消息的头部信息。若需要在返回有效负载集合的同时进行头部聚合,建议使用AbstractAggregatingMessageGroupProcessor实现,而非普通的MessageGroupProcessor函数式接口。
ReleaseStrategy
ReleaseStrategy 接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何POJO都可以实现完成决策逻辑,只要它提供一个接受单个java.util.List作为参数(也支持参数化列表)并返回布尔值的方法。该方法在每条新消息到达后被调用,以决定组是否完成,如下所示:
-
如果参数是
java.util.List<T>且参数类型T可分配给Message,则组中累积的所有消息都将发送给该方法。 -
如果参数是非参数化的
java.util.List或参数类型不可分配给Message,则该方法接收累积消息的有效载荷。 -
如果消息组已准备好进行聚合,则该方法必须返回
true,否则返回false。
以下示例展示了如何对 Message 类型的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何对 String 类型的 List 使用 @ReleaseStrategy 注解:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于POJO的发布策略会接收一个尚未发布消息的Collection(如果您需要访问整个Message)或一个负载对象的Collection(如果类型参数不是Message)。这满足了大多数使用场景。然而,如果出于某些原因,您需要访问完整的MessageGroup,则应提供ReleaseStrategy接口的实现。
在处理可能较大的分组时,您应当理解这些方法是如何被调用的,因为在分组释放之前,释放策略可能会被多次调用。最高效的方式是实现 ReleaseStrategy,因为聚合器可以直接调用它。次高效的方式是使用带有 Collection<Message<?>> 参数类型的 POJO 方法。最低效的方式是使用带有 Collection<Something> 类型的 POJO 方法。每次调用释放策略时,框架都必须将分组中消息的有效载荷复制到一个新的集合中(并且可能尝试将有效载荷转换为 Something)。使用 Collection<?> 可以避免转换,但仍需要创建新的 Collection。
基于这些原因,对于大型分组,我们建议您实现 ReleaseStrategy。
当分组被释放以进行聚合时,其所有尚未释放的消息都会被处理并从分组中移除。如果该分组也已完成(即序列中的所有消息都已到达,或者未定义序列),则该分组被标记为完成。此分组的任何新消息将被发送到丢弃通道(如果已定义)。将 expire-groups-upon-completion 设置为 true(默认为 false)会移除整个分组,任何新消息(与已移除分组具有相同关联 ID)将形成一个新的分组。您可以通过使用 MessageGroupStoreReaper 并将 send-partial-result-on-expiry 设置为 true 来释放部分序列。
从版本 6.5 开始,关联处理器还可以通过 discardIndividuallyOnExpiry 选项配置为将整个组作为单个消息丢弃。本质上,此消息的有效负载是来自过期组的消息列表。仅当 sendPartialResultOnExpiry 设置为 false(默认值)且提供了 dicardChannel 时才有效。
为便于丢弃延迟到达的消息,聚合器必须在分组释放后仍保留其状态。这最终可能导致内存不足的情况。为避免此类情况,应考虑配置 MessageGroupStoreReaper 来移除分组元数据。过期参数应设置为在达到某个时间点后使分组过期,此后不应再有延迟消息到达。有关配置收割器的信息,请参阅聚合器中的状态管理:MessageGroupStore。
Spring Integration 为 ReleaseStrategy 提供了一个实现:SimpleSequenceSizeReleaseStrategy。该实现通过检查每条到达消息的 SEQUENCE_NUMBER 和 SEQUENCE_SIZE 头部信息,来决定消息组何时完成并准备进行聚合。如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是 SequenceSizeReleaseStrategy,该策略在处理大分组时表现不佳。使用该策略时,重复的序列号会被检测并拒绝。此操作可能开销较大。
若您正在聚合大型分组,且无需释放部分分组,也无需检测/拒绝重复序列,请考虑改用 SimpleSequenceSizeReleaseStrategy——对于此类使用场景,该策略效率更高,并且自 5.0 版本起,在未指定部分分组释放时已成为默认策略。
聚合大型分组
4.3 版本将 SimpleMessageGroup 中消息的默认 Collection 更改为 HashSet(之前是 BlockingQueue)。当从大型组中移除单个消息时,这可能会带来较高的开销(需要 O(n) 的线性扫描)。尽管哈希集在移除操作上通常要快得多,但对于大型消息来说,它可能会比较昂贵,因为必须在插入和移除时都计算哈希值。如果你的消息计算哈希值开销较大,请考虑使用其他集合类型。如使用 MessageGroupFactory 中所述,我们提供了一个 SimpleMessageGroupFactory,以便你可以选择最适合需求的 Collection。你也可以提供自己的工厂实现来创建其他类型的 Collection<Message<?>>。
以下示例展示了如何配置一个聚合器,该聚合器使用先前的实现以及一个 SimpleSequenceSizeReleaseStrategy:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点位于聚合器的上游流程中,序列大小释放策略(固定或基于 sequenceSize 头信息)将无法达到预期效果,因为序列中的某些消息可能会被过滤器丢弃。在这种情况下,建议选择另一种 ReleaseStrategy,或者使用从丢弃子流程发送的补偿消息,这些消息在其内容中携带一些信息,以便在自定义的完整分组函数中跳过。更多信息请参阅 Filter。
关联策略
CorrelationStrategy 接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个表示关联消息与消息组所用相关键的 Object。该键必须满足 Map 中键的准则,涉及 equals() 和 hashCode() 方法的实现。
通常,任何POJO都可以实现关联逻辑,将消息映射到方法参数(或多个参数)的规则与ServiceActivator相同(包括支持@Header注解)。该方法必须返回一个值,且该值不能为null。
Spring Integration 为 CorrelationStrategy 提供了一个实现:HeaderAttributeCorrelationStrategy。该实现将消息头(其名称由构造函数参数指定)的值作为关联键返回。默认情况下,关联策略是一个 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 头属性的值。如果您希望使用自定义的头部名称进行关联,可以在 HeaderAttributeCorrelationStrategy 的实例上进行配置,并将其作为聚合器关联策略的引用提供。
锁注册表
对组的更改是线程安全的。因此,当你并发地为同一关联ID发送消息时,聚合器中只会处理其中一条消息,使其实际上成为每个消息组的单线程处理。系统使用 LockRegistry 来获取已解析关联ID的锁。默认情况下使用 DefaultLockRegistry(内存中)。当跨服务器使用共享的 MessageGroupStore 进行同步更新时,必须配置一个共享锁注册表。
避免死锁
如上所述,当消息组发生变更(添加或释放消息)时,会持有锁。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,并且聚合器共享一个公共的锁注册表,则可能发生死锁。这将导致线程挂起,此时执行 jstack <pid> 可能会显示类似以下的结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器拥有独立的锁注册表(该注册表可在应用实例间共享,但流程中的两个或多个聚合器必须各自拥有不同的注册表)
-
使用
ExecutorChannel或QueueChannel作为聚合器的输出通道,以便下游流程在新线程上运行 -
从 5.1.1 版本开始,将聚合器的
releaseLockBeforeSend属性设置为true
如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。当然,上述第一种解决方案在这种情况下并不适用。
在 Java DSL 中配置聚合器
请参阅 聚合器和重排序器 了解如何在 Java DSL 中配置聚合器。
使用 XML 配置聚合器
Spring Integration 支持通过 <aggregator/> 元素使用 XML 配置聚合器。以下示例展示了一个聚合器的例子:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" // <1>
auto-startup="true" // <2>
input-channel="inputChannel" // <3>
output-channel="outputChannel" // <4>
discard-channel="throwAwayChannel" // <5>
message-store="persistentMessageStore" // <6>
order="1" // <7>
send-partial-result-on-expiry="false" // <8>
send-timeout="1000" // <9>
correlation-strategy="correlationStrategyBean" // <10>
correlation-strategy-method="correlate" // <11>
correlation-strategy-expression="headers['foo']" // <12>
ref="aggregatorBean" // <13>
method="aggregate" // <14>
release-strategy="releaseStrategyBean" // <15>
release-strategy-method="release" // <16>
release-strategy-expression="size() == 5" // <17>
expire-groups-upon-completion="false" // <18>
empty-group-min-timeout="60000" // <19>
lock-registry="lockRegistry" // <20>
group-timeout="60000" // <21>
group-timeout-expression="size() ge 2 ? 100 : -1" // <22>
expire-groups-upon-timeout="true" // <23>
scheduler="taskScheduler" > // <24>
<expire-transactional/> // <25>
<expire-advice-chain/> // <26>
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
聚合器的 id 是可选的。
生命周期属性,指示聚合器是否应在应用程序上下文启动期间启动。可选(默认为 'true')。
聚合器接收消息的通道。必需。
聚合器发送聚合结果到的通道。可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。
聚合器发送超时消息到的通道(如果
send-partial-result-on-expiry为false)。可选。对
MessageGroupStore的引用,用于在关联键下存储消息组,直到它们完成。可选。默认情况下,它是一个易失性的内存存储。更多信息请参见消息存储。当多个处理器订阅同一个
DirectChannel时,此聚合器的顺序(用于负载均衡目的)。可选。指示一旦包含的
MessageGroup过期,过期的消息应被聚合并发送到 'output-channel' 或 'replyChannel'(参见 MessageGroupStore.expireMessageGroups(long))。使MessageGroup过期的一种方法是配置MessageGroupStoreReaper。但是,您也可以通过调用MessageGroupStore.expireMessageGroups(timeout)来使MessageGroup过期。您可以通过控制总线操作来实现这一点,或者,如果您有MessageGroupStore实例的引用,也可以通过调用expireMessageGroups(timeout)来实现。否则,仅凭此属性本身不会执行任何操作。它仅用作一个指示器,指示对于即将过期的MessageGroup中仍然存在的任何消息,是丢弃还是发送到输出或回复通道。可选(默认为false)。注意:此属性可能更恰当地称为send-partial-result-on-timeout,因为如果expire-groups-upon-timeout设置为false,组实际上可能不会过期。向
output-channel或discard-channel发送回复Message时的超时间隔。默认为30秒。仅当输出通道有一些“发送”限制时才应用,例如具有固定“容量”的QueueChannel。在这种情况下,会抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,send-timeout被忽略。对于group-timeout(-expression),来自计划到期任务的MessageDeliveryException会导致此任务被重新调度。可选。对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是
CorrelationStrategy接口的实现或 POJO。在后一种情况下,还必须定义correlation-strategy-method属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID头)。在
correlation-strategy引用的 bean 上定义的方法。它实现关联决策算法。可选,但有约束(必须存在correlation-strategy)。表示关联策略的 SpEL 表达式。例如:
"headers['something']"。只允许correlation-strategy或correlation-strategy-expression中的一个。对应用程序上下文中定义的 bean 的引用。该 bean 必须实现如前所述的聚合逻辑。可选(默认情况下,聚合消息的列表成为输出消息的有效负载)。
在
ref属性引用的 bean 上定义的方法。它实现消息聚合算法。可选(取决于是否定义了ref属性)。对实现释放策略的 bean 的引用。该 bean 可以是
ReleaseStrategy接口的实现或 POJO。在后一种情况下,还必须定义release-strategy-method属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头属性)。在
release-strategy属性引用的 bean 上定义的方法。它实现完成决策算法。可选,但有约束(必须存在release-strategy)。表示释放策略的 SpEL 表达式。表达式的根对象是
MessageGroup。例如:"size() == 5"。只允许release-strategy或release-strategy-expression中的一个。当设置为
true时(默认为false),已完成的组将从消息存储中移除,允许具有相同关联的后续消息形成新组。默认行为是将具有与已完成组相同关联的消息发送到discard-channel。仅当为
<aggregator>的MessageStore配置了MessageGroupStoreReaper时适用。默认情况下,当配置MessageGroupStoreReaper来使部分组过期时,空组也会被移除。组正常释放后,空组会存在。空组能够检测和丢弃迟到的消息。如果您希望空组在比部分组过期更长的计划上过期,请设置此属性。然后,空组在至少此毫秒数内未被修改之前,不会从MessageStore中移除。请注意,空组过期的实际时间还受到收割者timeout属性的影响,可能高达此值加上超时时间。对
org.springframework.integration.util.LockRegistrybean 的引用。它用于基于groupId获取Lock,以便对MessageGroup进行并发操作。默认情况下,使用内部的DefaultLockRegistry。使用分布式LockRegistry,例如ZookeeperLockRegistry,可以确保只有一个聚合器实例可以同时对组进行操作。更多信息请参见 Redis 锁注册表 或 Zookeeper 锁注册表。当当前消息到达时,如果
ReleaseStrategy没有释放组,则强制完成MessageGroup的超时时间(以毫秒为单位)。当需要发出部分结果(或丢弃组)时,如果MessageGroup在从最后一条消息到达时间开始计时的超时时间内没有新消息到达,此属性为聚合器提供了内置的基于时间的释放策略。要设置从MessageGroup创建时间开始计时的超时,请参见group-timeout-expression信息。当新消息到达聚合器时,其MessageGroup的任何现有ScheduledFuture<?>都会被取消。如果ReleaseStrategy返回false(意味着不释放)且groupTimeout > 0,则计划一个新任务来使组过期。我们不建议将此属性设置为零(或负值)。这样做实际上会禁用聚合器,因为每个消息组都会立即完成。但是,您可以通过使用表达式有条件地将其设置为零(或负值)。有关信息,请参见group-timeout-expression。完成期间采取的操作取决于ReleaseStrategy和send-partial-group-on-expiry属性。更多信息请参见聚合器和组超时。它与group-timeout-expression属性互斥。评估为
groupTimeout的 SpEL 表达式,其中MessageGroup作为#root评估上下文对象。用于调度强制完成MessageGroup。如果表达式评估为null,则不计划完成。如果评估为零,则组在当前线程上立即完成。实际上,这提供了一个动态的group-timeout属性。例如,如果您希望在组创建后经过 10 秒后强制完成MessageGroup,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中timestamp由MessageGroup.getTimestamp()提供,因为这里的MessageGroup是#root评估上下文对象。但请记住,根据其他组过期属性的配置,组的创建时间可能与第一条到达消息的时间不同。更多信息请参见group-timeout。与group-timeout属性互斥。当组由于超时(或由
MessageGroupStoreReaper)完成时,默认情况下组会过期(完全移除)。迟到的消息会启动一个新组。您可以将其设置为false以完成组但保留其元数据,以便丢弃迟到的消息。空组稍后可以使用MessageGroupStoreReaper和empty-group-min-timeout属性过期。默认为 'true'。一个
TaskSchedulerbean 引用,用于调度MessageGroup在groupTimeout内没有新消息到达时强制完成。如果未提供,则使用在ApplicationContext中注册的默认调度器(taskScheduler)(ThreadPoolTaskScheduler)。如果未指定group-timeout或group-timeout-expression,则此属性不适用。自版本 4.1 起。允许为
forceComplete操作启动事务。它由group-timeout(-expression)或MessageGroupStoreReaper发起,不应用于正常的add、release和discard操作。只允许此子元素或<expire-advice-chain/>。自版本 4.1 起。允许为
forceComplete操作配置任何Advice。它由group-timeout(-expression)或MessageGroupStoreReaper发起,不应用于正常的add、release和discard操作。只允许此子元素或<expire-transactional/>。也可以通过使用 Springtx命名空间在此处配置事务Advice。
过期组
有两个属性与过期(完全移除)组相关。当组过期时,不会留下任何记录,如果新消息到达且具有相同的关联性,则会启动一个新组。当组完成(未过期)时,空组会保留,延迟到达的消息将被丢弃。空组稍后可以通过结合使用 MessageGroupStoreReaper 和 empty-group-min-timeout 属性来移除。
expire-groups-upon-completion 涉及当 ReleaseStrategy 释放组时的“正常”完成。默认值为 false。
如果组未正常完成,但因超时而被释放或丢弃,则组通常会过期。自版本 4.1 起,您可以使用 expire-groups-upon-timeout 来控制此行为。为了向后兼容,其默认值为 true。
当组超时时,ReleaseStrategy 会获得一次额外的机会来释放该组。如果它这样做了并且 expire-groups-upon-timeout 为 false,则过期由 expire-groups-upon-completion 控制。如果组在超时期间未被释放策略释放,则过期由 expire-groups-upon-timeout 控制。超时的组要么被丢弃,要么发生部分释放(基于 send-partial-result-on-expiry)。
自 5.0 版本起,空组也会在 empty-group-min-timeout 后计划移除。如果 expireGroupsUponCompletion == false 且 minimumTimeoutForEmptyGroups > 0,则当正常或部分序列释放发生时,会计划执行移除组的任务。
从版本5.4开始,聚合器(和重排序器)可以配置为过期孤立的组(那些在持久消息存储中可能无法被释放的组)。expireTimeout(如果大于0)表示存储中早于此值的组应该被清除。purgeOrphanedGroups() 方法在启动时被调用,并且与提供的 expireDuration 一起,在定时任务中定期调用。此方法也可以在任何时候从外部调用。过期逻辑完全委托给 forceComplete(MessageGroup) 功能,根据上述提供的过期选项。当需要从那些不再通过常规消息到达逻辑释放的旧组中清理消息存储时,这种定期清除功能非常有用。在大多数情况下,这发生在应用程序重启后,当使用持久消息组存储时。该功能类似于带有定时任务的 MessageGroupStoreReaper,但在使用组超时而不是收割器时,提供了一种方便的方式来处理特定组件内的旧组。必须为当前关联端点专门提供 MessageGroupStore。否则,一个聚合器可能会清除另一个聚合器的组。对于聚合器,使用此技术过期的组将被丢弃或作为部分组释放,具体取决于 expireGroupsUponCompletion 属性。
:::
我们通常建议,如果自定义聚合器处理程序实现可能在其他 <aggregator> 定义中被引用,则使用 ref 属性。然而,如果自定义聚合器实现仅被单个 <aggregator> 定义使用,您可以使用内部 bean 定义(从 1.0.3 版本开始)在 <aggregator> 元素内配置聚合 POJO,如下例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 <aggregator> 配置中同时使用 ref 属性和内部 bean 定义是不允许的,因为这会造成歧义。在这种情况下,会抛出异常。
以下示例展示了聚合器 bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 Bean 实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在合理的情况下,发布策略方法和聚合器方法可以合并为一个单一的 bean。
以上示例中关联策略 Bean 的实现方式如下:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面例子中的聚合器会根据某个标准(本例中是除以十后的余数)对数字进行分组,并保持该分组直到有效载荷提供的数字总和超过某个特定值。
在合理的情况下,发布策略方法、关联策略方法和聚合器方法可以组合在单个 bean 中。(实际上,它们中的全部或任意两个都可以组合。)
聚合器与Spring表达式语言(SpEL)
自 Spring Integration 2.0 起,你可以通过 SpEL 来处理各种策略(关联、释放和聚合)。如果此类释放策略背后的逻辑相对简单,我们推荐使用这种方式。假设你有一个遗留组件,它被设计为接收一个对象数组。我们知道默认的释放策略会将所有聚合的消息组装到 List 中。现在我们面临两个问题:首先,我们需要从列表中提取单个消息;其次,我们需要提取每条消息的有效载荷,并组装成对象数组。以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用SpEL,这样的需求实际上可以通过一行表达式相对轻松地处理,从而避免编写自定义类并将其配置为bean。以下示例展示了如何实现:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在上述配置中,我们使用了集合投影表达式,从列表中所有消息的有效载荷中组装出一个新的集合,然后将其转换为数组,从而实现了与之前 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以采用相同的基于表达式的方法。
与其在 correlation-strategy 属性中为自定义的 CorrelationStrategy 定义 bean,您可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression 属性中进行配置,如下例所示:
correlation-strategy-expression="payload.person.id"
在前面的例子中,我们假设负载数据包含一个带有 id 属性的 person 对象,该 id 将用于关联消息。
同样地,对于 ReleaseStrategy,您可以将释放逻辑实现为 SpEL 表达式,并在 release-strategy-expression 属性中进行配置。评估上下文的根对象是 MessageGroup 本身。消息的 List 可以通过在表达式中使用组的 message 属性来引用。
在 5.0 版本之前的发行版中,根对象是 Message<?> 的集合,如之前的示例所示:
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup 本身,这表明一旦该组中存在有效载荷为 5 的消息,该组就应该被释放。
聚合器与组超时
从 4.0 版本开始,引入了两个新的互斥属性:group-timeout 和 group-timeout-expression。请参阅使用 XML 配置聚合器。在某些情况下,如果当前消息到达时 ReleaseStrategy 未触发释放,你可能需要在超时后发出聚合器结果(或丢弃该组)。为此,groupTimeout 选项允许强制调度 MessageGroup 完成,如下例所示:
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在这个示例中,如果聚合器按照 release-strategy-expression 的定义接收到序列中的最后一条消息,则可以进行正常释放。如果该特定消息未到达,只要组中包含至少两条消息,groupTimeout 将在十秒后强制完成该组。
强制完成组的结果取决于 ReleaseStrategy 和 send-partial-result-on-expiry。首先,再次咨询释放策略以确定是否进行正常释放。虽然组未发生变化,但 ReleaseStrategy 可以决定此时释放该组。如果释放策略仍不释放该组,则该组将过期。若 send-partial-result-on-expiry 为 true,则(部分)MessageGroup 中的现有消息将作为正常的聚合器回复消息释放到 output-channel。否则,该消息将被丢弃。
groupTimeout 行为与 MessageGroupStoreReaper 之间存在差异(请参阅使用 XML 配置聚合器)。收割器会定期强制完成 MessageGroupStore 中的所有 MessageGroup。而 groupTimeout 则针对每个 MessageGroup 单独执行,如果在 groupTimeout 期间没有新消息到达,则会触发强制完成。此外,收割器还可用于移除空组(当 expire-groups-upon-completion 为 false 时,为丢弃延迟消息而保留的组)。
从版本 5.5 开始,groupTimeoutExpression 可以解析为 java.util.Date 实例。这在某些场景下非常有用,例如基于消息组创建时间(MessageGroup.getTimestamp())来确定计划任务执行时刻,而不是像 groupTimeoutExpression 解析为 long 类型时那样基于当前消息到达时间进行计算:
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
通过注解配置聚合器
以下示例展示了一个通过注解配置的聚合器:
public class Waiter {
...
@Aggregator 1
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy 2
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy 3
public String correlateBy(OrderItem item) {
...
}
}
一个指示此方法应作为聚合器使用的注解。如果此类被用作聚合器,则必须指定此注解。
一个指示此方法用作聚合器释放策略的注解。如果任何方法上均未出现此注解,则聚合器将使用
SimpleSequenceSizeReleaseStrategy。一个指示此方法应作为聚合器关联策略使用的注解。如果未指定关联策略,则聚合器将使用基于
CORRELATION_ID的HeaderAttributeCorrelationStrategy。
@Aggregator 注解同样支持 XML 元素提供的所有配置选项。
聚合器既可以从XML中显式引用,也可以通过类路径扫描自动检测(前提是类上定义了@MessageEndpoint注解)。
聚合器组件的注解配置(@Aggregator 及其他注解)仅适用于简单场景,此时大多数默认选项已足够。若在使用注解配置时需要更精细地控制这些选项,可考虑为 AggregatingMessageHandler 创建 @Bean 定义,并在其 @Bean 方法上标注 @ServiceActivator,如下例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
更多信息请参见编程模型和@Bean 方法上的注解。
从版本 4.2 开始,AggregatorFactoryBean 可用于简化 AggregatingMessageHandler 的 Java 配置。
管理聚合器中的状态:MessageGroupStore
聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它需要基于一段时间内到达的、具有相同关联键的一组消息做出决策。有状态模式(例如 ReleaseStrategy)中的接口设计遵循这样一个原则:组件(无论是框架定义的还是用户定义的)应能够保持无状态。所有状态都由 MessageGroup 承载,其管理委托给 MessageGroupStore。MessageGroupStore 接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
更多信息,请参阅 Javadoc。
MessageGroupStore 在等待释放策略触发时,会在 MessageGroup 中累积状态信息,而该事件可能永远不会发生。因此,为了防止陈旧消息滞留,并为易失性存储提供在应用程序关闭时进行清理的钩子,MessageGroupStore 允许您注册回调,以便在其 MessageGroup 过期时应用。该接口非常直接,如下列表所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调函数可以直接访问存储和消息组,从而能够管理持久化状态(例如,通过完全从存储中移除该组)。
MessageGroupStore 维护着一个回调函数列表,它会根据需求将这些回调函数应用于所有时间戳早于指定参数时间的消息(请参阅之前描述的 registerMessageGroupExpiryCallback(..) 和 expireMessageGroups(..) 方法)。
重要的是,当你打算依赖 expireMessageGroups 功能时,不要在不同的聚合器组件中使用同一个 MessageGroupStore 实例。每个 AbstractCorrelatingMessageHandler 都会基于 forceComplete() 回调注册自己的 MessageGroupCallback。这样,每个待过期的组可能会被错误的聚合器完成或丢弃。从版本 5.0.10 开始,AbstractCorrelatingMessageHandler 使用 UniqueExpiryCallback 来注册 MessageGroupStore 中的回调。相应地,MessageGroupStore 会检查是否存在该类的实例,如果回调集合中已存在一个实例,则会记录错误并显示相应的消息。通过这种方式,框架禁止在不同的聚合器/重排序器中使用同一个 MessageGroupStore 实例,以避免由特定关联处理器创建的组在过期时产生上述副作用。
你可以调用 expireMessageGroups 方法并传入一个超时值。任何早于当前时间减去该值的消息组都将过期,并应用相应的回调函数。因此,消息组“过期”的具体含义由存储库的用户来定义。
为了方便用户,Spring Integration 提供了一个包装器来处理消息过期,即 MessageGroupStoreReaper,如下例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割器是一个 Runnable。在前面的例子中,消息组存储的 expire 方法每十秒被调用一次。超时时间本身为 30 秒。
需要理解的是,MessageGroupStoreReaper 的 'timeout' 属性是一个近似值,并且受到任务调度器速率的影响,因为该属性仅在 MessageGroupStoreReaper 任务下一次计划执行时才会被检查。例如,如果超时设置为十分钟,但 MessageGroupStoreReaper 任务计划每小时运行一次,并且 MessageGroupStoreReaper 任务的上一次执行发生在超时前一分钟,那么 MessageGroup 在接下来的 59 分钟内不会过期。因此,我们建议将速率设置为至少等于超时值或更短。
除了收割器,当应用程序通过 AbstractCorrelatingMessageHandler 中的生命周期回调关闭时,也会调用过期回调。
AbstractCorrelatingMessageHandler 会注册自己的过期回调函数,这与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry 相关联。如果该标志设置为 true,那么当过期回调被调用时,任何尚未释放的组中未标记的消息都可以被发送到输出通道。
由于 MessageGroupStoreReaper 是从一个定时任务中调用的,并且可能会产生消息(取决于 sendPartialResultOnExpiry 选项)发送到下游的集成流,因此建议提供一个自定义的 TaskScheduler,并配置一个 MessagePublishingErrorHandler 来通过 errorChannel 处理异常,这与常规聚合器释放功能的预期处理方式一致。同样的逻辑也适用于同样依赖 TaskScheduler 的组超时功能。更多信息请参阅错误处理。
当共享的 MessageStore 被用于不同的关联端点时,必须配置适当的 CorrelationStrategy 以确保组 ID 的唯一性。否则,当一个关联端点释放或过期来自其他端点的消息时,可能会发生意外行为。具有相同关联键的消息存储在同一个消息组中。
某些 MessageStore 实现允许通过数据分区来使用相同的物理资源。例如,JdbcMessageStore 具有 region 属性,而 MongoDbMessageStore 具有 collectionName 属性。
有关 MessageStore 接口及其实现的更多信息,请参阅消息存储。
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler 组件。它基于 Project Reactor 的 Flux.groupBy() 和 Flux.window() 操作符。传入的消息会被发送到由该组件构造函数中 Flux.create() 初始化的 FluxSink 中。如果未提供 outputChannel 或其不是 ReactiveStreamsSubscribableChannel 的实例,则对主 Flux 的订阅会在 Lifecycle.start() 的实现中完成。否则,订阅会推迟到由 ReactiveStreamsSubscribableChannel 实现完成订阅时进行。消息通过 Flux.groupBy() 使用 CorrelationStrategy 作为分组键进行分组。默认情况下,会参考消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 头部信息。
默认情况下,每个关闭的窗口都会以消息负载中的 Flux 形式发布,用于生成消息。此消息包含窗口中第一条消息的所有头部信息。输出消息负载中的 Flux 必须在下游进行订阅和处理。这种逻辑可以通过 FluxAggregatorMessageHandler 的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置选项进行自定义(或替换)。例如,如果我们希望在最终消息中获得负载的 List,可以像这样配置 Flux.collectList():
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler 提供了多种选项来选择适当的窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)- 将传播至Flux.windowUntil()操作符。更多信息请参阅其 Javadocs。此选项的优先级高于所有其他窗口选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 将传播至Flux.window(int)或windowTimeout(int, Duration)。默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标头计算得出的。 -
setWindowTimespan(Duration)- 将传播至Flux.window(Duration)或windowTimeout(int, Duration),具体取决于窗口大小的配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于对分组后的 Flux 应用转换,以实现公开选项未涵盖的任何自定义窗口操作。
由于此组件是一个 MessageHandler 实现,它可以简单地与 @ServiceActivator 消息注解一起用作 @Bean 定义。在使用 Java DSL 时,可以通过 .handle() EIP 方法使用它。以下示例演示了如何在运行时注册 IntegrationFlow,以及如何将 FluxAggregatorMessageHandler 与上游的分流器关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组的条件
从版本5.5开始,AbstractCorrelatingMessageHandler(包括其Java和XML DSL)公开了一个BiFunction<Message<?>, String, String>实现的groupConditionSupplier选项。此函数用于添加到组中的每条消息,并将结果条件语句存储到组中以供后续使用。ReleaseStrategy可以查询此条件,而无需遍历组中的所有消息。更多信息请参阅GroupConditionProvider的Java文档以及消息组条件。
另请参阅 文件聚合器。