跳到主要内容

批处理的领域语言

QWen Plus 中英对照 The Domain Language of Batch

对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该是熟悉且舒适的。这里有“作业 (Jobs)”和“步骤 (Steps)”,以及由开发人员提供的处理单元,称为 ItemReaderItemWriter。然而,由于 Spring 的模式、操作、模板、回调和惯用法,以下机会得以出现:

  • 显著提高了对清晰关注点分离的遵守。

  • 清晰划分的架构层和服务,以接口形式提供。

  • 简单且默认的实现,允许快速采用和开箱即用的易用性。

  • 极大增强了可扩展性。

下图是几十年来使用的批量参考架构的简化版本。它提供了对构成批量处理领域语言的组件的概述。该架构框架是一个蓝图,通过过去几代平台(大型机上的COBOL、Unix上的C,以及现在随处可用的Java)的数十年实现得到了验证。JCL 和 COBOL 开发人员很可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了 robust 且可维护系统中常见的各层、组件和技术服务的物理实现,这些系统用于解决从简单到复杂的批量应用程序的创建,并且具有应对非常复杂处理需求的基础设施和扩展功能。

图 2.1:批处理刻板印象

图1. 批处理刻板印象

前面的图表突出了构成 Spring Batch 领域语言的关键概念。一个 Job 包含一个或多个步骤,每个步骤恰好有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。作业需要通过 JobLauncher 启动,并且需要在 JobRepository 中存储有关当前运行进程的元数据。

任务

本节描述了与批处理作业概念相关的刻板印象。Job 是一个封装整个批处理过程的实体。与其他 Spring 项目常见的情况一样,Job 可以通过 XML 配置文件或基于 Java 的配置进行组装。这种配置可以被称为“作业配置”。然而,Job 仅仅是整体层次结构的顶层,如下图所示:

Job Hierarchy

图 2. 作业层次结构

在 Spring Batch 中,Job 仅仅是 Step 实例的容器。它将多个在流程中逻辑上属于一起的步骤组合在一起,并允许对所有步骤全局适用的属性进行配置,例如可重启性。作业配置包含:

  • 作业的名称。

  • Step 实例的定义和排序。

  • 作业是否可重启。

对于使用 Java 配置的用户,Spring Batch 提供了 Job 接口的默认实现,即 SimpleJob 类,它在 Job 的基础上创建了一些标准功能。在使用基于 Java 的配置时,提供了一组构建器用于实例化 Job,如下例所示:

@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
java

JobInstance

JobInstance 指的是逻辑上的一次作业运行的概念。考虑一个应该在一天结束时运行一次的批处理作业,例如前面图表中的 EndOfDay Job。虽然只有一个 EndOfDay 作业,但每次单独运行该作业都需要分别跟踪。在这种情况下,每天有一个逻辑上的 JobInstance。例如,有1月1日的运行、1月2日的运行,等等。如果1月1日的运行第一次失败了,并且在第二天重新运行,它仍然属于1月1日的运行。(通常,这也与它正在处理的数据相对应,也就是说,1月1日的运行处理的是1月1日的数据)。因此,每个 JobInstance 可以有多个执行(JobExecution 在本章后面会有更详细的讨论),并且在任何给定时间,只能运行一个 JobInstance(它对应于特定的 Job 和标识性的 JobParameters)。

JobInstance 的定义与要加载的数据完全没有关系。数据如何加载完全由 ItemReader 的实现来决定。例如,在 EndOfDay 场景中,数据可能包含一个表示 生效日期计划日期 的列,该列指定了数据所属的日期。因此,1月1日的运行只会加载1日的数据,而1月2日的运行只会使用2日的数据。由于这种判断很可能是业务决策的一部分,因此它被留给 ItemReader 来决定。然而,使用相同的 JobInstance 决定了是否使用之前执行的“状态”(即本章稍后讨论的 ExecutionContext)。使用一个新的 JobInstance 意味着“从头开始”,而使用现有的实例通常意味着“从上次中断的地方继续”。

JobParameters

在讨论了 JobInstance 以及它与 Job 的区别之后,自然而然会提出一个问题:“如何区分一个 JobInstance 和另一个?” 答案是:JobParametersJobParameters 对象持有一组用于启动批处理作业的参数。这些参数可以用于标识,甚至在运行期间作为参考数据使用,如下图所示:

Job Parameters

图 3. 作业参数

在前面的例子中,存在两个实例,一个是在1月1日,另一个是在1月2日。实际上只有一个 Job,但它有两个 JobParameter 对象:一个是使用参数 01-01-2017 启动的,另一个是使用参数 01-02-2017 启动的。因此,可以将契约定义为:JobInstance = Job + 可标识的 JobParameters。这使得开发人员能够有效地控制 JobInstance 的定义方式,因为他们掌控着传递哪些参数。

备注

并非所有作业参数都必须参与 JobInstance 的标识。默认情况下,它们确实如此。但是,框架还允许提交带有不参与 JobInstance 标识的参数的 Job

作业执行

JobExecution 指的是运行一个 Job 的单次尝试所涉及的技术概念。一次执行可能会以失败或成功结束,但只有当执行成功完成时,与给定执行对应的 JobInstance 才会被认为是已完成的。以之前描述的 EndOfDay Job 为例,假设 2017 年 1 月 1 日 的 JobInstance 第一次运行时失败了。如果使用与第一次运行相同的标识性 Job 参数(即 2017 年 1 月 1 日)再次运行该任务,则会创建一个新的 JobExecution。然而,仍然只有一个 JobInstance

一个 Job 定义了作业是什么以及它如何被执行,而 JobInstance 是一个纯粹的组织对象,用于将执行分组在一起,主要是为了启用正确的重启语义。然而,JobExecution 是实际运行过程中发生情况的主要存储机制,并且包含许多必须被控制和持久化的属性,如下表所示:

表 1. JobExecution 属性

属性定义
Status一个 BatchStatus 对象,用于指示执行的状态。在运行时,它是 BatchStatus#STARTED。如果失败,则为 BatchStatus#FAILED。如果成功完成,则为 BatchStatus#COMPLETED
startTime一个 java.time.LocalDateTime,表示执行开始时的当前系统时间。如果作业尚未开始,此字段为空。
endTime一个 java.time.LocalDateTime,表示执行结束时的当前系统时间,无论是否成功。如果作业尚未完成,此字段为空。
exitStatusExitStatus,指示运行的结果。它非常重要,因为它包含返回给调用者的退出代码。有关更多详细信息,请参阅第 5 章。如果作业尚未完成,此字段为空。
createTime一个 java.time.LocalDateTime,表示 JobExecution 首次被持久化时的当前系统时间。作业可能尚未开始(因此没有开始时间),但它始终具有 createTime,这是框架管理作业级别 ExecutionContexts 所必需的。
lastUpdated一个 java.time.LocalDateTime,表示 JobExecution 最后一次被持久化的时间。如果作业尚未开始,此字段为空。
executionContext包含任何需要在执行之间持久化的用户数据的“属性包”。
failureExceptions在执行 Job 过程中遇到的异常列表。如果在作业失败期间遇到多个异常,这些可能会很有用。

这些属性很重要,因为它们会被持久化,并且可以用于完全确定一个执行的状态。例如,如果 01-01 的 EndOfDay 作业在晚上 9:00 执行,并在 9:30 失败,则会在批处理元数据表中记录以下条目:

表 2. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEY_NAMEDATE_VALIDENTIFYING
1DATEschedule.Date2017-01-01TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112017-01-01 21:002017-01-01 21:30FAILED
备注

为了清晰和格式化的需要,列名可能已被缩写或删除。

既然作业已经失败,假设花了整个晚上才确定问题所在,因此“批处理窗口”现在已经关闭。进一步假设窗口从晚上9:00开始,作业再次为 01-01 启动,从上次中断的地方继续,并于 9:30 成功完成。由于现在已经是第二天,因此还需要运行 01-02 的作业,它在 9:31 立即启动,并在其正常的1小时运行时间内于 10:30 完成。除非存在两个作业可能尝试访问相同数据的风险(这会在数据库级别引发锁定问题),否则一个 JobInstance 并不要求必须在另一个之后启动。完全由调度器决定何时运行某个 Job。由于它们是独立的 JobInstances,Spring Batch 不会尝试阻止它们并发运行。(尝试在另一个实例已经在运行时启动相同的 JobInstance 将导致抛出 JobExecutionAlreadyRunningException)。此时,在 JobInstanceJobParameters 表中应各有额外的一条记录,在 JobExecution 表中则应有两条额外的记录,如下表所示:

表 5. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob
2EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEY_NAMEDATE_VALIDENTIFYING
1DATEschedule.Date2017-01-01 00:00:00TRUE
2DATEschedule.Date2017-01-01 00:00:00TRUE
3DATEschedule.Date2017-01-02 00:00:00TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112017-01-01 21:002017-01-01 21:30FAILED
212017-01-02 21:002017-01-02 21:30COMPLETED
322017-01-02 21:312017-01-02 22:29COMPLETED
备注

为了清晰和格式化的需要,列名可能已被缩写或删除。

步骤

一个 Step 是一个域对象,它封装了批处理作业中独立且顺序的阶段。因此,每个 Job 完全由一个或多个步骤组成。Step 包含定义和控制实际批处理过程所需的所有信息。这是一个必然模糊的描述,因为任何给定 Step 的内容由编写 Job 的开发人员自行决定。Step 可以根据开发人员的需求变得简单或复杂。一个简单的 Step 可能会从文件中加载数据到数据库中,这可能需要很少或不需要代码(具体取决于所使用的实现)。更复杂的 Step 可能在处理过程中应用复杂的业务规则。与 Job 类似,Step 有一个单独的 StepExecution,它与唯一的 JobExecution 相关联,如下图所示:

图 2.1:带步骤的作业层次结构

图 4. 带有步骤的作业层次结构

StepExecution

StepExecution 表示对 Step 的一次执行尝试。每次运行一个 Step 时,都会创建一个新的 StepExecution,这与 JobExecution 类似。但是,如果一个步骤由于其前一个步骤失败而未能执行,则不会为其持久化任何执行记录。只有当 Step 实际启动时,才会创建 StepExecution

Step 执行由 StepExecution 类的对象表示。每次执行都包含对其对应步骤和 JobExecution 的引用,以及与事务相关的数据,例如提交和回滚次数以及开始和结束时间。此外,每个步骤执行包含一个 ExecutionContext,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如用于重启所需的统计信息或状态信息。下表列出了 StepExecution 的属性:

表 8. StepExecution 属性

属性定义
Status一个 BatchStatus 对象,表示执行的状态。在运行期间,状态为 BatchStatus.STARTED。如果失败,状态为 BatchStatus.FAILED。如果成功完成,状态为 BatchStatus.COMPLETED
startTime一个 java.time.LocalDateTime,表示执行开始时的当前系统时间。如果步骤尚未开始,此字段为空。
endTime一个 java.time.LocalDateTime,表示执行结束时的当前系统时间,无论是否成功。如果步骤尚未退出,此字段为空。
exitStatus表示执行结果的 ExitStatus。它非常重要,因为它包含返回给调用者的退出代码。更多详细信息请参阅第 5 章。如果作业尚未退出,此字段为空。
executionContext包含任何需要在执行之间持久化的用户数据的“属性包”。
readCount成功读取的项目数量。
writeCount成功写入的项目数量。
commitCount为此执行提交的事务数量。
rollbackCountStep 控制的业务事务回滚的次数。
readSkipCountread 失败导致跳过项目的次数。
processSkipCountprocess 失败导致跳过项目的次数。
filterCountItemProcessor “过滤”掉的项目数量。
writeSkipCountwrite 失败导致跳过项目的次数。

ExecutionContext

ExecutionContext 表示由框架持久化和控制的一组键值对,它为开发人员提供了一个存储空间,用于保存与 StepExecution 对象或 JobExecution 对象作用域相关的持久状态。 (对于熟悉 Quartz 的用户来说,它与 JobDataMap 非常相似。)最佳使用案例是支持重启功能。以处理平面文件输入为例,在处理每一行时,框架会在提交点周期性地持久化 ExecutionContext。这样做可以让 ItemReader 在运行过程中发生致命错误甚至断电的情况下保存其状态。只需要将当前已读取的行数放入上下文中,如下例所示,框架会完成其余的工作:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
java

使用 Job 类型部分中的 EndOfDay 示例作为示例,假设有一个步骤 loadData,它将文件加载到数据库中。在第一次运行失败后,元数据表将如下例所示:

表 9. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_IDTYPE_CDKEY_NAMEDATE_VAL
1DATEschedule.Date2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112017-01-01 21:002017-01-01 21:30FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_IDJOB_EXEC_IDSTEP_NAMESTART_TIMEEND_TIMESTATUS
11loadData2017-01-01 21:002017-01-01 21:30FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_IDSHORT_CONTEXT
1{piece.count=40321}

在前面的情况下,Step 运行了 30 分钟并处理了 40,321 个“片段”,在这个场景中,这些片段代表文件中的行。此值会在每次提交之前由框架更新,并且可能包含与 ExecutionContext 中的条目相对应的多行数据。要在提交之前获得通知,需要使用多种 StepListener 实现之一(或 ItemStream),这些内容将在本指南的后续部分详细讨论。与前面的例子一样,假设该 Job 在第二天重启。当它重启时,上次运行的 ExecutionContext 中的值将从数据库中重新构建。当 ItemReader 打开时,它可以检查上下文中是否有任何存储的状态,并从中初始化自身,如下例所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);

long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

LineReader reader = getReader();

Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
java

在这种情况下,前面的代码运行后,当前行是 40,322,这使得 Step 可以从上次中断的地方重新开始。你还可以使用 ExecutionContext 来保存关于运行本身需要持久化的统计信息。例如,如果一个平面文件包含分布在多行上的订单用于处理,可能有必要存储已处理的订单数量(这与读取的行数有很大不同),以便在 Step 结束时发送一封包含处理订单总数的电子邮件。框架会为开发人员处理这些存储问题,并正确地将其限定在一个单独的 JobInstance 范围内。判断是否应该使用现有的 ExecutionContext 可能非常困难。例如,使用上面提到的 EndOfDay 示例,当 01-01 运行第二次开始时,框架识别到这是相同的 JobInstance,并且在每个 Step 的基础上,从数据库中提取 ExecutionContext 并通过 StepExecution 将其传递给 Step 本身。相反,对于 01-02 运行,框架识别到这是一个不同的实例,因此必须向 Step 提供一个空的上下文。框架为开发人员做出了许多类似的判断,以确保状态在正确的时间提供给他们。还需要注意的是,在任何给定时间,每个 StepExecution 都恰好存在一个 ExecutionContextExecutionContext 的使用者应小心谨慎,因为这会创建一个共享的键空间。因此,在放置值时应小心,以确保不会覆盖数据。然而,Step 在上下文中绝对不存储任何数据,因此不可能对框架产生不利影响。

请注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也有一个。例如,考虑以下代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
java

正如评论中所述,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContext。作用域为 Step 的那个在 Step 的每个提交点保存,而作用域为 Job 的那个在每次 Step 执行之间保存。

备注

ExecutionContext 中,所有非瞬态条目都必须是 Serializable。执行上下文的正确序列化是步骤和作业重启能力的基础。如果你使用了非本地可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。

JobRepository

JobRepository 是前面提到的所有刻板印象的持久化机制。它为 JobLauncherJobStep 的实现提供了 CRUD 操作。当一个 Job 首次启动时,从仓库中获取一个 JobExecution。此外,在执行过程中,通过将 StepExecutionJobExecution 的实现传递给仓库来持久化它们。

在使用 Java 配置时,@EnableBatchProcessing 注解提供了一个 JobRepository,它是自动配置的组件之一。

JobLauncher

JobLauncher 表示一个简单的接口,用于通过给定的一组 JobParameters 启动一个 Job,如下例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
java

预计实现方案将从 JobRepository 获取有效的 JobExecution 并执行该 Job

ItemReader

ItemReader 是一种抽象,表示为 Step 检索输入,一次一个项目。当 ItemReader 已经耗尽它可以提供的项目时,它通过返回 null 来指示这一点。您可以在此处找到有关 ItemReader 接口及其各种实现的更多详细信息:Readers And Writers

ItemWriter

ItemWriter 是一种抽象,表示 Step 的输出,一次写入一批或一块项目。通常,ItemWriter 对其接下来应接收的输入没有任何了解,仅知道在其当前调用中传递的项目。您可以从 Readers And Writers 中找到有关 ItemWriter 接口及其各种实现的更多详细信息。

ItemProcessor

ItemProcessor 是一个表示项业务处理的抽象。虽然 ItemReader 读取一个项,ItemWriter 写入一个项,但 ItemProcessor 提供了一个用于转换或应用其他业务处理的接入点。如果在处理项的过程中发现该项无效,返回 null 表示该项不应被写入。您可以在 Readers And Writers 中找到有关 ItemProcessor 接口的更多详细信息。

批量命名空间

前面列出的许多域概念需要在 Spring ApplicationContext 中进行配置。虽然可以使用上述接口的实现来进行标准的 Bean 定义,但为了简化配置,提供了一个命名空间,如下例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>

</beans:beans>
xml

只要已经声明了批处理命名空间,就可以使用其任何元素。有关配置作业的更多信息,请参见 配置和运行作业 。有关配置 Step 的更多信息,请参见 配置步骤