跳到主要内容

拦截 Step 执行

QWen Plus 中英对照 Intercepting Step Execution Intercepting Step Execution

Job 一样,在 Step 的执行过程中有许多事件可能发生,用户可能需要在这些事件中执行某些功能。例如,对于需要写入带有尾部信息的平面文件的情况,ItemWriter 需要在 Step 完成时收到通知,以便写入尾部信息。这可以通过多个 Step 范围的监听器之一来实现。

你可以通过 listeners 元素将实现 StepListener (但不是该接口本身,因为它为空)的任何类应用到一个步骤中。listeners 元素在步骤、任务片段(tasklet)或块(chunk)声明中是有效的。我们建议你在其功能适用的层级声明监听器,或者如果它是多功能的(例如 StepExecutionListenerItemReadListener),则应在它适用的最细粒度层级进行声明。

下面的示例展示了在 Java 中应用于块(chunk)级别的监听器:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
java

如果使用命名空间的 <step> 元素或其中一个 *StepFactoryBean 工厂,实现 StepListener 接口之一的 ItemReaderItemWriterItemProcessor 会自动注册到 Step 中。这仅适用于直接注入到 Step 中的组件。如果监听器嵌套在另一个组件内部,则需要显式注册它(如前所述,在 Registering ItemStream with a Step 中描述)。

除了 StepListener 接口之外,还提供了注解来解决相同的问题。普通的 Java 对象可以拥有带有这些注解的方法,然后这些方法会被转换为相应的 StepListener 类型。为 chunk 组件的自定义实现(例如 ItemReaderItemWriterTasklet)添加注解也是一种常见做法。这些注解会被 XML 解析器分析以用于 <listener/> 元素,并且还会注册到构建器中的 listener 方法,因此你只需要使用 XML 命名空间或构建器将监听器注册到步骤中即可。

StepExecutionListener

StepExecutionListener 表示 Step 执行的最通用的监听器。它允许在 Step 开始之前和结束之后(无论它是正常结束还是失败)进行通知,如下例所示:

public interface StepExecutionListener extends StepListener {

void beforeStep(StepExecution stepExecution);

ExitStatus afterStep(StepExecution stepExecution);

}
java

ExitStatus 的返回类型为 afterStep,以便监听器有机会在 Step 完成后修改返回的退出代码。

与此接口相对应的注解为:

  • @BeforeStep

  • @AfterStep

ChunkListener

一个“chunk” 被定义为在事务范围内处理的项目。在每个提交间隔时提交事务,就会提交一个 chunk。你可以使用 ChunkListener 在 chunk 开始处理之前或 chunk 成功完成之后执行逻辑,如下所示的接口定义:

public interface ChunkListener extends StepListener {

void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);

}
java

beforeChunk 方法在事务开始后但在 ItemReader 开始读取之前被调用。相反,afterChunk 在 chunk 已提交后被调用(如果发生回滚,则可能根本不调用)。

与此接口相对应的注解为:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

你可以在没有分块声明的情况下应用 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非面向项的 tasklet(它会在 tasklet 之前和之后被调用)。

ChunkListener 不会被设计为抛出受检异常。必须在实现中处理错误,否则步骤将终止。

ItemReadListener

在之前讨论跳过逻辑时,提到过记录被跳过的记录可能会比较有用,这样可以稍后处理这些记录。对于读取错误的情况,可以使用 ItemReaderListener 来实现这一点,如下所示的接口定义所展示:

public interface ItemReadListener<T> extends StepListener {

void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);

}
java

beforeRead 方法在每次调用 ItemReader 的读取操作之前被调用。afterRead 方法在每次成功调用读取操作之后被调用,并传递所读取的项。如果在读取过程中发生错误,则会调用 onReadError 方法。所提供的异常会被记录。

与此接口对应的注解为:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”一项的处理过程,如下所示的接口定义所展示的那样:

public interface ItemProcessListener<T, S> extends StepListener {

void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);

}
java

beforeProcess 方法在 ItemProcessorprocess 方法之前被调用,并传入将要处理的项。afterProcess 方法在该项成功处理后被调用。如果在处理过程中发生错误,则会调用 onProcessError 方法。会提供遇到的异常以及尝试处理的项,以便可以对其进行记录。

与此接口对应的注解有:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

你可以通过 ItemWriteListener “监听” 项目写入的过程,如下接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);

}
java

beforeWrite 方法在 ItemWriterwrite 方法之前被调用,并接收将要写入的项目列表。afterWrite 方法在项目成功写入后、但在与块处理相关联的事务提交之前被调用。如果在写入过程中发生错误,则会调用 onWriteError 方法。会提供遇到的异常以及尝试写入的项目,以便可以对它们进行记录。

与此接口相对应的注解为:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了在发生错误时通知的机制,但没有一个能告知你某条记录已被跳过。例如,即使某个项被重试并最终成功,onWriteError 仍然会被调用。因此,为了跟踪被跳过的项,有一个单独的接口,如下所示的接口定义所展示:

public interface SkipListener<T,S> extends StepListener {

void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);

}
java

onSkipInRead 在读取时每当跳过一个项目时会被调用。需要注意的是,回滚可能会导致同一个项目被多次标记为已跳过。onSkipInWrite 在写入时跳过一个项目时会被调用。由于该项目已成功读取(且未被跳过),因此还会将该项目本身作为参数提供。

与此接口对应的注解为:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过监听器和事务

SkipListener 最常见的用例之一是记录被跳过的项目,这样其他批处理过程甚至人工处理可以用来评估并修复导致跳过的问题。由于在许多情况下,原始事务可能会回滚,Spring Batch 做出了两项保证:

  • 适当的跳过方法(根据错误发生的时间)每个项目只调用一次。

  • SkipListener 总是在事务提交之前被调用。这是为了确保监听器调用的任何事务性资源不会因为 ItemWriter 内部的失败而回滚。