跳到主要内容
版本:6.0.2

配置 Step 以支持重启

DeepSeek V3 中英对照 Configuring a Step for Restart Configuring a Step for Restart

在“配置和运行作业”部分,我们讨论了如何重启一个 Job。重启对步骤有诸多影响,因此可能需要一些特定的配置。

设置启动限制

在许多场景中,你可能需要控制一个 Step 可以被启动的次数。例如,你可能需要配置某个特定的 Step,使其仅运行一次,因为它会使某些资源失效,而这些资源必须手动修复后才能再次运行。这是在步骤级别可配置的,因为不同的步骤可能有不同的要求。一个只能执行一次的 Step 可以与一个可以无限次运行的 Step 作为同一个 Job 的一部分共存。

以下代码片段展示了 Java 中启动限制配置的一个示例:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}

前面示例中展示的步骤只能运行一次。尝试再次运行会导致抛出 StartLimitExceededException 异常。请注意,启动限制的默认值为 Integer.MAX_VALUE

重启已完成的 Step

对于可重启的作业,可能存在一个或多个步骤,无论它们首次执行是否成功,都应始终运行。例如,验证步骤或在处理前清理资源的Step。在重启作业的正常处理过程中,任何状态为COMPLETED(表示已成功完成)的步骤都会被跳过。将allow-start-if-complete设置为true可以覆盖此行为,使该步骤始终运行。

以下代码片段展示了如何在 Java 中定义一个可重启的作业:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}

Step 重启配置示例

以下 Java 示例展示了如何配置一个包含可重启步骤的作业:

@Bean
public Job footballJob(JobRepository jobRepository, Step playerLoad, Step gameLoad, Step playerSummarization) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad)
.next(gameLoad)
.next(playerSummarization)
.build();
}

@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerLoad", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(playerFileItemReader())
.writer(playerWriter())
.build();
}

@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("gameLoad", jobRepository)
.allowStartIfComplete(true)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(gameFileItemReader())
.writer(gameWriter())
.build();
}

@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerSummarization", jobRepository)
.startLimit(2)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(playerSummarizationSource())
.writer(summaryWriter())
.build();
}

上述示例配置是针对一个加载足球比赛信息并进行汇总的作业。该作业包含三个步骤:playerLoadgameLoadplayerSummarizationplayerLoad 步骤从平面文件加载球员信息,而 gameLoad 步骤对比赛信息执行相同的操作。最后一步 playerSummarization 则根据提供的比赛数据汇总每位球员的统计数据。假设 playerLoad 加载的文件只需加载一次,而 gameLoad 可以加载特定目录中找到的任何比赛文件,并在成功加载到数据库后将其删除。因此,playerLoad 步骤不包含额外的配置。该步骤可以启动任意次数,如果已完成则会被跳过。然而,gameLoad 步骤每次都需要运行,以防自上次运行以来添加了额外的文件。它设置了 allow-start-if-completetrue,以确保始终启动。(假设加载比赛数据的数据库表上有一个处理指示器,以确保汇总步骤能够正确找到新的比赛数据)。汇总步骤作为作业中最重要的部分,其启动限制被配置为 2。这很有用,因为如果该步骤持续失败,将向控制作业执行的操作员返回一个新的退出码,并且在手动干预之前无法再次启动。

备注

本文档中的作业示例与示例项目中的 footballJob 不同。

本节剩余部分将描述 footballJob 示例中三次运行各自的情况。

运行 1:

  1. playerLoad 运行并成功完成,向 PLAYERS 表中添加了 400 名球员。

  2. gameLoad 运行并处理了 11 个游戏数据文件,将其内容加载到 GAMES 表中。

  3. playerSummarization 开始处理,并在 5 分钟后失败。

运行 2:

  1. playerLoad 不会运行,因为它已经成功完成,且 allow-start-if-completefalse(默认值)。

  2. gameLoad 再次运行并处理另外 2 个文件,将其内容也加载到 GAMES 表中(使用进程指示器标记这些数据尚未被处理)。

  3. playerSummarization 开始处理所有剩余的游戏数据(使用进程指示器进行过滤),并在 30 分钟后再次失败。

运行 3:

  1. playerLoad 不会运行,因为它已经成功完成,且 allow-start-if-completefalse(默认值)。

  2. gameLoad 再次运行并处理另外 2 个文件,将其内容也加载到 GAMES 表中(附带一个表示尚未处理的处理指示器)。

  3. playerSummarization 不会启动,作业会立即终止,因为这是 playerSummarization 的第三次执行,而它的限制仅为 2 次。必须提高限制,或者必须将 Job 作为新的 JobInstance 执行。