跳到主要内容
版本:7.0.2

重排序器

DeepSeek V3 中英对照 Resequencer

重排序器与聚合器相关,但用途不同。聚合器负责合并消息,而重排序器则在不改变消息内容的情况下传递消息。

功能

重排序器的工作方式与聚合器类似,都是利用 CORRELATION_ID 将消息分组存储。不同之处在于,重排序器不会对消息进行任何处理,而是按照消息 SEQUENCE_NUMBER 头部的值顺序来释放它们。

因此,你可以选择一次性释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE 和其他可能性),或者在有效序列可用时立即释放。(我们将在本章后面解释“有效序列”的含义。)

important

重排序器旨在对具有较小间隔的相对较短的消息序列进行重新排序。如果您有大量具有许多间隔的不连续序列,可能会遇到性能问题。

配置重排序器

请参阅 聚合器和重排序器 了解如何在 Java DSL 中配置重排序器。

配置重排序器只需在XML中包含相应的元素。

以下示例展示了重新排序器的配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer" // <1>
input-channel="inputChannel" // <2>
output-channel="outputChannel" // <3>
discard-channel="discardChannel" // <4>
release-partial-sequences="true" // <5>
message-store="messageStore" // <6>
send-partial-result-on-expiry="true" // <7>
send-timeout="86420000" // <8>
correlation-strategy="correlationStrategyBean" // <9>
correlation-strategy-method="correlate" // <10>
correlation-strategy-expression="headers['something']" // <11>
release-strategy="releaseStrategyBean" // <12>
release-strategy-method="release" // <13>
release-strategy-expression="size() == 10" // <14>
empty-group-min-timeout="60000" // <15>

lock-registry="lockRegistry" // <16>

group-timeout="60000" // <17>
group-timeout-expression="size() ge 2 ? 100 : -1" // <18>
scheduler="taskScheduler" /> // <19>
expire-group-upon-timeout="false" /> // <20>
  • 重排器的 id 是可选的。

  • 重排器的输入通道。必需。

  • 重排器将重新排序后的消息发送到的通道。可选。

  • 重排器将超时的消息发送到的通道(如果 send-partial-result-on-timeout 设置为 false)。可选。

  • 是否在有序序列可用时立即发送,还是等到整个消息组到达后才发送。可选。(默认值为 false。)

  • MessageGroupStore 的引用,可用于存储按相关键分组的一组消息,直到它们完成。可选。(默认是一个易失性的内存存储。)

  • 在组过期时,是否应发送已排序的组(即使缺少某些消息)。可选。(默认值为 false。)请参阅聚合器中的状态管理:MessageGroupStore

  • output-channeldiscard-channel 发送回复 Message 时的超时间隔。仅当输出通道存在某些“发送”限制时才适用,例如具有固定“容量”的 QueueChannel。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 会被忽略。对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 会导致此任务被重新调度。可选。

  • 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现或 POJO。如果是后者,则还必须定义 correlation-strategy-method 属性。可选。(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头。)

  • correlation-strategy 引用的 bean 上定义的方法,用于实现关联决策算法。可选,但有约束(要求存在 correlation-strategy)。

  • 表示关联策略的 SpEL 表达式。例如:"headers['something']"。只允许 correlation-strategycorrelation-strategy-expression 中的一个。

  • 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现或 POJO。如果是后者,则还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。

  • release-strategy 引用的 bean 上定义的方法,用于实现完成决策算法。可选,但有约束(要求存在 release-strategy)。

  • 表示释放策略的 SpEL 表达式。表达式的根对象是 MessageGroup。例如:"size() == 5"。只允许 release-strategyrelease-strategy-expression 中的一个。

  • 仅当为 <resequencer>MessageStore 配置了 MessageGroupStoreReaper 时适用。默认情况下,当配置 MessageGroupStoreReaper 来使部分组过期时,空组也会被移除。空组在组正常释放后存在。这是为了能够检测和丢弃延迟到达的消息。如果您希望空组比部分组更晚过期,请设置此属性。然后,空组在至少此毫秒数内未被修改之前,不会从 MessageStore 中移除。请注意,空组过期的实际时间还受收割器的超时属性影响,可能为此值加上超时时间。

  • 请参阅使用 XML 配置聚合器

  • 请参阅使用 XML 配置聚合器

  • 请参阅使用 XML 配置聚合器

  • 请参阅使用 XML 配置聚合器

  • 默认情况下,当组因超时(或由 MessageGroupStoreReaper)完成时,空组的元数据会被保留。延迟到达的消息会立即被丢弃。将此设置为 true 以完全移除该组。然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。由于导致超时的序列范围中存在“空洞”,新组永远不会正常释放。空组稍后可以通过使用 MessageGroupStoreReaperempty-group-min-timeout 属性来过期(完全移除)。从版本 5.0 开始,空组在 empty-group-min-timeout 时间过后也会被安排移除。默认值为 'false'。

另请参阅 聚合器过期组 了解更多信息。

备注

由于在 Java 类中无需为重新排序器实现自定义行为,因此不提供相关的注解支持。