JDBC 通道消息存储 JSON 序列化
版本 7.0 为 JdbcChannelMessageStore 引入了 JSON 序列化支持。默认情况下,Spring Integration 使用 Java 序列化将消息存储在数据库中。新的 JSON 序列化选项提供了一种替代的序列化机制。
安全注意事项: JSON序列化会将消息内容以文本形式存储在数据库中,这可能导致敏感数据暴露。在生产环境中使用JSON序列化前,请确保实施适当的数据库访问控制、静态数据加密,并考虑您组织的数据保护要求。
配置
JSON(反)序列化有两个可用的组件:
-
JsonChannelMessageStorePreparedStatementSetter- 将消息序列化为 JSON -
JsonMessageRowMapper- 将消息从 JSON 反序列化
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
// Enable JSON serialization
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter());
store.setMessageRowMapper(
new JsonMessageRowMapper("com.example"));
return store;
}
字符串参数 ("com.example") 指定了用于反序列化的额外受信任包。这些包会被追加到默认受信任包之后(参见受信任包章节)。出于安全考虑,只有来自受信任包的类才能被反序列化。
数据库模式修改
JSON 序列化需要修改数据库模式。默认的 BLOB/BYTEA 列类型模式无法用于 JSON 序列化。
MESSAGE_CONTENT 列必须更改为能够存储 JSON 的基于文本的类型。
- PostgreSQL
- MySQL
- H2
- Other Databases
对于 PostgreSQL,可以使用 JSONB 类型。
-- JSONB (启用 JSON 查询)
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT TYPE JSONB
USING MESSAGE_CONTENT::text::jsonb;
对于 MySQL,可以使用 JSON 类型。
-- JSON 类型 (启用 JSON 函数)
ALTER TABLE INT_CHANNEL_MESSAGE
MODIFY COLUMN MESSAGE_CONTENT JSON;
对于 H2 数据库,可以使用 CLOB 类型。
ALTER TABLE INT_CHANNEL_MESSAGE
ALTER COLUMN MESSAGE_CONTENT CLOB;
对于任何支持大文本列(CLOB、TEXT 等)的数据库,可以将 MESSAGE_CONTENT 列修改为适当的文本类型。
JSON 序列化的示例模式
以下示例展示了如何为基于 JSON 的消息存储创建专用表。
- PostgreSQL
- MySQL
- H2
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('JSON_MESSAGE_SEQ'),
MESSAGE_CONTENT JSONB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
MESSAGE_CONTENT JSON, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE TABLE JSON_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXT VALUE FOR JSON_MESSAGE_SEQ,
MESSAGE_CONTENT CLOB, -- JSON message content
REGION VARCHAR(100) NOT NULL,
CONSTRAINT JSON_CHANNEL_MESSAGE_PK
PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
JSON 结构
在使用基于Jackson的序列化时,消息会通过Jackson的多态类型处理功能,以如下JSON结构进行存储:
{
"@class": "org.springframework.messaging.support.GenericMessage",
"payload": {
"@class": "com.example.OrderMessage",
"orderId": "ORDER-12345",
"amount": 1299.99
},
"headers": {
"@class": "java.util.HashMap",
"priority": ["java.lang.String", "HIGH"],
"id": ["java.util.UUID", "a1b2c3d4-..."],
"timestamp": ["java.lang.Long", 1234567890]
}
}
@class 属性为多态类型的正确反序列化提供了必要的类型信息。
查询 JSON 内容(可选)
如果使用原生 JSON 列类型(PostgreSQL JSONB 或 MySQL JSON),可以直接查询消息内容。
PostgreSQL JSONB 查询
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT @> '{"payload": {"orderId": "ORDER-12345"}}';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE MESSAGE_CONTENT -> 'headers' @> '{"priority": ["java.lang.String", "HIGH"]}';
MySQL JSON 函数
-- Find messages by payload field
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.payload.orderId') = 'ORDER-12345';
-- Find high-priority messages
SELECT * FROM JSON_CHANNEL_MESSAGE
WHERE JSON_EXTRACT(MESSAGE_CONTENT, '$.headers.priority[1]') = 'HIGH';
如果使用了 TEXT 或 CLOB 列类型,这些 JSON 特定的查询将不可用。不过,通过 Spring Integration 进行存储和检索时,JSON 序列化仍然有效。
受信任的软件包
JacksonMessagingUtils.messagingAwareMapper() 会依据受信任的包列表对所有反序列化的类进行验证,以防止安全漏洞。
默认受信任的包包括:- java.util - java.lang - org.springframework.messaging.support - org.springframework.integration.support - org.springframework.integration.message - org.springframework.integration.store - org.springframework.integration.history - org.springframework.integration.handler
可以在构造函数中指定额外的包,这些包会被追加到此列表中:
// Trust additional packages for custom payload types
new JsonMessageRowMapper("com.example.orders", "com.example.payments")
自定义 JsonObjectMapper
对于高级场景,可以提供自定义的 JsonObjectMapper:
import org.springframework.integration.support.json.JacksonJsonObjectMapper;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.databind.SerializationFeature;
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
ObjectMapper customMapper = JacksonMessagingUtils.messagingAwareMapper("com.example");
customMapper.enable(SerializationFeature.INDENT_OUTPUT);
customMapper.registerModule(new CustomModule());
JacksonJsonObjectMapper jsonObjectMapper = new JacksonJsonObjectMapper(customMapper);
JdbcChannelMessageStore store = new JdbcChannelMessageStore(dataSource);
store.setPreparedStatementSetter(
new JsonChannelMessageStorePreparedStatementSetter(jsonObjectMapper));
store.setMessageRowMapper(
new JsonMessageRowMapper(jsonObjectMapper));
return store;
}
自定义的 JsonObjectMapper 应针对 Spring Integration 消息序列化进行适当配置。建议从 JacksonMessagingUtils.messagingAwareMapper() 开始,并在此基础上进行定制。在 JsonChannelMessageStorePreparedStatementSetter 和 JsonMessageRowMapper 中必须使用相同的配置,以确保序列化和反序列化的一致性。