跳到主要内容

JDBC 消息存储

QWen Plus 中英对照 JDBC Message Store

Spring Integration 提供了两个 JDBC 特定的消息存储实现。JdbcMessageStore 适用于与聚合器和申领检查模式一起使用。JdbcChannelMessageStore 实现则为消息通道提供了一个更具针对性和可扩展性的实现。

请注意,您可以使用 JdbcMessageStore 来支持一个消息通道,JdbcChannelMessageStore 为此目的进行了优化。

important

从 5.0.11、5.1.2 版本开始,JdbcChannelMessageStore 的索引已进行优化。如果你在这样的存储中有大型消息组,你可能需要更改索引。此外,PriorityChannel 的索引被注释掉了,因为除非你使用由 JDBC 支持的此类通道,否则不需要该索引。

备注

在使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引,因为该索引包含在查询的提示中。

初始化数据库

在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象配置目标数据库。

Spring Integration 提供了一些示例脚本,可用于初始化数据库。在 spring-integration-jdbc JAR 文件中,你可以在 org.springframework.integration.jdbc 包中找到这些脚本。它为一系列常见的数据库平台提供了一个示例创建脚本和一个示例删除脚本。使用这些脚本的常见方法是将它们引用到 Spring JDBC 数据源初始化程序 中。请注意,这些脚本作为示例和所需表及列名的规范提供。你可能会发现需要增强它们以用于生产环境(例如,通过添加索引声明)。

从 6.2 版本开始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 实现了 SmartLifecycle,并在各自的表上,在 start() 方法中执行 SELECT COUNT 查询,以确保目标数据库中存在所需的表(根据提供的前缀)。如果所需的表不存在,应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false) 禁用此检查。

通用 JDBC 消息存储

JDBC 模块提供了一个基于数据库的 Spring Integration MessageStore (在声明检查模式中非常重要)和 MessageGroupStore (在有状态模式如聚合器中非常重要)的实现。这两个接口均由 JdbcMessageStore 实现,并且支持在 XML 中配置存储实例,如下例所示:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
xml

你可以指定一个 JdbcTemplate 而不是一个 DataSource

以下示例显示了一些其他可选属性:

<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
xml

在前面的例子中,我们为存储生成的查询中的表名指定了一个前缀。表名前缀默认为 INT_

支持消息通道

如果你打算用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore 实现。它只与消息通道一起使用。

支持的数据库

JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须设置 JdbcChannelMessageStore 上的 ChannelMessageStoreQueryProvider 属性。这个 channelMessageStoreQueryProvider 为指定的特定数据库提供 SQL 查询。Spring Integration 提供了对以下关系数据库的支持:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果您的数据库未列出,您可以实现 ChannelMessageStoreQueryProvider 接口并提供自己的自定义查询。

版本 4.0 向表中添加了 MESSAGE_SEQUENCE 列,以确保即使消息存储在同一毫秒内也能保证先进先出 (FIFO) 队列。

从 6.2 版开始,ChannelMessageStoreQueryProvider 暴露了一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,并且其轮询查询现在基于单个 DELETE…​RETURNING 语句。JdbcChannelMessageStore 会咨询 isSingleStatementForPoll 选项,并且如果只支持单个轮询语句,则跳过单独的 DELETE 语句。

自定义消息插入

自从 5.0 版本以来,通过重载 ChannelMessageStorePreparedStatementSetter 类,您可以为 JdbcChannelMessageStore 中的消息插入提供自定义实现。您可以使用它来设置不同的列或更改表结构或序列化策略。例如,您可以将其结构存储为 JSON 字符串,而不是默认的序列化为 byte[]

以下示例使用 setValues 的默认实现来存储通用列,并覆盖行为以将消息有效负载存储为 varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
java
important

通常,我们不建议使用关系数据库进行排队。相反,如果可能的话,考虑使用由 JMS 或 AMQP 支持的通道。有关进一步的参考资料,请参阅以下资源:

如果您仍然计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这在后续章节中有描述。

并发轮询

当轮询消息通道时,你可以选择使用 TaskExecutor 引用来配置关联的 Poller

important

请记住,但是,如果你使用基于 JDBC 的消息通道,并且你计划轮询该通道(因此也是消息存储事务)并使用多个线程,你应该确保使用支持 多版本并发控制 (MVCC) 的关系型数据库。否则,锁定可能会成为一个问题,并且在使用多个线程时性能可能不会如预期那样提升。例如,Apache Derby 在这方面存在问题。

为了实现更好的 JDBC 队列吞吐量并避免不同线程可能从队列中轮询同一 Message 时出现的问题,在使用不支持 MVCC 的数据库时,重要的是JdbcChannelMessageStoreusingIdCache 属性设置为 true。以下示例展示了如何进行设置:

<bean id="queryProvider"
class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
<int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
<int:poller fixed-delay="500" receive-timeout="500"
max-messages-per-poll="1" task-executor="pool">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
isolation="READ_COMMITTED" transaction-manager="transactionManager" />
</int:poller>
</int:bridge>

<int:channel id="outputChannel" />
xml

优先级通道

从 4.0 版本开始,JdbcChannelMessageStore 实现了 PriorityCapableChannelMessageStore 并提供了 priorityEnabled 选项,使其可以作为 priority-queue 实例的 message-store 引用。为此,INT_CHANNEL_MESSAGE 表包含一个 MESSAGE_PRIORITY 列以存储 PRIORITY 消息头的值。此外,新增的 MESSAGE_SEQUENCE 列使我们即使在多个消息在同一毫秒内以相同优先级存储的情况下,也能实现强大的先进先出 (FIFO) 轮询机制。消息按照 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 从数据库中轮询(选择)。

备注

我们不建议为优先级和非优先级队列通道使用相同的 JdbcChannelMessageStore bean,因为 priorityEnabled 选项适用于整个存储库,并且队列通道无法保持正确的 FIFO 队列语义。但是,相同的 INT_CHANNEL_MESSAGE 表(甚至 region)可以用于两种 JdbcChannelMessageStore 类型。要配置该场景,可以从另一个消息存储库 bean 扩展一个消息存储库 bean,如下例所示:

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
xml

分区消息存储

通常使用 JdbcMessageStore 作为一组应用程序或同一应用程序中的一组节点的全局存储。为了防止名称冲突并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是通过更改前缀来使用不同的表名(如前面所述)。另一种方法是指定一个 region 名称,在单个表内对数据进行分区。第二种方法的一个重要用例是当 MessageStore 管理支持 Spring Integration 消息通道的持久队列时。持久化通道的消息数据在存储中以通道名称为键。因此,如果通道名称不是全局唯一的,通道可能会接收到并非为其准备的数据。为了避免这种风险,可以使用消息存储 region 来保持不同物理通道的数据分离,这些通道具有相同的逻辑名称。

PostgreSQL:接收推送通知

PostgreSQL 提供了一个监听和通知框架,用于在数据库表操作时接收推送通知。Spring Integration 利用了这一机制(从 6.0 版本开始),以允许在向 JdbcChannelMessageStore 添加新消息时接收推送通知。当使用此功能时,必须定义一个数据库触发器,该触发器可以在 Spring Integration 的 JDBC 模块中包含的 schema-postgresql.sql 文件的注释部分找到。

推送通知是通过 PostgresChannelMessageTableSubscriber 类接收的,该类允许其订阅者在任何给定的 regiongroupId 收到新消息时接收回调。即使消息是在不同的 JVM 上追加的,但只要是在同一个数据库中,这些通知仍然会被接收到。PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 合约从存储中拉取消息,作为对上述 PostgresChannelMessageTableSubscriber 通知的响应。

例如,可以如下接收 some group 的推送通知:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
java

事务支持

从 6.0.5 版本开始,在 PostgresSubscribableChannel 上指定一个 PlatformTransactionManager 将会在事务中通知订阅者。订阅者中的异常将导致事务回滚,并且消息会被重新放回消息存储中。事务支持默认情况下不会被激活。

重试

从 6.0.5 版本开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。默认情况下,不执行重试。

important

任何活跃的 PostgresChannelMessageTableSubscriber 在其活跃生命周期内会占用一个独占的 JDBC Connection。因此,非常重要的一点是,这个连接不应该来自池化的 DataSource。这类连接池通常期望发出的连接会在预定义的超时窗口内关闭。

为了满足对独占连接的需求,还建议一个 JVM 仅运行一个 PostgresChannelMessageTableSubscriber,该实例可以用于注册任意数量的订阅。