通过消息启动批处理作业
当使用核心 Spring Batch API 启动批处理作业时,您基本上有两种选择:
-
从命令行,使用
CommandLineJobRunner
-
以编程方式,使用
JobOperator.start()
或JobLauncher.run()
例如,当通过 shell 脚本调用批处理作业时,你可能希望使用 CommandLineJobRunner
。或者,你可以直接使用 JobOperator
(例如,在将 Spring Batch 作为 Web 应用程序的一部分时)。然而,对于更复杂的使用场景该怎么办呢?也许你需要轮询远程 (S)FTP 服务器以获取批处理作业的数据,或者你的应用程序需要同时支持多个不同的数据源。例如,你可能不仅从 Web,还从 FTP 和其他来源接收数据文件。在调用 Spring Batch 之前,可能还需要对输入文件进行额外的转换。
因此,使用 Spring Integration 及其众多适配器来执行批处理任务将更加强大。例如,你可以使用 File Inbound Channel Adapter 来监控文件系统中的一个目录,并在输入文件到达时立即启动批处理任务。此外,你可以创建 Spring Integration 流程,利用多个不同的适配器,通过仅使用配置即可轻松地从多个来源同时为你的批处理任务导入数据。使用 Spring Integration 实现所有这些场景非常简单,因为它允许以解耦、事件驱动的方式执行 JobLauncher
。
Spring Batch Integration 提供了 JobLaunchingMessageHandler
类,可用于启动批处理作业。JobLaunchingMessageHandler
的输入由 Spring Integration 消息提供,该消息的有效载荷类型为 JobLaunchRequest
。此类是围绕要启动的 Job
和启动批处理作业所需的 JobParameters
的包装器。
下图显示了启动批处理任务所需的典型 Spring Integration 消息流。EIP (企业集成模式) 网站 提供了消息图标及其描述的完整概述。
图 1. 启动批处理作业
将文件转换为 JobLaunchRequest
以下示例将文件转换为 JobLaunchRequest
:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
作业执行响应
当批处理作业被执行时,会返回一个 JobExecution
实例。你可以使用该实例来确定执行的状态。如果 JobExecution
能够成功创建,它总是会被返回,无论实际执行是否成功。
JobExecution
实例的返回行为取决于提供的 TaskExecutor
。如果使用的是 同步
(单线程)的 TaskExecutor
实现,那么只有在作业完成后才会返回 JobExecution
响应。而在使用 异步
的 TaskExecutor
时,JobExecution
实例会立即返回。之后,你可以通过 JobExecution.getJobId()
获取 JobExecution
实例的 id
,并使用 JobExplorer
查询 JobRepository
以获取作业的更新状态。更多详细信息,请参见 查询存储库。
Spring Batch 集成配置
考虑这样一种情况:有人需要创建一个文件 inbound-channel-adapter
,用于监听指定目录中的 CSV 文件,将它们传递给转换器(FileMessageToJobRequest
),通过作业启动网关启动作业,并使用 logging-channel-adapter
记录 JobExecution
的输出。
- Java
- XML
下面的示例展示了如何在 Java 中配置这种常见的情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
下面的示例展示了如何在 XML 中配置这种常见的情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
示例 ItemReader 配置
既然我们已经在轮询文件并启动作业,那么我们需要配置 Spring Batch 的 ItemReader
(例如),以使用由作业参数 "input.file.name" 指定位置找到的文件。以下豆定义配置展示了这一点:
- Java
- XML
以下 Java 示例展示了必要的 bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例展示了必要的 bean 配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中的主要关注点是将 #{jobParameters['input.file.name']}
的值注入为 Resource 属性值,以及将 ItemReader
bean 设置为具有步骤作用域。 将 bean 设置为具有步骤作用域利用了延迟绑定支持,这允许访问 jobParameters
变量。