数据库
像大多数企业应用风格一样,数据库是批处理的核心存储机制。然而,批处理与其他应用风格的不同之处在于系统必须处理的数据集的规模之大。如果一条 SQL 语句返回 100 万行,那么结果集可能会将所有返回的结果保存在内存中,直到所有行都被读取完毕。Spring Batch 为此问题提供了两种类型的解决方案:
基于游标的 ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为这是数据库对“流式传输”关系数据问题的解决方案。Java ResultSet
类本质上是一种面向对象的操作游标机制。ResultSet
维护一个指向当前数据行的游标。在 ResultSet
上调用 next
方法会将此游标移动到下一行。Spring Batch 基于游标的 ItemReader
实现会在初始化时打开一个游标,并在每次调用 read
时将游标向前移动一行,返回一个可被用于处理的映射对象。然后调用 close
方法以确保释放所有资源。Spring 核心中的 JdbcTemplate
通过使用回调模式来映射 ResultSet
中的所有行并在返回控制权给方法调用者之前关闭它,从而绕过此问题。然而,在批处理中,必须等到步骤完成才能进行下一步。以下图像展示了一个基于游标的 ItemReader
的通用工作原理图。请注意,虽然示例使用了 SQL(因为 SQL 广为人知),但任何技术都可以实现这种基本方法。
图 1. 游标示例
此示例说明了基本模式。假设有一个名为 'FOO' 的表,该表包含三列:ID
、NAME
和 BAR
,选择所有 ID
大于 1 但小于 7 的行。这将把游标的起始位置(第 1 行)定位到 ID
为 2 的位置。该行的结果应该是一个完全映射的 Foo
对象。再次调用 read()
方法会将游标移动到下一行,即 ID
为 3 的 Foo
对象。这些读取的结果在每次 read
操作后被输出,从而允许对象被垃圾回收(假定没有实例变量保留对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的 JDBC 实现。它直接与 ResultSet
一起工作,并需要一个 SQL 语句来针对从 DataSource
获取的连接执行。以下数据库模式作为示例使用:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每一行使用一个域对象,因此以下示例使用了 RowMapper
接口的实现来映射一个 CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为 JdbcCursorItemReader
与 JdbcTemplate
共享关键接口,所以查看如何使用 JdbcTemplate
读取此数据的示例对于将其与 ItemReader
进行对比很有帮助。在本示例中,假设 CUSTOMER
数据库中有 1,000 行数据。第一个示例使用了 JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段后,customerCredits
列表包含 1,000 个 CustomerCredit
对象。在查询方法中,从 DataSource
获取连接,执行提供的 SQL,并为 ResultSet
中的每一行调用 mapRow
方法。这与 JdbcCursorItemReader
的方法形成对比,如下例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段后,计数器的值等于 1,000。如果上面的代码将返回的 customerCredit
放入一个列表中,结果将与使用 JdbcTemplate
示例的结果完全相同。然而,ItemReader
的主要优势在于它允许以 '流' 的方式处理数据项。read
方法可以被调用一次,数据项可以通过 ItemWriter
写出,然后通过再次调用 read
获取下一个数据项。这使得数据项的读取和写入可以分块进行,并且可以定期提交,这是高性能批处理的本质。此外,ItemReader
很容易配置为注入到 Spring Batch 的 Step
中。
- Java
- XML
下面的示例展示了如何在 Java 中将 ItemReader
注入到 Step
中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
下面的示例展示了如何在 XML 中将 ItemReader
注入到 Step
中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
额外属性
由于在 Java 中打开游标有多种不同的选项,因此 JdbcCursorItemReader
上可以设置许多属性,如下表所述:
表 1. JdbcCursorItemReader 属性
ignoreWarnings | 确定是否记录 SQLWarnings 或导致异常。默认值为 true (表示会记录警告)。 |
fetchSize | 为 JDBC 驱动程序提供一个提示,即当 ResultSet 对象(由 ItemReader 使用)需要更多行时,应从数据库中获取的行数。默认情况下,不提供任何提示。 |
maxRows | 设置底层 ResultSet 在任何时候可以容纳的最大行数限制。 |
queryTimeout | 设置驱动程序等待 Statement 对象执行的时间(以秒为单位)。如果超出此限制,则抛出 DataAccessException 。(请参考您的驱动程序供应商文档以获取详细信息)。 |
verifyCursorPosition | 由于 ItemReader 持有的同一个 ResultSet 会被传递给 RowMapper ,用户可能会自行调用 ResultSet.next() ,这可能导致读取器的内部计数出现问题。将此值设置为 true 会在 RowMapper 调用后检测游标位置是否与调用前相同,如果不相同则抛出异常。 |
saveState | 指示是否应在 ItemStream#update(ExecutionContext) 提供的 ExecutionContext 中保存读取器的状态。默认值为 true 。 |
driverSupportsAbsolute | 指示 JDBC 驱动程序是否支持在 ResultSet 上设置绝对行。建议对于支持 ResultSet.absolute() 的 JDBC 驱动程序将其设置为 true ,因为它可能提高性能,尤其是在处理大数据集时步骤失败的情况下。默认值为 false 。 |
setUseSharedExtendedConnection | 指示用于游标的连接是否应被所有其他处理共享,从而使用相同的事务。如果将其设置为 false ,则游标将使用其自己的连接打开,并不参与为其余步骤处理启动的任何事务。如果将此标志设置为 true ,则必须将 DataSource 包装在 ExtendedConnectionDataSourceProxy 中,以防止连接在每次提交后关闭和释放。当将此选项设置为 true 时,用于打开游标的语句将以 'READ_ONLY' 和 'HOLD_CURSORS_OVER_COMMIT' 选项创建。这允许在步骤处理中进行事务开始和提交时保持游标打开。要使用此功能,您需要一个支持该功能的数据库以及支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认值为 false 。 |
StoredProcedureItemReader
有时有必要通过存储过程来获取游标数据。StoredProcedureItemReader
的工作方式类似于 JdbcCursorItemReader
,不同之处在于,它不是通过执行查询来获取游标,而是运行一个返回游标的存储过程。存储过程可以通过三种不同的方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为以输出参数形式返回的 ref-cursor (由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
- Java
- XML
下面的 Java 示例配置使用了与前面示例相同的“客户信用”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
下面的 XML 示例配置使用了与前面示例相同的“客户信用”示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖存储过程提供一个 ResultSet
作为返回结果(之前提到的选项 1)。
如果存储过程返回了一个 ref-cursor
(选项 2),那么我们需要提供返回的 ref-cursor
所在的输出参数的位置。
- Java
- XML
下面的示例展示了如何在 Java 中处理第一个参数为 ref-cursor 的情况:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
下面的示例展示了如何在 XML 中处理第一个参数为 ref-cursor 的情况:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是从存储函数返回的(选项 3),我们需要将属性 function
设置为 true
。它的默认值是 false
。
- Java
- XML
下面的示例展示了在 Java 中将属性设置为 true
:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
下面的示例展示了在 XML 中将属性设置为 true
:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些情况下,我们需要定义一个 RowMapper
以及一个 DataSource
和实际的存储过程名称。
如果存储过程或函数接受参数,那么必须使用 parameters
属性来声明和设置这些参数。以下是一个针对 Oracle 的示例,它声明了三个参数。第一个是返回 ref-cursor 的 out
参数,而第二个和第三个是接收类型为 INTEGER
的值的 in
参数。
- Java
- XML
下面的示例展示了如何在 Java 中处理参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
下面的示例展示了如何在 XML 中处理参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明外,我们还需要指定一个 PreparedStatementSetter
实现,用于为调用设置参数值。这与上面提到的 JdbcCursorItemReader
的工作方式相同。在 Additional Properties 中列出的所有附加属性同样适用于 StoredProcedureItemReader
。
分页 ItemReader
实现
使用数据库游标的替代方案是运行多个查询,每个查询获取结果的一部分。我们称这部分为一页(page)。每个查询都必须指定起始行号以及我们希望在该页中返回的行数。
JdbcPagingItemReader
ItemReader
的一种分页实现是 JdbcPagingItemReader
。JdbcPagingItemReader
需要一个 PagingQueryProvider
,它负责提供用于检索构成一页的行的 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要为每种支持的数据库类型使用不同的 PagingQueryProvider
。此外,还有 SqlPagingQueryProviderFactoryBean
,它可以自动检测正在使用的数据库并确定合适的 PagingQueryProvider
实现。这简化了配置,也是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求你指定一个 select
子句和一个 from
子句。你还可以提供一个可选的 where
子句。这些子句以及必需的 sortKey
用于构建 SQL 语句。
在 sortKey
上有一个唯一的键约束非常重要,以保证在执行之间不会丢失数据。
在读取器被打开后,它每次调用 read
时会以与任何其他 ItemReader
相同的基本方式返回一个项目。当需要更多行时,分页在后台进行。
- Java
- XML
以下 Java 示例配置使用了与之前展示的基于游标的 ItemReaders
类似的“客户信用”示例:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 示例配置使用了与之前展示的基于游标的 ItemReaders
类似的“客户信用”示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
这个配置的 ItemReader
使用 RowMapper
返回 CustomerCredit
对象,其中 RowMapper
必须指定。pageSize
属性决定了每次查询从数据库中读取的实体数量。
parameterValues
属性可用于为查询指定一个参数值的 Map
。如果你在 where
子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果你使用传统的 '?' 占位符,则每个条目的键应是占位符的编号,从 1 开始。
JpaPagingItemReader
JpaPagingItemReader
是 ItemReader
分页的另一种实现。JPA 没有类似于 Hibernate StatelessSession
的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是一个自然的选择。在每一页读取后,实体会脱离关联,并且持久化上下文会被清除,以便在页面处理完成后允许实体被垃圾回收。
JpaPagingItemReader
允许你声明一个 JPQL 语句并传入一个 EntityManagerFactory
。然后它会以与其他任何 ItemReader
类似的基本方式,每次调用 read 时返回一个项目。当需要更多实体时,分页会在后台进行。
- Java
- XML
下面的 Java 示例配置使用了与之前显示的 JDBC 读取器相同的“客户信用”示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
下面的 XML 示例配置使用了与之前显示的 JDBC 读取器相同的“客户信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
这个配置好的 ItemReader
以与上述 JdbcPagingItemReader
描述的完全相同的方式返回 CustomerCredit
对象,前提是 CustomerCredit
对象具有正确的 JPA 注解或 ORM 映射文件。pageSize
属性决定了每次查询执行时从数据库中读取的实体数量。
数据库 ItemWriters
虽然平面文件和XML文件都有特定的 ItemWriter
实例,但在数据库领域没有完全等效的东西。这是因为事务已经提供了所有所需的功能。文件需要 ItemWriter
实现,因为它们必须表现得像事务一样,跟踪已写入的项并在适当的时候刷新或清除。数据库不需要这种功能,因为写操作本身已经包含在一个事务中。用户可以创建自己的实现 ItemWriter
接口的DAO,或者使用为通用处理需求编写的自定义 ItemWriter
。无论哪种方式,它们都应该能正常工作。需要注意的一点是,批量处理输出时提供的性能和错误处理能力。这在使用 Hibernate 作为 ItemWriter
时最为常见,但在使用 JDBC 批量模式时也可能遇到相同的问题。假设我们小心地进行刷新并且数据没有错误,批量处理数据库输出本身并没有固有的缺陷。然而,在写入过程中出现任何错误都可能引起混淆,因为无法知道是哪个具体的项导致了异常,甚至无法确定是否有某个具体的项负责,如下图所示:
图 2. 刷新时的错误
如果在写入之前对项进行缓冲,则在提交之前刷新缓冲区时不会抛出任何错误。例如,假设每块写入 20 个项,而第 15 个项抛出一个 DataIntegrityViolationException
。就 Step
而言,所有 20 个项都被成功写入,因为在实际写入之前无法知道发生错误。一旦调用 Session#flush()
,缓冲区被清空并且异常被捕获。此时,Step
无能为力,事务必须回滚。通常情况下,此异常可能会导致该项被跳过(具体取决于跳过/重试策略),然后该项将不再被写入。然而,在批量处理场景中,无法知道是哪个项导致了问题。失败发生时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项之后刷新,如下图所示:
图 3. 写入时的错误
这是一个常见的用例,尤其是在使用 Hibernate 时,ItemWriter
实现的一个简单指导原则是在每次调用 write()
时进行刷新。这样做可以确保在发生错误后,Spring Batch 内部能够可靠地跳过某些项目,并处理对 ItemWriter
调用的粒度。