跳到主要内容

项目处理

QWen Plus 中英对照 Item processing

ItemReader 和 ItemWriter 接口 对于它们特定的任务都非常有用,但如果你希望在写入之前插入业务逻辑怎么办?对于读取和写入,有一种选择是使用组合模式:创建一个包含另一个 ItemWriterItemWriter,或者创建一个包含另一个 ItemReaderItemReader。以下代码展示了一个示例:

public class CompositeItemWriter<T> implements ItemWriter<T> {

ItemWriter<T> itemWriter;

public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}

public void write(Chunk<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}

public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
java

前面的类包含另一个 ItemWriter,在提供了一些业务逻辑之后,它会将任务委托给该 ItemWriter。这种模式也很容易用于 ItemReader,也许可以根据主 ItemReader 提供的输入获取更多参考数据。如果你需要自己控制对 write 的调用,这也非常有用。然而,如果你只想在实际写入之前“转换”传递给写入的项,则无需自己调用 write。你只需修改该项即可。针对这种场景,Spring Batch 提供了 ItemProcessor 接口,如下所示的接口定义:

public interface ItemProcessor<I, O> {

O process(I item) throws Exception;
}
java

ItemProcessor 很简单。给定一个对象,对其进行转换并返回另一个对象。提供的对象可能与原始对象类型相同,也可能不同。重点是在处理过程中可以应用业务逻辑,而创建该逻辑则完全由开发人员决定。ItemProcessor 可以直接集成到步骤中。例如,假设 ItemReader 提供了一个类型为 Foo 的类,并且在写入之前需要将其转换为类型 Bar。以下示例展示了一个执行此转换的 ItemProcessor

public class Foo {}

public class Bar {
public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}

public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
//write bars
}
}
java

在前面的例子中,有一个名为 Foo 的类,一个名为 Bar 的类,以及一个遵循 ItemProcessor 接口的名为 FooProcessor 的类。转换过程很简单,但在这里可以执行任何类型的转换。BarWriter 写入 Bar 对象,如果提供其他类型则会抛出异常。类似地,如果提供给 FooProcessor 的不是 Foo,它也会抛出异常。然后,可以将 FooProcessor 注入到 Step 中,如下例所示:

@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2, transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
java

ItemProcessorItemReaderItemWriter 之间的一个区别是,对于一个 Step 来说,ItemProcessor 是可选的。

链式 ItemProcessors

在许多场景中,执行单一转换非常有用,但如果你想要将多个 ItemProcessor 实现“链式”连接起来该怎么办呢?可以通过使用前面提到的组合模式来实现。以下例所示,更新之前的单一转换示例,Foo 被转换为 Bar,然后 Bar 再被转换为 Foobar 并输出。

public class Foo {}

public class Bar {
public Bar(Foo foo) {}
}

public class Foobar {
public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}

public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
//write items
}
}
java

一个 FooProcessor 和一个 BarProcessor 可以“链式”连接在一起以生成结果 Foobar,如下例所示:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
java

与前面的例子一样,您可以将复合处理器配置到 Step 中:

@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2, transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());

CompositeItemProcessor processor = new CompositeItemProcessor();

processor.setDelegates(delegates);

return processor;
}
java

筛选记录

项处理器的一个典型用法是在记录传递给 ItemWriter 之前过滤掉它们。过滤与跳过是不同的操作。跳过表示记录无效,而过滤表示记录不应被写入。

例如,考虑一个批处理作业,它读取一个包含三种不同类型记录的文件:需要插入的记录、需要更新的记录和需要删除的记录。如果系统不支持记录删除功能,我们不希望将任何可删除的记录发送到 ItemWriter。但是,由于这些记录实际上并不是错误记录,我们希望将它们过滤掉而不是跳过它们。因此,ItemWriter 将只会接收到可插入和可更新的记录。

要过滤一条记录,你可以从 ItemProcessor 返回 null。框架检测到结果为 null,并将避免将该项添加到传递给 ItemWriter 的记录列表中。从 ItemProcessor 抛出的异常会导致跳过。

验证输入

ItemReaders 和 ItemWriters 章节讨论了多种解析输入的方法。每个主要实现如果数据不是“格式良好”,都会抛出异常。FixedLengthTokenizer 如果缺少一段数据范围,会抛出异常。类似地,在 RowMapperFieldSetMapper 中尝试访问一个不存在的索引或与预期格式不同的索引时,也会抛出异常。所有这些类型的异常都在 read 返回之前被抛出。然而,它们并没有解决返回项是否有效的问题。例如,如果其中一个字段是年龄,它不能为负数。它可能正确解析了,因为它存在且是一个数字,但不会引发异常。由于已经有许多验证框架可供使用,Spring Batch 并不试图再提供一个新的。相反,它提供了一个简单的接口,称为 Validator,你可以通过任意数量的框架来实现它,如下所示的接口定义:

public interface Validator<T> {

void validate(T value) throws ValidationException;

}
java

合同规定,如果对象无效,则 validate 方法抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor,如下所示的bean定义展示了这一点:

@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();

processor.setValidator(validator());

return processor;
}

@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();

validator.setValidator(new TradeValidator());

return validator;
}
java

你也可以使用 BeanValidatingItemProcessor 来验证带有 Bean Validation API (JSR-303) 注解的项目。例如,考虑以下类型 Person

class Person {

@NotEmpty
private String name;

public Person(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

}
java

你可以通过在应用程序上下文中声明一个 BeanValidatingItemProcessor bean,并将其作为处理器注册到分块处理步骤中来验证项目:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);

return beanValidatingItemProcessor;
}
java

容错性

当一个块被回滚时,在读取过程中已缓存的项可能会被重新处理。如果一个步骤被配置为容错的(通常通过使用跳过或重试处理),则任何使用的 ItemProcessor 都应以幂等的方式实现。通常,这包括不对 ItemProcessor 的输入项进行任何更改,仅更新结果实例。