Apache Cassandra 支持
Spring Integration 提供了通道适配器(从 6.0 版开始)用于对 Apache Cassandra 集群执行数据库操作。它完全基于 Spring Data for Apache Cassandra 项目。
你需要将这个依赖添加到你的项目中:
- Maven
- Gradle
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.4.2"
Cassandra 外发组件
CassandraMessageHandler 是 AbstractReplyProducingMessageHandler 的一个实现,可以工作在单向(默认)和请求-回复模式(producesReply 选项)。它默认是异步的(可以通过 setAsync(false) 来重置),并针对提供的 ReactiveCassandraOperations 执行反应式的 INSERT、UPDATE、DELETE 或 STATEMENT 操作。操作类型可以通过 CassandraMessageHandler.Type 选项进行配置。ingestQuery 将模式设置为 INSERT;而 query 或 statementExpression,或 statementProcessor 将模式设置为 STATEMENT。
以下代码片段演示了此通道适配器或网关的各种配置:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));
    cassandraMessageHandler.setParameterExpressions(params);
    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
如果 CassandraMessageHandler 用作默认异步模式下的网关,则会生成一个 Mono<WriteResult>,它将根据提供的 MessageChannel 实现进行处理。对于真正的反应式处理,建议在输出通道配置中使用 FluxMessageChannel。在同步模式下,将调用 Mono.block() 来获取回复值。
如果执行 INSERT、UPDATE 或 DELETE 操作,则请求消息的有效负载中需要一个实体(标记为 org.springframework.data.cassandra.core.mapping.Table)。如果有效负载是实体列表,则执行相应的批量操作。
ingestQuery 模式期望有效负载以要插入的值矩阵形式存在 — List<List<?>>。例如,如果实体是这样的:
@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {
}
并且通道适配器有以下配置:
@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}
请求消息有效负载必须像这样转换:
List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();
对于更复杂的用例,有效负载可以是 com.datastax.oss.driver.api.core.cql.Statement 的一个实例。建议使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 来构建各种语句以执行针对 Apache Cassandra 的操作。例如,要删除 Book 表中的所有数据,可以向 CassandraMessageHandler 发送带有如下有效负载的消息:QueryBuilder.truncate("book").build()。或者,基于请求消息的逻辑,可以为 CassandraMessageHandler 提供一个 statementExpression 或 statementProcessor,以便根据该消息构建一个 Statement。为了方便起见,com.datastax.oss.driver.api.querybuilder 已作为 import 注册到 SpEL 评估上下文中,因此目标表达式可以像这样简单:
queryBuilder.truncate("book").build()
statement-expression="T(QueryBuilder).selectFrom("book").all()"
setParameterExpressions(Map<String, Expression> parameterExpressions) 表示可绑定的命名查询参数,仅与 setQuery(String query) 选项一起使用。请参阅上述提到的 Java 和 XML 示例。