跳到主要内容

重复

QWen Plus 中英对照 Repeat

RepeatTemplate

批处理与重复操作有关,要么作为简单的优化,要么作为作业的一部分。为了对重复操作进行策略化和泛化,并提供相当于迭代器框架的功能,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口的定义如下:

public interface RepeatOperations {

RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}
java

回调是一个接口,如下所示的定义中,它允许你插入一些可重复的业务逻辑:

public interface RepeatCallback {

RepeatStatus doInIteration(RepeatContext context) throws Exception;

}
java

回调函数会被反复执行,直到实现确定迭代应结束为止。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用者传达是否还有剩余工作需要处理的信息。一般来说,RepeatOperations 的实现应检查 RepeatStatus,并将其作为结束迭代决策的一部分。任何希望向调用者发出信号、表明没有剩余工作的回调都可以返回 RepeatStatus.FINISHED

最简单的通用 RepeatOperations 实现是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}

});
java

在前面的例子中,我们返回了 RepeatStatus.CONTINUABLE,以表明还有更多工作需要完成。回调函数也可以返回 RepeatStatus.FINISHED,以向调用者发出信号,表示没有剩余的工作。一些迭代可以根据回调中执行的工作的内在因素终止。而另一些则实际上是无限循环(从回调的角度来看),其完成决策被委托给外部策略,就像前面例子中展示的那样。

RepeatContext

RepeatCallback 的方法参数是一个 RepeatContext。许多回调函数会忽略该上下文。然而,如果需要,可以将其用作属性包来存储迭代期间的临时数据。在 iterate 方法返回后,该上下文将不再存在。

如果正在进行嵌套迭代,RepeatContext 会有一个父上下文。 父上下文有时可用于存储需要在多次调用 iterate 之间共享的数据。 例如,如果你想计算迭代中某个事件的发生次数并在后续调用中记住它,就是这种情况。

RepeatStatus

RepeatStatus 是 Spring Batch 中用于表示处理是否完成的枚举类型。它有两个可能的 RepeatStatus 值:

表 1. RepeatStatus 属性

描述
CONTINUABLE还有更多工作要做。
FINISHED不应再进行更多重复操作。

你可以通过在 RepeatStatus 中使用 and() 方法,用逻辑与操作来组合 RepeatStatus 值。这相当于对可继续标志进行逻辑与操作。换句话说,如果任一状态为 FINISHED,则结果为 FINISHED

完成策略

RepeatTemplate 内部,iterate 方法中循环的终止由 CompletionPolicy 决定,该策略同时也是 RepeatContext 的工厂。RepeatTemplate 的职责是使用当前策略在迭代的每个阶段创建一个 RepeatContext,并将其传递给 RepeatCallback。在回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy,要求其更新状态(该状态将存储在 RepeatContext 中)。然后,它会询问策略迭代是否已完成。

Spring Batch 提供了一些简单的通用 CompletionPolicy 实现。SimpleCompletionPolicy 允许执行固定次数(通过 RepeatStatus.FINISHED 可以在任何时间强制提前完成)。

用户可能需要为更复杂的决策实现自己的完成策略。例如,一个防止批处理作业在在线系统使用时执行的批处理窗口需要自定义策略。

异常处理

如果在 RepeatCallback 中抛出异常,RepeatTemplate 会咨询 ExceptionHandler,它可以决定是否重新抛出该异常。

以下列表显示了 ExceptionHandler 接口的定义:

public interface ExceptionHandler {

void handleException(RepeatContext context, Throwable throwable)
throws Throwable;

}
java

一个常见的用例是统计特定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和稍微灵活一点的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 拥有一个限制属性(limit property)和一个应与当前异常进行比较的异常类型。提供的类型的所有子类也会被计入。给定类型的异常在达到限制之前会被忽略,之后则会被重新抛出。其他类型的异常则始终会被重新抛出。

SimpleLimitExceptionHandler 的一个重要可选属性是名为 useParent 的布尔标志。 默认情况下,它为 false,因此限制仅在当前的 RepeatContext 中计算。 当设置为 true 时,限制将在嵌套迭代中的兄弟上下文中保持(例如步骤内的多个块)。

监听器

通常,能够为跨越多个不同迭代的横向关注点接收额外的回调是非常有用的。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并且在迭代过程中,如果可用,它们会通过 RepeatContextRepeatStatus 接收回调。

RepeatListener 接口有以下定义:

public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
java

openclose 回调会在整个迭代的前后调用。beforeafteronError 则适用于各个单独的 RepeatCallback 调用。

请注意,当有多个监听器时,它们会以列表的形式存在,因此有一个顺序。在这种情况下,openbefore 会按照相同的顺序被调用,而 afteronErrorclose 则会以相反的顺序被调用。

并行处理

RepeatOperations 的实现并不局限于按顺序执行回调函数。非常重要的一点是,某些实现能够并行执行其回调函数。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略来运行 RepeatCallback。默认情况下,使用的是 SynchronousTaskExecutor,这会导致整个迭代在同一线程中执行(与普通的 RepeatTemplate 相同)。

声明式迭代

有时,有一些业务处理是你知道每次发生时都需要重复执行的。这种经典的例子是消息管道的优化。如果一批消息频繁到达,处理它们比为每条消息承担单独事务的成本更高效。Spring Batch 提供了一个 AOP 拦截器,它将方法调用包装在一个 RepeatOperations 对象中以实现此目的。RepeatOperationsInterceptor 会根据提供的 RepeatTemplate 中的 CompletionPolicy 执行被拦截的方法并进行重复操作。

下面的示例使用 Java 配置来重复调用一个名为 processMessage 的方法的服务(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):

@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());

MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");

RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

return service;
}
java

前面的例子在拦截器中使用了默认的 RepeatTemplate。 若要更改策略、监听器和其他详细信息,可以将 RepeatTemplate 的实例注入到拦截器中。

如果被拦截的方法返回 void,拦截器始终返回 RepeatStatus.CONTINUABLE(因此,如果 CompletionPolicy 没有有限的终点,则存在无限循环的风险)。否则,它会一直返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为 null。此时,它将返回 RepeatStatus.FINISHED。因此,目标方法中的业务逻辑可以通过返回 null 或抛出由 RepeatTemplate 中提供的 ExceptionHandler 重新抛出的异常来表明没有更多工作需要处理。