2.2 数据模型


文档摘要

2.2 数据模型 — Milvus 核心数据结构解析 本节导读:深入理解Milvus的数据模型架构,掌握集合、实体、模式等核心概念,学会设计高效的数据结构和优化存储策略。 学习目标 掌握Milvus数据模型的基本组成(Collection、Entity、Field、Schema) 理解向量和元数据的存储机制 学会设计适合业务场景的数据模型 掌握分区、分片等数据组织策略 了解数据类型和索引配置的最佳实践 核心概念 集合(Collection) Collection是Milvus中最高级别的逻辑数据单元,类似于关系数据库中的表。

2.2 数据模型 — Milvus 核心数据结构解析

本节导读:深入理解Milvus的数据模型架构,掌握集合、实体、模式等核心概念,学会设计高效的数据结构和优化存储策略。

学习目标

  • 掌握Milvus数据模型的基本组成(Collection、Entity、Field、Schema)
  • 理解向量和元数据的存储机制
  • 学会设计适合业务场景的数据模型
  • 掌握分区、分片等数据组织策略
  • 了解数据类型和索引配置的最佳实践

核心概念

集合(Collection)

Collection是Milvus中最高级别的逻辑数据单元,类似于关系数据库中的表。每个Collection包含:

  • Schema定义:描述数据的结构和字段类型
  • 数据实体:实际的数据记录
  • 索引配置:优化查询性能的索引结构
# 创建Collection的示例 from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType # 连接到Milvus connections.connect("default", host="localhost", port="19530") # 定义字段 fields = [ FieldSchema("id", DataType.INT64, is_primary=True), FieldSchema("vector", DataType.FLOAT_VECTOR, dim=128), FieldSchema("name", DataType.VARCHAR, max_length=255), FieldSchema("timestamp", DataType.INT64), FieldSchema("metadata", DataType.JSON) ] # 创建Schema schema = CollectionSchema(fields=fields, description="Sample collection") # 创建Collection collection = Collection("sample_collection", schema)

实体(Entity)

Entity是Collection中的具体数据行,包含向量字段和元数据字段。每个Entity具有以下特征:

  • 唯一标识:主键字段确保数据唯一性
  • 向量字段:用于相似性搜索的高维数据
  • 元数据字段:描述性信息和过滤条件
  • 系统字段:自动生成的创建时间、更新时间等
# 插入实体的示例 data = [ [1, 2, 3, 4, 5], # id [[0.1, 0.2, ...], [0.3, 0.4, ...]], # vector ["doc1", "doc2"], # name [1642697600, 1642784000], # timestamp [{"category": "tech", "tags": ["ai", "ml"]}, {"category": "news"}] # metadata ] # 插入数据 collection.insert(data)

模式(Schema)

Schema定义了Collection的结构,包括字段名称、数据类型、约束等信息。Milvus支持以下字段类型:

数值类型

  • INT8/INT16/INT32/INT64:整数类型
  • FLOAT/DOUBLE:浮点数类型
  • BOOL:布尔类型

向量类型

  • FLOAT_VECTOR:32位浮点向量
  • BINARY_VECTOR:二进制向量(压缩存储)
  • FLOAT16_VECTOR:16位浮点向量(节省内存)

特殊类型

  • VARCHAR:可变长度字符串
  • JSON:JSON格式数据
  • ARRAY:数组类型
# 复杂Schema示例 complex_fields = [ FieldSchema("uuid", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=768), FieldSchema("image_embedding", DataType.FLOAT_VECTOR, dim=512), FieldSchema("text_content", DataType.VARCHAR, max_length=10000), FieldSchema("tags", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR, max_length=50), FieldSchema("price", DataType.FLOAT), FieldSchema("is_active", DataType.BOOL), FieldSchema("attributes", DataType.JSON), FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64) ] complex_schema = CollectionSchema( fields=complex_fields, description="E-commerce product collection with multi-modal data" )

环境准备 / 前置知识

基础依赖

# Python客户端安装 pip install pymilvus # 或者安装特定版本 pip install pymilvus==2.3.7 # 数据验证工具 pip install numpy pandas

系统要求

  • Milvus 2.x 环境(推荐2.3.0+)
  • Python 3.7+
  • 内存:根据数据量配置(推荐8GB+)
  • 存储:SSD硬盘(推荐NVMe)

连接配置

# 基本连接配置 connections.connect( alias="default", host="localhost", port="19530" ) # 连接池配置 connections.connect( alias="cluster", host="milvus-cluster-1", port="19530", pool={"wait_timeout": 30} )

分步实战

步骤1:设计数据模型

在设计数据模型时,需要考虑以下关键因素:

  1. 业务需求分析
  2. 查询模式识别
  3. 性能要求确定
  4. 存储成本优化
# 电商产品数据模型设计 def create_ecommerce_collection(): """电商产品集合设计""" # 根据产品特征定义字段 fields = [ # 主键 FieldSchema("product_id", DataType.VARCHAR, max_length=50, is_primary=True), # 向量字段 - 产品描述语义向量 FieldSchema("description_vector", DataType.FLOAT_VECTOR, dim=768), # 向量字段 - 产品图像向量 FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), # 元数据字段 FieldSchema("name", DataType.VARCHAR, max_length=500), FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("brand", DataType.VARCHAR, max_length=100), FieldSchema("price", DataType.FLOAT), FieldSchema("stock", DataType.INT32), FieldSchema("rating", DataType.FLOAT), # 标签数组 FieldSchema("tags", DataType.ARRAY, max_capacity=20, element_type=DataType.VARCHAR, max_length=50), # 动态属性 FieldSchema("attributes", DataType.JSON), # 时间戳 FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64), # 状态字段 FieldSchema("is_available", DataType.BOOL) ] # 创建Schema schema = CollectionSchema( fields=fields, description="E-commerce product collection with search and filter capabilities" ) # 创建Collection collection = Collection("ecommerce_products", schema) # 配置索引 index_params = { "description_vector": {"index_type": "HNSW", "params": {"M": 16, "ef_construction": 64}}, "image_vector": {"index_type": "HNSW", "params": {"M": 12, "ef_construction": 48}} } # 等待索引构建完成 print("数据模型创建成功") return collection

步骤2:批量数据导入

import json import time from datetime import datetime def batch_import_data(collection, data_generator, batch_size=1000): """批量导入数据""" total_imported = 0 batch_count = 0 for batch in data_generator: try: # 插入数据 collection.insert(batch) total_imported += len(batch) batch_count += 1 # 每批次后刷新 if batch_count % 10 == 0: collection.flush() print(f"已导入 {total_imported} 条记录") except Exception as e: print(f"第 {batch_count} 批插入失败: {e}") continue # 最终刷新 collection.flush() print(f"批量导入完成: 总计 {total_imported} 条记录") return total_imported # 生成示例数据 def generate_product_data(num_products=1000): """生成示例产品数据""" data = [] for i in range(num_products): product = { "product_id": f"prod_{i:06d}", "description_vector": [0.1] * 768, # 实际应该是embedding向量 "image_vector": [0.2] * 512, "name": f"产品 {i+1}", "category": ["电子产品", "服装", "家居"][i % 3], "brand": "品牌A", "price": 99.99 + i * 10, "stock": 100 - (i % 50), "rating": 4.5 + (i % 5) * 0.1, "tags": ["新品", "热销", "推荐"][:i % 3], "attributes": { "color": ["红色", "蓝色", "绿色"][i % 3], "size": ["S", "M", "L"][i % 3], "weight": 0.5 + i * 0.1 }, "created_at": int(time.time()) - i * 86400, "updated_at": int(time.time()), "is_available": True } data.append(product) return data # 批量导入执行 if __name__ == "__main__": # 创建集合 collection = create_ecommerce_collection() # 生成数据 data = generate_product_data(5000) # 批量导入 batch_import_data(collection, data, batch_size=1000)

步骤3:数据查询验证

def validate_data_model(collection): """验证数据模型完整性""" # 统计信息 print(f"集合名称: {collection.name}") print(f"实体数量: {collection.num_entities}") print(f"模式信息:") # 查看Schema schema = collection.schema for field in schema.fields: print(f" - {field.name}: {field.dtype}") if hasattr(field, 'dim'): print(f" 维度: {field.dim}") if hasattr(field, 'max_length'): print(f" 最大长度: {field.max_length}") # 简单查询测试 try: # 获取前5条记录 results = collection.query( expr="is_available == true", output_fields=["name", "category", "price"], limit=5 ) print("查询测试成功:") for result in results: print(f" - {result['name']} ({result['category']}): ¥{result['price']}") except Exception as e: print(f"查询测试失败: {e}") # 执行验证 validate_data_model(collection)

完整示例

文档搜索数据模型

class DocumentSearchCollection: """文档搜索集合管理器""" def __init__(self, name="document_search"): self.name = name self.collection = None def create_collection(self): """创建文档搜索集合""" fields = [ # 主键 FieldSchema("doc_id", DataType.VARCHAR, max_length=36, is_primary=True), # 向量字段 - 标题向量 FieldSchema("title_vector", DataType.FLOAT_VECTOR, dim=384), # 向量字段 - 内容向量 FieldSchema("content_vector", DataType.FLOAT_VECTOR, dim=768), # 向量字段 - 混合向量 FieldSchema("hybrid_vector", DataType.FLOAT_VECTOR, dim=512), # 文本字段 FieldSchema("title", DataType.VARCHAR, max_length=500), FieldSchema("content", DataType.VARCHAR, max_length=50000), FieldSchema("summary", DataType.VARCHAR, max_length=2000), # 分类信息 FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("language", DataType.VARCHAR, max_length=10), FieldSchema("source", DataType.VARCHAR, max_length=200), # 时间和统计信息 FieldSchema("publish_date", DataType.INT64), FieldSchema("word_count", DataType.INT32), FieldSchema("view_count", DataType.INT64), # 标签和关键词 FieldSchema("tags", DataType.ARRAY, max_capacity=50, element_type=DataType.VARCHAR, max_length=50), FieldSchema("keywords", DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=100), # 元数据 FieldSchema("metadata", DataType.JSON), # 状态 FieldSchema("status", DataType.VARCHAR, max_length=20), FieldSchema("is_public", DataType.BOOL), # 时间戳 FieldSchema("created_at", DataType.INT64), FieldSchema("updated_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Document search collection with vector embeddings" ) self.collection = Collection(self.name, schema) print(f"集合 {self.name} 创建成功") return self.collection def insert_document(self, doc_data): """插入单个文档""" vectors = { "title_vector": doc_data.get("title_vector", []), "content_vector": doc_data.get("content_vector", []), "hybrid_vector": doc_data.get("hybrid_vector", []) } data = [ [doc_data["doc_id"]], [vectors["title_vector"]], [vectors["content_vector"]], [vectors["hybrid_vector"]], [doc_data.get("title", "")], [doc_data.get("content", "")], [doc_data.get("summary", "")], [doc_data.get("category", "")], [doc_data.get("language", "zh")], [doc_data.get("source", "")], [doc_data.get("publish_date", int(time.time()))], [doc_data.get("word_count", 0)], [doc_data.get("view_count", 0)], [doc_data.get("tags", [])], [doc_data.get("keywords", [])], [doc_data.get("metadata", {})], [doc_data.get("status", "published")], [doc_data.get("is_public", True)], [doc_data.get("created_at", int(time.time()))], [doc_data.get("updated_at", int(time.time()))] ] self.collection.insert(data) return doc_data["doc_id"] def search_by_content(self, query_vector, limit=10): """基于内容向量搜索""" search_params = { "metric_type": "L2", "params": {"ef": 64} } results = self.collection.search( data=[query_vector], anns_field="content_vector", param=search_params, limit=limit, output_fields=["title", "summary", "category", "tags"] ) return results[0] def search_by_hybrid(self, query_vector, filters=None, limit=10): """混合搜索(向量+过滤)""" search_params = { "metric_type": "IP", # Inner Product "params": {"ef": 32} } expr = filters or "is_public == true and status == 'published'" results = self.collection.search( data=[query_vector], anns_field="hybrid_vector", param=search_params, limit=limit, expr=expr, output_fields=["title", "content", "category", "view_count"] ) return results[0]

多模态数据模型

class MultiModalCollection: """多模态数据集合""" def __init__(self): self.image_collection = None self.text_collection = None def create_image_collection(self): """创建图像集合""" fields = [ FieldSchema("image_id", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), FieldSchema("thumbnail_vector", DataType.FLOAT_VECTOR, dim=128), FieldSchema("file_path", DataType.VARCHAR, max_length=500), FieldSchema("file_size", DataType.INT64), FieldSchema("resolution", DataType.JSON), FieldSchema("format", DataType.VARCHAR, max_length=10), FieldSchema("color_palette", DataType.ARRAY, max_capacity=10, element_type=DataType.VARCHAR, max_length=20), FieldSchema("tags", DataType.ARRAY, max_capacity=50, element_type=DataType.VARCHAR, max_length=50), FieldSchema("description", DataType.VARCHAR, max_length=2000), FieldSchema("uploaded_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Image collection with vector embeddings" ) self.image_collection = Collection("image_collection", schema) return self.image_collection def create_text_collection(self): """创建文本集合""" fields = [ FieldSchema("text_id", DataType.VARCHAR, max_length=36, is_primary=True), FieldSchema("text_vector", DataType.FLOAT_VECTOR, dim=768), FieldSchema("content", DataType.VARCHAR, max_length=10000), FieldSchema("title", DataType.VARCHAR, max_length=500), FieldSchema("author", DataType.VARCHAR, max_length=100), FieldSchema("category", DataType.VARCHAR, max_length=100), FieldSchema("word_count", DataType.INT32), FieldSchema("sentiment", DataType.VARCHAR, max_length=20), FieldSchema("keywords", DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR, max_length=50), FieldSchema("created_at", DataType.INT64) ] schema = CollectionSchema( fields=fields, description="Text collection with semantic embeddings" ) self.text_collection = Collection("text_collection", schema) return self.text_collection def cross_modal_search(self, text_query, image_candidates): """跨模态搜索""" # 1. 文本搜索相关文本 text_results = self.text_collection.search( data=[text_query], anns_field="text_vector", limit=5, output_fields=["title", "content", "keywords"] ) # 2. 基于文本结果的相关图像搜索 image_ids = [res.id for res in text_results[0]] # 3. 图像搜索 image_results = self.image_collection.search( data=image_candidates, anns_field="image_vector", limit=10, output_fields=["file_path", "description", "tags"] ) return { "related_texts": text_results, "related_images": image_results }

常见问题 FAQ

Q1:如何选择合适的向量维度?

A: 向量维度主要取决于预训练模型和业务需求:

  • 语义搜索:推荐使用768维(如BERT、SBERT)
  • 图像特征:推荐使用512-2048维(如ResNet、ViT)
  • 语音特征:推荐使用256-512维(如Wav2Vec2)
  • 业务定制:根据具体场景选择,一般256-1024维较为平衡

选择原则:维度越高语义表达越丰富,但存储和计算成本也越高。建议在性能和精度之间找到平衡点。

Q2:JSON字段有哪些限制和最佳实践?

A: JSON字段的注意事项:

限制:

  • 单个JSON字段大小限制:1MB
  • 嵌套深度限制:5层
  • 查询性能:JSON字段不适合高频过滤查询
  • 索引支持:无法直接对JSON内部字段建立索引

最佳实践:

# 推荐的JSON结构 {"user_id": "123", "preferences": {"theme": "dark", "language": "zh"}, "stats": {"login_count": 50, "last_login": 1642697600}} # 避免的结构(扁平化关键字段更好) {"user_id": "123", "theme": "dark", "language": "zh", "login_count": 50, "last_login": 1642697600}

Q3:如何处理大量数据的导入性能?

A: 大批量数据导入的优化策略:

批量处理:

# 最佳批量大小:1000-5000条/批 def bulk_import(collection, data_generator, batch_size=2000): for batch in data_generator: collection.insert(batch) # 每10批刷新一次 if batch_count % 10 == 0: collection.flush()

异步导入:

# 使用Milvus的批量插入功能 collection.insert(data, async_=True) time.sleep(10) # 等待插入完成

内存优化:

  • 避免在内存中保存大量未插入数据
  • 使用生成器而非列表
  • 分批次处理和刷新

Q4:多模态数据如何统一存储和查询?

A: 多模态数据管理的几种模式:

方案1:分离存储(推荐)

# 图像集合 image_collection = Collection("images", image_schema) # 文本集合 text_collection = Collection("texts", text_schema) # 关系表 relation_collection = Collection("relations", relation_schema)

方案2:统一存储

# 在一个集合中存储多模态数据 fields = [ FieldSchema("id", DataType.VARCHAR, is_primary=True), FieldSchema("image_vector", DataType.FLOAT_VECTOR, dim=512), FieldSchema("text_vector", DataType.FLOAT_VECTOR, dim=768), FieldSchema("image_data", DataType.JSON), FieldSchema("text_data", DataType.JSON), FieldSchema("modality_type", DataType.VARCHAR, max_length=20) ]

Q5:如何处理数据的更新和删除?

A: Milvus的数据更新和删除策略:

更新策略:

# 删除旧数据,插入新数据(Milvus不支持直接更新) collection.delete("id == 'old_id'") collection.insert(new_data) # 使用upsert(如果主键冲突) collection.insert(new_data, upsert=True)

批量删除:

# 按条件删除 collection.delete("status == 'deleted'") # 按ID列表删除 ids = ["id1", "id2", "id3"] collection.delete(f"id in {ids}")

最佳实践与避坑

数据模型设计原则

  1. 范式化设计:避免数据冗余,确保数据一致性
  2. 查询导向:基于查询需求设计字段结构
  3. 性能优化:合理设置索引和分区策略
  4. 可扩展性:预留字段和扩展空间

存储优化策略

  • 向量压缩:根据精度要求选择FLOAT32或FLOAT16
  • 字段类型:选择合适的数据类型(如VARCHAR长度限制)
  • 分区设计:按时间、类别等维度进行分区
  • 索引配置:为常用查询字段建立合适的索引

查询性能优化

  • 向量维度:避免过高的向量维度
  • 过滤条件:尽量使用索引字段进行过滤
  • 批量查询:合并多个查询请求
  • 连接池:合理配置连接池参数

本节小结

通过本节的学习,我们深入理解了Milvus的数据模型架构,掌握了集合、实体、模式等核心概念。关键要点包括:

  1. 数据结构:Collection作为逻辑容器,包含Schema定义和实体数据
  2. 字段类型:支持数值、向量、文本、JSON等多种数据类型
  3. 设计原则:根据业务需求设计合适的数据模型,平衡性能和功能
  4. 实践技巧:批量导入、多模态处理、查询优化等实际应用经验

下一节我们将继续学习2.3「索引技术」,深入了解Milvus如何通过索引技术实现高效的向量搜索。

延伸阅读

关键词:数据模型, Collection, Entity, Schema, 向量数据库, 数据结构, 多模态, 性能优化
难度:进阶
预计阅读:45 分钟


发布者: 作者: 转发
评论区 (0)
U