跳到主要内容
版本:6.0.2

扩展与并行处理

DeepSeek V3 中英对照 Scaling and Parallel Processing

许多批处理问题可以通过单线程、单进程的作业来解决,因此在考虑更复杂的实现之前,先检查这种简单方案是否满足需求总是一个好主意。首先测量实际作业的性能,看看最简单的实现是否满足需求。即使使用标准硬件,读取和写入几百兆字节的文件也完全用不了一分钟。

当你准备开始实现一个包含并行处理的任务时,Spring Batch 提供了多种选择,这些选项将在本章中介绍,尽管某些功能在其他地方有所涉及。从高层次来看,并行处理有两种模式:

  • 单进程,多线程

  • 多进程

这些也分为以下几类:

  • 多线程步骤(单进程)

  • 并行步骤(单进程)

  • 步骤的本地分块(单进程)

  • 步骤的远程分块(多进程)

  • 步骤分区(单进程或多进程)

  • 远程步骤(多进程)

首先,我们回顾单进程选项。然后我们回顾多进程选项。

多线程步骤

启动并行处理的最简单方法是在您的步骤配置中添加一个 TaskExecutor

在使用 Java 配置时,你可以向步骤中添加一个 TaskExecutor,如下例所示:

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}

在这个例子中,taskExecutor 是一个指向另一个实现了 TaskExecutor 接口的 bean 定义的引用。TaskExecutor 是 Spring 的一个标准接口,因此关于可用实现的详细信息,请查阅 Spring 用户指南。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

上述配置的结果是,Step 将使用任务执行器中的多个线程来并发处理条目。因此,ItemProcessor 将同时被多个线程调用。这意味着 ItemProcessor 必须是线程安全的。如果在处理过程中使用了有状态组件,必须确保它们已正确同步以支持并发访问。

项目的读取和写入仍由执行步骤的主线程串行执行,因此 ItemReaderItemWriter 无需具备线程安全性或同步机制。然而,步骤的吞吐量可能会受到读写速度的限制。若出现这种情况,可考虑采用不同的并发技术,例如本地分块(local chunking)或本地分区(local partitioning)。

同时请注意,步骤中使用的任何池化资源(例如 DataSource)可能会对并发性设置限制。请确保将这些资源中的池大小至少设置为步骤中期望的并发线程数。

并行步骤

只要需要并行化的应用逻辑能够被拆分为不同的职责并分配给独立的步骤,就可以在单个进程中实现并行化。并行步骤的执行配置简单,易于使用。

使用 Java 配置时,将步骤 (step1,step2)step3 并行执行非常简单,如下所示:

@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() // 构建 FlowJobBuilder 实例
.build(); // 构建 Job 实例
}

@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}

@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}

@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置的任务执行器用于指定哪个 TaskExecutor 实现应执行各个流程。默认情况下是 SyncTaskExecutor,但需要异步 TaskExecutor 来并行执行步骤。请注意,作业确保在聚合退出状态并转换之前,拆分中的每个流程都已完成。

更多详情请参见分流部分。

本地分块

本地分块是 v6.0 中的一项新功能,它允许你在同一个 JVM 内使用多线程并行处理数据块。当你需要处理大量数据项并希望充分利用多核处理器时,这一功能尤其有用。通过本地分块,你可以配置面向块的步骤,使其使用多个线程并发处理数据块。每个线程将独立读取、处理和写入自己的数据块,而步骤将管理整体执行并提交结果。

这一功能通过使用 ChunkMessageChannelItemWriter 实现,该组件是一个项目写入器,负责将分块请求从 TaskExecutor 提交给本地工作节点:

@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}

ChunkMessageChannelItemWriter 需要一个 TaskExecutor 来并发处理数据块,同时还需要一个 ChunkProcessor 来定义如何处理每个数据块。以下是一个将每个数据块中的项目写入关系数据库表的处理器示例:

@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
String sql = "insert into vets (firstname, lastname) values (?, ?)";
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
.sql(sql)
.itemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.firstname());
ps.setString(2, item.lastname());
})
.build();

return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
try {
itemWriter.write(chunk);
contribution.incrementWriteCount(chunk.size());
contribution.setExitStatus(ExitStatus.COMPLETED);
}
catch (Exception e) {
transactionStatus.setRollbackOnly();
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
});
}

你可以在 Local Chunking Sample 中找到这种扩展技术的示例。

远程分块

在远程分块中,Step处理被分割到多个进程中,这些进程通过某种中间件相互通信。下图展示了这种模式:

远程分块

图 1. 远程分块

管理器组件是一个单一进程,而工作器是多个远程进程。如果管理器不是瓶颈,这种模式效果最佳,因此处理过程的开销必须大于读取项目的开销(这在实践中通常是常见情况)。

管理器是Spring Batch Step的一个实现,其中ItemWriter被替换为一个通用版本,该版本知道如何将数据块作为消息发送到中间件。工作器是适用于任何中间件的标准监听器(例如,对于JMS,它们将是MesssageListener的实现),其作用是通过ChunkProcessor接口,使用标准的ItemWriterItemProcessor加上ItemWriter来处理数据块。使用此模式的优势之一是,读取器、处理器和写入器组件都是现成的(与本地执行步骤时使用的组件相同)。数据项被动态划分,工作通过中间件共享,因此,如果所有监听器都是积极的消费者,负载均衡将自动实现。

中间件必须具有持久性,确保消息传递的可靠性,并且每条消息只能被一个消费者处理。JMS(Java消息服务)是显而易见的选择,但在网格计算和共享内存产品领域,也存在其他选项(例如 JavaSpaces)。

更多详情请参阅 Spring Batch Integration - 远程分块 章节。

分区

Spring Batch 还为 Step 执行的分区和远程执行提供了一个 SPI。在这种情况下,远程参与者是 Step 实例,这些实例可以像本地处理一样轻松地进行配置和使用。下图展示了这种模式:

分区概述

图 2. 分区

Job 在左侧作为一系列 Step 实例的顺序执行,其中一个 Step 实例被标记为管理器。图中的工作节点都是 Step 的相同实例,实际上它们可以替代管理器,为 Job 产生相同的结果。工作节点通常是远程服务,但也可以是本地执行线程。在此模式中,管理器发送给工作节点的消息不需要持久化或保证送达。JobRepository 中的 Spring Batch 元数据确保每个工作节点在每次 Job 执行中仅执行一次。

Spring Batch中的SPI包含一个特殊的Step实现(称为PartitionStep)以及两个需要针对特定环境实现的策略接口。这两个策略接口分别是PartitionHandlerStepExecutionSplitter,下图展示了它们的作用:

Partitioning SPI

图 3. 分区 SPI

在这种情况下,右侧的 Step 是“远程”工作器,因此可能有多个对象和/或进程扮演这一角色,而 PartitionStep 则负责驱动执行过程。

以下示例展示了使用 Java 配置时的 PartitionStep 配置:

@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}

与多线程步骤的 throttleLimit 方法类似,gridSize 方法可以防止任务执行器被来自单个步骤的请求所饱和。

Spring Batch Samples 的单元测试套件(参见 partition*Job.xml 配置文件)包含一个简单的示例,你可以复制并扩展它。

Spring Batch 为名为 step1:partition0 的分区创建步骤执行,依此类推。为了保持一致性,许多人更倾向于将管理器步骤称为 step1:manager。您可以为步骤使用别名(通过指定 name 属性而非 id 属性)。

PartitionHandler

PartitionHandler 是了解远程或网格环境结构的组件。它能够将 StepExecution 请求发送给远程的 Step 实例,这些请求以某种特定于结构的格式(如 DTO)进行包装。它无需知道如何拆分输入数据,也无需知道如何聚合多个 Step 执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为这些功能在许多情况下是结构本身提供的。无论如何,Spring Batch 始终提供独立于结构的可重启性。失败的 Job 总是可以重新启动,在这种情况下,只有失败的 Steps 会被重新执行。

PartitionHandler 接口可以针对多种分布式架构类型提供专门的实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web 服务、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)以及网格执行架构(如 GridGain)。Spring Batch 不包含任何专有网格或远程架构的实现。

然而,Spring Batch 确实提供了一个有用的 PartitionHandler 实现,它使用 Spring 的 TaskExecutor 策略,在执行的独立线程中本地运行 Step 实例。该实现称为 TaskExecutorPartitionHandler

你可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:

@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}

@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}

gridSize 属性决定了要创建的独立步骤执行数量,因此它可以与 TaskExecutor 中线程池的大小相匹配。或者,也可以将其设置为大于可用线程数,这样会使工作块更小。

TaskExecutorPartitionHandler 适用于IO密集型Step实例,例如复制大量文件或将文件系统复制到内容管理系统中。它也可用于远程执行,通过提供一个作为远程调用代理的Step实现(例如使用Spring Remoting)来实现。

Partitioner

Partitioner 的职责较为简单:仅用于为新的步骤执行生成执行上下文作为输入参数(无需考虑重启情况)。它只有一个方法,如下方接口定义所示:

public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称(String)与以 ExecutionContext 形式表示的输入参数关联起来。这些名称稍后会在批处理元数据中显示为分区 StepExecutions 中的步骤名称。ExecutionContext 本质上是一个名称-值对的集合,因此它可以包含一系列主键、行号或输入文件的位置。远程 Step 随后通常通过使用 #{…​} 占位符(步骤作用域内的延迟绑定)绑定到上下文输入,如下一节所示。

步骤执行名称(由 Partitioner 返回的 Map 中的键)需要在 Job 的步骤执行中保持唯一,除此之外没有其他特定要求。实现这一目标(同时让名称对用户有意义)的最简单方法是采用前缀+后缀的命名约定,其中前缀是正在执行的步骤名称(该名称在 Job 中本身是唯一的),后缀则是一个计数器。框架中提供了一个使用此约定的 SimplePartitioner

你可以使用一个名为 PartitionNameProvider 的可选接口来单独提供分区名称,而不依赖于分区本身。如果 Partitioner 实现了这个接口,那么在重启时只会查询名称。如果分区操作开销较大,这将是一个有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称相匹配。

绑定输入数据到步骤

对于由 PartitionHandler 执行的步骤而言,采用相同的配置并在运行时通过 ExecutionContext 绑定输入参数是非常高效的。利用 Spring Batch 的 StepScope 特性(在延迟绑定章节中有更详细的介绍)可以轻松实现这一点。例如,如果 Partitioner 创建的 ExecutionContext 实例包含一个名为 fileName 的属性键,该键指向每次步骤调用时不同的文件(或目录),那么 Partitioner 的输出可能类似于下表中的内容:

表 1. 由 Partitioner 提供的针对目录处理的示例步骤执行名称与执行上下文

步骤执行名称(键)执行上下文(值)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

然后,可以通过使用执行上下文的后期绑定将文件名绑定到一个步骤。

以下示例展示了如何在 Java 中定义延迟绑定:

@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}

远程步骤执行

自 v6.0 版本起,Spring Batch 提供了远程步骤执行的支持,允许您在远程机器或集群上执行批处理作业的步骤。此功能特别适用于大规模批处理场景,您希望将工作负载分布到多个节点以提高性能和可扩展性。远程步骤执行由 RemoteStep 类提供,它使用 Spring Integration 消息通道来实现本地作业执行环境与远程步骤执行器之间的通信。

RemoteStep 被配置为一个常规步骤,通过提供远程步骤名称和消息模板,将步骤执行请求发送给远程工作者:

@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}

在工作节点端,你需要定义待执行的远程步骤(本例中的workerStep),并配置一个Spring Integration流程来拦截步骤执行请求,同时调用StepExecutionRequestHandler

@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
// define step logic
.build();
}

/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
StepLocator stepLocator) {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobRepository(jobRepository);
stepExecutionRequestHandler.setStepLocator(stepLocator);
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.handle(stepExecutionRequestHandler, "handle")
.get();
}

@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}

你可以在 Remote Step Sample 中找到完整的示例。