跳到主要内容
版本:6.0.2

Untitled :: Spring Batch 参考文档

外部化批处理执行

到目前为止讨论的集成方法表明,Spring Integration 可以像外壳一样包裹 Spring Batch。然而,Spring Batch 也可以在内部使用 Spring Integration。通过这种方法,Spring Batch 用户可以将项目甚至数据块的处理委托给外部进程。这使您能够卸载复杂的处理任务。Spring Batch Integration 专门为以下场景提供支持:

  • 远程分块 (Remote Chunking)
  • 远程分区 (Remote Partitioning)

远程分块

下图展示了在使用 Spring Batch 与 Spring Integration 时,远程分块的一种工作方式:

Remote Chunking

图 1. 远程分块

更进一步,你还可以通过使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)来外部化分块处理,该组件负责发送数据项并收集结果。一旦发送完成,Spring Batch 会继续执行读取和分组数据项的过程,而无需等待结果。相反,ChunkMessageChannelItemWriter 负责收集结果并将其重新集成到 Spring Batch 流程中。

借助Spring Integration,您可以完全控制流程的并发性(例如,通过使用QueueChannel而非DirectChannel)。此外,通过依赖Spring Integration丰富的通道适配器集合(如JMS和AMQP),您可以将批处理作业的块分发到外部系统进行处理。

一个包含待远程分块步骤的作业,在 Java 中可能具有类似以下配置:

public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}

ItemReader 引用指向您希望在管理器上用于读取数据的 bean。ItemWriter 引用指向一个特殊的 ItemWriter(称为 ChunkMessageChannelItemWriter),如前所述。处理器(如果有)未包含在管理器配置中,因为它是在工作器上配置的。在实现您的用例时,应检查任何其他组件属性,例如节流限制等。

以下 Java 配置提供了一个基本的管理器设置:

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}

/*
* 配置出站流(发送给工作者的请求)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}

/*
* 配置入站流(来自工作者的回复)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}

/*
* 配置 ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}

上述配置为我们提供了多个bean。我们通过使用ActiveMQ以及Spring Integration提供的入站和出站JMS适配器来配置消息中间件。如图所示,被作业步骤引用的itemWriter bean使用ChunkMessageChannelItemWriter通过已配置的中间件写入数据块。

现在我们可以继续配置 worker,如下例所示:

以下示例展示了 Java 中的工作节点配置:

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}

/*
* 配置入站流(来自管理器的请求)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}

/*
* 配置出站流(发送给管理器的回复)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}

/*
* 配置 ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}

这些配置项大多与管理器配置相似。工作节点不需要访问 Spring Batch 的 JobRepository,也不需要实际的作业配置文件。核心关注点是 chunkProcessorChunkHandler 这个 bean。ChunkProcessorChunkRequestHandlerchunkProcessor 属性接收一个已配置的 SimpleChunkProcessor,您需要在此处提供将在工作节点上运行的 ItemWriter(以及可选的 ItemProcessor)引用,当工作节点从管理器接收到数据块时就会执行这些组件。

更多信息,请参阅“可扩展性”章节中关于远程分块的部分。

自 4.1 版本起,Spring Batch Integration 引入了 @EnableBatchIntegration 注解,可用于简化远程分块(remote chunking)的配置。该注解提供了两个可在应用上下文中自动装配的 Bean:

  • RemoteChunkingManagerStepBuilderFactory: 配置管理器步骤

  • RemoteChunkingWorkerBuilder: 配置远程工作器集成流程

这些 API 负责配置多个组件,如下图所示:

远程分块配置

图 2. 远程分块配置

在管理器端,RemoteChunkingManagerStepBuilderFactory 允许您通过声明以下内容来配置管理器步骤:

  • 用于读取项目并将其发送给工作线程的项目读取器

  • 用于向工作线程发送请求的输出通道("传出请求")

  • 用于接收工作线程回复的输入通道("传入回复")

您无需显式配置 ChunkMessageChannelItemWriterMessagingTemplate。(如果发现有明确理由,您仍然可以显式配置它们)。

在工作端,RemoteChunkingWorkerBuilder 允许您配置一个工作器以:

  • 监听输入通道(“传入请求”)上由管理器发送的请求

  • 针对每个请求,使用配置的 ItemProcessorItemWriter 调用 ChunkProcessorChunkRequestHandlerhandleChunk 方法

  • 在输出通道(“传出回复”)上向管理器发送回复

你无需显式配置 SimpleChunkProcessorChunkProcessorChunkRequestHandler。(如果发现有理由需要这样做,你仍然可以显式配置它们)。

以下示例展示了如何使用这些 API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

@Configuration
public static class ManagerConfiguration {

@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}

// Middleware beans setup omitted

}

@Configuration
public static class WorkerConfiguration {

@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;

@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}

// Middleware beans setup omitted

}

}

你可以在这里找到远程分块作业的完整示例。

远程分区

下图展示了一个典型远程分区场景:

Remote Partitioning

图 3. 远程分区

另一方面,当瓶颈并非源于数据处理本身,而是由相关的I/O操作引起时,远程分区(Remote Partitioning)便显得尤为有用。通过远程分区,您可以将任务分发给执行完整Spring Batch步骤的工作节点。因此,每个工作节点都拥有自己的ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration提供了MessageChannelPartitionHandler

PartitionHandler 接口的实现使用 MessageChannel 实例向远程工作节点发送指令并接收其响应。这为与远程工作节点通信所使用的传输机制(如 JMS 和 AMQP)提供了良好的抽象层。

“可扩展性”章节中关于远程分区的部分概述了配置远程分区所需的概念和组件,并展示了使用默认的 TaskExecutorPartitionHandler 在独立的本地执行线程中进行分区的示例。若要在多个 JVM 中实现远程分区,还需要两个额外的组件:

  • 远程通信框架或网格环境

  • 支持所需远程通信框架或网格环境的 PartitionHandler 实现

与远程分块类似,您可以使用JMS作为"远程通信框架"。在这种情况下,请使用 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现,如前所述。

以下示例假设已存在一个分区作业,并重点展示 Java 中 MessageChannelPartitionHandler 和 JMS 的配置:

/*
* 管理器端配置
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// 配置 aggregatorFactoryBean 的其他属性
return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}

/*
* 工作器端配置
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}

@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}

您还必须确保分区 handler 属性映射到 partitionHandler bean。

以下示例将分区 handler 属性映射到 Java 中的 partitionHandler

public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}

你可以在这里找到一个远程分区作业的完整示例。

您可以使用 @EnableBatchIntegration 注解来简化远程分区设置。该注解提供了两个对远程分区非常有用的 Bean:

  • RemotePartitioningManagerStepBuilderFactory: 配置管理器步骤

  • RemotePartitioningWorkerStepBuilderFactory: 配置工作器步骤

这些 API 负责配置多个组件,如下图所示:

远程分区配置(带作业仓库轮询)

图 4. 远程分区配置(带作业仓库轮询)

远程分区配置(含回复聚合)

图 5. 远程分区配置(含回复聚合)

在管理器端,RemotePartitioningManagerStepBuilderFactory 允许您通过声明以下内容来配置管理器步骤:

  • 用于分区数据的 Partitioner

  • 用于向工作节点发送请求的输出通道(“传出请求”)

  • 用于接收工作节点回复的输入通道(“传入回复”,当配置回复聚合时)

  • 轮询间隔和超时参数(当配置作业存储库轮询时)

您无需显式配置 MessageChannelPartitionHandlerMessagingTemplate。(如果发现有理由这样做,您仍然可以显式配置它们)。

在工作端,RemotePartitioningWorkerStepBuilderFactory 允许您配置一个工作器以:

  • 监听输入通道(“传入请求”)上由管理器发送的请求

  • 为每个请求调用 StepExecutionRequestHandlerhandle 方法

  • 在输出通道(“传出回复”)上向管理器发送回复

您无需显式配置 StepExecutionRequestHandler。(如果您有理由这样做,也可以显式配置它)。

以下示例展示了如何使用这些 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

@Configuration
public static class ManagerConfiguration {

@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}

// Middleware beans setup omitted

}

@Configuration
public static class WorkerConfiguration {

@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}

// Middleware beans setup omitted

}

}