文档处理系统 本节导读:掌握Haystack文档全生命周期处理,从数据加载到存储的完整链条 学习目标 熟练使用各种文档加载器处理不同格式 掌握文档预处理和文本清洗技术 理解不同文档存储器的适用场景 实现企业级文档处理流水线 核心概念 文档处理系统架构 Haystack文档处理系统采用模块化设计,支持多种数据源和输出格式,确保数据质量和处理效率。 组件协同机制 各组件通过标准化接口协作,支持自定义扩展和性能优化,满足企业级应用需求。
本节导读:掌握Haystack文档全生命周期处理,从数据加载到存储的完整链条
Haystack文档处理系统采用模块化设计,支持多种数据源和输出格式,确保数据质量和处理效率。
各组件通过标准化接口协作,支持自定义扩展和性能优化,满足企业级应用需求。
from haystack.document_stores import InMemoryDocumentStore from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.converters import TextConverter, PDFToTextConverter # 文本文件加载 text_converter = TextConverter() documents = text_converter.run(paths=["data.txt"]) # PDF文件加载 pdf_converter = PDFToTextConverter() pdf_docs = pdf_converter.run(paths=["document.pdf"])
from haystack import Document from haystack.components.converters import BaseConverter class CustomCSVLoader(BaseConverter): def convert(self, path: str) -> list[Document]: # 自定义CSV解析逻辑 pass
from haystack.components.preprocessors import DocumentCleaner cleaner = DocumentCleaner( remove_extra_whitespaces=True, remove_empty_lines=True, remove_substrings=["机密信息"], split_by=None, split_length=500, split_overlap=50 ) cleaned_docs = cleaner.run(documents=raw_documents)
from haystack.components.preprocessors import DocumentSplitter # 按字符分割 char_splitter = DocumentSplitter(split_by="character", split_length=200, split_overlap=20) # 按词分割 word_splitter = DocumentSplitter(split_by="word", split_length=100, split_overlap=10) # 按段落分割 paragraph_splitter = DocumentSplitter(split_by="page", split_length=1, split_overlap=0)
| 存储类型 | 特点 | 适用场景 |
|---|---|---|
| InMemoryDocumentStore | 内存存储,速度快 | 开发调试 |
| FAISSDocumentStore | 向量索引,高相似度检索 | 语义搜索 |
| MilvusDocumentStore | 分布式向量数据库 | 大规模部署 |
| PostgreSQLDocumentStore | 关系型数据库存储 | 事务处理 |
from haystack.document_stores import FAISSDocumentStore # FAISS存储配置 faiss_store = FAISSDocumentStore( embedding_dim=768, faiss_index_factory="Flat", return_embedding=True ) # PostgreSQL存储配置 pg_store = PostgreSQLDocumentStore( table_name="documents", embedding_dim=768, create_index_if_not_exists=True )
from haystack import Pipeline from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.converters import TextConverter # 创建处理流水线 document_pipeline = Pipeline() # 添加处理组件 document_pipeline.add_component("converter", TextConverter()) document_pipeline.add_component("cleaner", DocumentCleaner()) document_pipeline.add_component("splitter", DocumentSplitter()) # 连接组件 document_pipeline.connect("converter", "cleaner") document_pipeline.connect("cleaner", "splitter") # 执行处理 result = document_pipeline.run({ "converter": {"paths": ["data.txt"]}, "cleaner": {"documents": converter_result["documents"]}, "splitter": {"documents": cleaner_result["documents"]} })
import logging from datetime import datetime # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 处理过程监控 def process_documents_with_monitoring(documents): start_time = datetime.now() logger.info(f"开始处理 {len(documents)} 个文档") # 处理逻辑 processed_docs = [] for doc in documents: try: processed = process_single_document(doc) processed_docs.append(processed) except Exception as e: logger.error(f"处理文档失败: {e}") end_time = datetime.now() duration = (end_time - start_time).total_seconds() logger.info(f"处理完成,耗时 {duration:.2f}秒") return processed_docs
本章节深入讲解了Haystack文档处理系统的核心组件和实现细节,从数据加载到存储的完整链条。下一章我们将聚焦检索引擎,这是RAG系统的核心价值所在。
关键词:文档处理, 文档加载器, 预处理, 文档存储, Haystack, 企业级应用
难度:进阶
预计阅读:60分钟