跳到主要内容

子元素

QWen Plus 中英对照 Sub-elements

当这个 GatewayPollableChannel 接收消息时,您必须提供一个全局默认的 Poller,或者为 Job Launching Gateway 提供一个 Poller 子元素。

下面的示例展示了如何在 Java 中提供一个轮询器:

@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
java

提供带有信息性消息的反馈

由于 Spring Batch 作业可能运行较长时间,提供进度信息通常至关重要。例如,利益相关者可能希望在批处理作业的某些或所有部分失败时收到通知。Spring Batch 提供了通过以下方式收集此类信息的支持:

  • 主动轮询

  • 事件驱动型监听器

在异步启动 Spring Batch 作业时(例如,使用 Job Launching Gateway),会返回一个 JobExecution 实例。因此,你可以通过 JobExecution.getJobId() 方法,使用 JobExplorerJobRepository 中检索更新的 JobExecution 实例,从而持续轮询状态更新。然而,这种方法被认为是次优的,更推荐使用事件驱动的方法。

因此,Spring Batch 提供了监听器,包括以下三种最常用的监听器:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下图所示的示例中,一个 Spring Batch 作业已配置了 StepExecutionListener。因此,Spring Integration 会在任何步骤之前或之后接收并处理事件。例如,您可以使用 Router 来检查接收到的 StepExecution。根据该检查的结果,可能会发生各种情况(例如将消息路由到邮件输出通道适配器),从而可以根据某些条件发送电子邮件通知。

处理信息性消息

图 1. 处理信息性消息

以下两部分示例展示了如何配置监听器以在 StepExecution 事件中向 Gateway 发送消息,并将其输出记录到 logging-channel-adapter

首先,创建通知集成的bean。

下面的示例展示了如何在 Java 中创建通知集成 Bean:

@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
java
备注

您需要在配置中添加 @IntegrationComponentScan 注解。

第二,修改你的作业以添加一个步骤级监听器。

下面的示例展示了如何在 Java 中添加步骤级别的监听器:

public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importPayments", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.chunk(200, transactionManager)
.listener(notificationExecutionsListener())
// ...
.build();
)
.build();
}
java

异步处理器

异步处理器可以帮助你扩展项目的处理能力。在异步处理器的使用场景中,AsyncItemProcessor 作为调度器,在新线程上执行 ItemProcessor 的逻辑来处理项目。项目完成后,Future 将传递给 AsyncItemWriter 进行写入。

因此,您可以通过使用异步项处理来提高性能,基本上可以实现 fork-join 场景。AsyncItemWriter 会收集结果,并在所有结果都可用时立即将该块写回。

下面的示例展示了如何在 Java 中配置 AsyncItemProcessor

@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
java

delegate 属性指向你的 ItemProcessor bean,而 taskExecutor 属性指向你选择的 TaskExecutor

下面的示例展示了如何在 Java 中配置 AsyncItemWriter

@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
java

同样,delegate 属性实际上是指向你的 ItemWriter bean 的引用。

外部化批处理过程执行

到目前为止,所讨论的集成方法表明 Spring Integration 可以像外壳一样包装 Spring Batch。然而,Spring Batch 也可以在内部使用 Spring Integration。通过使用这种方法,Spring Batch 用户可以将项目甚至块的处理委托给外部进程。这使您能够卸载复杂的处理任务。Spring Batch Integration 为以下内容提供了专门的支持:

  • 远程分块

  • 远程分区

远程分块

下图展示了一种远程分块处理的工作方式,当您将 Spring Batch 与 Spring Integration 一起使用时:

Remote Chunking

图2. 远程分块

更进一步,您还可以通过使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)将块处理外部化,该组件会发送项目并收集结果。一旦发送出去,Spring Batch 将继续读取和分组项目的流程,而无需等待结果。相反,收集结果并将它们重新整合回 Spring Batch 流程的责任由 ChunkMessageChannelItemWriter 承担。

使用 Spring Integration,您可以完全控制流程的并发性(例如,通过使用 QueueChannel 而不是 DirectChannel)。此外,通过依赖 Spring Integration 丰富的通道适配器集合(如 JMS 和 AMQP),您可以将批处理作业的块分发到外部系统进行处理。

在 Java 中,一个带有远程分块步骤的作业可能具有类似于以下内容的配置:

public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
java

ItemReader 引用指向你希望在管理器上用于读取数据的 bean。ItemWriter 引用指向一个特殊的 ItemWriter(称为 ChunkMessageChannelItemWriter),如前所述。处理器(如果有)从管理器配置中省略,因为它是在工作程序上配置的。在实现你的使用场景时,应检查任何其他组件属性,例如限流限制等。

以下 Java 配置提供了一个基本的管理器设置:

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}

/*
* 配置出站流(发送到工作线程的请求)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}

/*
* 配置入站流(来自工作线程的回复)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}

/*
* 配置 ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
java

前面的配置为我们提供了多个bean。我们通过使用 ActiveMQ 和 Spring Integration 提供的入站和出站 JMS 适配器来配置消息中间件。如所示,我们的 itemWriter bean 被作业步骤引用,它使用 ChunkMessageChannelItemWriter 通过配置的中间件写入数据块。

现在我们可以继续进行工作线程配置,如下例所示:

下面的示例展示了用 Java 配置的工作程序:

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}

/*
* 配置传入流程(来自管理器的请求)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}

/*
* 配置传出流程(发送到管理器的回复)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}

/*
* 配置 ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
java

这些配置项中的大多数应该与管理器配置中的内容相似。工作节点(Workers)不需要访问 Spring Batch 的 JobRepository,也不需要访问实际的任务配置文件。主要的感兴趣 Bean 是 chunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor 属性接受一个已配置的 SimpleChunkProcessor,在这里你可以提供在工作节点运行时所需的 ItemWriter 引用(可选地还包括 ItemProcessor),当工作节点从管理节点接收数据块时,这些组件将会被使用。

如需更多信息,请参阅“可伸缩性”章节中关于 远程分块 的部分。

从 4.1 版本开始,Spring Batch Integration 引入了 @EnableBatchIntegration 注解,该注解可用于简化远程分块设置。此注解提供了两个可以在应用程序上下文中自动注入的 Bean:

  • RemoteChunkingManagerStepBuilderFactory:配置管理步骤

  • RemoteChunkingWorkerBuilder:配置远程工作程序集成流

这些 API 会负责配置多个组件,如下图所示:

Remote Chunking Configuration

图 3. 远程分块配置

在管理器端,RemoteChunkingManagerStepBuilderFactory 允许你通过声明以下内容来配置管理器步骤:

  • 用于读取项目并将其发送给工作者的项读取器

  • 用于向工作者发送请求的输出通道(“传出请求”)

  • 用于从工作者接收回复的输入通道(“传入回复”)

你不需要显式配置 ChunkMessageChannelItemWriterMessagingTemplate。(如果有必要,你仍然可以显式配置它们)。

在工作线程端,RemoteChunkingWorkerBuilder 允许你配置一个工作线程以:

  • 在输入通道(“Incoming requests”)上监听由管理器发送的请求

  • 对每个请求,调用 ChunkProcessorChunkHandlerhandleChunk 方法,并使用配置好的 ItemProcessorItemWriter

  • 在输出通道(“Outgoing replies”)上向管理器发送回复

你不需要显式配置 SimpleChunkProcessorChunkProcessorChunkHandler。(如果你有理由这样做,仍然可以显式配置它们)。

以下示例展示了如何使用这些API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

@Configuration
public static class ManagerConfiguration {

@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}

// Middleware beans setup omitted

}

@Configuration
public static class WorkerConfiguration {

@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;

@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}

// Middleware beans setup omitted

}

}
java

你可以在这里找到远程分块作业的完整示例 here

远程分区

下图显示了一个典型的远程分区情况:

远程分区

图 4. 远程分区

另一方面,远程分区(Remote Partitioning)在问题瓶颈并非由项目处理引起,而是由相关的 I/O 操作导致时非常有用。通过远程分区,您可以将工作发送给执行完整 Spring Batch 步骤的工作者。因此,每个工作者都有自己的 ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler

PartitionHandler 接口的实现使用 MessageChannel 实例向远程工作程序发送指令并接收它们的响应。这为与远程工作程序通信时所使用的传输方式(例如 JMS 和 AMQP)提供了一个很好的抽象。

在“可扩展性”章节中关于远程分区的部分,提供了配置远程分区所需的概念和组件的概述,并展示了使用默认的 TaskExecutorPartitionHandler 在单独的本地线程中进行分区的示例。为了实现跨多个 JVM 的远程分区,还需要两个额外的组件:

  • 远程处理结构或网格环境

  • 一个支持所需远程处理结构或网格环境的 PartitionHandler 实现

类似于远程分块,你可以使用 JMS 作为“远程传输框架”。在这种情况下,使用 MessageChannelPartitionHandler 实例作为 PartitionHandler 的实现,正如前面所述。

下面的示例假设已经存在一个分区作业,并专注于 Java 中的 MessageChannelPartitionHandler 和 JMS 配置:

/*
* 管理端配置
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// 配置聚合器的其他属性
return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}

/*
* 工作端配置
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}

@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
java

你还必须确保分区的 handler 属性映射到 partitionHandler bean。

下面的示例将分区 handler 属性映射到 Java 中的 partitionHandler

public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
java

你可以在这里找到远程分区作业的完整示例 here

你可以使用 @EnableBatchIntegration 注解来简化远程分区设置。该注解提供了两个对远程分区有用的 bean:

  • RemotePartitioningManagerStepBuilderFactory:配置管理步骤

  • RemotePartitioningWorkerStepBuilderFactory:配置工作步骤

这些 API 负责配置多个组件,如下图所示:

远程分区配置(带作业存储库轮询)

图 5. 远程分区配置(带作业存储库轮询)

远程分区配置(带回复聚合)

图 6. 远程分区配置(带回复聚合)

在管理器端,RemotePartitioningManagerStepBuilderFactory 通过声明以下内容让你配置管理器步骤:

  • 用于分区数据的 Partitioner

  • 用于向工作者发送请求的输出通道(“传出请求”)

  • 用于从工作者接收回复的输入通道(“传入回复”)(在配置回复聚合时)

  • 轮询间隔和超时参数(在配置作业存储库轮询时)

你不需要显式配置 MessageChannelPartitionHandlerMessagingTemplate。(如果你有理由这样做,仍然可以显式配置它们)。

在工作节点一侧,RemotePartitioningWorkerStepBuilderFactory 允许你配置一个工作节点以:

  • 在输入通道(“Incoming requests”)上监听由管理器发送的请求

  • 调用 StepExecutionRequestHandlerhandle 方法处理每个请求

  • 在输出通道(“Outgoing replies”)上向管理器发送回复

你不需要显式配置 StepExecutionRequestHandler。(如果你有理由这样做,可以显式配置它)。

以下示例展示了如何使用这些API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

@Configuration
public static class ManagerConfiguration {

@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}

// Middleware beans setup omitted

}

@Configuration
public static class WorkerConfiguration {

@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}

// Middleware beans setup omitted

}

}
java