跳到主要内容
版本:7.0.2

分割器

DeepSeek V3 中英对照 Splitter

拆分器是一种组件,其作用是将一条消息分割成若干部分,并将生成的消息发送出去进行独立处理。在包含聚合器的流水线中,拆分器通常是上游生产者。

编程模型

执行拆分的API包含一个基类 AbstractMessageSplitter。它是一个 MessageHandler 实现,封装了拆分器共有的特性,例如为生成的消息填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。这种填充使得能够追踪消息及其处理结果(在典型场景中,这些头部信息会被复制到各个转换端点生成的消息中)。随后,这些值可以被使用,例如,通过一个组合消息处理器

以下示例展示了 AbstractMessageSplitter 的代码片段:

public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的分割器,你可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,该方法包含分割消息的逻辑。返回值可以是以下之一:

  • 一个 Collection 或消息数组,或一个遍历消息的 Iterable(或 Iterator)。在这种情况下,消息会作为消息发送(在填充了 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER 之后)。使用这种方法可以让你拥有更多控制权——例如,在拆分过程中填充自定义消息头。

  • 一个 Collection 或非消息对象数组,或一个遍历非消息对象的 Iterable(或 Iterator)。它的工作方式与前一种情况类似,不同之处在于每个集合元素都被用作消息负载。使用这种方法可以让你专注于领域对象,而不必考虑消息系统,并生成更易于测试的代码。

  • 一个 Message 或非消息对象(但不是集合或数组)。它的工作方式与前面的情况类似,不同之处在于只发送一条消息。

在Spring Integration中,任何POJO都可以实现拆分算法,只要它定义了一个接受单个参数并具有返回值的方法。在这种情况下,该方法的返回值将按照前面描述的方式进行解释。输入参数可以是Message,也可以是一个简单的POJO。在后一种情况下,拆分器接收传入消息的有效载荷。我们推荐这种方法,因为它将代码与Spring Integration API解耦,并且通常更容易测试。

迭代器

从版本 4.1 开始,AbstractMessageSplitter 支持将 Iterator 类型作为待拆分的 value。请注意,对于 Iterator(或 Iterable)类型,我们无法访问底层项目的数量,因此 SEQUENCE_SIZE 标头会被设置为 0。这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将无法工作,并且来自 splitterCORRELATION_ID 对应的分组将不会被释放;它将保持为 incomplete 状态。在这种情况下,您应该使用适当的自定义 ReleaseStrategy,或者依赖 send-partial-result-on-expiry 结合 group-timeoutMessageGroupStoreReaper 来处理。

从版本5.0开始,AbstractMessageSplitter 提供了受保护的 obtainSizeIfPossible() 方法,以便在可能的情况下确定 IterableIterator 对象的大小。例如,XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从版本5.0.9开始,此方法还能正确返回 com.fasterxml.jackson.core.TreeNode 的大小。

Iterator 对象有助于避免在拆分前将整个集合加载到内存中。例如,当底层项通过迭代或流从某些外部系统(如数据库或 FTP MGET)填充时。

流与通量

从 5.0 版本开始,AbstractMessageSplitter 支持将 Java Stream 和 Reactive Streams Publisher 类型作为待拆分的 value。在这种情况下,目标 Iterator 将基于它们的迭代功能构建。

此外,如果分流器的输出通道是 ReactiveStreamsSubscribableChannel 的实例,AbstractMessageSplitter 将生成一个 Flux 结果而非 Iterator,并且输出通道会订阅此 Flux,以便基于下游流需求进行背压驱动的分流。

从 5.2 版本开始,拆分器支持 discardChannel 选项,用于发送那些拆分函数返回空容器(集合、数组、流、Flux 等)的请求消息。在这种情况下,没有可迭代的项发送到 outputChannelnull 拆分结果仍作为流结束的指示符。

使用 Java、Groovy 和 Kotlin DSL 配置拆分器

一个基于 Message 及其可迭代负载的简单分割器示例,使用 DSL 配置:

@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}

有关 DSL 的更多信息,请参阅相应章节:

使用 XML 配置拆分器

可以通过如下 XML 配置分割器:

<int:channel id="inputChannel"/>

<int:splitter id="splitter" // <1>
ref="splitterBean" // <2>
method="split" // <3>
input-channel="inputChannel" // <4>
output-channel="outputChannel" // <5>
discard-channel="discardChannel" /> // <6>

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
  • 拆分器的 ID 是可选的。

  • 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现拆分逻辑,如前面章节所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 的消息的有效负载是 java.util.Collection 的实现,并将默认拆分逻辑应用于该集合,将每个单独元素合并到一条消息中并发送到 output-channel

  • 实现拆分逻辑的方法(在 bean 上定义)。可选。

  • 拆分器的输入通道。必需。

  • 拆分器将传入消息的拆分结果发送到的通道。可选(因为传入消息可以自行指定回复通道)。

  • 在拆分结果为空的情况下,请求消息被发送到的通道。可选(它们将像 null 结果的情况一样停止)。

我们建议,如果自定义拆分器实现可以在其他 <splitter> 定义中被引用,则使用 ref 属性。然而,如果自定义拆分器处理程序实现应限定在单个 <splitter> 定义范围内,您可以配置一个内部 bean 定义,如下例所示:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
备注

不允许在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理器定义,因为这会造成歧义条件并导致抛出异常。

important

如果 ref 属性引用的 bean 继承自 AbstractMessageProducingHandler(例如框架本身提供的拆分器),配置将通过直接将输出通道注入到处理器中进行优化。在这种情况下,每个 ref 必须是一个独立的 bean 实例(或一个 prototype 作用域的 bean),或者使用内部的 <bean/> 配置类型。然而,此优化仅适用于在拆分器的 XML 定义中未提供任何特定于拆分器的属性时。如果无意中从多个 bean 引用了同一个消息处理器,将会出现配置异常。

使用注解配置分割器

@Splitter 注解适用于期望接收 Message 类型或消息负载类型的方法,且该方法的返回值应为任意类型的 Collection。如果返回值并非实际的 Message 对象,则每个元素将被包装为 Message 的负载。每个生成的 Message 将被发送到定义了 @Splitter 的端点所指定的输出通道。

以下示例展示了如何使用 @Splitter 注解配置拆分器:

@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}