第4章 检索引擎实现


文档摘要

第4章 检索引擎实现 本章将详细介绍记忆系统的检索引擎实现,从基础的搜索算法到高级的语义检索技术,为读者提供完整的检索解决方案。 4.1 检索引擎概述 4.1.1 检索需求分析 记忆系统的检索引擎需要满足以下核心需求: 多模态检索:支持文本、图像、音频等多种数据类型的检索 语义理解:能够理解查询的语义含义,而不仅仅是关键词匹配 实时响应:在毫秒级时间内返回检索结果 高准确性:确保检索结果的相关性和准确性 可扩展性:能够处理大规模数据和高并发请求 个性化:根据用户的历史和偏好优化检索结果 4.1.2 检索技术栈 现代记忆系统检索引擎通常采用以下技术栈: 4.1.3 检索架构设计 基于需求分析,我们设计分层检索架构: 4.2 关键词检索实现 4.2.

第4章 检索引擎实现

本章将详细介绍记忆系统的检索引擎实现,从基础的搜索算法到高级的语义检索技术,为读者提供完整的检索解决方案。

4.1 检索引擎概述

4.1.1 检索需求分析

记忆系统的检索引擎需要满足以下核心需求:

  1. 多模态检索:支持文本、图像、音频等多种数据类型的检索
  2. 语义理解:能够理解查询的语义含义,而不仅仅是关键词匹配
  3. 实时响应:在毫秒级时间内返回检索结果
  4. 高准确性:确保检索结果的相关性和准确性
  5. 可扩展性:能够处理大规模数据和高并发请求
  6. 个性化:根据用户的历史和偏好优化检索结果

4.1.2 检索技术栈

现代记忆系统检索引擎通常采用以下技术栈:

4.1.3 检索架构设计

基于需求分析,我们设计分层检索架构:

4.2 关键词检索实现

4.2.1 关键词索引构建

关键词检索是传统检索技术的基础,主要依赖于全文索引:

import jieba import jieba.posseg as pseg from whoosh.index import create_in, exists_in, open_dir from whoosh.fields import Schema, TEXT, ID, NUMERIC, DATETIME from whoosh.analysis import StandardAnalyzer, ChineseAnalyzer from whoosh.qparser import QueryParser, MultifieldParser from whoosh.query import And, Or, Term, Every from whoosh.sorting import FieldFacet import os from datetime import datetime class KeywordIndexBuilder: def __init__(self, index_dir: str): self.index_dir = index_dir self.schema = self._get_schema() self.analyzer = ChineseAnalyzer() # 确保索引目录存在 if not os.path.exists(index_dir): os.makedirs(index_dir) def _get_schema(self) -> Schema: """定义索引 schema""" return Schema( id=ID(stored=True, unique=True), type=TEXT(stored=True), category=TEXT(stored=True), title=TEXT(stored=True, field_boost=2.0), content=TEXT(stored=True), tags=TEXT(stored=True), importance=NUMERIC(stored=True, sortable=True), created_at=DATETIME(stored=True, sortable=True), updated_at=DATETIME(stored=True, sortable=True), access_count=NUMERIC(stored=True, sortable=True), metadata=TEXT(stored=True) ) def create_index(self): """创建索引""" ix = create_in(self.index_dir, self.schema) writer = ix.writer() return writer def add_document(self, writer, memory_data: dict): """添加文档到索引""" # 分词处理 content_words = self._tokenize_text(memory_data['content']) title_words = self._tokenize_text(memory_data['title']) tags_text = ' '.join(memory_data.get('tags', [])) # 构建索引文档 doc = { 'id': memory_data['id'], 'type': memory_data['type'], 'category': memory_data['category'], 'title': memory_data['title'], 'content': memory_data['content'], 'tags': tags_text, 'importance': memory_data.get('importance', 0.5), 'created_at': memory_data.get('created_at', datetime.now()), 'updated_at': memory_data.get('updated_at', datetime.now()), 'access_count': memory_data.get('access_count', 0), 'metadata': str(memory_data.get('metadata', {})) } # 添加到索引 writer.add_document(**doc) def _tokenize_text(self, text: str) -> str: """中文分词处理""" # 使用jieba进行分词 words = [] for word in jieba.cut(text): # 过滤停用词和短词 if len(word) > 1 and word not in self._get_stop_words(): words.append(word) return ' '.join(words) def _get_stop_words(self) -> set: """获取停用词列表""" return { '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '那', '现在', '可以', '但是', '还是', '因为', '什么', '如果' } def commit_writer(self, writer): """提交索引写入""" writer.commit()

4.2.2 关键词检索实现

from whoosh.searching import Results from whoosh.highlight import highlight class KeywordSearchEngine: def __init__(self, index_dir: str): self.index_dir = index_dir self.index = None self._open_index() def _open_index(self): """打开索引""" if exists_in(self.index_dir): self.index = open_dir(self.index_dir) else: raise FileNotFoundError(f"索引目录不存在: {self.index_dir}") def search(self, query: str, **kwargs) -> dict: """执行关键词搜索""" try: with self.index.searcher() as searcher: # 解析查询 query_obj = self._parse_query(query, kwargs) # 执行搜索 results = searcher.search(query_obj, limit=kwargs.get('limit', 10)) # 处理结果 processed_results = self._process_results(results, kwargs) return { 'success': True, 'total_hits': len(results), 'results': processed_results, 'query': query, 'execution_time': results.runtime } except Exception as e: return { 'success': False, 'error': str(e), 'query': query } def _parse_query(self, query: str, kwargs: dict) -> Query: """解析查询""" # 搜索字段配置 search_fields = kwargs.get('fields', ['title', 'content', 'tags']) # 创建多字段查询解析器 parser = MultifieldParser(search_fields, self.index.schema) # 解析查询 query_obj = parser.parse(query) # 添加过滤器 if 'type' in kwargs: query_obj = And([query_obj, Term('type', kwargs['type'])]) if 'category' in kwargs: query_obj = And([query_obj, Term('category', kwargs['category'])]) if 'tags' in kwargs: tag_queries = [Term('tags', tag) for tag in kwargs['tags']] query_obj = And([query_obj, Or(tag_queries)]) # 时间范围过滤 if 'start_date' in kwargs: from whoosh.query import DateRange query_obj = And([query_obj, DateRange('created_at', start=kwargs['start_date'], end=None)]) if 'end_date' in kwargs: from whoosh.query import DateRange query_obj = And([query_obj, DateRange('created_at', start=None, end=kwargs['end_date'])]) return query_obj def _process_results(self, results, kwargs) -> list: """处理搜索结果""" processed = [] for hit in results: # 高亮处理 if kwargs.get('highlight', False): content = hit['content'] highlighted_content = self._highlight_text(content, kwargs.get('query', '')) else: highlighted_content = hit['content'][:200] + '...' if len(hit['content']) > 200 else hit['content'] # 构建结果文档 doc = { 'id': hit['id'], 'type': hit['type'], 'category': hit['category'], 'title': hit['title'], 'content': highlighted_content, 'importance': hit['importance'], 'created_at': hit['created_at'], 'updated_at': hit['updated_at'], 'access_count': hit['access_count'], 'score': hit.score, 'rank': hit.rank + 1 } processed.append(doc) # 排序 if kwargs.get('sort_by'): processed = self._sort_results(processed, kwargs) # 分页 if 'offset' in kwargs: processed = processed[kwargs['offset']:] if 'limit' in kwargs: processed = processed[:kwargs['limit']] return processed def _highlight_text(self, text: str, query: str) -> str: """高亮文本""" # 简单实现:在匹配的词前后添加标记 words = query.split() for word in words: text = text.replace(word, f'<mark>{word}</mark>') return text def _sort_results(self, results: list, kwargs: dict) -> list: """排序结果""" sort_field = kwargs['sort_by'] sort_order = kwargs.get('sort_order', 'desc') reverse = sort_order.lower() == 'desc' if sort_field in ['importance', 'access_count', 'created_at', 'updated_at']: results.sort(key=lambda x: x[sort_field], reverse=reverse) return results def get_suggestions(self, query: str, limit: int = 5) -> list: """获取查询建议""" try: with self.index.searcher() as searcher: # 使用自动完成功能 from whoosh import suggest # 获取字段建议 field = "content" # 可以根据需要调整 suggestions = suggest.prefix_searcher(searcher, query, field, limit) return list(suggestions) except Exception as e: return [] def get_facets(self, query: str, facet_fields: list = None) -> dict: """获取搜索结果的facet统计""" if facet_fields is None: facet_fields = ['type', 'category'] try: with self.index.searcher() as searcher: # 创建facet对象 facets = {} for field in facet_fields: facet = FieldFacet(field) facets[field] = searcher.searcher.search(None, facet=facet) return { 'success': True, 'facets': facets } except Exception as e: return { 'success': False, 'error': str(e) }

4.2.3 性能优化策略

import threading from queue import Queue import time class KeywordSearchOptimizer: def __init__(self, index_dir: str): self.index_dir = index_dir self.query_cache = {} self.cache_size = 1000 self.lock = threading.Lock() self.preload_queue = Queue() # 预加载热数据 self.start_preload_thread() def start_preload_thread(self): """启动预加载线程""" def preload_worker(): while True: if not self.preload_queue.empty(): query = self.preload_queue.get() self._preload_query_results(query) time.sleep(1) thread = threading.Thread(target=preload_worker, daemon=True) thread.start() def _preload_query_results(self, query: str): """预加载查询结果""" try: # 模拟预加载逻辑 search_engine = KeywordSearchEngine(self.index_dir) results = search_engine.search(query, limit=10) # 存入缓存 with self.lock: if len(self.query_cache) >= self.cache_size: # 删除最旧的缓存 oldest_key = next(iter(self.query_cache)) del self.query_cache[oldest_key] self.query_cache[query] = { 'results': results, 'timestamp': time.time() } except Exception as e: print(f"预加载失败: {e}") def get_cached_results(self, query: str) -> dict: """获取缓存结果""" with self.lock: if query in self.query_cache: cached_data = self.query_cache[query] # 检查缓存是否过期(假设缓存1小时) if time.time() - cached_data['timestamp'] < 3600: return cached_data['results'] else: del self.query_cache[query] return None def add_to_preload_queue(self, query: str): """添加到预加载队列""" self.preload_queue.put(query) def optimize_query(self, query: str) -> str: """优化查询语句""" # 查询重写和优化 optimized = query # 移除常见停用词 stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一'} words = query.split() optimized_words = [word for word in words if word not in stop_words] optimized = ' '.join(optimized_words) # 如果查询太短,添加更多相关词 if len(optimized_words) < 2: optimized += ' 记忆 智能' return optimized

4.3 语义检索实现

4.3.1 向量嵌入生成

语义检索基于向量的相似度计算,首先需要生成高质量的嵌入向量:

from sentence_transformers import SentenceTransformer import numpy as np from typing import List, Dict, Tuple import faiss import pickle import os class SemanticEmbeddingService: def __init__(self, model_name: str = "all-MiniLM-L6-v2"): self.model = SentenceTransformer(model_name) self.dimension = self.model.get_sentence_embedding_dimension() self.index = None self.text_cache = {} # 初始化FAISS索引 self._init_index() def _init_index(self): """初始化FAISS索引""" # 使用IVF索引进行大规模向量检索 nlist = 100 # 聚类中心数量 quantizer = faiss.IndexFlatIP(self.dimension) # 内积量化器 self.index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist) # 设置训练标志 self.is_trained = False def train_index(self, texts: List[str]): """训练索引""" if not self.is_trained and texts: # 生成训练向量 training_vectors = self.model.encode(texts) # 训练索引 self.index.train(training_vectors) self.index.nprobe = 10 # 搜索时的聚类中心数量 self.is_trained = True print(f"索引训练完成,向量数量: {len(texts)}") def encode_text(self, text: str) -> np.ndarray: """编码文本为向量""" # 检查缓存 if text in self.text_cache: return self.text_cache[text] # 生成向量 embedding = self.model.encode(text) # 缓存结果 self.text_cache[text] = embedding return embedding def encode_texts_batch(self, texts: List[str]) -> np.ndarray: """批量编码文本""" return self.model.encode(texts) def search_similar(self, query: str, k: int = 10, threshold: float = 0.7) -> List[Dict]: """搜索相似文本""" if not self.is_trained: return [] # 生成查询向量 query_vector = self.encode_text(query) query_vector = query_vector.reshape(1, -1) # 搜索相似文本 distances, indices = self.index.search(query_vector, k) # 处理结果 results = [] for i, (dist, idx) in enumerate(zip(distances[0], indices[0])): if idx >= 0 and dist >= threshold: result = { 'doc_id': f"doc_{idx}", 'score': float(dist), 'rank': i + 1, 'distance': float(dist) } results.append(result) return results def calculate_similarity(self, text1: str, text2: str) -> float: """计算两个文本的相似度""" vector1 = self.encode_text(text1) vector2 = self.encode_text(text2) # 计算余弦相似度 similarity = np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)) return float(similarity) def get_statistics(self) -> Dict: """获取统计信息""" stats = { 'dimension': self.dimension, 'is_trained': self.is_trained, 'cache_size': len(self.text_cache), 'index_size': self.index.ntotal if self.index else 0 } return stats

4.4 小结与展望

本章详细介绍了记忆系统的检索引擎实现,包括关键词检索、语义检索和混合检索等核心技术。通过合理的技术选型和配置,可以构建一个高效、准确的记忆检索系统。

在下一章中,我们将探讨记忆系统的冲突解决机制,包括冲突检测、冲突分析和冲突处理等关键技术方案。


发布者: 作者: 转发
评论区 (0)
U