跳到主要内容
版本:7.0.2

JDBC 通道消息存储 JSON 序列化

DeepSeek V3 中英对照 JDBC Channel Message Store JSON Serialization

版本 7.0 为 JdbcChannelMessageStore 引入了 JSON 序列化支持。默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。新的 JSON 序列化选项提供了一种替代的序列化机制。

important

安全注意事项: JSON序列化会将消息内容以文本形式存储在数据库中,这可能导致敏感数据暴露。在生产环境中使用JSON序列化前,请确保实施适当的数据库访问控制、静态数据加密,并考虑您组织的数据保护要求。

配置

JSON(反)序列化有两个可用的组件:

  • JsonChannelMessageStorePreparedStatementSetter - 将消息序列化为 JSON

  • JsonMessageRowMapper - 将消息从 JSON 反序列化

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());

// Enable JSON serialization
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter());
store.setMessageRowMapper(
new JsonMessageRowMapper("com.example"));

return store;
}

字符串参数 ("com.example") 指定了用于反序列化的额外受信任包。这些包会被追加到默认受信任包之后(参见受信任包章节)。出于安全考虑,只有来自受信任包的类才能被反序列化。

数据库模式修改

important

JSON 序列化需要修改数据库模式。默认的 BLOB/BYTEA 列类型模式无法用于 JSON 序列化。

MESSAGE_CONTENT 列必须更改为能够存储 JSON 的基于文本的类型。

对于 PostgreSQL,可以使用 JSONB 类型。

-- JSONB (启用 JSON 查询)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;

JSON 序列化的示例模式

以下示例展示了如何为基于 JSON 的消息存储创建专用表。

CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
MESSAGE_CONTENT JSONB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);

JSON 结构

在使用基于Jackson的序列化时,消息会通过Jackson的多态类型处理功能,以如下JSON结构进行存储:

{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.example.OrderMessage",
"orderId": "ORDER-12345",
"amount": 1299.99
},
"headers": {
"@class": "java.util.HashMap",
"priority": ["java.lang.String", "HIGH"],
"id": ["java.util.UUID", "a1b2c3d4-..."],
"timestamp": ["java.lang.Long", 1234567890]
}
}

@class 属性为多态类型的正确反序列化提供了必要的类型信息。

查询 JSON 内容(可选)

如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),可以直接查询消息内容。

PostgreSQL JSONB 查询

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';

MySQL JSON 函数

-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';

-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';
备注

如果使用了 TEXTCLOB 列类型,这些 JSON 特定的查询将不可用。不过,通过 Spring Integration 进行存储和检索时,JSON 序列化仍然有效。

受信任的软件包

JacksonMessagingUtils.messagingAwareMapper() 会依据受信任的包列表对所有反序列化的类进行验证,以防止安全漏洞。

默认受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler

可以在构造函数中指定额外的包,这些包会被追加到此列表中:

// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")

自定义 JsonObjectMapper

对于高级场景,可以提供自定义的 JsonObjectMapper

import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
customMapper.enable(SerializationFeature.INDENT_OUTPUT);
customMapper.registerModule(new CustomModule());

JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);

JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
store.setMessageRowMapper(
new JsonMessageRowMapper(jsonObjectMapper));

return store;
}
important

自定义的 JsonObjectMapper 应针对 Spring Integration 消息序列化进行适当配置。建议从 JacksonMessagingUtils.messagingAwareMapper() 开始,并在此基础上进行定制。在 JsonChannelMessageStorePreparedStatementSetterJsonMessageRowMapper 中必须使用相同的配置,以确保序列化和反序列化的一致性。