跳到主要内容
版本:6.0.2

拦截 Step 执行

DeepSeek V3 中英对照 Intercepting Step Execution Intercepting Step Execution

Job 类似,在 Step 的执行过程中也存在许多事件,用户可能需要在这些事件中执行某些功能。例如,当需要写入包含页脚的平面文件时,ItemWriter 需要在 Step 完成时收到通知,以便写入页脚。这可以通过多个 Step 作用域的监听器之一来实现。

您可以将任何实现了 StepListener 扩展(而非该接口本身,因为它是空的)的类,通过 listeners 元素应用于一个步骤。listeners 元素在步骤、任务或块声明内部是有效的。我们建议您在适用其功能的层级上声明监听器,或者如果它是多功能的(例如 StepExecutionListenerItemReadListener),则在最细粒度的适用层级上声明它。

以下示例展示了在 Java 中应用于块(chunk)级别的监听器:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}

如果一个 ItemReaderItemWriterItemProcessor 本身实现了 StepListener 接口之一,那么在使用命名空间 <step> 元素或 *StepFactoryBean 工厂时,它会自动注册到 Step 中。这仅适用于直接注入到 Step 的组件。如果监听器嵌套在另一个组件内部,则需要显式注册它(如先前在将 ItemStream 注册到 Step 中所述)。

除了 StepListener 接口外,还提供了注解来处理相同的问题。普通的 Java 对象可以包含带有这些注解的方法,这些方法随后会被转换为相应的 StepListener 类型。通常也会对块组件的自定义实现(如 ItemReaderItemWriterTasklet)进行注解。这些注解会被 XML 解析器用于解析 <listener/> 元素,并在构建器的 listener 方法中注册,因此您只需使用 XML 命名空间或构建器将监听器注册到步骤中即可。

StepExecutionListener

StepExecutionListenerStep 执行过程中最通用的监听器。它允许在 Step 开始之前和结束之后(无论正常结束还是失败)发送通知,如下例所示:

public interface StepExecutionListener extends StepListener {

void beforeStep(StepExecution stepExecution);

ExitStatus afterStep(StepExecution stepExecution);

}

afterStep 的返回类型为 ExitStatus,以便让监听器有机会修改 Step 完成时返回的退出码。

该接口对应的注解为:

  • @BeforeStep

  • @AfterStep

ChunkListener

"块"(chunk)是指在事务范围内处理的一组数据项。在每个提交间隔提交事务时,会提交一个块。您可以使用 ChunkListener 在块开始处理之前、块成功完成之后或块处理失败之后执行逻辑,如下面的接口定义所示:

public interface ChunkListener<I, O> extends StepListener {

void beforeChunk(Chunk<I> chunk);
void afterChunk(Chunk<O> chunk);
void afterChunkError(Exception exception, Chunk<O> chunk);

}

beforeChunk 方法在读取一批数据项后、事务启动后、处理开始前被调用。相反地,afterChunk 方法在数据块写入后、事务提交或回滚前被调用。

备注

ChunkListener 监听器接口在并发步骤中不会被调用

该接口对应的注解为:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

ChunkListener 并非设计用于抛出受检异常。错误必须在实现中进行处理,否则步骤将终止。

ItemReadListener

在之前讨论跳过逻辑时,曾提到记录被跳过的记录可能是有益的,以便后续处理。对于读取错误的情况,可以通过 ItemReaderListener 来实现,如下面的接口定义所示:

public interface ItemReadListener<T> extends StepListener {

void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);

}

beforeRead 方法在每次调用 ItemReader 的读取操作之前被调用。afterRead 方法在每次成功读取之后被调用,并接收已读取的项作为参数。如果在读取过程中发生错误,则会调用 onReadError 方法。该方法会接收到遇到的异常,以便进行日志记录。

该接口对应的注解为:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 类似,项目的处理过程也可以被“监听”,如下面的接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);

}

beforeProcess 方法会在 ItemProcessorprocess 方法之前被调用,并接收待处理的条目。afterProcess 方法则在条目成功处理后调用。若处理过程中出现错误,则会调用 onProcessError 方法。该方法会接收到遇到的异常以及尝试处理的条目,以便进行日志记录。

该接口对应的注解为:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

你可以通过 ItemWriteListener 来“监听”项目的写入过程,如下面的接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法会在 ItemWriterwrite 方法之前被调用,并接收将要写入的项目列表。afterWrite 方法则在项目成功写入之后、但在提交与该块处理相关的事务之前被调用。如果在写入过程中发生错误,则会调用 onWriteError 方法。该方法会接收到遇到的异常以及尝试写入的项目,以便进行日志记录。

该接口对应的注解为:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了在发生错误时接收通知的机制,但它们均不会告知你某条记录实际上已被跳过。例如,即使某个条目经过重试后成功,onWriteError 仍会被调用。因此,为了跟踪被跳过的条目,我们提供了一个单独的接口,如下方接口定义所示:

public interface SkipListener<T,S> extends StepListener {

void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 在读取过程中跳过某个项目时被调用。需要注意的是,回滚操作可能导致同一项目被多次注册为已跳过。onSkipInWrite 在写入过程中跳过某个项目时被调用。由于该项目已成功读取(且未被跳过),因此也会将项目本身作为参数提供。

该接口对应的注解为:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过监听器与事务

SkipListener 最常见的用途之一是记录跳过的条目,以便通过另一个批处理流程甚至人工流程来评估和修复导致跳过的错误。由于在许多情况下原始事务可能被回滚,Spring Batch 提供了两项保证:

  • 每个条目仅调用一次适当的跳过方法(取决于错误发生的时间)。

  • SkipListener 总是在事务提交之前被调用。这是为了确保监听器调用的任何事务性资源不会因 ItemWriter 内部的故障而回滚。