4.1 Python 客户端使用详解


文档摘要

4.1 Python 客户端使用详解 — Qdrant Python API 实战 本节导读:通过完整的Python客户端代码示例,掌握Qdrant的核心操作流程,从环境配置到高级查询技巧,为实际项目开发提供可复用的代码模板。 学习目标 掌握Qdrant Python客户端的安装和配置方法 理解Collection创建和数据管理的完整流程 学会构建向量搜索应用的核心代码模式 掌握高级查询和优化技术 了解常见问题的解决方案 核心概念 Qdrant Python客户端提供了完整的API接口,支持所有Qdrant的核心功能。客户端基于HTTP协议构建,支持异步操作,适合高性能应用场景。 环境准备 / 前置知识 安装Qdrant Python客户端 版本兼容性 Python 3.

4.1 Python 客户端使用详解 — Qdrant Python API 实战

本节导读:通过完整的Python客户端代码示例,掌握Qdrant的核心操作流程,从环境配置到高级查询技巧,为实际项目开发提供可复用的代码模板。

学习目标

  • 掌握Qdrant Python客户端的安装和配置方法
  • 理解Collection创建和数据管理的完整流程
  • 学会构建向量搜索应用的核心代码模式
  • 掌握高级查询和优化技术
  • 了解常见问题的解决方案

核心概念

Qdrant Python客户端提供了完整的API接口,支持所有Qdrant的核心功能。客户端基于HTTP协议构建,支持异步操作,适合高性能应用场景。

环境准备 / 前置知识

安装Qdrant Python客户端

pip install qdrant-client

版本兼容性

  • Python 3.8+
  • qdrant-client >= 1.7.0
  • Qdrant服务端 >= 1.7.0

基础依赖

import os import asyncio import numpy as np from qdrant_client import QdrantClient from qdrant_client.http import models from qdrant_client.http.models import Distance, VectorParams from typing import List, Dict, Any, Optional

分步实战

步骤 1:连接Qdrant服务

class QdrantConnectionManager: """Qdrant连接管理器""" def __init__(self, host: str = "localhost", port: int = 6333, api_key: Optional[str] = None, https: bool = False): """ 初始化Qdrant连接 Args: host: 服务地址,默认localhost port: 服务端口,默认6333 api_key: API密钥,可选 https: 是否使用HTTPS,默认False """ self.host = host self.port = port self.api_key = api_key self.https = https self.client = None def connect(self) -> QdrantClient: """建立连接""" try: # 构建URL url = f"{self.https and 'https' or 'http'}://{self.host}:{self.port}" # 创建客户端 self.client = QdrantClient( url=url, api_key=self.api_key, timeout=30 ) # 测试连接 self.client.get_collection(collection_name="test_connection") print(f"✅ 成功连接到Qdrant服务:{url}") return self.client except Exception as e: print(f"❌ 连接失败:{e}") raise def disconnect(self): """断开连接""" if self.client: self.client = None print("🔌 已断开Qdrant连接")

步骤 2:创建Collection和配置

class QdrantCollectionManager: """Collection管理器""" def __init__(self, client: QdrantClient): self.client = client def create_collection(self, collection_name: str, vector_size: int = 384, distance: Distance = Distance.COSINE) -> str: """ 创建Collection Args: collection_name: Collection名称 vector_size: 向量维度 distance: 距离度量方式 Returns: Collection创建状态 """ try: # 检查Collection是否已存在 existing_collections = self.client.get_collections() collection_exists = any( col.name == collection_name for col in existing_collections.collections ) if collection_exists: print(f"⚠️ Collection '{collection_name}' 已存在") return "already_exists" # 创建Collection配置 config = models.CreateCollection( vectors=models.VectorParams( size=vector_size, distance=distance ), # 可选:添加其他配置 hnsw_config=models.HnswConfigDiff( ef=200, # 搜索深度 m=16 # 连接数 ) ) # 执行创建 operation_info = self.client.create_collection( collection_name=collection_name, vectors_config=config.vectors, hnsw_config=config.hnsw_config ) print(f"✅ 成功创建Collection:{collection_name}") return "created" except Exception as e: print(f"❌ 创建Collection失败:{e}") raise def delete_collection(self, collection_name: str): """删除Collection""" try: self.client.delete_collection(collection_name) print(f"✅ 已删除Collection:{collection_name}") except Exception as e: print(f"❌ 删除Collection失败:{e}") raise

步骤 3:数据插入和管理

class QdrantDataHandler: """数据处理器""" def __init__(self, client: QdrantClient): self.client = client def generate_sample_data(self, num_points: int = 1000, vector_size: int = 384) -> List[Dict[str, Any]]: """生成示例数据""" data = [] for i in range(num_points): # 生成随机向量(模拟 embeddings) vector = np.random.random(vector_size).tolist() # 构建payload(元数据) payload = { "title": f"文档_{i+1}", "content": f"这是第{i+1}个文档的示例内容", "category": np.random.choice(["技术", "商业", "生活", "教育"]), "created_at": f"2026-{np.random.randint(1,7):02d}-{np.random.randint(1,28):02d}", "tags": [f"tag_{np.random.randint(1,10)}", f"tag_{np.random.randint(11,20)}"] } data.append({ "id": i+1, "vector": vector, "payload": payload }) return data def batch_upsert(self, collection_name: str, data: List[Dict[str, Any]], batch_size: int = 100) -> int: """ 批量插入数据 Args: collection_name: Collection名称 data: 数据列表 batch_size: 批次大小 Returns: 成功插入的数据点数量 """ total_inserted = 0 try: for i in range(0, len(data), batch_size): batch = data[i:i + batch_size] # 构建批量插入请求 batch_points = [] for point in batch: batch_points.append( models.PointStruct( id=point["id"], vector=point["vector"], payload=point["payload"] ) ) # 执行批量插入 self.client.upsert( collection_name=collection_name, points=batch_points ) total_inserted += len(batch) print(f"📝 已插入 {len(batch)} 个数据点,总计 {total_inserted}/{len(data)}") except Exception as e: print(f"❌ 批量插入失败:{e}") raise print(f"✅ 成功插入 {total_inserted} 个数据点到 {collection_name}") return total_inserted def update_point(self, collection_name: str, point_id: int, payload_updates: Dict[str, Any]): """更新单个数据点""" try: self.client.set_payload( collection_name=collection_name, points=[point_id], payload=payload_updates ) print(f"✅ 已更新数据点 {point_id}:{payload_updates}") except Exception as e: print(f"❌ 更新数据点失败:{e}") raise def delete_point(self, collection_name: str, point_id: int): """删除数据点""" try: self.client.delete( collection_name=collection_name, points_selector=models.PointIdsList( points=[point_id] ) ) print(f"✅ 已删除数据点 {point_id}") except Exception as e: print(f"❌ 删除数据点失败:{e}") raise

步骤 4:向量搜索和查询

class QdrantSearchEngine: """搜索引擎""" def __init__(self, client: QdrantClient): self.client = client def similarity_search(self, collection_name: str, query_vector: List[float], limit: int = 10, score_threshold: float = 0.7, filter_conditions: Optional[Dict] = None) -> List[Dict]: """ 相似度搜索 Args: collection_name: Collection名称 query_vector: 查询向量 limit: 返回结果数量 score_threshold: 分数阈值 filter_conditions: 过滤条件 Returns: 搜索结果列表 """ try: # 构建搜索请求 search_request = models.SearchRequest( vector=query_vector, limit=limit, score_threshold=score_threshold, # 可选:过滤器 filter=models.Filter( must=filter_conditions ) if filter_conditions else None ) # 执行搜索 search_result = self.client.search( collection_name=collection_name, query_vector=query_vector, limit=limit, score_threshold=score_threshold, query_filter=search_request.filter ) # 格式化结果 results = [] for hit in search_result: results.append({ "id": hit.id, "score": hit.score, "payload": hit.payload, "vector_distance": hit.vector_distance }) print(f"✅ 搜索完成,找到 {len(results)} 个结果") return results except Exception as e: print(f"❌ 搜索失败:{e}") raise def hybrid_search(self, collection_name: str, query_vector: List[float], text_query: str, limit: int = 10, alpha: float = 0.5) -> List[Dict]: """ 混合搜索(向量 + 文本) Args: collection_name: Collection名称 query_vector: 查询向量 text_query: 文本查询 limit: 返回结果数量 alpha: 向量权重(1-alpha为文本权重) Returns: 混合搜索结果 """ try: # 向量搜索结果 vector_results = self.similarity_search( collection_name, query_vector, limit * 2 ) # 文本搜索结果(基于payload中的content字段) text_filter = models.FieldCondition( key="content", match=models.MatchText( text=text_query ) ) text_results = self.client.search( collection_name=collection_name, query_filter=models.Filter(must=[text_filter]), limit=limit * 2 ) # 合并和重排序(简化版) combined_results = [] for result in vector_results: result["search_type"] = "vector" result["weighted_score"] = result["score"] * alpha combined_results.append(result) for result in text_results: result["search_type"] = "text" result["weighted_score"] = result["score"] * (1 - alpha) combined_results.append(result) # 按加权分数排序 combined_results.sort(key=lambda x: x["weighted_score"], reverse=True) final_results = combined_results[:limit] print(f"✅ 混合搜索完成,返回 {len(final_results)} 个结果") return final_results except Exception as e: print(f"❌ 混合搜索失败:{e}") raise def filter_search(self, collection_name: str, query_vector: List[float], filters: Dict, limit: int = 10) -> List[Dict]: """ 带过滤条件的搜索 Args: collection_name: Collection名称 query_vector: 查询向量 filters: 过滤条件 limit: 返回结果数量 Returns: 搜索结果列表 """ try: # 构建复合过滤器 filter_conditions = [] # 支持多种过滤类型 for key, value in filters.items(): if isinstance(value, list): # 多值匹配 filter_conditions.append( models.FieldCondition( key=key, match=models.MatchAny( any=value ) ) ) else: # 单值匹配 filter_conditions.append( models.FieldCondition( key=key, match=models.MatchValue( value=value ) ) ) search_request = models.SearchRequest( vector=query_vector, limit=limit, filter=models.Filter(must=filter_conditions) ) search_result = self.client.search( collection_name=collection_name, query_vector=query_vector, limit=limit, query_filter=search_request.filter ) results = [] for hit in search_result: results.append({ "id": hit.id, "score": hit.score, "payload": hit.payload }) print(f"✅ 过滤搜索完成,找到 {len(results)} 个结果") return results except Exception as e: print(f"❌ 过滤搜索失败:{e}") raise

步骤 5:完整应用示例

class QdrantApplication: """Qdrant应用示例""" def __init__(self): self.connection_manager = None self.collection_manager = None self.data_handler = None self.search_engine = None self.collection_name = "demo_qdrant" async def setup(self): """应用设置""" print("🚀 开始设置Qdrant应用...") # 1. 连接服务 self.connection_manager = QdrantConnectionManager() self.connection_manager.connect() # 2. 初始化组件 self.collection_manager = QdrantCollectionManager(self.connection_manager.client) self.data_handler = QdrantDataHandler(self.connection_manager.client) self.search_engine = QdrantSearchEngine(self.connection_manager.client) # 3. 创建Collection result = self.collection_manager.create_collection( collection_name=self.collection_name, vector_size=384 ) if result == "created": # 4. 生成和插入数据 sample_data = self.data_handler.generate_sample_data(1000, 384) self.data_handler.batch_upsert(self.collection_name, sample_data) print("✅ 应用设置完成") def search_demo(self): """搜索演示""" print("\n🔍 开始搜索演示...") # 生成查询向量 query_vector = np.random.random(384).tolist() # 1. 简单相似度搜索 print("\n1. 相似度搜索:") results = self.search_engine.similarity_search( self.collection_name, query_vector, limit=5 ) for i, result in enumerate(results[:3]): print(f" {i+1}. ID:{result['id']} 分数:{result['score']:.3f} " f"类别:{result['payload']['category']}") # 2. 带过滤条件的搜索 print("\n2. 过滤搜索(只搜索'技术'类别):") tech_results = self.search_engine.filter_search( self.collection_name, query_vector, filters={"category": "技术"}, limit=5 ) for i, result in enumerate(tech_results[:3]): print(f" {i+1}. ID:{result['id']} 分数:{result['score']:.3f} " f"标题:{result['payload']['title']}") # 3. 混合搜索 print("\n3. 混合搜索(向量+文本):") hybrid_results = self.search_engine.hybrid_search( self.collection_name, query_vector, text_query="机器学习", limit=5, alpha=0.7 ) for i, result in enumerate(hybrid_results[:3]): search_type = result['search_type'] print(f" {i+1}. ID:{result['id']} 分数:{result['score']:.3f} " f"类型:{search_type} 标题:{result['payload']['title']}") def demo_async_operations(self): """异步操作演示""" print("\n⚡ 异步操作演示...") async def async_search(): query_vector = np.random.random(384).tolist() results = await self.connection_manager.client.search( collection_name=self.collection_name, query_vector=query_vector, limit=3 ) return results # 执行异步搜索 import asyncio results = asyncio.run(async_search()) print(f"✅ 异步搜索完成,找到 {len(results)} 个结果") def cleanup(self): """清理资源""" print("\n🧹 清理资源...") try: # 删除测试数据 self.collection_manager.delete_collection(self.collection_name) # 断开连接 self.connection_manager.disconnect() print("✅ 清理完成") except Exception as e: print(f"❌ 清理失败:{e}") # 使用示例 if __name__ == "__main__": # 创建应用实例 app = QdrantApplication() try: # 设置应用 asyncio.run(app.setup()) # 运行演示 app.search_demo() app.demo_async_operations() except Exception as e: print(f"❌ 应用运行失败:{e}") finally: # 清理 app.cleanup()

完整示例

上面的代码提供了一个完整的Qdrant Python客户端应用,包括:

  1. 连接管理:安全的连接建立和断开
  2. Collection操作:创建、删除和配置
  3. 数据操作:批量插入、更新和删除
  4. 搜索功能:相似度搜索、过滤搜索、混合搜索
  5. 异步支持:异步操作和性能优化
  6. 错误处理:完整的异常处理机制

常见问题 FAQ

Q1:如何处理大量数据的插入性能问题?

A:

  • 使用批量插入(batch_upsert),每次插入100-1000条记录
  • 调整hnsw参数(ef和m)以优化索引性能
  • 考虑使用异步操作提高吞吐量
  • 在低峰期进行大规模数据插入

Q2:Qdrant客户端连接失败怎么办?

A:

  • 检查Qdrant服务是否正在运行
  • 验证连接地址和端口是否正确
  • 确认防火墙设置允许6333端口访问
  • 检查API密钥是否正确(如果使用了认证)
  • 使用client.get_collections()测试连接

Q3:如何优化搜索性能?

A:

  • 合理设置score_threshold减少不必要的计算
  • 使用过滤器缩小搜索范围
  • 调整HNSW参数(ef控制搜索深度)
  • 对不同类型的查询使用不同的Collection
  • 考虑使用分片处理大规模数据

Q4:如何处理内存不足的情况?

A:

  • 适当减少向量维度
  • 使用向量量化技术
  • 定期清理不活跃的数据
  • 增加服务器内存配置
  • 考虑使用磁盘存储部分数据

Q5:如何保证数据一致性?

A:

  • 使用事务操作确保数据完整性
  • 定期备份数据
  • 监控Qdrant服务状态
  • 使用多副本保证高可用性
  • 实现适当的错误重试机制

最佳实践与避坑

  • 连接池管理:避免频繁创建和销毁连接
  • 批量操作:优先使用批量API减少网络开销
  • 合理配置:根据数据规模调整HNSW参数
  • 监控和日志:记录关键操作和性能指标
  • 错误恢复:实现重试机制处理网络异常

本节小结

通过本节的详细讲解,我们掌握了Qdrant Python客户端的完整使用方法:

  1. 连接管理:建立稳定的连接并处理各种异常情况
  2. 数据操作:高效的插入、更新、删除操作
  3. 搜索技术:从基础相似度搜索到复杂的混合检索
  4. 性能优化:批量操作、异步处理、参数调优
  5. 问题解决:常见问题的诊断和解决方案

Python客户端为Qdrant提供了强大的编程接口,能够满足从简单应用到复杂系统构建的各种需求。下一节我们将深入探讨Rust客户端的使用和性能优化技巧。

延伸阅读

关键词:Qdrant, Python客户端, API, 向量搜索, 批量操作, 异步处理, 性能优化
难度:进阶
预计阅读:60 分钟


发布者: 作者: 转发
评论区 (0)
U