项目处理
ItemReader 和 ItemWriter 接口 对于它们各自的任务都非常有用,但如果你想在写入之前插入业务逻辑该怎么办呢?对于读取和写入,一个选择是使用组合模式:创建一个包含另一个 ItemWriter 的 ItemWriter,或者创建一个包含另一个 ItemReader 的 ItemReader。以下代码展示了一个示例:
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(Chunk<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
前面的类包含另一个 ItemWriter,在提供一些业务逻辑后,它会委托给这个 ItemWriter。这种模式同样可以轻松应用于 ItemReader,或许是为了基于主 ItemReader 提供的输入获取更多参考数据。如果你需要自己控制对 write 的调用,这也很有用。然而,如果你只想在实际写入之前“转换”要写入的传入项,则无需自己调用 write。你可以直接修改该项。针对这种情况,Spring Batch 提供了 ItemProcessor 接口,如下面的接口定义所示:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor 很简单。给定一个对象,将其转换并返回另一个对象。提供的对象可能是也可能不是同一类型。关键在于,在转换过程中可以应用业务逻辑,而创建该逻辑完全取决于开发人员。ItemProcessor 可以直接连接到步骤中。例如,假设 ItemReader 提供了一个类型为 Foo 的类,并且需要在写出之前将其转换为 Bar 类型。以下示例展示了一个执行此转换的 ItemProcessor:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
//write bars
}
}
在前面的示例中,有一个名为 Foo 的类、一个名为 Bar 的类,以及一个遵循 ItemProcessor 接口的 FooProcessor 类。这里的转换虽然简单,但可以执行任何类型的转换。BarWriter 负责写入 Bar 对象,如果提供其他类型的对象则会抛出异常。同样地,FooProcessor 在接收到非 Foo 类型的对象时也会抛出异常。随后,FooProcessor 可以被注入到一个 Step 中,如下例所示:
- Java
- XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2).transactionManager(transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
ItemProcessor 与 ItemReader 或 ItemWriter 的一个区别在于,对于 Step 来说,ItemProcessor 是可选的。
链式 ItemProcessor
在许多场景中,执行单一转换非常有用,但如果您希望将多个 ItemProcessor 实现“串联”起来呢?您可以通过使用前面提到的组合模式来实现。为了更新之前的单一转换示例,Foo 被转换为 Bar,而 Bar 又被转换为 Foobar 并写出,如下例所示:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
//write items
}
}
FooProcessor 和 BarProcessor 可以“链式”组合,从而生成最终的 Foobar,如下例所示:
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
与之前的示例类似,您可以将复合处理器配置到 Step 中:
- Java
- XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2).transactionManager(transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.infrastructure.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
筛选记录
项目处理器的一个典型用途是在记录传递给 ItemWriter 之前将其过滤掉。过滤与跳过是不同的操作。跳过表示记录无效,而过滤表示记录不应被写入。
例如,考虑一个批处理作业,它读取一个包含三种不同类型记录的文件:待插入记录、待更新记录和待删除记录。如果系统不支持记录删除,我们就不希望将任何可删除记录发送给 ItemWriter。然而,由于这些记录实际上并非错误记录,我们希望将其过滤掉而非跳过。因此,ItemWriter 将仅接收可插入和可更新的记录。
要过滤一条记录,您可以从 ItemProcessor 返回 null。框架会检测到结果为 null,从而避免将该条目添加到传递给 ItemWriter 的记录列表中。从 ItemProcessor 抛出的异常会导致跳过该记录。
验证输入
ItemReaders 与 ItemWriters 章节讨论了多种解析输入的方法。如果输入“格式不正确”,每个主要实现都会抛出异常。如果数据范围缺失,FixedLengthTokenizer 会抛出异常。类似地,尝试访问 RowMapper 或 FieldSetMapper 中不存在或格式与预期不同的索引也会导致抛出异常。所有这些类型的异常都会在 read 方法返回之前抛出。然而,它们并未解决返回的条目是否有效的问题。例如,如果某个字段是年龄,它不能为负数。它可能被正确解析,因为它存在且是一个数字,但这并不会导致异常。由于已经存在大量的验证框架,Spring Batch 并不试图提供另一个。相反,它提供了一个简单的接口,称为 Validator,您可以通过任意数量的框架来实现它,如下面的接口定义所示:
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
契约规定,如果对象无效,validate 方法会抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor,如下面的 Bean 定义所示:
- Java
- XML
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
<bean class="org.springframework.batch.infrastructure.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.infrastructure.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
您也可以使用 BeanValidatingItemProcessor 来验证带有 Bean Validation API (JSR-303) 注解的条目。例如,考虑以下类型 Person:
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
您可以在应用程序上下文中声明一个 BeanValidatingItemProcessor bean,并将其注册为面向块步骤中的处理器,从而验证项目:
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}
容错
当一个数据块回滚时,在读取过程中已缓存的条目可能会被重新处理。如果某个步骤被配置为容错(通常通过使用跳过或重试处理),那么所使用的任何 ItemProcessor 都应该以幂等的方式实现。通常,这包括不对 ItemProcessor 的输入条目进行任何更改,并且只更新作为结果的实例。