跳到主要内容

分割器

QWen Plus 中英对照 Splitter

拆分器是一个组件,其作用是将消息分成多个部分,并发送这些 resulting messages 以独立处理。很多时候,它们是在包含聚合器的管道中的上游生产者。

编程模型

用于执行拆分的 API 包含一个基础类 AbstractMessageSplitter。它是一个 MessageHandler 实现,封装了拆分器的通用特性,例如在生成的消息上填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。这种填充使得可以跟踪消息及其处理结果(在典型场景中,这些消息头会被复制到由各种转换端点生成的消息中)。然后可以使用这些值,例如,由一个 组合消息处理器 使用。

以下示例显示了 AbstractMessageSplitter 的一部分:

public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);

}
java

要在应用程序中实现特定的拆分器,你可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,该方法包含拆分消息的逻辑。返回值可以是以下之一:

  • 一个 Collection 或消息数组或一个迭代消息的 Iterable (或 Iterator)。在这种情况下,消息在发送时会包含 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER。使用这种方法可以给你更多的控制权——例如,在拆分过程中填充自定义消息头。

  • 一个 Collection 或非消息对象数组或一个迭代非消息对象的 Iterable (或 Iterator)。它的工作方式与前一种情况类似,只是每个集合元素被用作消息的有效负载。使用这种方法可以让您专注于域对象,而无需考虑消息系统,并且生成的代码更容易测试。

  • 一个 Message 或非消息对象(但不是一个集合或数组)。它的工作方式与前面的情况类似,只是发送单个消息。

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值如前所述进行解释。输入参数可以是一个 Message 或一个简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们建议采用这种方法,因为它使代码与 Spring Integration API 解耦,并且通常更容易测试。

迭代器

从 4.1 版本开始,AbstractMessageSplitter 支持 Iterator 类型用于要拆分的 value。请注意,在 Iterator(或 Iterable)的情况下,我们无法访问底层项目的数量,SEQUENCE_SIZE 标头被设置为 0。这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自 splitterCORRELATION_ID 的组不会被释放;它将保持为 incomplete。在这种情况下,你应该使用适当的自定义 ReleaseStrategy 或依赖 send-partial-result-on-expiry 结合 group-timeoutMessageGroupStoreReaper

从 5.0 版本开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法,以允许在可能的情况下确定 IterableIterator 对象的大小。例如,XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从 5.0.9 版本开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode 的大小。

一个 Iterator 对象在分割之前避免了在内存中构建整个集合的需求。例如,当底层项目是从某些外部系统(例如 DataBase 或 FTP MGET)通过迭代或流式传输填充时。

Stream 和 Flux

从 5.0 版本开始,AbstractMessageSplitter 支持 Java Stream 和 Reactive Streams Publisher 类型用于分割 value。在这种情况下,目标 Iterator 是基于它们的迭代功能构建的。

此外,如果拆分器的输出通道是 ReactiveStreamsSubscribableChannel 的一个实例,那么 AbstractMessageSplitter 会产生一个 Flux 结果而不是 Iterator,并且输出通道会订阅这个 Flux,以根据下游流需求进行基于背压的拆分。

从 5.2 版本开始,拆分器支持 discardChannel 选项,用于发送那些被拆分函数返回为空容器(集合、数组、流、Flux 等)的请求消息。在这种情况下,只是没有项目可以迭代以发送到 outputChannelnull 拆分结果仍然作为流程结束的指示符。

使用 Java、Groovy 和 Kotlin DSL 配置 Splitter

基于 Message 和其可迭代有效负载的简单拆分器示例,带有 DSL 配置:

@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
java

有关DSL的更多信息,请参阅相关章节:

使用 XML 配置 Splitter

分隔器可以通过 XML 进行配置,如下所示:

<int:channel id="inputChannel"/>

<int:splitter id="splitter" // <1>
ref="splitterBean" // <2>
method="split" // <3>
input-channel="inputChannel" // <4>
output-channel="outputChannel" // <5>
discard-channel="discardChannel" /> // <6>

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
xml
  • 拆分器的 ID 是可选的。

  • 引用在应用程序上下文中定义的 bean。该 bean 必须实现拆分逻辑,如前面的部分所述。可选。如果没有提供对 bean 的引用,则假定到达 input-channel 的消息的有效负载是 java.util.Collection 的实现,并将默认拆分逻辑应用于集合,将每个单独的元素纳入消息并发送到 output-channel

  • 实现拆分逻辑的方法(在 bean 上定义)。可选。

  • 拆分器的输入通道。必需。

  • 拆分器向其发送拆分传入消息结果的通道。可选(因为传入的消息可以自行指定回复通道)。

  • 在拆分结果为空的情况下发送请求消息的通道。可选(它们将在结果为 null 时停止)。

我们建议使用 ref 属性,如果自定义分割器实现可以在其他 <splitter> 定义中被引用。然而,如果自定义分割器处理器实现应该仅作用于单个 <splitter> 定义的作用域,你可以配置一个内部的 bean 定义,如下例所示:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
xml
备注

在同一 <int:splitter> 配置中使用 ref 属性和内部处理器定义是不允许的,因为它会创建一个模棱两可的条件,并导致抛出异常。

important

如果 ref 属性引用了一个扩展 AbstractMessageProducingHandler 的 bean(例如框架本身提供的拆分器),则通过直接将输出通道注入处理程序来优化配置。在这种情况下,每个 ref 必须是一个单独的 bean 实例(或 prototype 范围的 bean)或使用内部 <bean/> 配置类型。但是,只有在您不在拆分器 XML 定义中提供任何拆分器特定属性时,此优化才适用。如果您无意中从多个 bean 引用了相同的消息处理程序,则会抛出配置异常。

使用注解配置 Splitter

@Splitter 注解适用于期望 Message 类型或消息有效载荷类型的 方法,且方法的返回值应该是一个任何类型的 Collection。如果返回的值不是实际的 Message 对象,则每个项会被包装在一个 Message 中作为 Message 的有效载荷。每个结果 Message 都会发送到定义了 @Splitter 的端点的指定输出通道。

以下示例展示了如何使用 @Splitter 注解配置分隔符:

@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
java