入站通道适配器
入站通道适配器的主要功能是执行 SQL SELECT
查询,并将结果集转换为消息。消息的有效负载是整个结果集(表示为 List
),列表中的项目类型取决于行映射策略。默认策略是一个通用映射器,它为查询结果中的每一行返回一个 Map
。可选地,您可以通过添加对 RowMapper
实例的引用来更改此设置(有关行映射的更详细信息,请参阅 Spring JDBC 文档)。
如果你想将 SELECT
查询结果中的行转换为单独的消息,可以使用下游拆分器。
入站适配器还需要引用 JdbcTemplate
实例或 DataSource
之一。
除了生成消息的 SELECT
语句外,适配器还有一个将记录标记为已处理的 UPDATE
语句,这样它们就不会出现在下一次轮询中。更新可以通过来自原始选择的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中名为 id
的列会被转换为更新参数映射中名为 id
的列表)。以下示例定义了一个带有更新查询和 DataSource
引用的入站通道适配器。
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数以前缀冒号 (:
) 加参数名称的方式指定(在前面的例子中,这是一个应用于轮询结果集中每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的一个标准特性,结合了 Spring Integration 采用的惯例(投影到轮询结果列表上)。底层的 Spring JDBC 功能限制了可用的表达式(例如,大多数特殊字符,除了句号之外,都是不允许的),但由于目标通常是一个可以通过 bean 路径寻址的对象列表(可能是一个对象的列表),所以这种限制并不过分。
要更改参数生成策略,您可以注入一个 SqlParameterSourceFactory
到适配器中以覆盖默认行为(适配器有一个 sql-parameter-source-factory
属性)。Spring Integration 提供了 ExpressionEvaluatingSqlParameterSourceFactory
,它创建了一个基于 SpEL 的参数源,查询的结果作为 #root
对象。(如果 update-per-row
为 true,则根对象是行)。如果相同的参数名称在更新查询中多次出现,它只会被评估一次,并且其结果会被缓存。
你也可以为 select 查询使用参数源。在这种情况下,由于没有“结果”对象可以进行评估,因此每次使用单个参数源(而不是使用参数源工厂)。从 4.0 版开始,你可以使用 Spring 创建基于 SpEL 的参数源,如下例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
每个参数表达式中的 value
可以是任何有效的 SpEL 表达式。表达式评估的 #root
对象是在 parameterSource
bean 上定义的构造函数参数。它对所有评估都是静态的(在前面的例子中,是一个空的 String
)。
从 5.0 版开始,您可以使用 sqlParameterTypes
为 ExpressionEvaluatingSqlParameterSourceFactory
指定特定参数的目标 SQL 类型。
以下示例为查询中使用的参数提供了 SQL 类型:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用 createParameterSourceNoCache
工厂方法。否则,参数源会缓存评估结果。还请注意,由于禁用了缓存,如果相同的参数名称在 select 查询中多次出现,则每次出现都会重新评估。
轮询和事务
入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。轮询器在 JDBC 使用中的一个重要特性是可以在事务中包装轮询操作的选项,如下例所示:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有明确指定轮询器,将使用默认值。与 Spring Integration 通常情况一样,它可以定义为顶级 bean)。
在前面的例子中,数据库每 1000 毫秒(或每一秒)轮询一次,更新和选择查询都在同一事务中执行。未显示事务管理器的配置。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道为直接通道(默认值),这样端点将在同一线程中调用,因此也在同一事务中。这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。
max-rows
与 max-messages-per-poll
JDBC 入站通道适配器定义了一个名为 max-rows
的属性。当您指定适配器的轮询器时,也可以定义一个名为 max-messages-per-poll
的属性。虽然这两个属性看起来相似,但它们的含义却大不相同。
max-messages-per-poll
指定了每次轮询间隔查询执行的次数,而 max-rows
指定了每次执行返回的行数。
在正常情况下,当你使用 JDBC 入站通道适配器时,你可能不希望设置轮询器的 max-messages-per-poll
属性。其默认值是 1
,这意味着 JDBC 入站通道适配器的 receive() 方法会在每个轮询间隔内精确执行一次。
将 max-messages-per-poll
属性设置为更大的值意味着查询会连续执行那么多次数。有关 max-messages-per-poll
属性的更多信息,请参阅 配置入站通道适配器。
相比之下,max-rows
属性,如果大于 0
,则指定从由 receive()
方法创建的查询结果集中使用的最大行数。如果该属性设置为 0
,则所有行都将包含在结果消息中。该属性默认值为 0
。
建议使用供应商特定的查询选项来限制结果集,例如 MySQL LIMIT
或 SQL Server TOP
或 Oracle 的 ROWNUM
。有关更多信息,请参阅特定供应商的文档。