项目读取器与写入器实现
在本节中,我们将向您介绍之前章节中尚未讨论过的读取器和写入器。
装饰器
在某些情况下,您可能需要为已有的 ItemReader 或 ItemWriter 实现附加特定的行为。为此,Spring Batch 提供了以下开箱即用的装饰器:
SynchronizedItemStreamReader
当使用非线程安全的 ItemReader 时,Spring Batch 提供了 SynchronizedItemStreamReader 装饰器,可用于使 ItemReader 变为线程安全。Spring Batch 提供了 SynchronizedItemStreamReaderBuilder 来构建 SynchronizedItemStreamReader 的实例。
例如,FlatFileItemReader 并非线程安全,无法在多线程步骤中使用。可以通过使用 SynchronizedItemStreamReader 包装该读取器,以使其能够安全地用于多线程步骤。以下是如何包装此类读取器的示例:
@Bean
public SynchronizedItemStreamReader<Person> itemReader() {
FlatFileItemReader<Person> flatFileItemReader = new FlatFileItemReaderBuilder<Person>()
// set reader properties
.build();
return new SynchronizedItemStreamReaderBuilder<Person>()
.delegate(flatFileItemReader)
.build();
}
SingleItemPeekableItemReader
Spring Batch 包含一个装饰器,用于向 ItemReader 添加一个 peek 方法。该 peek 方法允许用户提前查看一个项目。重复调用 peek 会返回相同的项目,这也是 read 方法将返回的下一个项目。Spring Batch 提供了 SingleItemPeekableItemReaderBuilder 来构建 SingleItemPeekableItemReader 的实例。
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为无法在多个线程中同时保证 peek 操作的有效性。在多个线程中调用 peek 后,只有一个线程能在下一次调用 read 时获取到该项。
SynchronizedItemStreamWriter
在使用非线程安全的 ItemWriter 时,Spring Batch 提供了 SynchronizedItemStreamWriter 装饰器,可用于使 ItemWriter 变为线程安全。Spring Batch 提供了 SynchronizedItemStreamWriterBuilder 来构建 SynchronizedItemStreamWriter 的实例。
例如,FlatFileItemWriter 不是线程安全的,无法在多线程步骤中使用。可以通过使用 SynchronizedItemStreamWriter 包装该写入器,以在多线程步骤中安全使用。以下是如何包装此类写入器的示例:
@Bean
public SynchronizedItemStreamWriter<Person> itemWriter() {
FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriterBuilder<Person>()
// set writer properties
.build();
return new SynchronizedItemStreamWriterBuilder<Person>()
.delegate(flatFileItemWriter)
.build();
}
MultiResourceItemWriter
MultiResourceItemWriter 包装了一个 ResourceAwareItemWriterItemStream,并在当前资源中写入的项目数量超过 itemCountLimitPerResource 时创建一个新的输出资源。Spring Batch 提供了一个 MultiResourceItemWriterBuilder 来构造 MultiResourceItemWriter 的实例。
ClassifierCompositeItemWriter
ClassifierCompositeItemWriter 根据通过提供的 Classifier 实现的路由模式,为每个项目调用 ItemWriter 实现集合中的一个。如果所有委托都是线程安全的,则该实现也是线程安全的。Spring Batch 提供了 ClassifierCompositeItemWriterBuilder 来构造 ClassifierCompositeItemWriter 的实例。
ClassifierCompositeItemProcessor
ClassifierCompositeItemProcessor 是一个 ItemProcessor,它根据通过提供的 Classifier 实现的路由模式,调用一组 ItemProcessor 实现中的一个。Spring Batch 提供了 ClassifierCompositeItemProcessorBuilder 来构建 ClassifierCompositeItemProcessor 的实例。
MappingItemWriter
MappingItemWriter 通过在执行写入操作前对每个项目应用映射函数,将接受特定类型项目的 ItemWriter 适配为接受另一种类型项目的写入器。只要下游项目写入器是线程安全的,即可保证线程安全性;若下游项目写入器实现了 ItemStream 接口,状态管理也将得到遵循。
当与 CompositeItemWriter 结合使用时,此项目写入器最为有用。在这种情况下,下游写入器前的映射函数可以是输入项的获取器,也可以是更复杂的转换逻辑,从而有效地支持解构模式。
消息读写器
Spring Batch 为常用的消息系统提供了以下读取器和写入器:
AmqpItemReader
AmqpItemReader 是一个使用 AmqpTemplate 从交换器接收或转换消息的 ItemReader。Spring Batch 提供了 AmqpItemReaderBuilder 来构建 AmqpItemReader 的实例。
AmqpItemWriter
AmqpItemWriter 是一个使用 AmqpTemplate 向 AMQP 交换机发送消息的 ItemWriter。如果提供的 AmqpTemplate 中未指定交换机名称,消息将被发送到无名交换机。Spring Batch 提供了 AmqpItemWriterBuilder 来构建 AmqpItemWriter 的实例。
JmsItemReader
JmsItemReader 是一个用于 JMS 的 ItemReader,它使用 JmsTemplate。该模板应具有一个默认目的地,用于为 read() 方法提供数据项。Spring Batch 提供了 JmsItemReaderBuilder 来构建 JmsItemReader 的实例。
JmsItemWriter
JmsItemWriter 是一个用于 JMS 的 ItemWriter,它使用 JmsTemplate。该模板应具有默认目的地,用于在 write(List) 方法中发送条目。Spring Batch 提供了 JmsItemWriterBuilder 来构建 JmsItemWriter 的实例。
KafkaItemReader
KafkaItemReader 是用于读取 Apache Kafka 主题的 ItemReader。它可以配置为从同一主题的多个分区读取消息。该组件将消息偏移量存储在执行上下文中,以支持重启功能。Spring Batch 提供了 KafkaItemReaderBuilder 来构建 KafkaItemReader 的实例。
KafkaItemWriter
KafkaItemWriter 是用于 Apache Kafka 的 ItemWriter,它使用 KafkaTemplate 将事件发送到默认主题。Spring Batch 提供了 KafkaItemWriterBuilder 来构建 KafkaItemWriter 的实例。
数据库读取器
Spring Batch 提供了以下数据库读取器:
MongoPagingItemReader
MongoPagingItemReader 是一个通过分页技术从 MongoDB 读取文档的 ItemReader。Spring Batch 提供了 MongoPagingItemReaderBuilder 来构建 MongoPagingItemReader 的实例。
MongoCursorItemReader
MongoCursorItemReader 是一个通过流式技术从 MongoDB 读取文档的 ItemReader。Spring Batch 提供了 MongoCursorItemReaderBuilder 来构建 MongoCursorItemReader 的实例。
RepositoryItemReader
RepositoryItemReader 是一个通过 PagingAndSortingRepository 读取记录的 ItemReader。Spring Batch 提供了 RepositoryItemReaderBuilder 来构建 RepositoryItemReader 的实例。
数据库写入器
Spring Batch 提供了以下数据库写入器:
MongoItemWriter
MongoItemWriter 是一个 ItemWriter 实现,它使用 Spring Data 的 MongoOperations 实现来写入 MongoDB 存储。Spring Batch 提供了 MongoItemWriterBuilder 来构建 MongoItemWriter 的实例。
RepositoryItemWriter
RepositoryItemWriter 是 Spring Data 中 CrudRepository 的 ItemWriter 包装器。Spring Batch 提供了 RepositoryItemWriterBuilder 来构建 RepositoryItemWriter 的实例。
JdbcBatchItemWriter
JdbcBatchItemWriter 是一个 ItemWriter,它利用 NamedParameterJdbcTemplate 的批处理功能来执行针对所有提供项的批量语句。Spring Batch 提供了 JdbcBatchItemWriterBuilder 来构建 JdbcBatchItemWriter 的实例。
JpaItemWriter
JpaItemWriter 是一个 ItemWriter,它使用 JPA EntityManagerFactory 来合并不属于持久化上下文的任何实体。Spring Batch 提供了 JpaItemWriterBuilder 来构造 JpaItemWriter 的实例。
专业读者
Spring Batch 提供了以下专用读取器:
LdifReader
LdifReader 从 Resource 中读取 LDIF(LDAP 数据交换格式)记录,解析它们,并在每次执行 read 时返回一个 LdapAttribute 对象。Spring Batch 提供了 LdifReaderBuilder 来构建 LdifReader 的实例。
MappingLdifReader
MappingLdifReader 从 Resource 中读取 LDIF(LDAP 数据交换格式)记录,解析它们,然后将每个 LDIF 记录映射到一个 POJO(普通旧 Java 对象)。每次读取都会返回一个 POJO。Spring Batch 提供了 MappingLdifReaderBuilder 来构建 MappingLdifReader 的实例。
AvroItemReader
AvroItemReader 从资源中读取序列化的 Avro 数据。每次读取返回一个由 Java 类或 Avro Schema 指定类型的实例。该读取器可以配置为处理嵌入 Avro 模式的输入,也可以不配置。Spring Batch 提供了 AvroItemReaderBuilder 来构建 AvroItemReader 的实例。
专业编写器
Spring Batch 提供了以下专用写入器:
SimpleMailMessageItemWriter
SimpleMailMessageItemWriter 是一个能够发送邮件消息的 ItemWriter。它将消息的实际发送委托给 MailSender 实例。Spring Batch 提供了 SimpleMailMessageItemWriterBuilder 来构建 SimpleMailMessageItemWriter 的实例。
AvroItemWriter
AvroItemWrite 将 Java 对象根据给定的类型或模式(Schema)序列化到可写资源(WriteableResource)中。写入器(writer)可以选择配置为在输出中嵌入 Avro 模式或不嵌入。Spring Batch 提供了 AvroItemWriterBuilder 来构建 AvroItemWriter 的实例。
专用处理器
Spring Batch 提供了以下专用处理器:
ScriptItemProcessor
ScriptItemProcessor 是一种 ItemProcessor,它将当前待处理项传递给指定的脚本,并返回脚本执行的结果。Spring Batch 提供了 ScriptItemProcessorBuilder 来构建 ScriptItemProcessor 的实例。