重复
RepeatTemplate
批处理与重复操作有关,要么作为简单的优化,要么作为作业的一部分。为了对重复操作进行策略化和泛化,并提供相当于迭代器框架的功能,Spring Batch 提供了 RepeatOperations
接口。RepeatOperations
接口的定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是一个接口,如下所示的定义中,它允许你插入一些可重复的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调函数会被反复执行,直到实现确定迭代应结束为止。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLE
或 RepeatStatus.FINISHED
。RepeatStatus
枚举向重复操作的调用者传达是否还有剩余工作需要处理的信息。一般来说,RepeatOperations
的实现应检查 RepeatStatus
,并将其作为结束迭代决策的一部分。任何希望向调用者发出信号、表明没有剩余工作的回调都可以返回 RepeatStatus.FINISHED
。
最简单的通用 RepeatOperations
实现是 RepeatTemplate
:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的例子中,我们返回了 RepeatStatus.CONTINUABLE
,以表明还有更多工作需要完成。回调函数也可以返回 RepeatStatus.FINISHED
,以向调用者发出信号,表示没有剩余的工作。一些迭代可以根据回调中执行的工作的内在因素终止。而另一些则实际上是无限循环(从回调的角度来看),其完成决策被委托给外部策略,就像前面例子中展示的那样。
RepeatContext
RepeatCallback
的方法参数是一个 RepeatContext
。许多回调函数会忽略该上下文。然而,如果需要,可以将其用作属性包来存储迭代期间的临时数据。在 iterate
方法返回后,该上下文将不再存在。
如果正在进行嵌套迭代,RepeatContext
会有一个父上下文。 父上下文有时可用于存储需要在多次调用 iterate
之间共享的数据。 例如,如果你想计算迭代中某个事件的发生次数并在后续调用中记住它,就是这种情况。
RepeatStatus
RepeatStatus
是 Spring Batch 中用于表示处理是否完成的枚举类型。它有两个可能的 RepeatStatus
值:
表 1. RepeatStatus 属性
值 | 描述 |
---|---|
CONTINUABLE | 还有更多工作要做。 |
FINISHED | 不应再进行更多重复操作。 |
你可以通过在 RepeatStatus
中使用 and()
方法,用逻辑与操作来组合 RepeatStatus
值。这相当于对可继续标志进行逻辑与操作。换句话说,如果任一状态为 FINISHED
,则结果为 FINISHED
。
完成策略
在 RepeatTemplate
内部,iterate
方法中循环的终止由 CompletionPolicy
决定,该策略同时也是 RepeatContext
的工厂。RepeatTemplate
的职责是使用当前策略在迭代的每个阶段创建一个 RepeatContext
,并将其传递给 RepeatCallback
。在回调完成其 doInIteration
后,RepeatTemplate
必须调用 CompletionPolicy
,要求其更新状态(该状态将存储在 RepeatContext
中)。然后,它会询问策略迭代是否已完成。
Spring Batch 提供了一些简单的通用 CompletionPolicy
实现。SimpleCompletionPolicy
允许执行固定次数(通过 RepeatStatus.FINISHED
可以在任何时间强制提前完成)。
用户可能需要为更复杂的决策实现自己的完成策略。例如,一个防止批处理作业在在线系统使用时执行的批处理窗口需要自定义策略。
异常处理
如果在 RepeatCallback
中抛出异常,RepeatTemplate
会咨询 ExceptionHandler
,它可以决定是否重新抛出该异常。
以下列表显示了 ExceptionHandler
接口的定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是统计特定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler
和稍微灵活一点的 RethrowOnThresholdExceptionHandler
。SimpleLimitExceptionHandler
拥有一个限制属性(limit property)和一个应与当前异常进行比较的异常类型。提供的类型的所有子类也会被计入。给定类型的异常在达到限制之前会被忽略,之后则会被重新抛出。其他类型的异常则始终会被重新抛出。
SimpleLimitExceptionHandler
的一个重要可选属性是名为 useParent
的布尔标志。 默认情况下,它为 false
,因此限制仅在当前的 RepeatContext
中计算。 当设置为 true
时,限制将在嵌套迭代中的兄弟上下文中保持(例如步骤内的多个块)。
监听器
通常,能够为跨越多个不同迭代的横向关注点接收额外的回调是非常有用的。为此,Spring Batch 提供了 RepeatListener
接口。RepeatTemplate
允许用户注册 RepeatListener
实现,并且在迭代过程中,如果可用,它们会通过 RepeatContext
和 RepeatStatus
接收回调。
RepeatListener
接口有以下定义:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和 close
回调会在整个迭代的前后调用。before
、after
和 onError
则适用于各个单独的 RepeatCallback
调用。
请注意,当有多个监听器时,它们会以列表的形式存在,因此有一个顺序。在这种情况下,open
和 before
会按照相同的顺序被调用,而 after
、onError
和 close
则会以相反的顺序被调用。
并行处理
RepeatOperations
的实现并不局限于按顺序执行回调函数。非常重要的一点是,某些实现能够并行执行其回调函数。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate
,它使用 Spring 的 TaskExecutor
策略来运行 RepeatCallback
。默认情况下,使用的是 SynchronousTaskExecutor
,这会导致整个迭代在同一线程中执行(与普通的 RepeatTemplate
相同)。
声明式迭代
有时,有一些业务处理是你知道每次发生时都需要重复执行的。这种经典的例子是消息管道的优化。如果一批消息频繁到达,处理它们比为每条消息承担单独事务的成本更高效。Spring Batch 提供了一个 AOP 拦截器,它将方法调用包装在一个 RepeatOperations
对象中以实现此目的。RepeatOperationsInterceptor
会根据提供的 RepeatTemplate
中的 CompletionPolicy
执行被拦截的方法并进行重复操作。
- Java
- XML
下面的示例使用 Java 配置来重复调用一个名为 processMessage
的方法的服务(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
下面的示例展示了使用 Spring AOP 命名空间进行声明式迭代,以重复调用一个名为 processMessage
的方法的服务(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的例子在拦截器中使用了默认的 RepeatTemplate
。 若要更改策略、监听器和其他详细信息,可以将 RepeatTemplate
的实例注入到拦截器中。
如果被拦截的方法返回 void
,拦截器始终返回 RepeatStatus.CONTINUABLE
(因此,如果 CompletionPolicy
没有有限的终点,则存在无限循环的风险)。否则,它会一直返回 RepeatStatus.CONTINUABLE
,直到被拦截方法的返回值为 null
。此时,它将返回 RepeatStatus.FINISHED
。因此,目标方法中的业务逻辑可以通过返回 null
或抛出由 RepeatTemplate
中提供的 ExceptionHandler
重新抛出的异常来表明没有更多工作需要处理。