4.1 向量基础


文档摘要

4.1 向量基础 — LightRAG 向量模块 本节导读:掌握向量嵌入的基本概念和在LightRAG中的核心作用,建立向量计算的基础能力 学习目标 理解向量嵌入的基本概念和意义 掌握LightRAG中向量模块的架构 了解常用的向量嵌入模型 能够实现基本的向量计算和相似度比较 掌握向量化处理的基本流程 核心概念 什么是向量嵌入 向量嵌入(Vector Embedding)是将离散的文本信息映射到连续向量空间的技术。在LightRAG中,向量模块负责将文本转换为数值向量,为后续的语义检索提供基础。

4.1 向量基础 — LightRAG 向量模块

本节导读:掌握向量嵌入的基本概念和在LightRAG中的核心作用,建立向量计算的基础能力

学习目标

  • 理解向量嵌入的基本概念和意义
  • 掌握LightRAG中向量模块的架构
  • 了解常用的向量嵌入模型
  • 能够实现基本的向量计算和相似度比较
  • 掌握向量化处理的基本流程

核心概念

什么是向量嵌入

向量嵌入(Vector Embedding)是将离散的文本信息映射到连续向量空间的技术。在LightRAG中,向量模块负责将文本转换为数值向量,为后续的语义检索提供基础。

向量嵌入的特点

  1. 语义保持:语义相近的文本在向量空间中距离相近
  2. 维度压缩:将高维文本信息压缩为低维稠密向量
  3. 数值化表示:便于计算机进行数学运算和比较
  4. 关系保留:文本间的语义关系在向量中得到体现

向量在LightRAG中的作用

文本输入 → 向量嵌入 → 向量检索 → 结果输出 ↓ ↓ ↓ ↓ 语义理解 量化表示 相似度匹配 答案生成

向量嵌入模型分类

模型类型 维度 特点 适用场景
BERT 768 双向编码,上下文理解 深度语义理解
Sentence-BERT 384-768 句子级嵌入 句子相似度
Word2Vec 300 词级嵌入 词汇语义
GloVe 300-300 全局词向量 词汇分析

环境准备 / 前置知识

技术栈要求

  • Python 3.8+:主要编程语言
  • PyTorch:深度学习框架
  • Transformers:预训练模型库
  • sentence-transformers:句子嵌入库
  • numpy:数值计算库

依赖安装

# 基础依赖 pip install torch transformers sentence-transformers numpy # 中文支持 pip install jieba paddlepaddle # 可视化工具 pip install matplotlib seaborn

前置知识

  • Python编程基础
  • 深度学习基本概念
  • 自然语言处理基础
  • 向量空间基础
  • 线性代数基础

分步实战

步骤 1:基础嵌入模型实现

import torch import numpy as np from typing import List, Tuple from sentence_transformers import SentenceTransformer class VectorEmbedding: """向量嵌入基础类""" def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): """ 初始化向量嵌入模型 Args: model_name: 模型名称 """ self.model_name = model_name self.model = None self.device = "cuda" if torch.cuda.is_available() else "cpu" self._load_model() def _load_model(self): """加载嵌入模型""" print(f"加载模型: {self.model_name}") print(f"使用设备: {self.device}") try: self.model = SentenceTransformer(self.model_name) self.model.to(self.device) print("模型加载成功") except Exception as e: print(f"模型加载失败: {e}") raise def encode_texts(self, texts: List[str]) -> np.ndarray: """ 编码文本为向量 Args: texts: 文本列表 Returns: 文本向量数组 """ if not texts: return np.array([]) print(f"编码 {len(texts)} 个文本...") try: # 批量编码文本 embeddings = self.model.encode( texts, batch_size=32, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True # 归一化以便计算余弦相似度 ) print(f"编码完成,向量维度: {embeddings.shape[1]}") return embeddings except Exception as e: print(f"编码失败: {e}") raise def get_embedding_dimension(self) -> int: """获取嵌入维度""" return self.model.get_sentence_embedding_dimension() def similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float: """ 计算两个向量的余弦相似度 Args: vec1: 第一个向量 vec2: 第二个向量 Returns: 相似度分数 (0-1) """ if self.model.__class__.__name__ == 'SentenceTransformer': # 如果模型已经归一化,直接使用点积 return np.dot(vec1, vec2) else: # 否则计算余弦相似度 norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return np.dot(vec1, vec2) / (norm1 * norm2)

步骤 2:文本预处理和分段

class TextPreprocessor: """文本预处理器""" def __init__(self, max_length: int = 512): self.max_length = max_length def preprocess_texts(self, texts: List[str]) -> List[str]: """ 预处理文本列表 Args: texts: 原始文本列表 Returns: 预处理后的文本列表 """ processed = [] for text in texts: if not text or not isinstance(text, str): processed.append("") continue # 基本清洗 cleaned = self._clean_text(text) # 长度控制 if len(cleaned) > self.max_length: cleaned = cleaned[:self.max_length] processed.append(cleaned) return processed def _clean_text(self, text: str) -> str: """基本文本清洗""" # 去除首尾空白 text = text.strip() # 去除多余空白字符 text = ' '.join(text.split()) return text def split_long_text(self, text: str, max_length: int = 512) -> List[str]: """ 分割长文本为适当长度 Args: text: 长文本 max_length: 最大长度 Returns: 分割后的文本片段 """ if len(text) <= max_length: return [text] segments = [] sentences = text.split('。') current_segment = "" for sentence in sentences: if len(current_segment) + len(sentence) + 1 <= max_length: current_segment += sentence + "。" else: if current_segment: segments.append(current_segment.strip()) current_segment = sentence + "。" if current_segment: segments.append(current_segment.strip()) return segments

步骤 3:向量相似度计算

class SimilarityCalculator: """相似度计算器""" def __init__(self, embedding_model: VectorEmbedding): self.embedding_model = embedding_model def compute_similarity_matrix(self, texts: List[str]) -> np.ndarray: """ 计算文本相似度矩阵 Args: texts: 文本列表 Returns: 相似度矩阵 """ # 编码文本 embeddings = self.embedding_model.encode_texts(texts) # 计算相似度矩阵 n = len(texts) similarity_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): if i == j: similarity_matrix[i, j] = 1.0 else: similarity_matrix[i, j] = self.embedding_model.similarity( embeddings[i], embeddings[j] ) return similarity_matrix def find_most_similar(self, query: str, candidates: List[str], top_k: int = 5) -> List[Tuple[int, float]]: """ 找到最相似的候选文本 Args: query: 查询文本 candidates: 候选文本列表 top_k: 返回前k个 Returns: (索引, 相似度分数) 的列表 """ # 编码查询和候选文本 all_texts = [query] + candidates embeddings = self.embedding_model.encode_texts(all_texts) query_embedding = embeddings[0] candidate_embeddings = embeddings[1:] # 计算相似度 similarities = [] for i, cand_embedding in enumerate(candidate_embeddings): similarity = self.embedding_model.similarity(query_embedding, cand_embedding) similarities.append((i, similarity)) # 排序并返回top_k similarities.sort(key=lambda x: x[1], reverse=True) return similarities[:top_k]

步骤 4:LightRAG向量模块集成

class LightRAGVectorModule: """LightRAG向量模块""" def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): """ 初始化LightRAG向量模块 Args: model_name: 嵌入模型名称 """ self.embedding_model = VectorEmbedding(model_name) self.preprocessor = TextPreprocessor() self.similarity_calculator = SimilarityCalculator(self.embedding_model) # 缓存机制 self.text_cache = {} self.cache_limit = 10000 def process_text(self, text: str, use_cache: bool = True) -> np.ndarray: """ 处理单个文本 Args: text: 输入文本 use_cache: 是否使用缓存 Returns: 文本向量 """ # 使用缓存 if use_cache and text in self.text_cache: return self.text_cache[text] # 预处理 processed_text = self.preprocessor.preprocess_texts([text])[0] # 编码 embedding = self.embedding_model.encode_texts([processed_text])[0] # 更新缓存 if use_cache and len(self.text_cache) < self.cache_limit: self.text_cache[text] = embedding return embedding def process_texts_batch(self, texts: List[str]) -> np.ndarray: """ 批量处理文本 Args: texts: 文本列表 Returns: 文本向量数组 """ # 预处理 processed_texts = self.preprocessor.preprocess_texts(texts) # 编码 embeddings = self.embedding_model.encode_texts(processed_texts) return embeddings def semantic_search(self, query: str, documents: List[str], top_k: int = 5) -> List[Tuple[int, float, str]]: """ 语义搜索 Args: query: 查询文本 documents: 文档列表 top_k: 返回前k个 Returns: (索引, 相似度, 文档) 的列表 """ similar_results = self.similarity_calculator.find_most_similar(query, documents, top_k) results = [] for idx, similarity in similar_results: results.append((idx, similarity, documents[idx])) return results def get_cluster_info(self, texts: List[str], threshold: float = 0.7) -> Dict: """ 获取文本聚类信息 Args: texts: 文本列表 threshold: 相似度阈值 Returns: 聚类信息 """ # 计算相似度矩阵 similarity_matrix = self.similarity_calculator.compute_similarity_matrix(texts) # 简单的聚类算法 clusters = [] used_indices = set() for i in range(len(texts)): if i in used_indices: continue # 创建新簇 cluster = [i] used_indices.add(i) # 寻找相似文本 for j in range(len(texts)): if j not in used_indices and similarity_matrix[i, j] >= threshold: cluster.append(j) used_indices.add(j) clusters.append(cluster) return { 'cluster_count': len(clusters), 'clusters': clusters, 'cluster_sizes': [len(cluster) for cluster in clusters], 'avg_similarity': np.mean(similarity_matrix) } def get_module_info(self) -> Dict: """获取模块信息""" return { 'model_name': self.embedding_model.model_name, 'embedding_dimension': self.embedding_model.get_embedding_dimension(), 'cache_size': len(self.text_cache), 'device': self.embedding_model.device, 'total_texts_processed': len(self.text_cache) }

步骤 5:完整应用示例

def create_light_rag_vector_demo(): """创建LightRAG向量模块演示""" print("=== LightRAG向量模块演示 ===\n") # 1. 初始化向量模块 print("1. 初始化向量模块...") vector_module = LightRAGVectorModule() # 2. 获取模块信息 print("\n2. 模块信息:") info = vector_module.get_module_info() for key, value in info.items(): print(f" {key}: {value}") # 3. 示例文本 sample_texts = [ "机器学习是人工智能的一个重要分支", "深度学习基于神经网络", "自然语言处理技术发展迅速", "计算机视觉在图像识别方面应用广泛", "强化学习在游戏AI中表现突出", "数据挖掘可以发现隐藏的模式", "算法优化可以提高程序效率" ] print(f"\n3. 处理 {len(sample_texts)} 个示例文本...") # 4. 处理文本 embeddings = vector_module.process_texts_batch(sample_texts) print(f" 向量维度: {embeddings.shape}") # 5. 相似度搜索 print("\n4. 语义搜索演示:") query = "人工智能技术" print(f" 查询: {query}") search_results = vector_module.semantic_search(query, sample_texts, top_k=3) print(" 搜索结果:") for idx, similarity, text in search_results: print(f" {idx+1}. 相似度: {similarity:.4f} - {text}") # 6. 聚类分析 print("\n5. 文本聚类分析:") cluster_info = vector_module.get_cluster_info(sample_texts, threshold=0.6) print(f" 聚类数量: {cluster_info['cluster_count']}") print(" 聚类大小:", cluster_info['cluster_sizes']) print(f" 平均相似度: {cluster_info['avg_similarity']:.4f}") # 7. 缓存测试 print("\n6. 缓存测试:") test_text = "这是一个测试文本" # 第一次处理 start_time = time.time() embedding1 = vector_module.process_text(test_text) first_time = time.time() - start_time print(f" 第一次处理时间: {first_time:.4f} 秒") # 第二次处理(使用缓存) start_time = time.time() embedding2 = vector_module.process_text(test_text) second_time = time.time() - start_time print(f" 第二次处理时间: {second_time:.4f} 秒") # 验证缓存效果 if first_time > second_time: print(" ✓ 缓存有效,性能提升明显") else: print(" ⚠ 缓存效果不明显") return vector_module # 运行演示 if __name__ == "__main__": # 运行演示 vector_module = create_light_rag_vector_demo()

常见问题 FAQ

Q1:如何选择合适的嵌入模型?

A:选择嵌入模型时需要考虑以下因素:

def select_embedding_model(use_case: str, constraints: Dict) -> str: """根据使用场景选择嵌入模型""" # 不同场景的推荐模型 recommendations = { 'balanced': "sentence-transformers/all-MiniLM-L6-v2", 'fast': "sentence-transformers/all-MiniLM-L6-v2", 'high_quality': "sentence-transformers/all-mpnet-base-v2", 'chinese': "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", 'semantic_search': "sentence-transformers/msmarco-MiniLM-L-12-v2" } # 根据约束选择 if constraints.get('memory_limit', 0) < 100: # < 100MB return "sentence-transformers/all-MiniLM-L6-v2" elif constraints.get('speed_requirement') == 'fast': return "sentence-transformers/all-MiniLM-L6-v2" elif constraints.get('quality_requirement') == 'high': return "sentence-transformers/all-mpnet-base-v2" # 默认选择平衡的模型 return recommendations['balanced']

Q2:如何处理长文本?

A:长文本处理需要特殊策略:

class LongTextProcessor: """长文本处理器""" def __init__(self, embedding_model, max_length=512): self.embedding_model = embedding_model self.max_length = max_length def process_long_text(self, text: str, strategy="mean") -> np.ndarray: """处理长文本""" # 分割文本 segments = self._split_text(text) # 编码各个段落 segment_embeddings = self.embedding_model.process_texts_batch(segments) # 合并策略 if strategy == "mean": return np.mean(segment_embeddings, axis=0) elif strategy == "max": return np.max(segment_embeddings, axis=0) elif strategy == "weighted": weights = [len(seg) for seg in segments] weights = np.array(weights) / np.sum(weights) return np.average(segment_embeddings, axis=0, weights=weights) else: return np.mean(segment_embeddings, axis=0) def _split_text(self, text: str) -> List[str]: """分割长文本""" sentences = text.split('。') segments = [] current = "" for sentence in sentences: if len(current) + len(sentence) + 1 <= self.max_length: current += sentence + "。" else: if current: segments.append(current.strip()) current = sentence + "。" if current: segments.append(current.strip()) return segments

Q3:如何优化性能?

A:性能优化的几种方法:

class VectorModuleOptimizer: """向量模块优化器""" def __init__(self, vector_module): self.vector_module = vector_module self.performance_stats = { 'total_calls': 0, 'cache_hits': 0, 'avg_processing_time': 0 } def enable_caching(self, max_cache_size=10000): """启用缓存""" self.vector_module.text_cache = {} self.vector_module.cache_limit = max_cache_size print(f"缓存已启用,最大大小: {max_cache_size}") def batch_optimize(self, texts: List[str], batch_size=32) -> np.ndarray: """批量优化处理""" all_embeddings = [] for i in range(0, len(texts), batch_size): batch_texts = texts[i:i + batch_size] batch_embeddings = self.vector_module.process_texts_batch(batch_texts) all_embeddings.append(batch_embeddings) return np.vstack(all_embeddings) def monitor_performance(self): """监控性能""" total = self.performance_stats['total_calls'] cache_hits = self.performance_stats['cache_hits'] if total > 0: cache_ratio = cache_hits / total print(f"缓存命中率: {cache_ratio:.2%}") print(f"总调用次数: {total}")

最佳实践与避坑

最佳实践

  1. 模型选择:根据任务需求选择合适的模型大小
  2. 批量处理:使用批处理提高编码效率
  3. 缓存机制:合理使用缓存提升重复查询性能
  4. 内存管理:监控内存使用,避免内存泄漏
  5. 版本控制:记录模型版本和配置

常见陷阱

  1. 过度依赖默认设置:需要根据具体场景调整参数
  2. 忽略缓存管理:缓存可能导致内存问题
  3. 不处理长文本:长文本可能影响嵌入质量
  4. 忽视性能监控:需要定期监控模型性能
  5. 不进行版本管理:模型更新可能导致结果不一致

本节小结

本节介绍了LightRAG向量模块的基础概念、架构设计和实际应用。通过本节的学习,你应该能够:

  • 理解向量嵌入的基本概念和重要性
  • 掌握LightRAG向量模块的架构设计
  • 能够实现基本的向量计算和相似度比较
  • 了解文本预处理和向量化处理的流程
  • 掌握向量模块的性能优化方法

向量模块是LightRAG系统的核心组件之一,掌握向量嵌入技术对于构建高质量的检索系统至关重要。

延伸阅读

关键词:向量嵌入,语义相似度,LightRAG,预训练模型,向量化
难度:基础
预计阅读:25分钟


发布者: 作者: 转发
评论区 (0)
U