跳到主要内容
版本:6.0.2

通过消息启动批处理作业

DeepSeek V3 中英对照 Launching Batch Jobs through Messages

当使用核心 Spring Batch API 启动批处理作业时,基本上有两种选择:

  • 通过命令行,使用 CommandLineJobOperator

  • 通过编程方式,使用 JobOperator.start()

例如,当需要通过shell脚本调用批处理作业时,你可能希望使用CommandLineJobOperator。或者,你也可以直接使用JobOperator(例如,在将Spring Batch作为Web应用程序的一部分使用时)。然而,对于更复杂的用例呢?也许你需要轮询远程的(S)FTP服务器来获取批处理作业的数据,或者你的应用程序必须同时支持多个不同的数据源。例如,你可能不仅从Web接收数据文件,还从FTP和其他来源接收。也许在调用Spring Batch之前,还需要对输入文件进行额外的转换。

因此,通过使用Spring Integration及其众多适配器来执行批处理作业将更为强大。例如,你可以使用文件入站通道适配器来监控文件系统中的目录,并在输入文件到达时立即启动批处理作业。此外,你还可以创建Spring Integration流程,利用多个不同的适配器,仅通过配置即可轻松从多个来源同时为批处理作业获取数据。使用Spring Integration实现所有这些场景非常简单,因为它支持JobOperator的解耦、事件驱动执行。

Spring Batch Integration 提供了 JobLaunchingMessageHandler 类,您可以使用它来启动批处理作业。JobLaunchingMessageHandler 的输入由 Spring Integration 消息提供,该消息的有效负载类型为 JobLaunchRequest。此类是对要启动的 Job 以及启动批处理作业所需的 JobParameters 的封装。

下图展示了启动批处理作业所需的典型 Spring Integration 消息流。EIP(企业集成模式)网站提供了消息图标的完整概览及其描述。

启动批处理作业

图 1. 启动批处理作业

将文件转换为 JobLaunchRequest

以下示例将文件转换为 JobLaunchRequest

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;

public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}

public void setJob(Job job) {
this.job = job;
}

@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();

jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());

return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}

JobExecution 响应

当批处理作业正在执行时,会返回一个 JobExecution 实例。你可以使用这个实例来确定执行的状态。如果 JobExecution 能够被成功创建,无论实际执行是否成功,它总是会被返回。

JobExecution 实例的返回方式取决于所使用的 TaskExecutor。如果使用 同步(单线程)的 TaskExecutor 实现,则 JobExecution 响应仅在作业 完成后 返回。当使用 异步TaskExecutor 时,JobExecution 实例会立即返回。随后,你可以获取 JobExecution 实例的 id(通过 JobExecution.getJobInstanceId())并使用 JobExplorer 查询 JobRepository 以获取作业的更新状态。更多信息,请参阅查询存储库

Spring Batch 集成配置

考虑这样一种情况:某人需要创建一个 inbound-channel-adapter 文件,用于监听指定目录中的 CSV 文件,将其传递给转换器 (FileMessageToJobRequest),通过作业启动网关启动作业,并使用 logging-channel-adapter 记录 JobExecution 的输出。

以下示例展示了如何在 Java 中配置该常见场景:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}

示例 ItemReader 配置

现在我们已经实现了文件轮询和任务启动,接下来需要配置 Spring Batch 的 ItemReader(例如),使其使用通过名为 "input.file.name" 的任务参数所定义路径下的文件。以下 Bean 配置展示了具体实现方式:

以下 Java 示例展示了必要的 bean 配置:

@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}

上述示例中的主要关注点是将 #{jobParameters['input.file.name']} 的值作为 Resource 属性值注入,并将 ItemReader bean 设置为 step 作用域。将 bean 设置为 step 作用域利用了延迟绑定支持,从而允许访问 jobParameters 变量。