JDBC 消息存储
Spring Integration 提供了两种特定于 JDBC 的消息存储实现。JdbcMessageStore 适用于聚合器和声明检查模式。JdbcChannelMessageStore 实现则提供了一个更专注且可扩展的实现,专门用于消息通道。
请注意,您可以使用 JdbcMessageStore 来支持消息通道,而 JdbcChannelMessageStore 正是为此目的进行了优化。
从版本 5.0.11 和 5.1.2 开始,JdbcChannelMessageStore 的索引已得到优化。如果您的此类存储中存在大型消息组,您可能需要调整索引。此外,PriorityChannel 的索引已被注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要该索引。
使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引,因为查询提示中包含了该索引。
初始化数据库
在使用 JDBC 消息存储组件之前,您需要准备一个包含适当对象的目标数据库。
Spring Integration 附带了一些可用于初始化数据库的示例脚本。在 spring-integration-jdbc JAR 文件中,你可以在 org.springframework.integration.jdbc 包中找到这些脚本。它为一系列常见的数据库平台提供了示例创建脚本和示例删除脚本。使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器 中引用它们。请注意,这些脚本是作为示例以及所需表和列名的规范提供的。你可能会发现,为了在生产环境中使用,需要对其进行增强,例如添加索引声明。
从版本6.2开始,JdbcMessageStore、JdbcChannelMessageStore、JdbcMetadataStore 和 DefaultLockRepository 实现了 SmartLifecycle,并在其 start() 方法中执行 SELECT COUNT 查询,以确认目标数据库中是否存在所需的表(根据提供的前缀)。如果所需的表不存在,应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false) 来禁用此检查。
通用 JDBC 消息存储
JDBC模块提供了Spring Integration MessageStore(在Claim Check模式中很重要)和MessageGroupStore(在有状态模式如聚合器中很重要)的实现,这些实现由数据库支持。这两个接口都由JdbcMessageStore实现,并且支持在XML中配置存储实例,如下例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
你可以指定一个 JdbcTemplate 而不是 DataSource。
以下示例展示了其他一些可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
在前面的示例中,我们为存储生成的查询中的表名指定了前缀。表名前缀默认为 INT_。
支持消息通道
如果您打算使用 JDBC 作为消息通道的存储后端,我们推荐使用 JdbcChannelMessageStore 实现。它仅与消息通道配合使用。
支持的数据库
JdbcChannelMessageStore 使用特定于数据库的 SQL 查询来从数据库中检索消息。因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。这个 channelMessageStoreQueryProvider 为您指定的特定数据库提供 SQL 查询。Spring Integration 为以下关系型数据库提供支持:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果你的数据库不在列表中,你可以实现 ChannelMessageStoreQueryProvider 接口并提供自己的自定义查询。
Version 4.0 在表中添加了 MESSAGE_SEQUENCE 列,以确保即使在消息存储于同一毫秒的情况下也能实现先进先出(FIFO)队列。
从版本 6.2 开始,ChannelMessageStoreQueryProvider 公开了一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,并且其轮询查询现在基于单个 DELETE…RETURNING 语句。JdbcChannelMessageStore 会参考 isSingleStatementForPoll 选项,如果仅支持单个轮询语句,则跳过单独的 DELETE 语句。
自定义消息插入
自 5.0 版本起,通过重载 ChannelMessageStorePreparedStatementSetter 类,您可以为 JdbcChannelMessageStore 中的消息插入提供自定义实现。您可以使用它来设置不同的列、更改表结构或序列化策略。例如,您可以将消息结构存储为 JSON 字符串,而非默认的序列化为 byte[]。
以下示例使用 setValues 的默认实现来存储通用列,并重写其行为以将消息负载存储为 varchar:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系型数据库作为队列。如果可能,请考虑使用基于 JMS 或 AMQP 的通道。更多参考资料请参见:
如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,该机制在后续章节中有详细说明。
并发轮询
轮询消息通道时,可以选择通过引用 TaskExecutor 来配置关联的 Poller。
请注意,如果你使用 JDBC 支持的消息通道,并计划使用多个线程轮询该通道以及事务性消息存储,应确保使用支持多版本并发控制(MVCC)的关系型数据库。否则,可能会出现锁定问题,并且在使用多线程时,性能可能无法达到预期。例如,Apache Derby 在这方面存在问题。
为了获得更好的 JDBC 队列吞吐量,并避免不同线程可能从队列中轮询到相同 Message 的问题,在使用不支持 MVCC 的数据库时,必须将 JdbcChannelMessageStore 的 usingIdCache 属性设置为 true。以下示例展示了如何操作:
<bean id="queryProvider"
class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="10"
queue-capacity="10" rejection-policy="CALLER_RUNS" />
<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<int:channel id="inputChannel">
<int:queue message-store="store"/>
</int:channel>
<int:bridge input-channel="inputChannel" output-channel="outputChannel">
<int:poller fixed-delay="500" receive-timeout="500"
max-messages-per-poll="1" task-executor="pool">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
isolation="READ_COMMITTED" transaction-manager="transactionManager" />
</int:poller>
</int:bridge>
<int:channel id="outputChannel" />
优先级通道
从 4.0 版本开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 接口,并提供了 priorityEnabled 选项,使其能够作为 priority-queue 实例的 message-store 引用。为此,INT_CHANNEL_MESSAGE 表新增了 MESSAGE_PRIORITY 列,用于存储 PRIORITY 消息头的值。此外,新增的 MESSAGE_SEQUENCE 列使我们能够实现稳健的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储了多个具有相同优先级的消息。消息从数据库中轮询(选择)时,会按照 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 的顺序进行。
我们不建议在优先级和非优先级队列通道中使用相同的 JdbcChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储,并且队列通道无法保留正确的 FIFO 队列语义。然而,相同的 INT_CHANNEL_MESSAGE 表(甚至 region)可以用于两种 JdbcChannelMessageStore 类型。要配置该场景,您可以从另一个消息存储 bean 扩展一个消息存储 bean,如下例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
消息存储分区
通常,我们会将 JdbcMessageStore 用作一组应用程序或同一应用程序中多个节点的全局存储。为了防止名称冲突并提供对数据库元数据配置的控制,消息存储允许通过两种方式对表进行分区。一种方式是通过更改前缀来使用不同的表名(如之前所述)。另一种方式是指定一个 region 名称,在单个表内对数据进行分区。第二种方法的一个重要应用场景是当 MessageStore 管理支持 Spring Integration 消息通道的持久队列时。持久通道的消息数据在存储中通过通道名称进行键控。因此,如果通道名称不是全局唯一的,通道可能会接收到非预期的数据。为了避免这种风险,您可以使用消息存储的 region 来为具有相同逻辑名称的不同物理通道保持数据分离。
PostgreSQL:接收推送通知
PostgreSQL 提供了一套监听与通知框架,用于在数据库表发生变更时接收推送通知。Spring Integration(自 6.0 版本起)利用这一机制,使得当新消息被添加到 JdbcChannelMessageStore 时能够接收推送通知。使用此功能时,必须定义一个数据库触发器,该触发器可在 Spring Integration JDBC 模块中包含的 schema-postgresql.sql 文件的注释部分找到。
推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在任何给定 region 和 groupId 的新消息到达时接收回调。即使消息是在不同的 JVM 上追加到同一数据库,这些通知也能被接收。PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约从存储中拉取消息,作为对上述 PostgresChannelMessageTableSubscriber 通知的响应。
例如,some group 的推送通知可以按如下方式接收:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从 6.0.5 版本开始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 将在事务中通知订阅者。订阅者中的异常将导致事务回滚,并将消息放回消息存储中。事务支持默认未激活。
重试
从版本 6.0.5 开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。默认情况下,不执行重试。
任何处于活动状态的 PostgresChannelMessageTableSubscriber 在其整个活动生命周期内都会占用一个独占的 JDBC Connection。因此,确保此连接不来自连接池 DataSource 至关重要。这类连接池通常期望已分配的连接在预定义的时间窗口内被关闭。
由于需要独占连接,还建议每个 JVM 仅运行一个 PostgresChannelMessageTableSubscriber,该订阅者可用于注册任意数量的订阅。