重复
RepeatTemplate
批处理涉及重复性操作,无论是作为简单优化还是作为作业的一部分。为了对重复操作进行策略化和泛化,并提供相当于迭代器框架的功能,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口的定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是一个接口,其定义如下所示,它允许您插入一些需要重复执行的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调会重复执行,直到实现确定迭代应当结束。在这些接口中,返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLE 或 RepeatStatus.FINISHED。RepeatStatus 枚举向重复操作的调用者传递关于是否仍有工作待处理的信息。一般来说,RepeatOperations 的实现应当检查 RepeatStatus 并将其作为决定结束迭代的一部分。任何希望向调用者表明没有剩余工作的回调都可以返回 RepeatStatus.FINISHED。
RepeatOperations 最简单的通用实现是 RepeatTemplate:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的例子中,我们返回 RepeatStatus.CONTINUABLE,以表明还有更多工作要做。回调函数也可以返回 RepeatStatus.FINISHED,向调用者发出信号,表示没有剩余工作。某些迭代可以通过回调函数内部执行工作的内在考量来终止。其他迭代实际上是无限循环(就回调函数而言),完成决策被委托给外部策略,如前面例子所示。
RepeatContext
RepeatCallback 的方法参数是一个 RepeatContext。许多回调会忽略这个上下文。然而,如有必要,你可以将其用作一个属性包,在迭代期间存储临时数据。当 iterate 方法返回后,该上下文将不复存在。
如果存在嵌套迭代正在进行中,RepeatContext 会有一个父级上下文。父级上下文偶尔可用于存储需要在多次调用 iterate 之间共享的数据。例如,如果您想要统计迭代中某个事件的发生次数,并在后续调用中记住这个计数,这种情况就会用到父级上下文。
重复状态
RepeatStatus 是 Spring Batch 中用于指示处理是否已完成的枚举。它有两个可能的 RepeatStatus 值:
表 1. RepeatStatus 属性
| 值 | 描述 |
|---|---|
CONTINUABLE | 还有更多工作要做。 |
FINISHED | 不应再进行更多重复。 |
你可以通过使用 RepeatStatus 中的 and() 方法,将 RepeatStatus 值与逻辑 AND 操作结合。这样做的效果是对可继续标志执行逻辑 AND 操作。换句话说,如果任一状态为 FINISHED,则结果为 FINISHED。
完成策略
在 RepeatTemplate 内部,iterate 方法中循环的终止由 CompletionPolicy 决定,它同时也是 RepeatContext 的工厂。RepeatTemplate 的职责是使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。当回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 以更新其状态(该状态将存储在 RepeatContext 中)。随后,它会询问策略迭代是否已完成。
Spring Batch 提供了一些简单的通用 CompletionPolicy 实现。SimpleCompletionPolicy 允许执行达到固定次数(同时 RepeatStatus.FINISHED 可随时强制提前完成)。
用户可能需要为更复杂的决策实现自己的完成策略。例如,一个防止在线系统在使用时执行批处理作业的批处理窗口就需要自定义策略。
异常处理
如果在 RepeatCallback 内部抛出异常,RepeatTemplate 会咨询 ExceptionHandler,由它决定是否重新抛出该异常。
以下清单展示了 ExceptionHandler 接口的定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是统计给定类型的异常数量,并在达到限制时使任务失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和稍显灵活的 RethrowOnThresholdExceptionHandler。SimpleLimitExceptionHandler 具有一个限制属性和一个异常类型,该异常类型将与当前异常进行比较。所提供的类型的所有子类也会被计数。在达到限制之前,给定类型的异常会被忽略,然后它们会被重新抛出。其他类型的异常则总是被重新抛出。
SimpleLimitExceptionHandler 的一个重要可选属性是名为 useParent 的布尔标志。默认情况下,该标志为 false,因此限制仅在当前的 RepeatContext 中生效。当设置为 true 时,限制会在嵌套迭代(例如步骤内的一组块)中的同级上下文之间保持。
监听器
通常,能够接收针对多个不同迭代的横切关注点的额外回调是非常有用的。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并在迭代过程中,在可用时通过 RepeatContext 和 RepeatStatus 为它们提供回调。
RepeatListener 接口的定义如下:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open 和 close 回调在整个迭代开始前和结束后执行。before、after 和 onError 则应用于每次单独的 RepeatCallback 调用。
请注意,当存在多个监听器时,它们会以列表形式存在,因此存在顺序。在这种情况下,open 和 before 会按照相同的顺序调用,而 after、onError 和 close 则会按照相反的顺序调用。
并行处理
RepeatOperations 的实现并不局限于顺序执行回调。某些实现能够并行执行其回调,这一点非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略来运行 RepeatCallback。默认情况下,它使用 SynchronousTaskExecutor,这会导致整个迭代在同一个线程中执行(与普通的 RepeatTemplate 相同)。
声明式迭代
有时,某些业务处理需要在每次发生时重复执行。一个典型的例子是消息管道的优化。如果一批消息频繁到达,批量处理它们比为每条消息承担单独的事务开销更为高效。为此,Spring Batch 提供了一个 AOP 拦截器,它可以将方法调用包装在 RepeatOperations 对象中。RepeatOperationsInterceptor 会执行被拦截的方法,并根据提供的 RepeatTemplate 中的 CompletionPolicy 进行重复操作。
- Java
- XML
以下示例使用 Java 配置来重复调用名为 processMessage 的方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
以下示例展示了使用 Spring AOP 命名空间进行声明式迭代,以重复调用名为 processMessage 的方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在拦截器中使用了默认的 RepeatTemplate。若要更改策略、监听器及其他细节,可以向拦截器中注入一个 RepeatTemplate 实例。
如果被拦截的方法返回 void,拦截器总是返回 RepeatStatus.CONTINUABLE(因此,如果 CompletionPolicy 没有明确的终止点,存在无限循环的风险)。否则,它会持续返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为 null。此时,它返回 RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回 null,或者通过抛出由提供的 RepeatTemplate 中的 ExceptionHandler 重新抛出的异常,来表明没有更多工作需要处理。