跳到主要内容

控制步骤流

QWen Plus 中英对照 Controlling Step Flow

随着在所属作业中将步骤分组的能力的提升,随之而来的是需要能够控制作业如何从一个步骤流向另一个步骤。Step 的失败并不一定意味着 Job 应该失败。此外,可能存在多种类型的“成功”,这些类型决定了下一个应该执行的 Step 是哪个。根据一组 Steps 的配置方式,某些步骤甚至可能完全不被处理。

important

流程定义中的步骤 Bean 方法代理

在一个流程定义中,步骤实例必须是唯一的。当一个步骤在流程定义中有多个结果时,确保传递给流程定义方法(如 startfrom 等)的是同一个步骤实例非常重要。否则,流程执行可能会出现意外行为。

在以下示例中,步骤作为参数注入到流程或作业 Bean 定义方法中。这种依赖注入方式可以保证流程定义中步骤的唯一性。但是,如果通过调用标注了 @Bean 的步骤定义方法来定义流程,那么如果禁用了 Bean 方法代理(例如 @Configuration(proxyBeanMethods = false)),步骤可能就不是唯一的。如果更倾向于使用 Bean 间注入的方式,则必须启用 Bean 方法代理。

更多关于 Spring Framework 中 Bean 方法代理的详细信息,请参阅 使用 @Configuration 注解 部分。

顺序流

最简单的流程场景是所有步骤都按顺序执行的作业,如下图所示:

顺序流

图1. 顺序流

这可以通过在 step 中使用 next 来实现。

下面的示例展示了如何在 Java 中使用 next() 方法:

@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.next(stepB)
.next(stepC)
.build();
}
java

在上述场景中,stepA 会首先运行,因为它是在列表中的第一个 Step。如果 stepA 正常完成,则执行 stepB,依此类推。但是,如果 step A 失败,整个 Job 将失败,且 stepB 不会执行。

备注

使用 Spring Batch XML 命名空间时,配置中列出的第一个步骤 总是Job 执行的第一个步骤。其他步骤元素的顺序无关紧要,但第一个步骤必须始终出现在 XML 中的第一个位置。

条件流

在前面的例子中,只有两种可能性:

  1. step 执行成功,下一个 step 应该被执行。

  2. step 失败了,因此,job 应该失败。

在许多情况下,这可能已经足够了。但是,如果某个 step 失败应该触发一个不同的 step,而不是导致整个流程失败,这种情况又该如何处理呢?下图展示了一个这样的流程:

Conditional Flow

图 2. 条件流

Java API 提供了一组流畅的方法,允许你指定流程以及在某个步骤失败时要执行的操作。以下示例展示了如何指定一个步骤(stepA),然后根据 stepA 是否成功,转到两个不同的步骤(stepBstepC)之一:

@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.on("*").to(stepB)
.from(stepA).on("FAILED").to(stepC)
.end()
.build();
}
java

在使用 Java 配置时,on() 方法使用一种简单的模式匹配方案来匹配由 Step 执行所产生的 ExitStatus

模式中只允许使用两个特殊字符:

  • * 匹配零个或多个字符

  • ? 匹配正好一个字符

例如,c*t 匹配 catcount,而 c?t 匹配 cat 但不匹配 count

虽然在一个 Step 上的转换元素数量没有限制,但如果 Step 的执行结果产生了一个没有被任何元素覆盖的 ExitStatus,框架会抛出异常,并导致 Job 失败。框架会自动将转换按从最具体到最不具体的顺序排列。这意味着,即使在前面的例子中 stepA 的顺序被调换,ExitStatusFAILED 仍然会转到 stepC

批处理状态与退出状态

在为条件流配置 Job 时,了解 BatchStatusExitStatus 之间的区别非常重要。BatchStatus 是一个枚举类型,它是 JobExecutionStepExecution 的属性,框架使用它来记录 JobStep 的状态。它可以是以下值之一:COMPLETEDSTARTINGSTARTEDSTOPPINGSTOPPEDFAILEDABANDONEDUNKNOWN。其中大多数是不言自明的:当某个步骤或作业成功完成时,状态会被设置为 COMPLETED,当它失败时会被设置为 FAILED,依此类推。

下面的示例在使用 Java 配置时包含 on 元素:

...
.from(stepA).on("FAILED").to(stepB)
...
java

乍一看,on 似乎引用了其所属 StepBatchStatus。但实际上,它引用的是该 StepExitStatus。顾名思义,ExitStatus 表示 Step 执行完成后的状态。

在使用 Java 配置时,前面 Java 配置示例中显示的 on() 方法引用了 ExitStatus 的退出代码。

如果退出代码为 FAILED,则“转到 stepB”。 默认情况下,退出代码始终与 StepBatchStatus 相同,这就是为什么前面的条目可行。但是,如果退出代码需要不同怎么办?一个很好的例子来自 samples 项目中的 skip 示例作业:

下面的示例展示了如何在 Java 中使用不同的退出代码:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step errorPrint1) {
return new JobBuilder("job", jobRepository)
.start(step1).on("FAILED").end()
.from(step1).on("COMPLETED WITH SKIPS").to(errorPrint1)
.from(step1).on("*").to(step2)
.end()
.build();
}
java

step1 有三种可能性:

  • Step 失败,这种情况下作业应该失败。

  • Step 成功完成。

  • Step 成功完成,但退出代码为 COMPLETED WITH SKIPS。在这种情况下,应运行不同的步骤来处理错误。

前面的配置可以正常工作。然而,某些内容需要根据执行过程中跳过记录的条件来更改退出代码,如下例所示:

public class SkipCheckingListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
} else {
return null;
}
}
}
java

前面的代码是一个 StepExecutionListener,它首先检查 Step 是否成功,然后检查 StepExecution 上的跳过计数是否大于 0。如果两个条件都满足,则返回一个新的 ExitStatus,其退出代码为 COMPLETED WITH SKIPS

配置以停止

在讨论了 BatchStatus 和 ExitStatus 之后,人们可能会想知道 JobBatchStatusExitStatus 是如何确定的。虽然这些状态是由执行的代码为 Step 确定的,但 Job 的状态是根据配置确定的。

到目前为止,所有讨论的作业配置都至少有一个最终的 Step,没有任何转换。

在以下Java示例中,step 执行后,Job 结束:

@Bean
public Job job(JobRepository jobRepository, Step step1) {
return new JobBuilder("job", jobRepository)
.start(step1)
.build();
}
java

如果 Step 未定义任何转换,则 Job 的状态定义如下:

  • 如果 StepExitStatusFAILED 结束,则 JobBatchStatusExitStatus 都是 FAILED

  • 否则,JobBatchStatusExitStatus 都是 COMPLETED

虽然这种终止批处理作业的方法对于某些批处理作业(例如简单的顺序步骤作业)已经足够,但在某些情况下可能需要自定义定义的作业停止场景。为此,Spring Batch 提供了三个用于停止 Job 的转换元素(除了我们之前讨论过的 next element)。这些停止元素中的每一个都会以特定的 BatchStatus 停止一个 Job。需要注意的是,停止转换元素不会影响 Job 中任何 StepsBatchStatusExitStatus。这些元素仅影响 Job 的最终状态。例如,一个作业中的每个步骤的状态可能是 FAILED,但整个作业的状态可以是 COMPLETED

在步骤结束

配置一个步骤结束会指示 JobBatchStatusCOMPLETED 停止。状态为 COMPLETEDJob 无法重启(框架会抛出一个 JobInstanceAlreadyCompleteException)。

在使用 Java 配置时,end 方法用于此任务。end 方法还允许使用可选的 exitStatus 参数,您可以使用它来自定义 JobExitStatus。如果不提供 exitStatus 值,则 ExitStatus 默认为 COMPLETED,以匹配 BatchStatus

考虑以下场景:如果 step2 失败,Job 会以 BatchStatusCOMPLETEDExitStatusCOMPLETED 停止,并且 step3 不会执行。否则,执行将移交给 step3。请注意,如果 step2 失败,Job 将无法重启(因为状态是 COMPLETED)。

下面的示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2)
.on("FAILED").end()
.from(step2).on("*").to(step3)
.end()
.build();
}
java

步骤失败

配置一个步骤在特定点失败会指示 Job 停止并带有 BatchStatusFAILED 的状态。与结束不同,Job 的失败并不会阻止该 Job 重新启动。

在使用 XML 配置时,fail 元素还允许使用可选的 exit-code 属性,该属性可用于自定义 JobExitStatus。如果没有提供 exit-code 属性,则 ExitStatus 默认为 FAILED,以匹配 BatchStatus

考虑以下场景:如果 step2 失败,Job 会停止,BatchStatusFAILEDExitStatusEARLY TERMINATION,并且 step3 不会执行。否则,执行将移至 step3。此外,如果 step2 失败且 Job 被重新启动,执行将再次从 step2 开始。

下面的示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2).on("FAILED").fail()
.from(step2).on("*").to(step3)
.end()
.build();
}
java

在指定步骤停止作业

将作业配置为在特定步骤停止,指示 JobSTOPPEDBatchStatus 停止。停止一个 Job 可以为处理提供一个临时的中断,以便操作员在重新启动 Job 之前采取一些措施。

在使用 Java 配置时,stopAndRestart 方法需要一个 restart 属性,该属性指定在 Job 重新启动时应从哪个步骤继续执行。

考虑以下场景:如果 step1COMPLETE 结束,则作业停止。重新启动后,执行从 step2 开始。

下面的示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job", jobRepository)
.start(step1).on("COMPLETED").stopAndRestart(step2)
.end()
.build();
}
java

程序化流程决策

在某些情况下,仅靠 ExitStatus 无法提供足够的信息来决定下一步执行哪个步骤。在这种情况下,可以使用 JobExecutionDecider 来辅助决策,如下例所示:

public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
java

在以下示例中,当使用 Java 配置时,实现 JobExecutionDecider 的 bean 被直接传递给 next 调用:

@Bean
public Job job(JobRepository jobRepository, MyDecider decider, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(decider).on("FAILED").to(step2)
.from(decider).on("COMPLETED").to(step3)
.end()
.build();
}
java

Split Flows

分流

到目前为止,所描述的每种场景都涉及以线性方式一次执行一个步骤的 Job。除了这种典型的风格之外,Spring Batch 还允许配置具有并行流的作业。

基于 Java 的配置允许您通过提供的构建器来配置拆分。如以下示例所示,split 元素包含一个或多个 flow 元素,在这些元素中可以定义完全独立的流程。split 元素还可以包含之前讨论过的任何转换元素,例如 next 属性或 nextendfail 元素。

@Bean
public Flow flow1(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}

@Bean
public Flow flow2(Step step3) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3)
.build();
}

@Bean
public Job job(JobRepository jobRepository, Flow flow1, Flow flow2, Step step4) {
return new JobBuilder("job", jobRepository)
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4)
.end()
.build();
}
java

外部化流程定义和作业之间的依赖关系

作业中的部分流程可以外部化为一个单独的 bean 定义,然后重新使用。有两种方法可以实现。第一种是将流程声明为对其他地方定义的引用。

下面的 Java 示例展示了如何将一个流程声明为对在其他地方定义的流程的引用:

@Bean
public Job job(JobRepository jobRepository, Flow flow1, Step step3) {
return new JobBuilder("job", jobRepository)
.start(flow1)
.next(step3)
.end()
.build();
}

@Bean
public Flow flow1(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}
java

如前面的例子所示,定义外部流程的效果是将外部流程中的步骤插入到作业中,就像它们是内联声明的一样。通过这种方式,许多作业可以引用相同的模板流程,并将这些模板组合成不同的逻辑流程。这也是一种分离各个流程集成测试的好方法。

外部化流的另一种形式是使用 JobStepJobStepFlowStep 类似,但它实际上会为指定流中的步骤创建并启动一个单独的作业执行。

下面的示例展示了 Java 中 JobStep 的一个示例:

@Bean
public Job jobStepJob(JobRepository jobRepository, Step jobStepJobStep1) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1)
.build();
}

@Bean
public Step jobStepJobStep1(JobRepository jobRepository, JobLauncher jobLauncher, Job job, JobParametersExtractor jobParametersExtractor) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job)
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor)
.build();
}

@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
// ...
.build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

extractor.setKeys(new String[]{"input.file"});

return extractor;
}
java

任务参数提取器是一种策略,它决定了如何将 StepExecutionContext 转换为运行的 JobJobParameters。当您希望对作业和步骤的监控和报告有一些更精细的选项时,JobStep 非常有用。使用 JobStep 也通常是回答“如何在作业之间创建依赖关系?”这个问题的一个好方法。这是一种将大型系统分解为较小模块并控制作业流程的好方式。