跳到主要内容

重排序器

QWen Plus 中英对照 Resequencer

重组器与聚合器相关,但用途不同。虽然聚合器组合消息,但重组器在不改变消息的情况下传递消息。

功能

重新排序器的工作方式与聚合器类似,因为它使用 CORRELATION_ID 将消息存储在组中。不同之处在于,重新排序器不会以任何方式处理消息。相反,它根据消息的 SEQUENCE_NUMBER 头值的顺序释放它们。

在这方面,你可以选择一次性发布所有消息(在根据 SEQUENCE_SIZE 完整序列之后,以及其他可能性)或是在有效序列可用时立即发布。(我们将在本章后面解释什么是“有效序列”。)

important

重新排序器旨在重新排列相对较短的消息序列,并且间隙较小。如果您有大量的不连续序列且有许多间隙,您可能会遇到性能问题。

配置 Resequencer

详见 聚合器和重排序器 以了解如何在 Java DSL 中配置重排序器。

配置一个重排序器只需要在 XML 中包含适当的元素。

以下示例显示了一个重新排序器配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer" // <1>
input-channel="inputChannel" // <2>
output-channel="outputChannel" // <3>
discard-channel="discardChannel" // <4>
release-partial-sequences="true" // <5>
message-store="messageStore" // <6>
send-partial-result-on-expiry="true" // <7>
send-timeout="86420000" // <8>
correlation-strategy="correlationStrategyBean" // <9>
correlation-strategy-method="correlate" // <10>
correlation-strategy-expression="headers['something']" // <11>
release-strategy="releaseStrategyBean" // <12>
release-strategy-method="release" // <13>
release-strategy-expression="size() == 10" // <14>
empty-group-min-timeout="60000" // <15>

lock-registry="lockRegistry" // <16>

group-timeout="60000" // <17>
group-timeout-expression="size() ge 2 ? 100 : -1" // <18>
scheduler="taskScheduler" /> // <19>
expire-group-upon-timeout="false" /> // <20>
xml
  • 重排序器的 id 是可选的。

  • 重排序器的输入通道。必需。

  • 重排序器发送重新排序的消息的通道。可选。

  • 重排序器发送超时消息的通道(如果 send-partial-result-on-timeout 设置为 false)。可选。

  • 是否在有序序列可用时立即发送出去,还是仅在完整的消息组到达后发送。可选。(默认值是 false。)

  • MessageGroupStore 的引用,可用于在它们完成之前根据相关键存储消息组。可选。(默认是一个易失性内存存储。)

  • 在组过期时,是否应发送有序的组(即使某些消息丢失)。可选。(默认是 false。)请参阅 聚合器中的状态管理:MessageGroupStore

  • 在将回复 Message 发送到 output-channeldiscard-channel 时等待的超时间隔。仅当输出通道有一些“发送”限制时才会应用此属性,例如具有固定“容量”的 QueueChannel。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 将导致此任务重新计划。可选。

  • 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后者的情况下,还必须定义 correlation-strategy-method 属性。可选。(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。)

  • 定义在由 correlation-strategy 引用的 bean 上并实现关联决策算法的方法。可选,但有限制(需要存在 correlation-strategy)。

  • 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只能使用 correlation-strategycorrelation-strategy-expression 其中之一。

  • 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后者的情况下,还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 标头属性)。

  • 定义在由 release-strategy 引用的 bean 上并实现完成决策算法的方法。可选,但有限制(需要存在 release-strategy)。

  • 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup。示例:"size() == 5"。只能使用 release-strategyrelease-strategy-expression 其中之一。

  • 仅适用于为 <resequencer> MessageStore 配置了 MessageGroupStoreReaper 的情况。默认情况下,当 MessageGroupStoreReaper 配置为使部分组过期时,空组也会被移除。空组是在组正常释放后存在的。这是为了启用对迟到消息的检测和丢弃。如果您希望以比部分组过期更长的时间表来过期空组,请设置此属性。然后,空组不会从 MessageStore 中移除,直到它们未被修改至少这个毫秒数。请注意,实际过期空组的时间也受收割者的超时属性影响,并且可能最多为此值加上超时。

  • 请参阅 使用 XML 配置聚合器

  • 请参阅 使用 XML 配置聚合器

  • 请参阅 使用 XML 配置聚合器

  • 请参阅 使用 XML 配置聚合器

  • 默认情况下,当由于超时(或通过 MessageGroupStoreReaper)完成一个组时,保留空组的元数据。迟到的消息将立即被丢弃。将此设置为 true 以完全移除该组。然后,迟到的消息将启动一个新组,并且不会被丢弃,直到该组再次超时。由于序列范围中的“空洞”导致超时,新组永远不会正常释放。可以通过使用 MessageGroupStoreReaper 并结合 empty-group-min-timeout 属性稍后过期(完全移除)空组。从 5.0 版开始,默认情况下,空组在 empty-group-min-timeout 到期后也将被安排移除。默认值是 'false'。

也请参阅 Aggregator Expiring Groups 以获取更多信息。

备注

由于重新排序器不需要在 Java 类中实现自定义行为,因此它没有注解支持。