通过消息启动批处理作业
当使用核心 Spring Batch API 启动批处理作业时,基本上有两种选择:
-
通过命令行,使用
CommandLineJobOperator -
通过编程方式,使用
JobOperator.start()
例如,当需要通过shell脚本调用批处理作业时,你可能希望使用CommandLineJobOperator。或者,你也可以直接使用JobOperator(例如,在将Spring Batch作为Web应用程序的一部分使用时)。然而,对于更复杂的用例呢?也许你需要轮询远程的(S)FTP服务器来获取批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你可能不仅从Web接收数据文件,还从FTP和其他来源接收。也许在调用Spring Batch之前,还需要对输入文件进行额外的转换。
因此,通过使用Spring Integration及其众多适配器来执行批处理作业将更为强大。例如,你可以使用文件入站通道适配器来监控文件系统中的目录,并在输入文件到达时立即启动批处理作业。此外,你还可以创建Spring Integration流程,利用多个不同的适配器,仅通过配置即可轻松从多个来源同时为批处理作业获取数据。使用Spring Integration实现所有这些场景非常简单,因为它支持JobOperator的解耦、事件驱动执行。
Spring Batch Integration 提供了 JobLaunchingMessageHandler 类,您可以使用它来启动批处理作业。JobLaunchingMessageHandler 的输入由 Spring Integration 消息提供,该消息的有效负载类型为 JobLaunchRequest。此类是对要启动的 Job 以及启动批处理作业所需的 JobParameters 的封装。
下图展示了启动批处理作业所需的典型 Spring Integration 消息流。EIP(企业集成模式)网站提供了消息图标的完整概览及其描述。

图 1. 启动批处理作业
将文件转换为 JobLaunchRequest
以下示例将文件转换为 JobLaunchRequest:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理作业正在执行时,会返回一个 JobExecution 实例。你可以使用这个实例来确定执行的状态。如果 JobExecution 能够被成功创建,无论实际执行是否成功,它总是会被返回。
JobExecution 实例的返回方式取决于所使用的 TaskExecutor。如果使用 同步(单线程)的 TaskExecutor 实现,则 JobExecution 响应仅在作业 完成后 返回。当使用 异步 的 TaskExecutor 时,JobExecution 实例会立即返回。随后,你可以获取 JobExecution 实例的 id(通过 JobExecution.getJobInstanceId())并使用 JobExplorer 查询 JobRepository 以获取作业的更新状态。更多信息,请参阅查询存储库。
Spring Batch 集成配置
考虑这样一种情况:某人需要创建一个 inbound-channel-adapter 文件,用于监听指定目录中的 CSV 文件,将其传递给转换器 (FileMessageToJobRequest),通过作业启动网关启动作业,并使用 logging-channel-adapter 记录 JobExecution 的输出。
- Java
- XML
以下示例展示了如何在 Java 中配置该常见场景:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例展示了如何在 XML 中配置该常见场景:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
示例 ItemReader 配置
现在我们已经实现了文件轮询和任务启动,接下来需要配置 Spring Batch 的 ItemReader(例如),使其使用通过名为 "input.file.name" 的任务参数所定义路径下的文件。以下 Bean 配置展示了具体实现方式:
- Java
- XML
以下 Java 示例展示了必要的 bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例展示了必要的 bean 配置:
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
上述示例中的主要关注点是将 #{jobParameters['input.file.name']} 的值作为 Resource 属性值注入,并将 ItemReader bean 设置为 step 作用域。将 bean 设置为 step 作用域利用了延迟绑定支持,从而允许访问 jobParameters 变量。