创建自定义 ItemReader 和 ItemWriter
到目前为止,本章已经讨论了Spring Batch中读写的基本契约以及一些常见的实现方式。然而,这些内容都相当通用,许多潜在场景可能无法通过开箱即用的实现来覆盖。本节将通过一个简单的示例,展示如何创建自定义的ItemReader和ItemWriter实现,并正确实现它们的契约。该ItemReader还实现了ItemStream接口,以说明如何使读取器或写入器具备可重启能力。
自定义 ItemReader 示例
在本示例中,我们创建一个简单的 ItemReader 实现,该实现从提供的列表中读取数据。我们首先实现 ItemReader 最基本的方法 read,如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类接收一个项目列表,每次返回一个项目,并从列表中移除该项目。当列表为空时,返回 null,从而满足 ItemReader 最基本的要求,如下面的测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader 可重启
最后的挑战是让 ItemReader 具备可重启性。目前,如果处理过程被中断并重新开始,ItemReader 必须从头开始。这在许多场景下实际上是有效的,但有时更希望批处理作业能从上次中断的地方继续。关键的区别通常在于读取器是有状态的还是无状态的。无状态的读取器无需担心可重启性,但有状态的读取器在重启时必须尝试恢复其最后已知的状态。因此,我们建议尽可能保持自定义读取器为无状态,这样你就不必担心可重启性问题。
如果你确实需要存储状态,那么应该使用 ItemStream 接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用 ItemStream 的 update 方法时,ItemReader 的当前索引会以 'current.index' 为键存储在提供的 ExecutionContext 中。当调用 ItemStream 的 open 方法时,系统会检查 ExecutionContext 是否包含该键对应的条目。如果找到该键,则当前索引会移动到该位置。这是一个相当简单的示例,但它仍然符合通用约定:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReader 都具备更为复杂的重启逻辑。例如,JdbcCursorItemReader 会在游标中存储最后处理行的行 ID。
同样值得注意的是,ExecutionContext 中使用的键不应过于简单。这是因为同一个 ExecutionContext 会用于 Step 中的所有 ItemStream。在大多数情况下,只需在键前加上类名就足以保证唯一性。然而,在极少数情况下,如果同一个步骤中使用了两个相同类型的 ItemStream(例如需要输出两个文件时),就需要一个更独特的名称。因此,许多 Spring Batch 的 ItemReader 和 ItemWriter 实现都提供了 setName() 属性,允许覆盖此键名。
自定义 ItemWriter 示例
实现自定义的 ItemWriter 在许多方面与上述 ItemReader 示例类似,但存在足够多的差异,因此值得单独举例说明。不过,添加可重启性功能本质上与此相同,故本例不再赘述。与 ItemReader 示例一样,本例使用 List 以保持示例尽可能简洁:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter 可重启
为了使 ItemWriter 具备可重启能力,我们将采用与 ItemReader 相同的处理方式,即添加并实现 ItemStream 接口以同步执行上下文。在此示例中,我们可能需要统计已处理的条目数量,并将其作为页脚记录添加。若需实现此功能,我们可以在 ItemWriter 中实现 ItemStream 接口,从而在流重新打开时从执行上下文中恢复计数器状态。
在许多实际场景中,自定义的 ItemWriter 也会委托给另一个本身可重启的写入器(例如写入文件时),或者它写入的是事务性资源,因此无需保持可重启性,因为它是无状态的。当你拥有一个有状态的写入器时,可能需要确保同时实现 ItemWriter 和 ItemStream 接口。还需注意,写入器的调用方需要感知 ItemStream,因此你可能需要在配置中将其注册为一个流。