跳到主要内容

高级元数据使用

QWen Plus 中英对照 Advanced Metadata Usage

到目前为止,已经讨论了 JobLauncherJobRepository 这两个接口。它们共同表示了作业的简单启动以及批量领域对象的基本 CRUD 操作:

Job Repository

图1. 作业存储库

一个 JobLauncher 使用 JobRepository 来创建新的 JobExecution 对象并运行它们。随后,JobStep 的实现会在 Job 运行期间使用相同的 JobRepository 对这些执行进行基本更新。对于简单场景来说,这些基本操作已经足够。然而,在具有数百个批处理作业和复杂调度需求的大型批处理环境中,需要对元数据进行更高级的访问:

Job Repository Advanced

图 2. 高级作业存储库访问

在接下来的部分中讨论的 JobExplorerJobOperator 接口,为查询和控制元数据添加了额外的功能。

查询存储库

在任何高级功能之前,最基本的需求是能够查询存储库以获取现有的执行记录。此功能由 JobExplorer 接口提供:

public interface JobExplorer {

List<JobInstance> getJobInstances(String jobName, int start, int count);

JobExecution getJobExecution(Long executionId);

StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

JobInstance getJobInstance(Long instanceId);

List<JobExecution> getJobExecutions(JobInstance jobInstance);

Set<JobExecution> findRunningJobExecutions(String jobName);
}
java

从其方法签名可以看出,JobExplorerJobRepository 的只读版本,并且,与 JobRepository 一样,它也可以通过使用工厂 Bean 轻松配置。

下面的示例展示了如何在 Java 中配置 JobExplorer

...
// 这将位于您的 DefaultBatchConfiguration 扩展中
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
java

本章前面部分 提到,您可以修改 JobRepository 的表前缀,以允许不同的版本或模式。由于 JobExplorer 与相同的表一起工作,因此它也需要设置前缀的能力。

下面的示例展示了如何在 Java 中为 JobExplorer 设置表前缀:

...
// 这将位于您的 DefaultBatchConfiguration 扩展中
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
java

JobRegistry

JobRegistry(及其父接口 JobLocator)不是必需的,但如果您想跟踪上下文中可用的任务,则它可能会很有用。当任务在其他地方(例如,在子上下文中)创建时,它也有助于在应用程序上下文中集中收集这些任务。您还可以使用自定义的 JobRegistry 实现来操作已注册任务的名称和其他属性。框架仅提供了一种实现,这种实现是基于从任务名称到任务实例的简单映射。

当使用 @EnableBatchProcessing 时,会为你提供一个 JobRegistry。以下示例展示了如何配置你自己的 JobRegistry

...
// 这是通过 @EnableBatchProcessing 提供的,但可以通过覆盖 DefaultBatchConfiguration 中的 bean 来自定义
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
java

你可以通过以下几种方式之一来填充 JobRegistry:使用 bean 后置处理器、使用智能初始化单例或使用注册器生命周期组件。接下来的章节将描述这些机制。

JobRegistryBeanPostProcessor

这是一个可以在所有作业创建时注册它们的 Bean 后置处理器。

下面的示例展示了如何为在 Java 中定义的作业包含 JobRegistryBeanPostProcessor

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
java

虽然这并不是严格必需的,但示例中的后处理器被赋予了一个 id,这样它就可以被包含在子上下文中(例如,作为父级 Bean 定义),并导致在那里创建的所有任务也会被自动注册。

注意

弃用通知

从 5.2 版本开始,JobRegistryBeanPostProcessor 类已被弃用,建议使用 JobRegistrySmartInitializingSingleton,详见 JobRegistrySmartInitializingSingleton

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,它会在作业注册表中注册所有单例作业。

下面的示例展示了如何在 Java 中定义一个 JobRegistrySmartInitializingSingleton

@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
java

自动任务注册器

这是一个生命周期组件,它创建子上下文,并在这些上下文中的任务被创建时注册。这样做的一个优势是,尽管子上下文中的任务名称仍然需要在注册表中全局唯一,但它们的依赖项可以使用“自然”的名称。例如,您可以创建一组 XML 配置文件,每个文件中只有一个 Job,但它们对具有相同 bean 名称(如 reader)的 ItemReader 有不同的定义。如果所有这些文件都被导入到同一个上下文中,读取器定义会发生冲突并相互覆盖,但是通过自动注册器,这种情况可以避免。这使得从应用程序的不同模块贡献的任务更容易集成。

下面的示例展示了如何为用 Java 定义的任务包含 AutomaticJobRegistrar

@Bean
public AutomaticJobRegistrar registrar() {

AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;

}
java

注册器有两个强制属性:一个 ApplicationContextFactory 数组(在前面的例子中通过便捷的工厂豆创建)和一个 JobLoaderJobLoader 负责管理子上下文的生命周期,并在 JobRegistry 中注册作业。

ApplicationContextFactory 负责创建子上下文。最常见的用法是(如前面的例子所示)使用 ClassPathXmlApplicationContextFactory。该工厂的一个特性是,默认情况下,它会从父上下文复制一些配置到子上下文中。因此,例如,如果子上下文的配置应与父上下文相同,则无需重新定义 PropertyPlaceholderConfigurer 或 AOP 配置。

你可以将 AutomaticJobRegistrarJobRegistryBeanPostProcessor 结合使用(只要你也使用 DefaultJobLoader)。例如,如果在主父上下文中以及子位置中都定义了作业,这可能是一个理想的选择。

JobOperator

如前所述,JobRepository 提供了对元数据的 CRUD 操作,而 JobExplorer 提供了对元数据的只读操作。然而,当这些操作结合使用时,对于执行常见的监控任务最为有用,例如停止、重新启动或汇总 Job,这在批处理操作员中很常见。Spring Batch 在 JobOperator 接口中提供了这些类型的操作:

public interface JobOperator {

List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;

Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

String getParameters(long executionId) throws NoSuchJobExecutionException;

Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;

Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;

Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;

String getSummary(long executionId) throws NoSuchJobExecutionException;

Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;

Set<String> getJobNames();

}
java

前面的操作表示来自许多不同接口的方法,例如 JobLauncherJobRepositoryJobExplorerJobRegistry。出于这个原因,提供的 JobOperator 实现(SimpleJobOperator)有许多依赖项。

下面的示例展示了 Java 中 SimpleJobOperator 的典型 Bean 定义:

/**
* 这个 Bean 的所有注入依赖项都由 @EnableBatchProcessing
* 基础设施开箱即用提供。
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {

SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);

return jobOperator;
}
java

从 5.0 版本开始,@EnableBatchProcessing 注解会自动在应用程序上下文中注册一个 job operator bean。

备注

如果你在作业存储库中设置了表前缀,也不要忘记在作业资源管理器中设置它。

JobParametersIncrementer

JobOperator 上的大多数方法都很直观,并且您可以在 接口的 Javadoc 中找到更详细的说明。然而,startNextInstance 方法值得一提。此方法始终启动一个 Job 的新实例。如果在 JobExecution 中存在严重问题并且需要从头开始重新启动 Job,这将非常有用。与 JobLauncher 不同(JobLauncher 需要一个新的 JobParameters 对象来触发一个新的 JobInstance),如果参数与任何以前的参数集不同,startNextInstance 方法会使用与 Job 绑定的 JobParametersIncrementer 来强制 Job 进入一个新的实例:

public interface JobParametersIncrementer {

JobParameters getNext(JobParameters parameters);

}
java

JobParametersIncrementer 的契约是,给定一个 JobParameters 对象,它通过增加其中可能包含的任何必要值来返回“下一个” JobParameters 对象。这种策略非常有用,因为框架无法知道对 JobParameters 的哪些更改使其成为“下一个”实例。例如,如果 JobParameters 中唯一的值是一个日期,并且需要创建下一个实例,那么该值应该增加一天还是一周(如果作业是每周一次的话)?对于任何有助于标识 Job 的数值,也是如此,如下例所示:

public class SampleIncrementer implements JobParametersIncrementer {

public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
java

在这个例子中,键为 run.id 的值用于区分不同的 JobInstances。如果传入的 JobParameters 为 null,可以假设该 Job 从未运行过,因此可以返回其初始状态。但是,如果不是 null,则获取旧值,将其加一后返回。

对于在 Java 中定义的任务,可以通过构建器中提供的 incrementer 方法将增量器与 Job 关联,如下所示:

@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
java

停止作业

JobOperator 最常见的用例之一是优雅地停止一个 Job:

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
java

关闭并非立即生效,因为没有办法强制立即关闭,尤其是当执行当前处于框架无法控制的开发人员代码中时,例如业务服务。然而,一旦控制权返回到框架,它会将当前 StepExecution 的状态设置为 BatchStatus.STOPPED,保存它,并在完成之前对 JobExecution 做同样的操作。

中止作业

一个状态为 FAILED 的作业执行可以重新启动(如果该 Job 是可重启的)。状态为 ABANDONED 的作业执行不能被框架重新启动。ABANDONED 状态还用于步骤执行中,以在重新启动的作业执行中将其标记为可跳过的步骤。如果一个作业正在运行,并且遇到在之前失败的作业执行中被标记为 ABANDONED 的步骤,它将跳过该步骤并进入下一步(由作业流定义和步骤执行退出状态决定)。

如果进程终止了(例如通过 kill -9 或服务器故障),任务当然不会运行,但 JobRepository 无法知道这一点,因为在进程终止之前没有人通知它。你必须手动告知它,确认执行要么已失败,要么应被视为中止(将其状态更改为 FAILEDABANDONED)。这是一个业务决策,无法自动化处理。仅当任务可重启且你知道重启数据有效时,才应将状态更改为 FAILED