跳到主要内容
版本:7.0.2

入站通道适配器

DeepSeek V3 中英对照 Inbound Channel Adapter

入站通道适配器的主要功能是执行SQL SELECT查询,并将结果集转换为消息。消息的有效载荷是整个结果集(表示为List),列表中项的类型取决于行映射策略。默认策略是通用映射器,它为查询结果中的每一行返回一个Map。可选地,您可以通过添加对RowMapper实例的引用来更改此策略(有关行映射的更多详细信息,请参阅Spring JDBC文档)。

备注

如果你想将 SELECT 查询结果中的行转换为独立的消息,可以使用下游分割器。

入站适配器还需要引用一个 JdbcTemplate 实例或一个 DataSource

除了使用 SELECT 语句生成消息外,适配器还包含一个 UPDATE 语句,用于将记录标记为已处理,以便它们不会出现在下一次轮询中。该更新操作可以通过原始查询中的 ID 列表进行参数化。默认情况下,这是通过命名约定实现的(输入结果集中名为 id 的列会被转换为更新参数映射中名为 id 的列表)。以下示例定义了一个包含更新查询和 DataSource 引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
备注

更新查询中的参数通过在参数名称前加上冒号(:)前缀来指定(在前面的示例中,这是一个应用于轮询结果集中每一行的表达式)。这是Spring JDBC中命名参数JDBC支持的标准特性,结合了Spring Integration采用的一种约定(投影到轮询结果列表上)。底层的Spring JDBC特性限制了可用的表达式(例如,除了句点之外的大多数特殊字符都不允许使用),但由于目标通常是一个对象列表(可能是一个对象的列表),这些对象可以通过bean路径进行寻址,因此这并不具有过度的限制性。

要更改参数生成策略,你可以向适配器注入一个 SqlParameterSourceFactory 来覆盖默认行为(适配器有一个 sql-parameter-source-factory 属性)。Spring Integration 提供了 ExpressionEvaluatingSqlParameterSourceFactory,它会创建一个基于 SpEL 的参数源,并将查询结果作为 #root 对象。(如果 update-per-row 为 true,则根对象是行)。如果同一个参数名在更新查询中出现多次,它只会被求值一次,并且其结果会被缓存。

你也可以为选择查询使用参数源。在这种情况下,由于没有“结果”对象可供评估,每次都会使用单个参数源(而不是使用参数源工厂)。从版本 4.0 开始,你可以使用 Spring 创建一个基于 SpEL 的参数源,如下例所示:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的 value 可以是任何有效的 SpEL 表达式。表达式求值的 #root 对象是定义在 parameterSource bean 上的构造函数参数。对于所有求值过程而言,该对象是静态的(在前面的示例中,是一个空的 String)。

从 5.0 版本开始,你可以为 ExpressionEvaluatingSqlParameterSourceFactory 提供 sqlParameterTypes 来为特定参数指定目标 SQL 类型。

以下示例为查询中使用的参数提供了 SQL 类型:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
important

请使用 createParameterSourceNoCache 工厂方法。否则,参数源会缓存评估结果。同时请注意,由于禁用了缓存,如果同一个参数名在选择查询中出现多次,每次出现都会重新进行评估。

轮询与事务

入站适配器接受一个常规的 Spring Integration 轮询器作为子元素。因此,轮询频率(以及其他用途)可以得到控制。对于 JDBC 使用场景,轮询器的一个重要特性是能够将轮询操作包装在事务中,如下例所示:

<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
备注

如果您没有明确指定轮询器,将使用默认值。与 Spring Integration 的常规做法一样,它可以被定义为一个顶级 bean。

在前面的示例中,数据库每1000毫秒(或每秒一次)被轮询一次,更新和选择查询都在同一个事务中执行。事务管理器配置未显示。然而,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道为直接通道(默认情况),这样端点就在同一个线程中被调用,因此也在同一个事务中。这样,如果其中任何一个失败,事务将回滚,输入数据将恢复到其原始状态。

max-rowsmax-messages-per-poll

JDBC入站通道适配器定义了一个名为max-rows的属性。当您指定适配器的轮询器时,还可以定义一个名为max-messages-per-poll的属性。虽然这两个属性看起来很相似,但它们的含义却大不相同。

max-messages-per-poll 指定每个轮询间隔内查询执行的次数,而 max-rows 指定每次执行返回的行数。

在正常情况下,使用JDBC入站通道适配器时,您可能不希望设置轮询器的max-messages-per-poll属性。其默认值为1,这意味着JDBC入站通道适配器的receive()方法在每个轮询间隔内仅执行一次。

max-messages-per-poll 属性设置为更大的值意味着查询会连续执行相应次数。有关 max-messages-per-poll 属性的更多信息,请参阅配置入站通道适配器

相比之下,max-rows 属性如果大于 0,则指定从 receive() 方法创建的查询结果集中使用的最大行数。如果该属性设置为 0,则所有行都将包含在结果消息中。该属性默认值为 0

备注

建议通过特定供应商的查询选项来限制结果集,例如 MySQL 的 LIMIT、SQL Server 的 TOP 或 Oracle 的 ROWNUM。更多信息请参阅相应供应商的文档。