重排序器
重排序器与聚合器相关,但用途不同。聚合器负责合并消息,而重排序器则在不改变消息内容的情况下传递消息。
功能
重排序器的工作方式与聚合器类似,都是利用 CORRELATION_ID 将消息分组存储。不同之处在于,重排序器不会对消息进行任何处理,而是按照消息 SEQUENCE_NUMBER 头部的值顺序来释放它们。
因此,你可以选择一次性释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE 和其他可能性),或者在有效序列可用时立即释放。(我们将在本章后面解释“有效序列”的含义。)
重排序器旨在对具有较小间隔的相对较短的消息序列进行重新排序。如果您有大量具有许多间隔的不连续序列,可能会遇到性能问题。
配置重排序器
请参阅 聚合器和重排序器 了解如何在 Java DSL 中配置重排序器。
配置重排序器只需在XML中包含相应的元素。
以下示例展示了重新排序器的配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" // <1>
input-channel="inputChannel" // <2>
output-channel="outputChannel" // <3>
discard-channel="discardChannel" // <4>
release-partial-sequences="true" // <5>
message-store="messageStore" // <6>
send-partial-result-on-expiry="true" // <7>
send-timeout="86420000" // <8>
correlation-strategy="correlationStrategyBean" // <9>
correlation-strategy-method="correlate" // <10>
correlation-strategy-expression="headers['something']" // <11>
release-strategy="releaseStrategyBean" // <12>
release-strategy-method="release" // <13>
release-strategy-expression="size() == 10" // <14>
empty-group-min-timeout="60000" // <15>
lock-registry="lockRegistry" // <16>
group-timeout="60000" // <17>
group-timeout-expression="size() ge 2 ? 100 : -1" // <18>
scheduler="taskScheduler" /> // <19>
expire-group-upon-timeout="false" /> // <20>
重排器的 id 是可选的。
重排器的输入通道。必需。
重排器将重新排序后的消息发送到的通道。可选。
重排器将超时的消息发送到的通道(如果
send-partial-result-on-timeout设置为false)。可选。是否在有序序列可用时立即发送,还是等到整个消息组到达后才发送。可选。(默认值为
false。)对
MessageGroupStore的引用,可用于存储按相关键分组的一组消息,直到它们完成。可选。(默认是一个易失性的内存存储。)在组过期时,是否应发送已排序的组(即使缺少某些消息)。可选。(默认值为 false。)请参阅聚合器中的状态管理:MessageGroupStore。
向
output-channel或discard-channel发送回复Message时的超时间隔。仅当输出通道存在某些“发送”限制时才适用,例如具有固定“容量”的QueueChannel。在这种情况下,会抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,send-timeout会被忽略。对于group-timeout(-expression),来自计划过期任务的MessageDeliveryException会导致此任务被重新调度。可选。对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是
CorrelationStrategy接口的实现或 POJO。如果是后者,则还必须定义correlation-strategy-method属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID头。)在
correlation-strategy引用的 bean 上定义的方法,用于实现关联决策算法。可选,但有约束(要求存在correlation-strategy)。表示关联策略的 SpEL 表达式。例如:
"headers['something']"。只允许correlation-strategy或correlation-strategy-expression中的一个。对实现释放策略的 bean 的引用。该 bean 可以是
ReleaseStrategy接口的实现或 POJO。如果是后者,则还必须定义release-strategy-method属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头属性)。在
release-strategy引用的 bean 上定义的方法,用于实现完成决策算法。可选,但有约束(要求存在release-strategy)。表示释放策略的 SpEL 表达式。表达式的根对象是
MessageGroup。例如:"size() == 5"。只允许release-strategy或release-strategy-expression中的一个。仅当为
<resequencer>的MessageStore配置了MessageGroupStoreReaper时适用。默认情况下,当配置MessageGroupStoreReaper来使部分组过期时,空组也会被移除。空组在组正常释放后存在。这是为了能够检测和丢弃延迟到达的消息。如果您希望空组比部分组更晚过期,请设置此属性。然后,空组在至少此毫秒数内未被修改之前,不会从MessageStore中移除。请注意,空组过期的实际时间还受收割器的超时属性影响,可能为此值加上超时时间。请参阅使用 XML 配置聚合器。
请参阅使用 XML 配置聚合器。
请参阅使用 XML 配置聚合器。
请参阅使用 XML 配置聚合器。
默认情况下,当组因超时(或由
MessageGroupStoreReaper)完成时,空组的元数据会被保留。延迟到达的消息会立即被丢弃。将此设置为true以完全移除该组。然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。由于导致超时的序列范围中存在“空洞”,新组永远不会正常释放。空组稍后可以通过使用MessageGroupStoreReaper和empty-group-min-timeout属性来过期(完全移除)。从版本 5.0 开始,空组在empty-group-min-timeout时间过后也会被安排移除。默认值为 'false'。
另请参阅 聚合器过期组 了解更多信息。
由于在 Java 类中无需为重新排序器实现自定义行为,因此不提供相关的注解支持。