重排序器
重组器与聚合器相关,但用途不同。虽然聚合器组合消息,但重组器在不改变消息的情况下传递消息。
功能
重新排序器的工作方式与聚合器类似,因为它使用 CORRELATION_ID
将消息存储在组中。不同之处在于,重新排序器不会以任何方式处理消息。相反,它根据消息的 SEQUENCE_NUMBER
头值的顺序释放它们。
在这方面,你可以选择一次性发布所有消息(在根据 SEQUENCE_SIZE
完整序列之后,以及其他可能性)或是在有效序列可用时立即发布。(我们将在本章后面解释什么是“有效序列”。)
重新排序器旨在重新排列相对较短的消息序列,并且间隙较小。如果您有大量的不连续序列且有许多间隙,您可能会遇到性能问题。
配置 Resequencer
详见 聚合器和重排序器 以了解如何在 Java DSL 中配置重排序器。
配置一个重排序器只需要在 XML 中包含适当的元素。
以下示例显示了一个重新排序器配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" // <1>
input-channel="inputChannel" // <2>
output-channel="outputChannel" // <3>
discard-channel="discardChannel" // <4>
release-partial-sequences="true" // <5>
message-store="messageStore" // <6>
send-partial-result-on-expiry="true" // <7>
send-timeout="86420000" // <8>
correlation-strategy="correlationStrategyBean" // <9>
correlation-strategy-method="correlate" // <10>
correlation-strategy-expression="headers['something']" // <11>
release-strategy="releaseStrategyBean" // <12>
release-strategy-method="release" // <13>
release-strategy-expression="size() == 10" // <14>
empty-group-min-timeout="60000" // <15>
lock-registry="lockRegistry" // <16>
group-timeout="60000" // <17>
group-timeout-expression="size() ge 2 ? 100 : -1" // <18>
scheduler="taskScheduler" /> // <19>
expire-group-upon-timeout="false" /> // <20>
重排序器的 id 是可选的。
重排序器的输入通道。必需。
重排序器发送重新排序的消息的通道。可选。
重排序器发送超时消息的通道(如果
send-partial-result-on-timeout
设置为false
)。可选。是否在有序序列可用时立即发送出去,还是仅在完整的消息组到达后发送。可选。(默认值是
false
。)对
MessageGroupStore
的引用,可用于在它们完成之前根据相关键存储消息组。可选。(默认是一个易失性内存存储。)在组过期时,是否应发送有序的组(即使某些消息丢失)。可选。(默认是 false。)请参阅 聚合器中的状态管理:MessageGroupStore。
在将回复
Message
发送到output-channel
或discard-channel
时等待的超时间隔。仅当输出通道有一些“发送”限制时才会应用此属性,例如具有固定“容量”的QueueChannel
。在这种情况下,会抛出MessageDeliveryException
。对于AbstractSubscribableChannel
实现,send-timeout
将被忽略。对于group-timeout(-expression)
,来自计划过期任务的MessageDeliveryException
将导致此任务重新计划。可选。对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是
CorrelationStrategy
接口的实现或 POJO。在后者的情况下,还必须定义correlation-strategy-method
属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。)定义在由
correlation-strategy
引用的 bean 上并实现关联决策算法的方法。可选,但有限制(需要存在correlation-strategy
)。表示关联策略的 SpEL 表达式。示例:
"headers['something']"
。只能使用correlation-strategy
或correlation-strategy-expression
其中之一。对实现释放策略的 bean 的引用。该 bean 可以是
ReleaseStrategy
接口的实现或 POJO。在后者的情况下,还必须定义release-strategy-method
属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
标头属性)。定义在由
release-strategy
引用的 bean 上并实现完成决策算法的方法。可选,但有限制(需要存在release-strategy
)。表示释放策略的 SpEL 表达式。表达式的根对象是一个
MessageGroup
。示例:"size() == 5"
。只能使用release-strategy
或release-strategy-expression
其中之一。仅适用于为
<resequencer>
MessageStore
配置了MessageGroupStoreReaper
的情况。默认情况下,当MessageGroupStoreReaper
配置为使部分组过期时,空组也会被移除。空组是在组正常释放后存在的。这是为了启用对迟到消息的检测和丢弃。如果您希望以比部分组过期更长的时间表来过期空组,请设置此属性。然后,空组不会从MessageStore
中移除,直到它们未被修改至少这个毫秒数。请注意,实际过期空组的时间也受收割者的超时属性影响,并且可能最多为此值加上超时。请参阅 使用 XML 配置聚合器。
请参阅 使用 XML 配置聚合器。
请参阅 使用 XML 配置聚合器。
请参阅 使用 XML 配置聚合器。
默认情况下,当由于超时(或通过
MessageGroupStoreReaper
)完成一个组时,保留空组的元数据。迟到的消息将立即被丢弃。将此设置为true
以完全移除该组。然后,迟到的消息将启动一个新组,并且不会被丢弃,直到该组再次超时。由于序列范围中的“空洞”导致超时,新组永远不会正常释放。可以通过使用MessageGroupStoreReaper
并结合empty-group-min-timeout
属性稍后过期(完全移除)空组。从 5.0 版开始,默认情况下,空组在empty-group-min-timeout
到期后也将被安排移除。默认值是 'false'。
也请参阅 Aggregator Expiring Groups 以获取更多信息。
由于重新排序器不需要在 Java 类中实现自定义行为,因此它没有注解支持。