常见批处理模式
有些批处理作业可以完全由 Spring Batch 中的现成组件组装而成。例如,可以配置 ItemReader 和 ItemWriter 实现来覆盖广泛的场景。然而,在大多数情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是 Tasklet、ItemReader、ItemWriter 以及各种监听器接口。大多数简单的批处理作业可以使用 Spring Batch ItemReader 的现成输入,但在处理和写入过程中通常存在需要开发人员实现 ItemWriter 或 ItemProcessor 的自定义关注点。
在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。这些示例主要围绕监听器接口展开。需要注意的是,如果适用,ItemReader 或 ItemWriter 也可以实现监听器接口。
记录项目处理与失败情况
一个常见的用例是需要对步骤中的错误进行逐项特殊处理,例如记录到特殊通道或向数据库插入记录。面向数据块的Step(通过步骤工厂Bean创建)允许用户通过简单的ItemReadListener处理read错误,以及通过ItemWriteListener处理write错误来实现此用例。以下代码片段演示了一个同时记录读取和写入失败的监听器:
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现此监听器后,必须将其注册到步骤中。
- Java
- XML
以下示例展示了如何在 Java 中为步骤注册监听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例展示了如何在 XML 中为步骤注册监听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的监听器在 onError() 方法中执行任何操作,这些操作必须位于一个即将回滚的事务内。如果你需要在 onError() 方法内部使用事务性资源(例如数据库),请考虑为该方法添加声明式事务(详见 Spring Core 参考指南),并将其传播属性设置为 REQUIRES_NEW。
因业务原因手动停止作业
Spring Batch 通过 JobOperator 接口提供了 stop() 方法,但这实际上是为操作者而非应用程序开发者设计的。有时,在业务逻辑内部停止作业执行会更方便或更合理。
最简单的做法是抛出一个 RuntimeException(这种异常既不会被无限重试,也不会被跳过)。例如,可以使用自定义异常类型,如下例所示:
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
另一种阻止步骤执行的简单方法是从 ItemReader 返回 null,如下例所示:
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
之前的示例实际上依赖于一个事实,即存在一个默认的 CompletionPolicy 策略实现,当待处理项为 null 时,该策略会发出批次完成的信号。我们可以实现一个更复杂的完成策略,并通过 SimpleStepFactoryBean 将其注入到 Step 中。
- Java
- XML
以下示例展示了如何在 Java 中向一个步骤注入完成策略:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例展示了如何在 XML 中向一个步骤注入完成策略:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在 StepExecution 中设置一个标志,该标志由框架中的 Step 实现在项目处理之间进行检查。要实现这种方法,我们需要访问当前的 StepExecution,这可以通过实现一个 StepListener 并将其注册到 Step 中来实现。以下示例展示了一个设置标志的监听器:
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
当该标志被设置时,默认行为是步骤会抛出一个 JobInterruptedException。此行为可以通过 StepInterruptionPolicy 来控制。然而,唯一的选择是抛出或不抛出异常,因此这始终是作业的异常结束。
添加页脚记录
在编写平面文件时,通常需要在所有处理完成后,向文件末尾追加一条“页脚”记录。这可以通过使用Spring Batch提供的FlatFileFooterCallback接口来实现。FlatFileFooterCallback(及其对应接口FlatFileHeaderCallback)是FlatFileItemWriter的可选属性,可以添加到项目写入器中。
- Java
- XML
以下示例展示了如何在 Java 中使用 FlatFileHeaderCallback 和 FlatFileFooterCallback:
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例展示了如何在 XML 中使用 FlatFileHeaderCallback 和 FlatFileFooterCallback:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口仅包含一个方法,该方法在必须写入页脚时被调用,如下方接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
撰写摘要页脚
涉及页脚记录的一个常见需求是在输出过程中汇总信息,并将这些信息附加到文件末尾。该页脚通常用作文件的摘要或提供校验和。
例如,如果一个批处理作业正在将 Trade 记录写入平面文件,并且要求将所有 Trades 的总金额放在文件尾部,那么可以使用以下 ItemWriter 实现:
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这个 TradeItemWriter 存储了一个 totalAmount 值,该值会随着每个写入的 Trade 项目的 amount 而增加。在处理完最后一个 Trade 后,框架会调用 writeFooter 方法,将 totalAmount 写入文件。请注意,write 方法使用了一个临时变量 chunkTotal 来存储当前数据块中 Trade 金额的总和。这样做是为了确保,如果在 write 方法中发生跳过操作,totalAmount 将保持不变。只有在 write 方法结束时,当我们确保没有抛出任何异常后,才会更新 totalAmount。
为了让 writeFooter 方法被调用,必须将 TradeItemWriter(它实现了 FlatFileFooterCallback 接口)作为 footerCallback 配置到 FlatFileItemWriter 中。
- Java
- XML
以下示例展示了如何在 Java 中装配 TradeItemWriter:
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例展示了如何在 XML 中装配 TradeItemWriter:
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
目前编写的 TradeItemWriter 仅在 Step 不可重启时才能正常工作。这是因为该类是有状态的(因为它存储了 totalAmount),但 totalAmount 并未持久化到数据库中。因此,在重启时无法检索到该值。为了使该类可重启,应实现 ItemStream 接口及其 open 和 update 方法,如下例所示:
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update方法在ExecutionContext对象被持久化到数据库之前,会将totalAmount的最新版本存储到ExecutionContext中。open方法则从ExecutionContext中检索任何已存在的totalAmount,并将其用作处理的起点,这样TradeItemWriter就能在重新启动时,从上一次Step运行中断的地方继续执行。
基于驱动查询的 ItemReaders
在关于读取器和写入器的章节中,我们讨论了使用分页进行数据库输入。许多数据库供应商(例如 DB2)采用极其悲观的锁定策略,如果被读取的表还需要在线应用程序的其他部分使用,则可能会引发问题。此外,在超大型数据集上打开游标可能会在某些供应商的数据库中引发问题。因此,许多项目更倾向于使用“驱动查询”方法来读取数据。这种方法通过迭代键(而不是需要返回的整个对象)来工作,如下图所示:

图 1. 驱动查询作业
如您所见,前图所示的示例使用了与基于游标的示例相同的 'FOO' 表。然而,SQL 语句并未选择整行数据,而仅选择了 ID 字段。因此,read 方法返回的不再是 FOO 对象,而是 Integer 类型的数值。随后,该数值可用于查询包含完整 Foo 对象的“详细信息”,如下图所示:

图 2. 驱动查询示例
ItemProcessor 应被用于将从驱动查询中获取的键转换为完整的 Foo 对象。可以使用现有的 DAO 基于该键查询完整的对象。
多行记录
虽然平面文件通常每条记录都限制在单行内,但常见的情况是文件可能包含跨越多行且具有多种格式的记录。以下文件摘录展示了这种排列方式的示例:
HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34
从以 'HEA' 开头的行到以 'FOT' 开头的行之间的所有内容被视为一条记录。为了正确处理这种情况,需要考虑以下几点:
-
与每次读取一条记录不同,
ItemReader必须将多行记录的每一行作为一个组来读取,以便能够完整地传递给ItemWriter。 -
每种行类型可能需要采用不同的分词方式。
由于单个记录跨越多行,并且我们可能不知道有多少行,ItemReader 必须确保始终读取整个记录。为了实现这一点,应实现一个自定义的 ItemReader 作为 FlatFileItemReader 的包装器。
- Java
- XML
以下示例展示了如何在 Java 中实现自定义的 ItemReader:
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例展示了如何在 XML 中实现自定义的 ItemReader:
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
为确保每一行都能被正确分词,这对于固定长度输入尤为重要,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。更多详情请参阅读取器和写入器章节中的 FlatFileItemReader。随后,委托读取器会使用 PassThroughFieldSetMapper 将每一行的 FieldSet 返回给包装的 ItemReader。
- Java
- XML
以下示例展示了如何在 Java 中确保每一行都被正确分词:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例展示了如何在 XML 中确保每一行都被正确分词:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
该包装器必须能够识别记录的结束,以便持续调用其委托的 read() 方法直至结束。对于读取的每一行,包装器应逐步构建待返回的条目。一旦到达页脚,即可返回该条目以传递给 ItemProcessor 和 ItemWriter,如下例所示:
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业需要从作业内部调用外部命令。虽然调度器可以单独启动这样的进程,但这样会丢失关于运行过程的通用元数据。此外,多步骤作业也需要被拆分成多个独立的作业。
由于这种需求非常普遍,Spring Batch 提供了一个 Tasklet 实现来调用系统命令。
- Java
- XML
以下示例展示了如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例展示了如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 命令完成的超时时间为 5 秒 -->
<property name="timeout" value="5000" />
</bean>
处理未找到输入时的步骤完成情况
在许多批处理场景中,数据库或文件中没有找到要处理的行并不属于异常情况。Step 会被简单地视为未找到工作,并以读取0条记录的状态完成。Spring Batch 默认提供的所有 ItemReader 实现都采用这种方式。如果即使存在输入数据(通常是由于文件名错误或类似问题导致)也没有任何数据被写出,这可能会引起一些困惑。因此,应该检查元数据本身,以确定框架发现了多少待处理的工作。然而,如果认为找不到输入数据属于异常情况呢?在这种情况下,通过编程方式检查元数据中是否有已处理的条目,并据此引发失败,是最佳解决方案。由于这是一个常见用例,Spring Batch 提供了一个监听器来专门实现此功能,如 NoWorkFoundStepExecutionListener 的类定义所示:
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的 StepExecutionListener 在 'afterStep' 阶段检查 StepExecution 的 readCount 属性,以判断是否有数据被读取。如果没有数据被读取,则返回退出码 FAILED,表示该 Step 应该失败。否则,返回 null,这不会影响 Step 的状态。
将数据传递给后续步骤
在步骤之间传递信息通常很有用。这可以通过 ExecutionContext 实现。需要注意的是,存在两种 ExecutionContext:一种是 Step 级别的,另一种是 Job 级别的。Step 的 ExecutionContext 仅在步骤执行期间存在,而 Job 的 ExecutionContext 在整个 Job 执行期间都存在。另一方面,Step 的 ExecutionContext 会在每次提交一个数据块时更新,而 Job 的 ExecutionContext 仅在每个 Step 结束时更新。
这种分离的结果是,所有数据必须在 Step 执行期间被放入 Step 的 ExecutionContext 中。这样做可以确保在 Step 运行时数据被正确存储。如果将数据存储到 Job 的 ExecutionContext 中,那么在 Step 执行期间这些数据不会被持久化。如果 Step 失败,这些数据将会丢失。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
为使数据在后续步骤中可用,必须在步骤完成后将其"提升"至作业的执行上下文中。为此,Spring Batch提供了ExecutionContextPromotionListener。该监听器必须配置需要提升的执行上下文中数据对应的键值,同时也可选择性地配置触发提升操作的退出码模式列表(默认为COMPLETED)。与所有监听器相同,必须将其注册在步骤上。
- Java
- XML
以下示例展示了如何在 Java 中将一个步骤提升到 Job 的 ExecutionContext 中:
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例展示了如何在 XML 中将一个步骤提升到 Job 的 ExecutionContext 中:
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从 Job ExecutionContext 中检索已保存的值,如下例所示:
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}