Apache Cassandra 向量存储
本节将引导您设置 CassandraVectorStore 来存储文档嵌入向量并执行相似性搜索。
什么是 Apache Cassandra?
Apache Cassandra® 是一个真正的开源分布式数据库,以其线性可扩展性、久经考验的容错能力和低延迟而闻名,使其成为关键任务事务数据的理想平台。
其向量相似性搜索(VSS)基于JVector库实现,确保了顶级的性能表现与相关性。
在 Apache Cassandra 中进行向量搜索的操作非常简单:
SELECT content FROM table ORDER BY content_vector ANN OF query_embedding;
关于此功能的更多文档可在此处阅读:这里。
这款Spring AI向量存储库旨在同时支持全新的RAG应用,并能够无缝集成到现有数据和表格之上。
该存储库也可用于现有数据库中的非RAG应用场景,例如语义搜索、地理邻近搜索等。
存储库会根据其配置自动创建或增强所需的架构。如果您不希望修改架构,请使用 initializeSchema 配置存储库。
在使用 spring-boot-autoconfigure 时,根据 Spring Boot 标准,initializeSchema 默认值为 false,你必须通过在 application.properties 文件中设置 …initialize-schema=true 来选择启用模式创建/修改。
什么是 JVector?
JVector 是一个纯 Java 嵌入式向量搜索引擎。
它与其他HNSW向量相似性搜索实现的不同之处在于:
-
算法快。JVector 采用了受 DiskANN 及相关研究启发的前沿图算法,实现了高召回率和低延迟。
-
实现快。JVector 利用 Panama SIMD API 加速索引构建和查询。
-
内存高效。JVector 使用乘积量化技术压缩向量,使其在搜索期间能常驻内存。
-
磁盘感知。JVector 的磁盘布局设计旨在查询时执行最少的必要 IOPS。
-
并发处理。索引构建至少可线性扩展到 32 个线程。线程数加倍,构建时间减半。
-
增量式。边构建索引边查询。向量添加后,即可在搜索结果中找到,无延迟。
-
易于集成。API 专为生产环境中的用户设计,便于集成。
先决条件
-
一个用于计算文档嵌入的
EmbeddingModel实例。这通常被配置为 Spring Bean。有以下几种选项: -
一个 Apache Cassandra 实例,版本需为 5.0-beta1 或更高
依赖关系
Spring AI 的自动配置和 starter 模块的 artifact 名称发生了重大变化。更多信息请参阅升级说明。
对于依赖管理,我们建议使用 Spring AI BOM,具体说明请参见依赖管理章节。
将这些依赖项添加到你的项目中:
- 仅针对 Cassandra 向量存储:
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-cassandra-store</artifactId>
</dependency>
- 或者,如果你需要一个 RAG 应用程序所需的一切(使用默认的 ONNX 嵌入模型):
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-vector-store-cassandra</artifactId>
</dependency>
配置属性
你可以在 Spring Boot 配置中使用以下属性来定制 Apache Cassandra 向量存储。
| 属性 | 默认值 |
|---|---|
spring.ai.vectorstore.cassandra.keyspace | springframework |
spring.ai.vectorstore.cassandra.table | ai_vector_store |
spring.ai.vectorstore.cassandra.initialize-schema | false |
spring.ai.vectorstore.cassandra.index-name | |
spring.ai.vectorstore.cassandra.content-column-name | content |
spring.ai.vectorstore.cassandra.embedding-column-name | embedding |
spring.ai.vectorstore.cassandra.fixed-thread-pool-executor-size | 16 |
用法
基本用法
创建一个CassandraVectorStore实例作为Spring Bean:
@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
return CassandraVectorStore.builder(embeddingModel)
.session(session)
.keyspace("my_keyspace")
.table("my_vectors")
.build();
}
获取向量存储实例后,即可添加文档并执行搜索:
// Add documents
vectorStore.add(List.of(
new Document("1", "content1", Map.of("key1", "value1")),
new Document("2", "content2", Map.of("key2", "value2"))
));
// Search with filters
List<Document> results = vectorStore.similaritySearch(
SearchRequest.query("search text")
.withTopK(5)
.withSimilarityThreshold(0.7f)
.withFilterExpression("metadata.key1 == 'value1'")
);
高级配置
对于更复杂的使用场景,您可以在Spring Bean中配置其他设置:
@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
return CassandraVectorStore.builder(embeddingModel)
.session(session)
.keyspace("my_keyspace")
.table("my_vectors")
// Configure primary keys
.partitionKeys(List.of(
new SchemaColumn("id", DataTypes.TEXT),
new SchemaColumn("category", DataTypes.TEXT)
))
.clusteringKeys(List.of(
new SchemaColumn("timestamp", DataTypes.TIMESTAMP)
))
// Add metadata columns with optional indexing
.addMetadataColumns(
new SchemaColumn("category", DataTypes.TEXT, SchemaColumnTags.INDEXED),
new SchemaColumn("score", DataTypes.DOUBLE)
)
// Customize column names
.contentColumnName("text")
.embeddingColumnName("vector")
// Performance tuning
.fixedThreadPoolExecutorSize(32)
// Schema management
.initializeSchema(true)
// Custom batching strategy
.batchingStrategy(new TokenCountBatchingStrategy())
.build();
}
连接配置
有两种配置连接到Cassandra的方式:
- 使用注入的 CqlSession(推荐):
@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
return CassandraVectorStore.builder(embeddingModel)
.session(session)
.keyspace("my_keyspace")
.table("my_vectors")
.build();
}
- 在构建器中直接使用连接详情:
@Bean
public VectorStore vectorStore(EmbeddingModel embeddingModel) {
return CassandraVectorStore.builder(embeddingModel)
.contactPoint(new InetSocketAddress("localhost", 9042))
.localDatacenter("datacenter1")
.keyspace("my_keyspace")
.build();
}
元数据筛选
你可以利用CassandraVectorStore的通用、可移植元数据过滤器。要使元数据列可搜索,这些列必须是主键或经过SAI索引。若要让非主键列建立索引,请使用SchemaColumnTags.INDEXED配置元数据列。
例如,您可以使用文本表达式语言:
vectorStore.similaritySearch(
SearchRequest.builder().query("The World")
.topK(5)
.filterExpression("country in ['UK', 'NL'] && year >= 2020").build());
或通过表达式DSL以编程方式实现:
Filter.Expression f = new FilterExpressionBuilder()
.and(
f.in("country", "UK", "NL"),
f.gte("year", 2020)
).build();
vectorStore.similaritySearch(
SearchRequest.builder().query("The World")
.topK(5)
.filterExpression(f).build());
便携式过滤器表达式会自动转换为 CQL 查询。
进阶示例:基于维基百科数据集的向量存储
以下示例演示了如何在现有模式中使用存储。此处我们采用 github.com/datastax-labs/colbert-wikipedia-data 项目中的模式,该项目附带完整的维基百科数据集,并已为您完成向量化处理。
首先,在 Cassandra 数据库中创建架构:
wget https://s.apache.org/colbert-wikipedia-schema-cql -O colbert-wikipedia-schema.cql
cqlsh -f colbert-wikipedia-schema.cql
然后使用构建器模式来配置存储:
@Bean
public VectorStore vectorStore(CqlSession session, EmbeddingModel embeddingModel) {
List<SchemaColumn> partitionColumns = List.of(
new SchemaColumn("wiki", DataTypes.TEXT),
new SchemaColumn("language", DataTypes.TEXT),
new SchemaColumn("title", DataTypes.TEXT)
);
List<SchemaColumn> clusteringColumns = List.of(
new SchemaColumn("chunk_no", DataTypes.INT),
new SchemaColumn("bert_embedding_no", DataTypes.INT)
);
List<SchemaColumn> extraColumns = List.of(
new SchemaColumn("revision", DataTypes.INT),
new SchemaColumn("id", DataTypes.INT)
);
return CassandraVectorStore.builder()
.session(session)
.embeddingModel(embeddingModel)
.keyspace("wikidata")
.table("articles")
.partitionKeys(partitionColumns)
.clusteringKeys(clusteringColumns)
.contentColumnName("body")
.embeddingColumnName("all_minilm_l6_v2_embedding")
.indexName("all_minilm_l6_v2_ann")
.initializeSchema(false)
.addMetadataColumns(extraColumns)
.primaryKeyTranslator((List<Object> primaryKeys) -> {
if (primaryKeys.isEmpty()) {
return "test§¶0";
}
return String.format("%s§¶%s", primaryKeys.get(2), primaryKeys.get(3));
})
.documentIdTranslator((id) -> {
String[] parts = id.split("§¶");
String title = parts[0];
int chunk_no = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
return List.of("simplewiki", "en", title, chunk_no, 0);
})
.build();
}
@Bean
public EmbeddingModel embeddingModel() {
// default is ONNX all-MiniLM-L6-v2 which is what we want
return new TransformersEmbeddingModel();
}
加载完整的维基百科数据集
要加载完整的维基百科数据集:
-
从 s.apache.org/simplewiki-sstable-tar 下载
simplewiki-sstable.tar(这需要一段时间,文件大小为数十 GB) -
加载数据:
tar -xf simplewiki-sstable.tar -C ${CASSANDRA_DATA}/data/wikidata/articles-*/
nodetool import wikidata articles ${CASSANDRA_DATA}/data/wikidata/articles-*/
-
如果表中已有现有数据,在执行
tar命令时,请检查 tarball 中的文件不会覆盖现有的 sstables。 -
nodetool import的替代方法是直接重启 Cassandra。 -
如果索引存在任何故障,它们将自动重建。
访问 Native Client
Cassandra Vector Store 实现通过 getNativeClient() 方法提供了对底层原生 Cassandra 客户端(CqlSession)的访问:
CassandraVectorStore vectorStore = context.getBean(CassandraVectorStore.class);
Optional<CqlSession> nativeClient = vectorStore.getNativeClient();
if (nativeClient.isPresent()) {
CqlSession session = nativeClient.get();
// Use the native client for Cassandra-specific operations
}
原生客户端让您能够访问那些可能未通过VectorStore接口公开的Cassandra特定功能和操作。