跳到主要内容
版本:7.0.2

Apache Cassandra 支持

DeepSeek V3 中英对照 Apache Cassandra Support

Spring Integration 提供了通道适配器(从版本 6.0 开始),用于对 Apache Cassandra 集群执行数据库操作。它完全基于 Spring Data for Apache Cassandra 项目。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>7.0.2</version>
</dependency>

Cassandra 出站组件

CassandraMessageHandler 是一个 AbstractReplyProducingMessageHandler 实现,可以在单向(默认)和请求-应答模式(producesReply 选项)下工作。它默认是异步的(可通过 setAsync(false) 重置),并针对提供的 ReactiveCassandraOperations 执行响应式的 INSERTUPDATEDELETESTATEMENT 操作。操作类型可以通过 CassandraMessageHandler.Type 选项进行配置。ingestQuery 将模式设置为 INSERTquerystatementExpressionstatementProcessor 将模式设置为 STATEMENT

以下代码片段展示了此通道适配器或网关的各种配置:

@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}

如果 CassandraMessageHandler 在默认异步模式下作为网关使用,会产生一个 Mono<WriteResult>,该结果会根据提供的 MessageChannel 实现进行处理。对于真正的响应式处理,建议在输出通道配置中使用 FluxMessageChannel。在同步模式下,会调用 Mono.block() 来获取回复值。

如果执行 INSERTUPDATEDELETE 操作,请求消息负载中应包含一个实体(标记为 org.springframework.data.cassandra.core.mapping.Table)。如果负载是实体列表,则将执行相应的批量操作。

ingestQuery 模式期望有效载荷以要插入的值矩阵形式存在 - List<List<?>>。例如,如果实体如下所示:

@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {

}

而通道适配器具有以下配置:

@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}

请求消息的有效载荷必须像这样进行转换:

List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();

对于更复杂的用例,有效载荷可以是 com.datastax.oss.driver.api.core.cql.Statement 的实例。推荐使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 来构建各种语句以在 Apache Cassandra 上执行。例如,要从 Book 表中删除所有数据,可以向 CassandraMessageHandler 发送一个包含如下有效载荷的消息:QueryBuilder.truncate("book").build()。或者,对于基于请求消息的逻辑,可以为 CassandraMessageHandler 提供 statementExpressionstatementProcessor,以基于该消息构建 Statement。为了方便起见,com.datastax.oss.driver.api.querybuilder 被注册为 SpEL 评估上下文中的 import,因此目标表达式可以像这样简单:

statement-expression="T(QueryBuilder).selectFrom("book").all()"

setParameterExpressions(Map<String, Expression> parameterExpressions) 表示可绑定的命名查询参数,仅在与 setQuery(String query) 选项一起使用时生效。请参考上文提到的 Java 和 XML 示例。