milvus pdf 嵌入实战 一、环境准备 推荐Python 3.8+环境(网页8) 确保Milvus服务已启动(默认端口19530) 建议配置8GB+内存(处理大规模PDF时) 二、PDF文本处理 文本提取 文本重构增强 注意事项: 添加异常降级处理逻辑 三、向量化处理 智能分块策略 最佳实践: 采用滑动窗口重叠分块/单独一个句子也是可以的 添加文本清洗步骤:去除特殊符号/乱码 对短文本进行合并处理 四、Milvus集成 集合配置优化 索引策略: 批量插入 五、完整工作流
# 基础依赖 !pip install pymupdf openai sentence-transformers pymilvus
def extract_pdf_text(path): """ 从PDF文件中提取文本内容 参数: path: PDF文件路径 返回: 提取的文本字符串 """ try: doc = pymupdf.open(path) return " ".join([page.get_text() for page in doc]) except Exception as e: raise RuntimeError(f"PDF解析失败: {str(e)}")
def reload_text(path): """ 使用大模型重构PDF提取的文本,提高文本质量 参数: path: PDF文件路径 返回: 重构后的文本字符串 """ text = extract_pdf_text(path) # 初始化OpenAI客户端,连接到阿里云DashScope client = OpenAI( api_key = api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1") try: # 调用DeepSeek-v3模型重构文本 response = client.chat.completions.create( model="deepseek-v3", messages=[ {"role": "user", "content": "这是pdf 解析出来的文档 请你仔细阅读,将内容调整为完整的句子。同时需要确保不能丢失原文的信息。解析文档内容如下:{}".format(text) } ] ) reload_text=response.choices[0].message.content return reload_text except Exception as e: print(f"API调用失败,启用降级处理: {str(e)}") # 降级处理:清理特殊字符并合并空格 cleaned_text = re.sub(r'[^\w\u4e00-\u9fff\s。?!;,]', '', text) return re.sub(r'\s+', ' ', cleaned_text)
注意事项:
def split_text(path): """ 对重构后的文本进行分段并生成向量表示 参数: path: PDF文件路径 返回: tuple: (分段文本列表, 对应的向量表示) """ text=reload_text(path) # 使用正则表达式按照标点符号分割文本 split_pattern = r'(?<=[。!?;.?!!;\n])\s*' chunks = [c.strip() for c in re.split(split_pattern, text) if len(c.strip()) > 20] # 滑动窗口合并短文本,每5个句子为一组 final_chunks = [] for i in range(0, len(chunks), 3): window = chunks[i:i+5] final_chunks.append(" ".join(window)) # 打印分段结果 for index,value in enumerate(final_chunks): print(f"第{index}个句子-->{value}") # 加载本地预训练的向量模型 model = SentenceTransformer('C:/Users/Administrator/Desktop/easy-vectordb/model/bge-small-zh-v1.5') # 生成文本向量并进行L2归一化 embeddings = model.encode(final_chunks) embeddings = normalize(embeddings, norm='l2') return final_chunks, embeddings
最佳实践:
# 增强型schema配置(网页3) fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), # 主键字段 FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512), # 512维浮点向量 FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000) # 原始文本内容 ] schema = CollectionSchema(fields, description="PDF知识库", enable_dynamic_field=True) # 开启动态字段 collection = Collection("pdf_test", schema)
索引策略:
# 复合索引配置(网页5) vector_index = { "index_type": "IVF_PQ", "metric_type": "IP", "params": {"nlist": 2048, "m": 32} } collection.create_index("vector", vector_index) collection.create_index("content", {"index_type": "TRIE"})
data = [ [i for i in range(len(texts_list))], # 主键ID列表 embeddings.tolist(), # 向量列表(转换为Python列表) texts_list # 原始文本列表 ] # 执行插入并刷新内存 collection.insert(data) collection.flush() collection.load() # 将索引加载到内存
""" PDF向量存储完整工作流 功能:解析PDF->文本重构->分块处理->向量生成->Milvus存储 环境要求:Python 3.10+,Milvus 2.4.x """ import pymupdf from openai import OpenAI from sklearn.preprocessing import normalize from sentence_transformers import SentenceTransformer from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection import re # API配置 - 使用兼容模式连接到阿里云DashScope api_key="sk-XXXXXXXX" api_base="https://dashscope.aliyuncs.com/compatible-mode/v1" def extract_pdf_text(path): """ 从PDF文件中提取文本内容 参数: path: PDF文件路径 返回: 提取的文本字符串 """ try: doc = pymupdf.open(path) return " ".join([page.get_text() for page in doc]) except Exception as e: raise RuntimeError(f"PDF解析失败: {str(e)}") def reload_text(path): """ 使用大模型重构PDF提取的文本,提高文本质量 参数: path: PDF文件路径 返回: 重构后的文本字符串 """ text = extract_pdf_text(path) # 初始化OpenAI客户端,连接到阿里云DashScope client = OpenAI( api_key = api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1") try: # 调用DeepSeek-v3模型重构文本 response = client.chat.completions.create( model="deepseek-v3", messages=[ {"role": "user", "content": "这是pdf 解析出来的文档 请你仔细阅读,将内容调整为完整的句子。同时需要确保不能丢失原文的信息。解析文档内容如下:{}".format(text) } ] ) reload_text=response.choices[0].message.content return reload_text except Exception as e: print(f"API调用失败,启用降级处理: {str(e)}") # 降级处理:清理特殊字符并合并空格 cleaned_text = re.sub(r'[^\w\u4e00-\u9fff\s。?!;,]', '', text) return re.sub(r'\s+', ' ', cleaned_text) def split_text(path): """ 对重构后的文本进行分段并生成向量表示 参数: path: PDF文件路径 返回: tuple: (分段文本列表, 对应的向量表示) """ text=reload_text(path) # 使用正则表达式按照标点符号分割文本 split_pattern = r'(?<=[。!?;.?!!;\n])\s*' chunks = [c.strip() for c in re.split(split_pattern, text) if len(c.strip()) > 20] # 滑动窗口合并短文本,每5个句子为一组 # 这里有很多处理的办法 文本随便演示了一种 final_chunks = [] for i in range(0, len(chunks), 3): window = chunks[i:i+5] final_chunks.append(" ".join(window)) # 打印分段结果 for index,value in enumerate(final_chunks): print(f"第{index}个句子-->{value}") # 加载本地预训练的向量模型 model = SentenceTransformer('C:/Users/Administrator/Desktop/easy-vectordb/model/bge-small-zh-v1.5') # 生成文本向量并进行L2归一化 embeddings = model.encode(final_chunks) embeddings = normalize(embeddings, norm='l2') return final_chunks, embeddings # 主程序入口 if __name__ == "__main__": path="C:/Users/Administrator/Desktop/easy-vectordb/code/Datawhale社区介绍.pdf" texts_list,embeddings=split_text(path) # 连接到Milvus向量数据库 connections.connect(alias="default", uri="http://127.0.0.1:19530") # 定义向量表结构 fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), # 主键字段 FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512), # 512维浮点向量 FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000) # 原始文本内容 ] # 创建集合(表) schema = CollectionSchema( fields, description="PDF知识库", enable_dynamic_field=True ) collection = Collection("pdf_test", schema) # 创建混合索引 vector_index = { "index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 1024} } collection.create_index("vector", vector_index) collection.create_index("content", {"index_type": "TRIE"}) # 批量插入(含动态字段) data = [ [i for i in range(len(texts_list))], # 主键ID列表 embeddings.tolist(), # 向量列表(转换为Python列表) texts_list # 原始文本列表 ] # 执行插入并刷新内存 collection.insert(data) collection.flush() collection.load() # 将索引加载到内存