跳到主要内容

扩展与并行处理

QWen Plus 中英对照 Scaling and Parallel Processing

许多批处理问题可以通过单线程、单进程的任务来解决,因此在考虑更复杂的实现之前,始终最好先适当检查这种方案是否满足你的需求。测量一个现实任务的性能,并首先看看最简单的实现是否能满足你的需求。即使使用标准硬件,你也可以在一分钟内轻松完成对数百兆字节文件的读取和写入操作。

当你准备开始实现带有一些并行处理的作业时,Spring Batch 提供了一系列选项,这些选项在本章中有描述,尽管某些功能在其他地方有介绍。在高层次上,有两种并行处理模式:

  • 单进程,多线程

  • 多进程

这些可以分解为以下类别的内容:

  • 多线程步骤(单进程)

  • 并行步骤(单进程)

  • 步骤的远程分块(多进程)

  • 分区一个步骤(单进程或多进程)

首先,我们回顾单进程选项。然后我们回顾多进程选项。

多线程步骤

开始并行处理的最简单方法是将 TaskExecutor 添加到您的 Step 配置中。

在使用 Java 配置时,可以将 TaskExecutor 添加到步骤中,如下例所示:

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
java

在本例中,taskExecutor 是对另一个实现 TaskExecutor 接口的 Bean 定义的引用。TaskExecutor 是标准的 Spring 接口,因此请查阅 Spring 用户指南以了解可用实现的详细信息。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

前面配置的结果是,Step 通过在单独的执行线程中读取、处理和写入每个项目的块(每个提交间隔)。请注意,这意味着要处理的项目没有固定的顺序,并且一个块可能包含与单线程情况相比不连续的项目。除了任务执行器施加的任何限制(例如,它是否由线程池支持)之外,任务片段配置还有一个节流限制(默认值:4)。您可能需要增加此限制以确保完全使用线程池。

在使用 Java 配置时,构建器提供了对节流限制的访问,如下所示:

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
java

还要注意,步骤中使用的任何连接池资源(例如 DataSource)可能会对并发性施加限制。确保这些资源中的连接池大小至少与该步骤中所需的并发线程数一样大。

注意

节流限制废弃

从 v5.0 起,节流限制已被废弃且没有替代方案。如果你希望替换默认 TaskExecutorRepeatTemplate 中的当前节流机制,则需要提供一个自定义的 RepeatOperations 实现(基于带有有界任务队列的 TaskExecutor),并通过 StepBuilder#stepOperations 将其设置到步骤中:

@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.stepOperations(customRepeatOperations)
.build();
}
java

在某些常见的批处理用例中,使用多线程的 Step 实现存在一些实际限制。Step 中的许多参与者(例如读取器和写入器)是有状态的。如果状态没有按线程进行隔离,那么这些组件将无法在多线程的 Step 中使用。特别是,Spring Batch 中的大多数读取器和写入器都不是为多线程使用而设计的。然而,可以使用无状态或线程安全的读取器和写入器,并且在 Spring Batch Samples 中有一个示例(名为 parallelJob),该示例展示了如何使用进程指示符(参见 防止状态持久化)来跟踪数据库输入表中已处理的项。

Spring Batch 提供了一些 ItemWriterItemReader 的实现。通常,它们会在 Javadoc 中说明是否是线程安全的,或者在并发环境中避免问题需要做什么。如果 Javadoc 中没有相关信息,你可以检查其实现,看看是否存在任何状态。如果一个读取器不是线程安全的,可以使用提供的 SynchronizedItemStreamReader 对其进行装饰,或者在你自己的同步委托器中使用它。你可以对 read() 调用进行同步,并且只要处理和写入是块中最耗时的部分,你的步骤仍然可能比单线程配置完成得更快。

并行步骤

只要可以将需要并行化的应用逻辑拆分为不同的职责并分配给各个步骤,就可以在单个进程中实现并行化。Parallel Step 的执行易于配置和使用。

在使用 Java 配置时,以 step3 并行执行步骤 (step1, step2) 是非常直接的,如下所示:

@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() // 构建 FlowJobBuilder 实例
.build(); // 构建 Job 实例
}

@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}

@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}

@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
java

可配置的任务执行器用于指定应使用哪个 TaskExecutor 实现来执行各个流程。默认值是 SyncTaskExecutor,但如果要并行运行步骤,则需要使用异步的 TaskExecutor。请注意,作业会确保在汇总退出状态并进行转换之前,拆分中的每个流程都已完成。

请参阅 Split Flows 部分以获取更多详细信息。

远程分块

在远程分块处理中,Step 处理被拆分到多个进程中,通过某些中间件相互通信。下图展示了该模式:

远程分块处理

图1. 远程分块

管理器组件是一个单一进程,而工作进程是多个远程进程。如果管理器不是瓶颈,这种模式效果最好,因此处理必须比读取项目更耗费资源(在实际应用中经常如此)。

管理者是 Spring Batch Step 的一种实现,其中 ItemWriter 被替换为一个通用版本,该版本知道如何将项目块作为消息发送到中间件。工作者是用于所使用中间件的标准监听器(例如,对于 JMS,它们将是 MessageListener 实现),其作用是通过 ChunkProcessor 接口,使用标准的 ItemWriterItemProcessor 加上 ItemWriter 来处理项目块。使用这种模式的一个优势是,读取器、处理器和写入器组件是现成的(与用于步骤本地执行的组件相同)。项目被动态划分,并通过中间件共享工作负载,因此,如果所有监听器都是急切消费者,则负载均衡是自动的。

中间件必须是持久的,具有保证的消息传递和每条消息的单一消费者。 JMS 是显而易见的选择,但在网格计算和共享内存产品领域中也存在其他选项(例如 JavaSpaces)。

请参阅 Spring Batch Integration - Remote Chunking 部分以获取更多详细信息。

分区

Spring Batch 还提供了一个 SPI,用于对 Step 执行进行分区并在远程执行。在这种情况下,远程参与者是 Step 实例,这些实例也可以轻松配置并用于本地处理。下图显示了该模式:

分区概述

图 2. 分区

Job 在左侧以一系列 Step 实例的顺序运行,其中一个 Step 实例被标记为管理器。在此图中,这些工作单元(workers)都是 Step 的相同实例,实际上它们可以替代管理器,并为 Job 产生相同的结果。这些工作单元通常是远程服务,但也可能是本地执行线程。在这种模式下,管理器发送给工作单元的消息不需要持久化或保证送达。JobRepository 中的 Spring Batch 元数据确保了每个工作单元在每次 Job 执行过程中仅被执行一次。

Spring Batch 中的 SPI 由 Step 的一种特殊实现(称为 PartitionStep)以及需要为特定环境实现的两个策略接口组成。这些策略接口是 PartitionHandlerStepExecutionSplitter,以下序列图展示了它们的作用:

分区 SPI

图 3. 分区 SPI

在这种情况下,右侧的 Step 是“远程”工作者,因此,可能有许多对象或进程扮演这个角色,而 PartitionStep 被显示为驱动执行过程。

下面的示例展示了在使用 Java 配置时 PartitionStep 的配置:

@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
java

类似于多线程步骤中的 throttleLimit 方法,gridSize 方法可以防止任务执行器被单个步骤的请求所淹没。

针对 Spring Batch Samples (请参阅 partition*Job.xml 配置)的单元测试套件包含一个简单的示例,您可以复制并扩展该示例。

Spring Batch 为分区创建步骤执行,称为 step1:partition0 等等。许多人为了保持一致性,更喜欢将管理步骤命名为 step1:manager。你可以为步骤使用别名(通过指定 name 属性而不是 id 属性)。

PartitionHandler

PartitionHandler 是了解远程或网格环境结构的组件。它能够将 StepExecution 请求发送到远程的 Step 实例,这些请求通常会被封装在某种特定于结构的格式中,例如 DTO。它不需要知道如何拆分输入数据,也不需要知道如何聚合多个 Step 执行的结果。一般来说,它可能也不需要了解弹性和故障转移,因为在许多情况下这些是结构本身的特性。无论如何,Spring Batch 始终提供独立于结构的可重启性。失败的 Job 可以随时重新启动,在这种情况下,只有失败的 Step 会被重新执行。

PartitionHandler 接口可以有多种织构类型的专用实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web 服务、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence),以及网格执行织构(如 GridGain)。Spring Batch 不包含任何专有网格或远程调用织构的实现。

然而,Spring Batch 提供了一个有用的 PartitionHandler 实现,该实现使用 Spring 中的 TaskExecutor 策略,在单独的执行线程中本地执行 Step 实例。此实现名为 TaskExecutorPartitionHandler

您可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:

@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}

@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize( 10 );
return retVal;
}
java

gridSize 属性决定了要创建的单独步骤执行的数量,因此它可以与 TaskExecutor 中的线程池大小相匹配。或者,也可以将其设置为大于可用线程的数量,这会使工作块变得更小。

TaskExecutorPartitionHandler 对于 IO 密集型的 Step 实例非常有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供一个作为远程调用代理的 Step 实现来用于远程执行(例如使用 Spring Remoting)。

Partitioner

Partitioner 的职责更为简单:只为新的步骤执行生成执行上下文作为输入参数(无需担心重启问题)。它只有一个方法,如下接口定义所示:

public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
java

此方法的返回值将每个步骤执行的唯一名称(即 String)与形式为 ExecutionContext 的输入参数相关联。这些名称稍后会在批量元数据中作为分区 StepExecutions 中的步骤名称出现。ExecutionContext 只是一组名称-值对,因此它可能包含主键范围、行号或输入文件的位置。远程 Step 通常通过使用 #{…​} 占位符(步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。

步骤执行的名称(Partitioner 返回的 Map 中的键)需要在一个 Job 的步骤执行中是唯一的,但没有其他特定要求。实现这一点的最简单方法(也是让用户理解名称含义的方法)是使用前缀+后缀的命名约定,其中前缀是正在执行的步骤的名称(其本身在 Job 中是唯一的),而后缀只是一个计数器。框架中有一个 SimplePartitioner 使用了这种约定。

你可以使用一个可选的接口,名为 PartitionNameProvider,来单独提供分区名称,而不必从分区本身获取。如果 Partitioner 实现了此接口,则在重启时只会查询名称。如果分区操作代价较高,这可能是一个有用的优化方式。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称一致。

将输入数据绑定到步骤

PartitionHandler 执行的步骤具有相同的配置,并且其输入参数在运行时从 ExecutionContext 绑定,这是非常高效的。通过 Spring Batch 的 StepScope 功能可以很容易实现这一点(在 Late Binding 部分有更详细的介绍)。例如,如果 Partitioner 创建了带有属性键名为 fileNameExecutionContext 实例,该键指向每个步骤调用的不同文件(或目录),那么 Partitioner 的输出可能类似于下表中的内容:

表 1. 由 Partitioner 提供的示例步骤执行名称到执行上下文,针对目录处理

步骤执行名称(键)执行上下文(值)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

然后可以通过使用晚期绑定到执行上下文,将文件名绑定到步骤。

下面的示例展示了如何在 Java 中定义延迟绑定:

@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
java