跳到主要内容

创建自定义 ItemReaders 和 ItemWriters

QWen Plus 中英对照 Creating Custom ItemReaders and ItemWriters

到目前为止,本章讨论了Spring Batch中读取和写入的基本契约(contracts)以及一些常见的实现方式。然而,这些都相对通用,许多潜在场景可能无法通过开箱即用的实现来覆盖。本节通过一个简单示例展示如何创建自定义的 ItemReaderItemWriter 实现,并正确实现它们的契约。此外,ItemReader 还实现了 ItemStream,以此说明如何使读取器或写入器具备可重启性。

自定义 ItemReader 示例

为了这个例子的目的,我们创建一个简单的 ItemReader 实现,它从给定的列表中读取数据。我们首先实现 ItemReader 最基本的契约,即 read 方法,如下代码所示:

public class CustomItemReader<T> implements ItemReader<T> {

List<T> items;

public CustomItemReader(List<T> items) {
this.items = items;
}

public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {

if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
java

前面的类接收一个项目列表,并逐一返回它们,同时从列表中移除每个项目。当列表为空时,它将返回 null,从而满足了 ItemReader 的最基本要求,如下测试代码所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
java

使 ItemReader 可重启

最后一个挑战是使 ItemReader 可重启。目前,如果处理被中断并重新开始,ItemReader 必须从头开始。这在许多场景下实际上是有效的,但有时更希望批处理作业从它中断的地方继续。关键的区别通常在于读取器是有状态的还是无状态的。一个无状态的读取器不需要担心可重启性,而一个有状态的读取器在重启时必须尝试恢复其最后已知的状态。基于此原因,我们建议您尽可能保持自定义读取器为无状态,这样就无需担心可重启性。

如果你确实需要存储状态,则应使用 ItemStream 接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";

public CustomItemReader(List<T> items) {
this.items = items;
}

public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {

if (currentIndex < items.size()) {
return items.get(currentIndex++);
}

return null;
}

public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}

public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}

public void close() throws ItemStreamException {}
}
java

在每次调用 ItemStreamupdate 方法时,ItemReader 的当前索引会以键名为 'current.index' 存储在提供的 ExecutionContext 中。当调用 ItemStreamopen 方法时,会检查 ExecutionContext 是否包含该键的条目。如果找到该键,则将当前索引移动到该位置。这是一个相对简单的例子,但它仍然符合通用契约:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
java

大多数 ItemReaders 具有更复杂的重启逻辑。例如,JdbcCursorItemReader 会将游标中最后处理的行的行 ID 存储起来。

还值得注意的是,在 ExecutionContext 中使用的键不应过于简单。这是因为相同的 ExecutionContext 会被用于一个 Step 中的所有 ItemStreams。在大多数情况下,只需将键以前缀类名的方式进行补充,就足以保证其唯一性。然而,在极少数情况下,如果在同一步骤中使用了两个相同类型的 ItemStream(例如需要输出两个文件时),则需要一个更具唯一性的名称。因此,许多 Spring Batch 的 ItemReaderItemWriter 实现都提供了一个 setName() 属性,允许覆盖此键名。

自定义 ItemWriter 示例

实现自定义 ItemWriter 在许多方面与上面的 ItemReader 示例类似,但也有足够的不同之处,因此需要单独举例说明。然而,添加可重启性本质上是相同的,因此在本示例中不再赘述。与 ItemReader 示例一样,为了使示例尽可能简单,这里使用了 List

public class CustomItemWriter<T> implements ItemWriter<T> {

List<T> output = TransactionAwareProxyFactory.createTransactionalList();

public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}

public List<T> getOutput() {
return output;
}
}
java

使 ItemWriter 可重启

为了使 ItemWriter 可重启,我们将遵循与 ItemReader 相同的过程,添加并实现 ItemStream 接口以同步执行上下文。在示例中,我们可能需要计算已处理的项目数量,并将该数量作为页脚记录添加。如果需要这样做,我们可以在我们的 ItemWriter 中实现 ItemStream,以便在流重新打开时,计数器可以从执行上下文中重建。

在许多现实情况中,自定义的 ItemWriter 也会委托给另一个本身可重启的写入器(例如,当写入文件时),或者它写入的是一个事务性资源,因此不需要可重启能力,因为它本身是无状态的。当你使用有状态的写入器时,你应该确保同时实现 ItemStreamItemWriter。还要记住,写入器的客户端需要了解 ItemStream,因此你可能需要在配置中将其注册为流。