跳到主要内容

常见的批处理模式

QWen Plus 中英对照 Common Batch Patterns

一些批处理任务可以完全通过 Spring Batch 中的现成组件来组装。例如,ItemReaderItemWriter 的实现可以配置以覆盖广泛的场景。然而,在大多数情况下,必须编写自定义代码。应用开发者的主要 API 入口是 TaskletItemReaderItemWriter 以及各种监听器接口。大多数简单的批处理任务可以使用 Spring Batch ItemReader 提供的现成输入,但在处理和写入过程中通常存在自定义需求,这需要开发者实现一个 ItemWriterItemProcessor

在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。这些示例主要涉及监听器接口。需要注意的是,如果合适,ItemReaderItemWriter 也可以实现监听器接口。

日志记录项处理和失败

一个常见的用例是在步骤中需要对每个项的错误进行特殊处理,例如记录到特殊的通道或向数据库插入一条记录。基于块的 Step(由步骤工厂 Bean 创建)允许用户通过简单的 ItemReadListener 来处理 read 时的错误,以及通过 ItemWriteListener 来处理 write 时的错误。以下代码片段展示了一个记录 readwrite 失败的监听器:

public class ItemFailureLoggerListener extends ItemListenerSupport {

private static Log logger = LogFactory.getLog("item.error");

public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}

public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
java

实现此监听器后,必须将其注册到一个步骤中。

下面的示例展示了如何在 Java 中为一个步骤注册监听器:

@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
java
important

如果您的监听器在 onError() 方法中执行任何操作,该操作必须位于将要回滚的事务内。如果您需要在 onError() 方法中使用事务性资源(例如数据库),请考虑为此方法添加一个声明式事务(具体细节参见 Spring Core 参考指南),并将其传播属性 propagation 的值设置为 REQUIRES_NEW

因商业原因手动停止作业

Spring Batch 通过 JobOperator 接口提供了一个 stop() 方法,但这主要是供操作员使用,而不是应用程序开发人员使用。有时,在业务逻辑内部停止作业执行会更加方便或更具意义。

最简单的方法是抛出一个 RuntimeException(既不会被无限重试也不会被跳过的异常)。例如,可以使用自定义异常类型,如下例所示:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
java

另一种阻止步骤执行的简单方法是让 ItemReader 返回 null,如下例所示:

public class EarlyCompletionItemReader implements ItemReader<T> {

private ItemReader<T> delegate;

public void setDelegate(ItemReader<T> delegate) { ... }

public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}

}
java

上一个示例实际上依赖于这样一个事实,即存在 CompletionPolicy 策略的默认实现,当要处理的项为 null 时,它会发出一个完整的批处理信号。可以通过 SimpleStepFactoryBean 将更复杂的完成策略实现并注入到 Step 中。

下面的示例展示了如何在 Java 中将完成策略注入到步骤中:

@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
java

另一种方法是在 StepExecution 中设置一个标志位,该标志位会在框架中的 Step 实现类在处理项目之间进行检查。为了实现这种替代方案,我们需要访问当前的 StepExecution,这可以通过实现一个 StepListener 并将其注册到 Step 来实现。以下示例展示了一个设置该标志位的监听器:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

private StepExecution stepExecution;

public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}

public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}

}
java

当标志被设置时,默认行为是该步骤会抛出一个 JobInterruptedException。此行为可以通过 StepInterruptionPolicy 进行控制。然而,唯一的选择是抛出或不抛出异常,因此这总是会导致作业出现异常终止。

添加页脚记录

通常,在写入平面文件时,需要在文件末尾追加一个“页脚”记录,此操作需在所有处理完成之后进行。这可以通过 Spring Batch 提供的 FlatFileFooterCallback 接口来实现。FlatFileFooterCallback(及其对应的部分 FlatFileHeaderCallback)是 FlatFileItemWriter 的可选属性,可以添加到项写入器中。

下面的示例展示了如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
java

页脚回调接口只有一个方法,当需要写入页脚时会被调用,如下所示的接口定义:

public interface FlatFileFooterCallback {

void writeFooter(Writer writer) throws IOException;

}
java

编写摘要页脚

一个涉及页脚记录的常见需求是在输出过程中汇总信息,并将这些信息附加到文件的末尾。这个页脚通常作为文件的总结,或者提供一个校验和。

例如,如果批处理作业正在将 Trade 记录写入平面文件,并且有这样一个要求:即将所有 Trades 的总金额放在文件尾部,那么可以使用以下的 ItemWriter 实现:

public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {

private ItemWriter<Trade> delegate;

private BigDecimal totalAmount = BigDecimal.ZERO;

public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}

delegate.write(items);

// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}

public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}

public void setDelegate(ItemWriter delegate) {...}
}
java

这个 TradeItemWriter 存储了一个 totalAmount 值,该值会随着每个写入的 Trade 项的 amount 而增加。在最后一个 Trade 处理完成后,框架会调用 writeFooter,将 totalAmount 写入文件。需要注意的是,write 方法使用了一个临时变量 chunkTotal,用于存储块中 Trade 金额的总计。这样做是为了确保如果在 write 方法中发生跳过操作,totalAmount 不会被改变。只有在 write 方法的末尾,在确保不会抛出异常的情况下,才会更新 totalAmount

为了让 writeFooter 方法被调用,必须将实现 FlatFileFooterCallbackTradeItemWriter 配置到 FlatFileItemWriter 中作为 footerCallback

下面的示例展示了如何在 Java 中配置 TradeItemWriter

@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();

itemWriter.setDelegate(flatFileItemWriter(null));

return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
java

到目前为止,TradeItemWriter 的实现方式只有在 Step 不可重启的情况下才能正确工作。这是因为该类是有状态的(因为它存储了 totalAmount),但 totalAmount 没有持久化到数据库中。因此,在重启时无法恢复该值。为了使这个类具备重启能力,应该实现 ItemStream 接口及其 openupdate 方法,如下例所示:

public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}

public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
java

更新方法会在该对象持久化到数据库之前,将 totalAmount 的最新版本存储到 ExecutionContext 中。而 open 方法会从 ExecutionContext 中检索任何已存在的 totalAmount,并将其作为处理的起点,从而使得 TradeItemWriterStep 重新启动时可以从上次停止的地方继续执行。

驱动基于查询的 ItemReaders

关于读取器和写入器的章节 中,讨论了使用分页进行数据库输入的内容。许多数据库供应商(例如 DB2)具有极其悲观的锁定策略,如果正在读取的表还需要被在线应用程序的其他部分使用,这可能会导致问题。此外,在某些供应商的数据库上,对极大数据集打开游标也可能引发问题。因此,许多项目更倾向于使用“驱动查询 (Driving Query)”的方式来读取数据。这种方法通过迭代键而不是需要返回的整个对象来工作,如下图所示:

Driving Query Job

图 1. 驱动查询作业

正如您所见,前面图像中显示的示例使用了与基于游标的示例中相同的 'FOO' 表。然而,与选择整行不同,SQL 语句中仅选择了 ID。因此,从 read 返回的不是一个 FOO 对象,而是一个 Integer。然后可以使用这个数字查询 'details',这是一个完整的 Foo 对象,如下图所示:

Driving Query Example

图 2. 驱动查询示例

一个 ItemProcessor 应用于将从驱动查询中获取的键转换为完整的 Foo 对象。可以使用现有的 DAO 根据该键查询完整的对象。

多行记录

虽然在平面文件中通常每条记录都限制在一行内,但常见的情况是,一个文件可能包含跨越多行且具有多种格式的记录。以下文件摘录展示了一种这样的排列示例:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

从以“HEA”开头的行到以“FOT”开头的行之间的所有内容被视为一条记录。为了正确处理这种情况,必须考虑以下几点:

  • 与其一次读取一条记录,ItemReader 必须将多行记录的每一行作为一组读取,以便能够完整地传递给 ItemWriter

  • 每种行类型可能需要以不同的方式分词。

因为单条记录跨越多行,并且我们可能不知道有多少行,所以 ItemReader 必须小心确保始终读取整个记录。为了实现这一点,应实现一个自定义的 ItemReader 作为 FlatFileItemReader 的包装器。

下面的示例展示了如何在 Java 中实现自定义的 ItemReader

@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

itemReader.setDelegate(flatFileItemReader());

return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
java

为了确保每一行都能正确分词,这对于固定长度的输入尤其重要,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。更多详情请参见读者与作家章节中的 FlatFileItemReader。然后,委托读取器使用 PassThroughFieldSetMapper 为每一行传递一个 FieldSet 回到包装的 ItemReader

下面的示例展示了如何确保 Java 中的每一行正确地进行标记化:

@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();

Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());

tokenizer.setTokenizers(tokenizers);

return tokenizer;
}
java

这个包装器必须能够识别记录的结束位置,以便它可以持续对其委托对象调用 read(),直到到达结束位置。对于读取的每一行,包装器应构建要返回的项。一旦到达页脚,该项就可以被返回,以传递给 ItemProcessorItemWriter,如下例所示:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
Trade t = null;

for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
java

执行系统命令

许多批处理作业需要在批处理作业内部调用外部命令。这样的进程可以由调度程序单独启动,但关于运行的通用元数据的优势将会丢失。此外,多步骤的作业也需要被拆分为多个作业。

由于需求非常普遍,Spring Batch 提供了一个用于调用系统命令的 Tasklet 实现。

下面的示例展示了如何在 Java 中调用外部命令:

@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();

tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);

return tasklet;
}
java

当未找到输入时处理步骤完成

在许多批处理场景中,在数据库或文件中找不到需要处理的行并不是异常情况。Step 仅仅被视为没有找到需要处理的任务,并以读取 0 项完成。Spring Batch 提供的所有 ItemReader 实现默认采用这种方法。如果即使存在输入也没有输出,这可能会导致一些混淆(通常发生在文件被误命名或其他类似问题时)。因此,应检查元数据本身,以确定框架找到了多少需要处理的任务。然而,如果找不到输入被视为异常情况呢?在这种情况下,通过编程方式检查元数据中未处理的项目并触发失败是最佳解决方案。由于这是一个常见的用例,Spring Batch 提供了一个具有此功能的监听器,如 NoWorkFoundStepExecutionListener 的类定义所示:

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}

}
java

前面的 StepExecutionListener 在 'afterStep' 阶段检查 StepExecutionreadCount 属性,以确定是否没有读取任何项。如果是这种情况,则返回退出代码 FAILED,表示该 Step 应该失败。否则,返回 null,这不会影响 Step 的状态。

传递数据到未来步骤

在很多情况下,将信息从一个步骤传递到另一个步骤是非常有用的。这可以通过 ExecutionContext 来实现。需要注意的是,存在两个 ExecutionContext:一个在 Step 级别,另一个在 Job 级别。StepExecutionContext 仅在步骤执行期间存在,而 JobExecutionContext 则在整个 Job 执行期间都存在。另一方面,每当 Step 提交一个 chunk 时,StepExecutionContext 都会更新,而 JobExecutionContext 仅在每个 Step 结束时更新。

这种分离的后果是,所有数据必须在 Step 执行时放置在 ExecutionContext 中。这样做可以确保数据在 Step 运行期间正确存储。如果将数据存储到 JobExecutionContext 中,则在 Step 执行期间不会持久化。如果 Step 失败,那么这些数据将会丢失。

public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;

public void write(Chunk<? extends Object> items) throws Exception {
// ...

ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
java

为了使数据能够被后续的 Steps 使用,在步骤完成后,必须将其“提升”到 JobExecutionContext 中。Spring Batch 提供了 ExecutionContextPromotionListener 来实现这一目的。该监听器必须配置与需要提升的数据相关的 ExecutionContext 中的键。此外,还可以选择性地配置一组退出代码模式,以指定在哪些情况下发生提升(默认为 COMPLETED)。和所有监听器一样,它必须注册到 Step 上。

下面的示例展示了如何在 Java 中将步骤提升到 JobExecutionContext

@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

listener.setKeys(new String[] {"someKey"});

return listener;
}
java

最后,必须从 Job ExecutionContext 中检索保存的值,如下例所示:

public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;

public void write(Chunk<? extends Object> items) throws Exception {
// ...
}

@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}
java