跳到主要内容
版本:6.0.2

控制步骤流程

DeepSeek V3 中英对照 Controlling Step Flow

随着能够将步骤分组到一个所属作业中,就需要能够控制作业如何从一个步骤“流转”到另一个步骤。一个Step的失败并不一定意味着Job应该失败。此外,可能存在多种“成功”类型,这些类型决定了接下来应该执行哪个Step。根据一组Step的配置方式,某些步骤甚至可能完全不被处理。

important

流程定义中的步骤 Bean 方法代理

在流程定义中,步骤实例必须是唯一的。当一个步骤在流程定义中有多个结果时,确保将同一个步骤实例传递给流程定义方法(如 startfrom 等)非常重要。否则,流程执行可能会出现意外行为。

在以下示例中,步骤作为参数注入到流程或作业 Bean 定义方法中。这种依赖注入风格保证了步骤在流程定义中的唯一性。然而,如果流程是通过调用带有 @Bean 注解的步骤定义方法来定义的,那么当 Bean 方法代理被禁用时(即使用 @Configuration(proxyBeanMethods = false)),步骤可能不是唯一的。如果更倾向于使用 Bean 间注入风格,则必须启用 Bean 方法代理。

有关 Spring Framework 中 Bean 方法代理的更多详细信息,请参阅 使用 @Configuration 注解 部分。

顺序流程

最简单的流程场景是一个作业,其中所有步骤按顺序执行,如下图所示:

顺序流程

图 1. 顺序流

这可以通过在 step 中使用 next 来实现。

以下示例展示了如何在 Java 中使用 next() 方法:

@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.next(stepB)
.next(stepC)
.build();
}

在上述场景中,stepA 首先运行,因为它是列出的第一个 Step。如果 stepA 正常完成,stepB 将运行,依此类推。然而,如果 stepA 失败,整个 Job 将失败,stepB 不会执行。

备注

在使用 Spring Batch XML 命名空间时,配置中列出的第一个步骤 总是Job 运行的第一个步骤。其他步骤元素的顺序无关紧要,但第一个步骤必须始终在 XML 中首先出现。

条件流

在前面的例子中,只有两种可能性:

  1. step 成功,应执行下一个 step

  2. step 失败,因此该 job 应失败。

在许多情况下,这可能已经足够了。但是,如果某个 step 的失败应该触发另一个不同的 step,而不是导致整个流程失败,这种情况该如何处理呢?下图展示了这样的流程:

条件流

图 2. 条件流

Java API 提供了一套流畅的方法,允许您指定流程以及步骤失败时的处理方式。以下示例展示了如何指定一个步骤(stepA),然后根据 stepA 是否成功,继续执行两个不同步骤(stepBstepC)中的任意一个:

@Bean
public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) {
return new JobBuilder("job", jobRepository)
.start(stepA)
.on("*").to(stepB)
.from(stepA).on("FAILED").to(stepC)
.end()
.build();
}

使用 Java 配置时,on() 方法采用简单的模式匹配方案来匹配 Step 执行后产生的 ExitStatus

模式中只允许使用两个特殊字符:

  • * 匹配零个或多个字符

  • ? 匹配恰好一个字符

例如,c*t 匹配 catcount,而 c?t 匹配 cat 但不匹配 count

虽然 Step 上的过渡元素数量没有限制,但如果 Step 执行产生的 ExitStatus 未被任何元素覆盖,框架将抛出异常并导致 Job 失败。框架会自动按从最具体到最不具体的顺序排列过渡。这意味着,即使在前述示例中 stepA 的顺序被调换,FAILEDExitStatus 仍会跳转到 stepC

批处理状态与退出状态

在配置条件流 Job 时,理解 BatchStatusExitStatus 之间的区别非常重要。BatchStatus 是一个枚举类型,它是 JobExecutionStepExecution 的属性,框架使用它来记录 JobStep 的状态。它可以是以下值之一:COMPLETEDSTARTINGSTARTEDSTOPPINGSTOPPEDFAILEDABANDONEDUNKNOWN。其中大多数值含义不言自明:COMPLETED 是步骤或作业成功完成时设置的状态,FAILED 是失败时设置的状态,依此类推。

以下示例展示了使用 Java 配置时包含 on 元素的情况:

...
.from(stepA).on("FAILED").to(stepB)
...

乍一看,on 似乎引用了它所属 StepBatchStatus。然而,它实际上引用的是 StepExitStatus。顾名思义,ExitStatus 表示 Step 执行完成后的状态。

在使用 Java 配置时,前面 Java 配置示例中展示的 on() 方法引用了 ExitStatus 的退出码。

在英文中,它表示:“如果退出码为 FAILED,则转到 stepB”。默认情况下,退出码始终与 StepBatchStatus 相同,这就是为什么前面的条目能够生效。然而,如果需要退出码不同,该怎么办呢?一个很好的例子来自示例项目中的 skip 示例作业:

以下示例展示了如何在 Java 中处理不同的退出码:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step errorPrint1) {
return new JobBuilder("job", jobRepository)
.start(step1).on("FAILED").end()
.from(step1).on("COMPLETED WITH SKIPS").to(errorPrint1)
.from(step1).on("*").to(step2)
.end()
.build();
}

step1 有三种可能性:

  • Step 失败,此时作业应标记为失败。

  • Step 成功完成。

  • Step 成功完成,但退出码为 COMPLETED WITH SKIPS。此时,应运行另一个步骤来处理错误。

上述配置可以正常工作。然而,需要根据执行是否跳过记录的情况来改变退出代码,如下例所示:

public class SkipCheckingListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
} else {
return null;
}
}
}

上述代码是一个 StepExecutionListener,它首先检查确保 Step 执行成功,然后检查 StepExecution 中的跳过计数是否大于 0。如果两个条件都满足,则返回一个退出码为 COMPLETED WITH SKIPS 的新 ExitStatus

配置停止功能

在讨论了BatchStatus 和 ExitStatus之后,你可能会想知道 JobBatchStatusExitStatus 是如何确定的。虽然这些状态对于 Step 是由执行的代码决定的,但对于 Job 来说,这些状态是基于配置来确定的。

到目前为止,讨论的所有作业配置都至少有一个没有转换的最终 Step

在下面的 Java 示例中,step 执行完毕后,Job 结束:

@Bean
public Job job(JobRepository jobRepository, Step step1) {
return new JobBuilder("job", jobRepository)
.start(step1)
.build();
}

如果未为 Step 定义任何转换,则 Job 的状态定义如下:

  • 如果 StepFAILEDExitStatus 结束,则 JobBatchStatusExitStatus 均为 FAILED

  • 否则,JobBatchStatusExitStatus 均为 COMPLETED

虽然这种终止批处理作业的方法对于某些批处理作业(例如简单的顺序步骤作业)已经足够,但可能需要自定义的作业停止场景。为此,Spring Batch 提供了三个转换元素来停止 Job(除了我们之前讨论过的 next 元素)。每个停止元素都会以特定的 BatchStatus 停止 Job。需要注意的是,停止转换元素对 Job 中任何 StepsBatchStatusExitStatus 都没有影响。这些元素仅影响 Job 的最终状态。例如,一个作业中的每个步骤都可能具有 FAILED 状态,但作业本身却具有 COMPLETED 状态。

结束于某个步骤

配置步骤结束会指示JobCOMPLETEDBatchStatus停止。状态为COMPLETED的已结束Job无法重新启动(框架会抛出JobInstanceAlreadyCompleteException异常)。

在使用 Java 配置时,end 方法用于此任务。end 方法还允许一个可选的 exitStatus 参数,您可以使用它来自定义 JobExitStatus。如果未提供 exitStatus 值,则 ExitStatus 默认为 COMPLETED,以匹配 BatchStatus

考虑以下场景:如果 step2 失败,Job 将以 BatchStatusCOMPLETEDExitStatusCOMPLETED 的状态停止,并且 step3 不会运行。否则,执行将移至 step3。请注意,如果 step2 失败,Job 将不可重启(因为状态为 COMPLETED)。

以下示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2)
.on("FAILED").end()
.from(step2).on("*").to(step3)
.end()
.build();
}

步骤失败

将步骤配置为在特定点失败,会指示一个 JobFAILEDBatchStatus 停止。与结束不同,Job 的失败并不会阻止该 Job 被重新启动。

在使用 XML 配置时,fail 元素还允许一个可选的 exit-code 属性,该属性可用于自定义 JobExitStatus。如果未提供 exit-code 属性,则 ExitStatus 默认为 FAILED,以匹配 BatchStatus

考虑以下场景:如果 step2 失败,Job 会停止,其 BatchStatusFAILEDExitStatusEARLY TERMINATION,并且 step3 不会执行。否则,执行会继续到 step3。此外,如果 step2 失败并且 Job 被重启,执行会再次从 step2 开始。

以下示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(step2).on("FAILED").fail()
.from(step2).on("*").to(step3)
.end()
.build();
}

在指定步骤停止作业

配置一个作业在特定步骤停止,会指示一个JobSTOPPEDBatchStatus状态停止。停止一个Job可以在处理过程中提供临时中断,以便操作员在重新启动Job之前采取某些操作。

在使用 Java 配置时,stopAndRestart 方法需要一个 restart 属性,该属性指定了当 Job 重启时执行应从哪个步骤恢复。

考虑以下场景:如果 step1COMPLETE 状态结束,作业随即停止。一旦重新启动,执行将从 step2 开始。

以下示例展示了 Java 中的场景:

@Bean
public Job job(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job", jobRepository)
.start(step1).on("COMPLETED").stopAndRestart(step2)
.end()
.build();
}

程序化流程决策

在某些情况下,可能需要比 ExitStatus 更详细的信息来决定下一步执行哪个步骤。此时,可以使用 JobExecutionDecider 来辅助决策,如下例所示:

public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}

在以下示例中,当使用 Java 配置时,一个实现了 JobExecutionDecider 的 bean 被直接传递给 next 调用:

@Bean
public Job job(JobRepository jobRepository, MyDecider decider, Step step1, Step step2, Step step3) {
return new JobBuilder("job", jobRepository)
.start(step1)
.next(decider).on("FAILED").to(step2)
.from(decider).on("COMPLETED").to(step3)
.end()
.build();
}

分流

到目前为止,描述的每个场景都涉及一个按线性方式依次执行其步骤的 Job。除了这种典型风格外,Spring Batch 还允许配置具有并行流的作业。

基于 Java 的配置允许您通过提供的构建器来配置拆分。如下例所示,split 元素包含一个或多个 flow 元素,其中可以定义完全独立的流程。split 元素也可以包含任何先前讨论过的转换元素,例如 next 属性或 nextendfail 元素。

@Bean
public Flow flow1(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}

@Bean
public Flow flow2(Step step3) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3)
.build();
}

@Bean
public Job job(JobRepository jobRepository, Flow flow1, Flow flow2, Step step4) {
return new JobBuilder("job", jobRepository)
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4)
.end()
.build();
}

外部化作业间的流程定义与依赖关系

作业中的部分流程可以外部化为单独的 bean 定义,然后重复使用。有两种方法可以实现这一点。第一种是将流程声明为对其他地方定义的流程的引用。

以下 Java 示例展示了如何声明一个流,以引用在其他地方定义的流:

@Bean
public Job job(JobRepository jobRepository, Flow flow1, Step step3) {
return new JobBuilder("job", jobRepository)
.start(flow1)
.next(step3)
.end()
.build();
}

@Bean
public Flow flow1(Step step1, Step step2) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}

如前面示例所示,定义外部流程的效果是将外部流程中的步骤插入到作业中,就像它们被内联声明一样。通过这种方式,多个作业可以引用相同的模板流程,并将这些模板组合成不同的逻辑流程。这也是分离各个流程集成测试的好方法。

外部化流程的另一种形式是使用 JobStepJobStep 类似于 FlowStep,但实际上会为指定流程中的步骤创建并启动一个独立的作业执行。

以下示例展示了 Java 中 JobStep 的一个例子:

@Bean
public Job jobStepJob(JobRepository jobRepository, Step jobStepJobStep1) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1)
.build();
}

@Bean
public Step jobStepJobStep1(JobRepository jobRepository, JobLauncher jobLauncher, Job job, JobParametersExtractor jobParametersExtractor) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job)
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor)
.build();
}

@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
// ...
.build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

extractor.setKeys(new String[]{"input.file"});

return extractor;
}

作业参数提取器是一种策略,用于确定如何将StepExecutionContext转换为所运行JobJobParameters。当您希望对作业和步骤的监控与报告拥有更细粒度的控制选项时,JobStep会非常有用。使用JobStep也常常是回答“如何在作业之间创建依赖关系?”这个问题的好方法。它是将大型系统拆分为更小模块并控制作业流程的有效方式。