2.2 数据模型 — Milvus 核心数据结构解析 本节导读:深入理解Milvus的数据模型架构,掌握集合、实体、模式等核心概念,学会设计高效的数据结构和优化存储策略。 学习目标 掌握Milvus数据模型的基本组成(Collection、Entity、Field、Schema) 理解向量和元数据的存储机制 学会设计适合业务场景的数据模型 掌握分区、分片等数据组织策略 了解数据类型和索引配置的最佳实践 核心概念 集合(Collection) Collection是Milvus中最高级别的逻辑数据单元,类似于关系数据库中的表。
本节导读:深入理解Milvus的数据模型架构,掌握集合、实体、模式等核心概念,学会设计高效的数据结构和优化存储策略。
Collection是Milvus中最高级别的逻辑数据单元,类似于关系数据库中的表。每个Collection包含:
# 创建Collection的示例 from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType # 连接到Milvus connections.connect("default", host="localhost", port="19530") # 定义字段 fields = [ FieldSchema("id", DataType.INT64, is_primary=True), FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128), FieldSchema("name", DataType.VARCHAR, max_length=255), FieldSchema("timestamp", DataType.INT64), FieldSchema("metadata", DataType.JSON) ] # 创建Schema schema = CollectionSchema(fields=fields, description="Sample collection") # 创建Collection collection = Collection("sample_collection", schema)
Entity是Collection中的具体数据行,包含向量字段和元数据字段。每个Entity具有以下特征:
# 插入实体的示例 data = [ [1, 2, 3, 4, 5], # id [[0.1, 0.2, ...], [0.3, 0.4, ...]], # vector ["doc1", "doc2"], # name [1642697600, 1642784000], # timestamp [{"category": "tech", "tags": ["ai", "ml"]}, {"category": "news"}] # metadata ] # 插入数据 collection.insert(data)
Schema定义了Collection的结构,包括字段名称、数据类型、约束等信息。Milvus支持以下字段类型:
# 复杂Schema示例 complex_fields = [ FieldSchema("uuid", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768), FieldSchema("image_embedding", DataType.FLOAT_VECTOR, dim=512), FieldSchema("text_content", DataType.VARCHAR, max_length=10000), FieldSchema("tags", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR, max_length=50), FieldSchema("price", DataType.FLOAT), FieldSchema("is_active", DataType.BOOL), FieldSchema("attributes", DataType.JSON), FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64) ] complex_schema = CollectionSchema( fields=complex_fields, description="E-commerce product collection with multi-modal data" )
# Python客户端安装 pip install pymilvus # 或者安装特定版本 pip install pymilvus==2.3.7 # 数据验证工具 pip install numpy pandas
# 基本连接配置 connections.connect( alias="default", host="localhost", port="19530" ) # 连接池配置 connections.connect( alias="cluster", host="milvus-cluster-1", port="19530", pool={"wait_timeout": 30} )
在设计数据模型时,需要考虑以下关键因素:
# 电商产品数据模型设计 def create_ecommerce_collection(): """电商产品集合设计""" # 根据产品特征定义字段 fields = [ # 主键 FieldSchema("product_id", DataType.VARCHAR, max_length=50, is_primary=True), # 向量字段 - 产品描述语义向量 FieldSchema("description_vector", DataType.FLOAT_VECTOR, dim=768), # 向量字段 - 产品图像向量 FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), # 元数据字段 FieldSchema("name", DataType.VARCHAR, max_length=500), FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("brand", DataType.VARCHAR, max_length=100), FieldSchema("price", DataType.FLOAT), FieldSchema("stock", DataType.INT32), FieldSchema("rating", DataType.FLOAT), # 标签数组 FieldSchema("tags", DataType.ARRAY, max_capacity=20, element_type=DataType.VARCHAR, max_length=50), # 动态属性 FieldSchema("attributes", DataType.JSON), # 时间戳 FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64), # 状态字段 FieldSchema("is_available", DataType.BOOL) ] # 创建Schema schema = CollectionSchema( fields=fields, description="E-commerce product collection with search and filter capabilities" ) # 创建Collection collection = Collection("ecommerce_products", schema) # 配置索引 index_params = { "description_vector": {"index_type": "HNSW", "params": {"M": 16, "ef_construction": 64}}, "image_vector": {"index_type": "HNSW", "params": {"M": 12, "ef_construction": 48}} } # 等待索引构建完成 print("数据模型创建成功") return collection
import json import time from datetime import datetime def batch_import_data(collection, data_generator, batch_size=1000): """批量导入数据""" total_imported = 0 batch_count = 0 for batch in data_generator: try: # 插入数据 collection.insert(batch) total_imported += len(batch) batch_count += 1 # 每批次后刷新 if batch_count % 10 == 0: collection.flush() print(f"已导入 {total_imported} 条记录") except Exception as e: print(f"第 {batch_count} 批插入失败: {e}") continue # 最终刷新 collection.flush() print(f"批量导入完成: 总计 {total_imported} 条记录") return total_imported # 生成示例数据 def generate_product_data(num_products=1000): """生成示例产品数据""" data = [] for i in range(num_products): product = { "product_id": f"prod_{i:06d}", "description_vector": [0.1] * 768, # 实际应该是embedding向量 "image_vector": [0.2] * 512, "name": f"产品 {i+1}", "category": ["电子产品", "服装", "家居"][i % 3], "brand": "品牌A", "price": 99.99 + i * 10, "stock": 100 - (i % 50), "rating": 4.5 + (i % 5) * 0.1, "tags": ["新品", "热销", "推荐"][:i % 3], "attributes": { "color": ["红色", "蓝色", "绿色"][i % 3], "size": ["S", "M", "L"][i % 3], "weight": 0.5 + i * 0.1 }, "created_at": int(time.time()) - i * 86400, "updated_at": int(time.time()), "is_available": True } data.append(product) return data # 批量导入执行 if __name__ == "__main__": # 创建集合 collection = create_ecommerce_collection() # 生成数据 data = generate_product_data(5000) # 批量导入 batch_import_data(collection, data, batch_size=1000)
def validate_data_model(collection): """验证数据模型完整性""" # 统计信息 print(f"集合名称: {collection.name}") print(f"实体数量: {collection.num_entities}") print(f"模式信息:") # 查看Schema schema = collection.schema for field in schema.fields: print(f" - {field.name}: {field.dtype}") if hasattr(field, 'dim'): print(f" 维度: {field.dim}") if hasattr(field, 'max_length'): print(f" 最大长度: {field.max_length}") # 简单查询测试 try: # 获取前5条记录 results = collection.query( expr="is_available == true", output_fields=["name", "category", "price"], limit=5 ) print("查询测试成功:") for result in results: print(f" - {result['name']} ({result['category']}): ¥{result['price']}") except Exception as e: print(f"查询测试失败: {e}") # 执行验证 validate_data_model(collection)
class DocumentSearchCollection: """文档搜索集合管理器""" def __init__(self, name="document_search"): self.name = name self.collection = None def create_collection(self): """创建文档搜索集合""" fields = [ # 主键 FieldSchema("doc_id", DataType.VARCHAR, max_length=36, is_primary=True), # 向量字段 - 标题向量 FieldSchema("title_vector", DataType.FLOAT_VECTOR, dim=384), # 向量字段 - 内容向量 FieldSchema("content_vector", DataType.FLOAT_VECTOR, dim=768), # 向量字段 - 混合向量 FieldSchema("hybrid_vector", DataType.FLOAT_VECTOR, dim=512), # 文本字段 FieldSchema("title", DataType.VARCHAR, max_length=500), FieldSchema("content", DataType.VARCHAR, max_length=50000), FieldSchema("summary", DataType.VARCHAR, max_length=2000), # 分类信息 FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("language", DataType.VARCHAR, max_length=10), FieldSchema("source", DataType.VARCHAR, max_length=200), # 时间和统计信息 FieldSchema("publish_date", DataType.INT64), FieldSchema("word_count", DataType.INT32), FieldSchema("view_count", DataType.INT64), # 标签和关键词 FieldSchema("tags", DataType.ARRAY, max_capacity=50, element_type=DataType.VARCHAR, max_length=50), FieldSchema("keywords", DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=100), # 元数据 FieldSchema("metadata", DataType.JSON), # 状态 FieldSchema("status", DataType.VARCHAR, max_length=20), FieldSchema("is_public", DataType.BOOL), # 时间戳 FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Document search collection with vector embeddings" ) self.collection = Collection(self.name, schema) print(f"集合 {self.name} 创建成功") return self.collection def insert_document(self, doc_data): """插入单个文档""" vectors = { "title_vector": doc_data.get("title_vector", []), "content_vector": doc_data.get("content_vector", []), "hybrid_vector": doc_data.get("hybrid_vector", []) } data = [ [doc_data["doc_id"]], [vectors["title_vector"]], [vectors["content_vector"]], [vectors["hybrid_vector"]], [doc_data.get("title", "")], [doc_data.get("content", "")], [doc_data.get("summary", "")], [doc_data.get("category", "")], [doc_data.get("language", "zh")], [doc_data.get("source", "")], [doc_data.get("publish_date", int(time.time()))], [doc_data.get("word_count", 0)], [doc_data.get("view_count", 0)], [doc_data.get("tags", [])], [doc_data.get("keywords", [])], [doc_data.get("metadata", {})], [doc_data.get("status", "published")], [doc_data.get("is_public", True)], [doc_data.get("created_at", int(time.time()))], [doc_data.get("updated_at", int(time.time()))] ] self.collection.insert(data) return doc_data["doc_id"] def search_by_content(self, query_vector, limit=10): """基于内容向量搜索""" search_params = { "metric_type": "L2", "params": {"ef": 64} } results = self.collection.search( data=[query_vector], anns_field="content_vector", param=search_params, limit=limit, output_fields=["title", "summary", "category", "tags"] ) return results[0] def search_by_hybrid(self, query_vector, filters=None, limit=10): """混合搜索(向量+过滤)""" search_params = { "metric_type": "IP", # Inner Product "params": {"ef": 32} } expr = filters or "is_public == true and status == 'published'" results = self.collection.search( data=[query_vector], anns_field="hybrid_vector", param=search_params, limit=limit, expr=expr, output_fields=["title", "content", "category", "view_count"] ) return results[0]
class MultiModalCollection: """多模态数据集合""" def __init__(self): self.image_collection = None self.text_collection = None def create_image_collection(self): """创建图像集合""" fields = [ FieldSchema("image_id", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), FieldSchema("thumbnail_vector", DataType.FLOAT_VECTOR, dim=128), FieldSchema("file_path", DataType.VARCHAR, max_length=500), FieldSchema("file_size", DataType.INT64), FieldSchema("resolution", DataType.JSON), FieldSchema("format", DataType.VARCHAR, max_length=10), FieldSchema("color_palette", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR, max_length=20), FieldSchema("tags", DataType.ARRAY, max_capacity=50, element_type=DataType.VARCHAR, max_length=50), FieldSchema("description", DataType.VARCHAR, max_length=2000), FieldSchema("uploaded_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Image collection with vector embeddings" ) self.image_collection = Collection("image_collection", schema) return self.image_collection def create_text_collection(self): """创建文本集合""" fields = [ FieldSchema("text_id", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("text_vector", DataType.FLOAT_VECTOR, dim=768), FieldSchema("content", DataType.VARCHAR, max_length=10000), FieldSchema("title", DataType.VARCHAR, max_length=500), FieldSchema("author", DataType.VARCHAR, max_length=100), FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("word_count", DataType.INT32), FieldSchema("sentiment", DataType.VARCHAR, max_length=20), FieldSchema("keywords", DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=50), FieldSchema("created_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Text collection with semantic embeddings" ) self.text_collection = Collection("text_collection", schema) return self.text_collection def cross_modal_search(self, text_query, image_candidates): """跨模态搜索""" # 1. 文本搜索相关文本 text_results = self.text_collection.search( data=[text_query], anns_field="text_vector", limit=5, output_fields=["title", "content", "keywords"] ) # 2. 基于文本结果的相关图像搜索 image_ids = [res.id for res in text_results[0]] # 3. 图像搜索 image_results = self.image_collection.search( data=image_candidates, anns_field="image_vector", limit=10, output_fields=["file_path", "description", "tags"] ) return { "related_texts": text_results, "related_images": image_results }
A: 向量维度主要取决于预训练模型和业务需求:
选择原则:维度越高语义表达越丰富,但存储和计算成本也越高。建议在性能和精度之间找到平衡点。
A: JSON字段的注意事项:
限制:
最佳实践:
# 推荐的JSON结构 {"user_id": "123", "preferences": {"theme": "dark", "language": "zh"}, "stats": {"login_count": 50, "last_login": 1642697600}} # 避免的结构(扁平化关键字段更好) {"user_id": "123", "theme": "dark", "language": "zh", "login_count": 50, "last_login": 1642697600}
A: 大批量数据导入的优化策略:
批量处理:
# 最佳批量大小:1000-5000条/批 def bulk_import(collection, data_generator, batch_size=2000): for batch in data_generator: collection.insert(batch) # 每10批刷新一次 if batch_count % 10 == 0: collection.flush()
异步导入:
# 使用Milvus的批量插入功能 collection.insert(data, async_=True) time.sleep(10) # 等待插入完成
内存优化:
A: 多模态数据管理的几种模式:
方案1:分离存储(推荐)
# 图像集合 image_collection = Collection("images", image_schema) # 文本集合 text_collection = Collection("texts", text_schema) # 关系表 relation_collection = Collection("relations", relation_schema)
方案2:统一存储
# 在一个集合中存储多模态数据 fields = [ FieldSchema("id", DataType.VARCHAR, is_primary=True), FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), FieldSchema("text_vector", DataType.FLOAT_VECTOR, dim=768), FieldSchema("image_data", DataType.JSON), FieldSchema("text_data", DataType.JSON), FieldSchema("modality_type", DataType.VARCHAR, max_length=20) ]
A: Milvus的数据更新和删除策略:
更新策略:
# 删除旧数据,插入新数据(Milvus不支持直接更新) collection.delete("id == 'old_id'") collection.insert(new_data) # 使用upsert(如果主键冲突) collection.insert(new_data, upsert=True)
批量删除:
# 按条件删除 collection.delete("status == 'deleted'") # 按ID列表删除 ids = ["id1", "id2", "id3"] collection.delete(f"id in {ids}")
通过本节的学习,我们深入理解了Milvus的数据模型架构,掌握了集合、实体、模式等核心概念。关键要点包括:
下一节我们将继续学习2.3「索引技术」,深入了解Milvus如何通过索引技术实现高效的向量搜索。
关键词:数据模型, Collection, Entity, Schema, 向量数据库, 数据结构, 多模态, 性能优化
难度:进阶
预计阅读:45 分钟