高级元数据使用
到目前为止,已经讨论了 JobLauncher
和 JobRepository
这两个接口。它们共同表示了作业的简单启动以及批量领域对象的基本 CRUD 操作:
图1. 作业存储库
一个 JobLauncher
使用 JobRepository
来创建新的 JobExecution
对象并运行它们。随后,Job
和 Step
的实现会在 Job
运行期间使用相同的 JobRepository
对这些执行进行基本更新。对于简单场景来说,这些基本操作已经足够。然而,在具有数百个批处理作业和复杂调度需求的大型批处理环境中,需要对元数据进行更高级的访问:
图 2. 高级作业存储库访问
在接下来的部分中讨论的 JobExplorer
和 JobOperator
接口,为查询和控制元数据添加了额外的功能。
查询存储库
在任何高级功能之前,最基本的需求是能够查询存储库以获取现有的执行记录。此功能由 JobExplorer
接口提供:
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从其方法签名可以看出,JobExplorer
是 JobRepository
的只读版本,并且,与 JobRepository
一样,它也可以通过使用工厂 Bean 轻松配置。
- Java
- XML
下面的示例展示了如何在 Java 中配置 JobExplorer
:
...
// 这将位于您的 DefaultBatchConfiguration 扩展中
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
下面的示例展示了如何在 XML 中配置 JobExplorer
:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
本章前面部分 提到,您可以修改 JobRepository
的表前缀,以允许不同的版本或模式。由于 JobExplorer
与相同的表一起工作,因此它也需要设置前缀的能力。
- Java
- XML
下面的示例展示了如何在 Java 中为 JobExplorer
设置表前缀:
...
// 这将位于您的 DefaultBatchConfiguration 扩展中
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
下面的示例展示了如何在 XML 中为 JobExplorer
设置表前缀:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
JobRegistry
JobRegistry
(及其父接口 JobLocator
)不是必需的,但如果您想跟踪上下文中可用的任务,则它可能会很有用。当任务在其他地方(例如,在子上下文中)创建时,它也有助于在应用程序上下文中集中收集这些任务。您还可以使用自定义的 JobRegistry
实现来操作已注册任务的名称和其他属性。框架仅提供了一种实现,这种实现是基于从任务名称到任务实例的简单映射。
- Java
- XML
当使用 @EnableBatchProcessing
时,会为你提供一个 JobRegistry
。以下示例展示了如何配置你自己的 JobRegistry
:
...
// 这是通过 @EnableBatchProcessing 提供的,但可以通过覆盖 DefaultBatchConfiguration 中的 bean 来自定义
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
以下示例展示了如何为在 XML 中定义的作业包含一个 JobRegistry
:
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
你可以通过以下几种方式之一来填充 JobRegistry
:使用 bean 后置处理器、使用智能初始化单例或使用注册器生命周期组件。接下来的章节将描述这些机制。
JobRegistryBeanPostProcessor
这是一个可以在所有作业创建时注册它们的 Bean 后置处理器。
- Java
- XML
下面的示例展示了如何为在 Java 中定义的作业包含 JobRegistryBeanPostProcessor
:
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
下面的示例展示了如何为在 XML 中定义的作业包含 JobRegistryBeanPostProcessor
:
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
虽然这并不是严格必需的,但示例中的后处理器被赋予了一个 id
,这样它就可以被包含在子上下文中(例如,作为父级 Bean 定义),并导致在那里创建的所有任务也会被自动注册。
弃用通知
从 5.2 版本开始,JobRegistryBeanPostProcessor
类已被弃用,建议使用 JobRegistrySmartInitializingSingleton
,详见 JobRegistrySmartInitializingSingleton。
JobRegistrySmartInitializingSingleton
这是一个 SmartInitializingSingleton
,它会在作业注册表中注册所有单例作业。
- Java
- XML
下面的示例展示了如何在 Java 中定义一个 JobRegistrySmartInitializingSingleton
:
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
下面的示例展示了如何在 XML 中定义一个 JobRegistrySmartInitializingSingleton
:
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
自动任务注册器
这是一个生命周期组件,它创建子上下文,并在这些上下文中的任务被创建时注册。这样做的一个优势是,尽管子上下文中的任务名称仍然需要在注册表中全局唯一,但它们的依赖项可以使用“自然”的名称。例如,您可以创建一组 XML 配置文件,每个文件中只有一个 Job,但它们对具有相同 bean 名称(如 reader
)的 ItemReader
有不同的定义。如果所有这些文件都被导入到同一个上下文中,读取器定义会发生冲突并相互覆盖,但是通过自动注册器,这种情况可以避免。这使得从应用程序的不同模块贡献的任务更容易集成。
- Java
- XML
下面的示例展示了如何为用 Java 定义的任务包含 AutomaticJobRegistrar
:
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
下面的示例展示了如何为用 XML 定义的任务包含 AutomaticJobRegistrar
:
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
注册器有两个强制属性:一个 ApplicationContextFactory
数组(在前面的例子中通过便捷的工厂豆创建)和一个 JobLoader
。JobLoader
负责管理子上下文的生命周期,并在 JobRegistry
中注册作业。
ApplicationContextFactory
负责创建子上下文。最常见的用法是(如前面的例子所示)使用 ClassPathXmlApplicationContextFactory
。该工厂的一个特性是,默认情况下,它会从父上下文复制一些配置到子上下文中。因此,例如,如果子上下文的配置应与父上下文相同,则无需重新定义 PropertyPlaceholderConfigurer
或 AOP 配置。
你可以将 AutomaticJobRegistrar
与 JobRegistryBeanPostProcessor
结合使用(只要你也使用 DefaultJobLoader
)。例如,如果在主父上下文中以及子位置中都定义了作业,这可能是一个理想的选择。
JobOperator
如前所述,JobRepository
提供了对元数据的 CRUD 操作,而 JobExplorer
提供了对元数据的只读操作。然而,当这些操作结合使用时,对于执行常见的监控任务最为有用,例如停止、重新启动或汇总 Job,这在批处理操作员中很常见。Spring Batch 在 JobOperator
接口中提供了这些类型的操作:
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
前面的操作表示来自许多不同接口的方法,例如 JobLauncher
、JobRepository
、JobExplorer
和 JobRegistry
。出于这个原因,提供的 JobOperator
实现(SimpleJobOperator
)有许多依赖项。
- Java
- XML
下面的示例展示了 Java 中 SimpleJobOperator
的典型 Bean 定义:
/**
* 这个 Bean 的所有注入依赖项都由 @EnableBatchProcessing
* 基础设施开箱即用提供。
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
下面的示例展示了 XML 中 SimpleJobOperator
的典型 Bean 定义:
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
从 5.0 版本开始,@EnableBatchProcessing
注解会自动在应用程序上下文中注册一个 job operator bean。
如果你在作业存储库中设置了表前缀,也不要忘记在作业资源管理器中设置它。
JobParametersIncrementer
JobOperator
上的大多数方法都很直观,并且您可以在 接口的 Javadoc 中找到更详细的说明。然而,startNextInstance
方法值得一提。此方法始终启动一个 Job
的新实例。如果在 JobExecution
中存在严重问题并且需要从头开始重新启动 Job
,这将非常有用。与 JobLauncher
不同(JobLauncher
需要一个新的 JobParameters
对象来触发一个新的 JobInstance
),如果参数与任何以前的参数集不同,startNextInstance
方法会使用与 Job
绑定的 JobParametersIncrementer
来强制 Job
进入一个新的实例:
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的契约是,给定一个 JobParameters 对象,它通过增加其中可能包含的任何必要值来返回“下一个” JobParameters
对象。这种策略非常有用,因为框架无法知道对 JobParameters
的哪些更改使其成为“下一个”实例。例如,如果 JobParameters
中唯一的值是一个日期,并且需要创建下一个实例,那么该值应该增加一天还是一周(如果作业是每周一次的话)?对于任何有助于标识 Job
的数值,也是如此,如下例所示:
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在这个例子中,键为 run.id
的值用于区分不同的 JobInstances
。如果传入的 JobParameters
为 null,可以假设该 Job
从未运行过,因此可以返回其初始状态。但是,如果不是 null,则获取旧值,将其加一后返回。
- Java
- XML
对于在 Java 中定义的任务,可以通过构建器中提供的 incrementer
方法将增量器与 Job
关联,如下所示:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
对于在 XML 中定义的任务,可以通过命名空间中的 incrementer
属性将增量器与 Job
关联,如下所示:
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
停止作业
JobOperator
最常见的用例之一是优雅地停止一个 Job:
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭并非立即生效,因为没有办法强制立即关闭,尤其是当执行当前处于框架无法控制的开发人员代码中时,例如业务服务。然而,一旦控制权返回到框架,它会将当前 StepExecution
的状态设置为 BatchStatus.STOPPED
,保存它,并在完成之前对 JobExecution
做同样的操作。
中止作业
一个状态为 FAILED
的作业执行可以重新启动(如果该 Job
是可重启的)。状态为 ABANDONED
的作业执行不能被框架重新启动。ABANDONED
状态还用于步骤执行中,以在重新启动的作业执行中将其标记为可跳过的步骤。如果一个作业正在运行,并且遇到在之前失败的作业执行中被标记为 ABANDONED
的步骤,它将跳过该步骤并进入下一步(由作业流定义和步骤执行退出状态决定)。
如果进程终止了(例如通过 kill -9
或服务器故障),任务当然不会运行,但 JobRepository
无法知道这一点,因为在进程终止之前没有人通知它。你必须手动告知它,确认执行要么已失败,要么应被视为中止(将其状态更改为 FAILED
或 ABANDONED
)。这是一个业务决策,无法自动化处理。仅当任务可重启且你知道重启数据有效时,才应将状态更改为 FAILED
。