5.1 流式数据处理与实时索引 — Qdrant实时数据流处理 本节导读:深入掌握Qdrant的流式数据处理能力,通过实时索引技术和增量更新机制,构建高性能的实时向量搜索应用,满足现代AI应用对实时性的严格要求。 学习目标 理解Qdrant的流式数据处理架构 掌握实时索引的创建和管理方法 学会实现增量数据更新和同步机制 了解流式处理的性能优化策略 掌握实时搜索的应用场景和实现 核心概念 Qdrant的流式数据处理支持实时数据索引和查询,通过增量更新和优化的索引结构,为AI应用提供低延迟的实时搜索能力。 环境准备 / 前置知识 系统要求 Qdrant 1.7.
本节导读:深入掌握Qdrant的流式数据处理能力,通过实时索引技术和增量更新机制,构建高性能的实时向量搜索应用,满足现代AI应用对实时性的严格要求。
Qdrant的流式数据处理支持实时数据索引和查询,通过增量更新和优化的索引结构,为AI应用提供低延迟的实时搜索能力。
pip install qdrant-client kafka-python redis asyncio aiohttp
import asyncio import json import time from typing import List, Dict, Any, Optional from dataclasses import dataclass from qdrant_client import QdrantClient from qdrant_client.http import models from qdrant_client.http.models import Distance, VectorParams, PointStruct import redis import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class StreamEvent: """流式事件数据结构""" id: str vector: List[float] payload: Dict[str, Any] timestamp: float event_type: str # 'insert', 'update', 'delete' class StreamDataCollector: """流式数据采集器""" def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True) self.event_queue = "qdrant_stream_events" self.batch_size = 100 self.batch_timeout = 5.0 # 秒 def add_event(self, event: StreamEvent): """添加事件到队列""" try: # 序列化事件 event_data = { 'id': event.id, 'vector': event.vector, 'payload': event.payload, 'timestamp': event.timestamp, 'event_type': event.event_type } # 添加到Redis队列 self.redis_client.lpush(self.event_queue, json.dumps(event_data)) logger.info(f"✅ 事件已添加到队列: {event.id}") except Exception as e: logger.error(f"❌ 添加事件失败: {e}") def get_batch(self) -> List[StreamEvent]: """获取一批事件""" try: # 批量获取事件 events_data = self.redis_client.brpop(self.event_queue, timeout=self.batch_timeout) if not events_data: return [] # 解析事件 events = [] for event_data in events_data[1:]: # 跳过队列名 event_dict = json.loads(event_data) event = StreamEvent( id=event_dict['id'], vector=event_dict['vector'], payload=event_dict['payload'], timestamp=event_dict['timestamp'], event_type=event_dict['event_type'] ) events.append(event) return events except Exception as e: logger.error(f"❌ 获取批量事件失败: {e}") return [] def get_event_count(self) -> int: """获取队列中事件数量""" try: return self.redis_client.llen(self.event_queue) except Exception as e: logger.error(f"❌ 获取事件数量失败: {e}") return 0 def clear_events(self): """清空事件队列""" try: self.redis_client.delete(self.event_queue) logger.info("✅ 事件队列已清空") except Exception as e: logger.error(f"❌ 清空事件队列失败: {e}") class StreamProcessor: """流式数据处理器""" def __init__(self, qdrant_client: QdrantClient, collector: StreamDataCollector): self.qdrant_client = qdrant_client self.collector = collector self.collection_name = "real_time_streaming" self.processed_events = 0 self.start_time = time.time() def create_stream_collection(self): """创建流式处理的Collection""" try: # 检查Collection是否已存在 existing_collections = self.qdrant_client.get_collections() collection_exists = any( col.name == self.collection_name for col in existing_collections.collections ) if collection_exists: logger.info(f"⚠️ Collection '{self.collection_name}' 已存在") return # 创建Collection配置 config = models.CreateCollection( vectors=models.VectorParams( size=384, # 向量维度 distance=Distance.COSINE ), # 流式处理优化配置 hnsw_config=models.HnswConfigDiff( ef=200, # 搜索深度 m=16 # 连接数 ), # 实时索引配置 optimizers_config=models.OptimizersConfigDiff( deleted_threshold=0.2, # 删除阈值 vacuum_min_vector_number=1000, # 最小向量数 ) ) # 执行创建 self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=config.vectors, hnsw_config=config.hnsw_config, optimizers_config=config.optimizers_config ) logger.info(f"✅ 流式处理Collection已创建: {self.collection_name}") except Exception as e: logger.error(f"❌ 创建Collection失败: {e}") raise def process_events_batch(self, events: List[StreamEvent]): """批量处理事件""" if not events: return try: # 按事件类型分类 insert_points = [] update_points = [] delete_ids = [] for event in events: if event.event_type == 'insert': point_struct = PointStruct( id=int(event.id), vector=event.vector, payload=event.payload ) insert_points.append(point_struct) elif event.event_type == 'update': point_struct = PointStruct( id=int(event.id), vector=event.vector, payload=event.payload ) update_points.append(point_struct) elif event.event_type == 'delete': delete_ids.append(int(event.id)) # 执行批量操作 if insert_points: self.qdrant_client.upsert( collection_name=self.collection_name, points=insert_points ) logger.info(f"✅ 批量插入 {len(insert_points)} 个点") if update_points: self.qdrant_client.upsert( collection_name=self.collection_name, points=update_points ) logger.info(f"✅ 批量更新 {len(update_points)} 个点") if delete_ids: self.qdrant_client.delete( collection_name=self.collection_name, points_selector=models.PointIdsList(points=delete_ids) ) logger.info(f"✅ 批量删除 {len(delete_ids)} 个点") # 更新统计 self.processed_events += len(events) # 记录处理时间 processing_time = time.time() - self.start_time qps = self.processed_events / processing_time if processing_time > 0 else 0 logger.info(f"📊 已处理 {self.processed_events} 个事件,QPS: {qps:.2f}") except Exception as e: logger.error(f"❌ 批量处理事件失败: {e}") raise async def start_stream_processing(self): """启动流式处理""" logger.info("🚀 开始流式数据处理...") while True: try: # 获取批量事件 events = self.collector.get_batch() if events: # 处理事件 self.process_events_batch(events) else: # 没有事件时短暂休眠 await asyncio.sleep(0.1) except Exception as e: logger.error(f"❌ 流式处理错误: {e}") await asyncio.sleep(1) # 错误时等待1秒再重试 def get_collection_stats(self): """获取Collection统计信息""" try: # 获取Collection信息 collection_info = self.qdrant_client.get_collection(self.collection_name) # 获取向量统计 vectors_info = self.qdrant_client.get_collection_vectors(self.collection_name) stats = { 'name': self.collection_name, 'status': collection_info.status, 'vector_count': collection_info.vectors_count, 'vector_config': collection_info.config.params.vectors, 'segments_count': len(collection_info.config.params.hnsw_config), 'optimizers_config': collection_info.config.params.optimizers_config } return stats except Exception as e: logger.error(f"❌ 获取Collection统计失败: {e}") return None
class RealTimeIndexManager: """实时索引管理器""" def __init__(self, qdrant_client: QdrantClient): self.qdrant_client = qdrant_client self.collection_name = "real_time_indexing" def create_index_with_filtering(self): """创建带过滤功能的索引""" try: # 创建Collection self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=384, distance=Distance.COSINE ) ) # 创建字段索引用于过滤 self.qdrant_client.create_field_index( collection_name=self.collection_name, field_name="category", field_schema=models.FieldSchema( name="category", data_type=models.DataType.KEYWORD ) ) self.qdrant_client.create_field_index( collection_name=self.collection_name, field_name="priority", field_schema=models.FieldSchema( name="priority", data_type=models.DataType.INTEGER ) ) logger.info(f"✅ 实时索引已创建: {self.collection_name}") except Exception as e: logger.error(f"❌ 创建实时索引失败: {e}") raise def rebuild_index_incremental(self, from_timestamp: float): """增量重建索引""" try: # 获取指定时间之后的数据 scroll_request = models.ScrollRequest( collection_name=self.collection_name, scroll_filter=models.Filter( must=[ models.FieldCondition( key="timestamp", range=models.RangeRange( gte=from_timestamp ) ) ] ), limit=1000 ) # 执行滚动查询 scroll_result = self.qdrant_client.scroll(scroll_request) # 重建索引逻辑 updated_points = [] for point in scroll_result.result: # 更新索引信息 point.payload['_index_version'] = time.time() updated_points.append(point) # 批量更新 if updated_points: self.qdrant_client.upsert( collection_name=self.collection_name, points=updated_points ) logger.info(f"✅ 增量重建索引完成,更新了 {len(updated_points)} 个点") except Exception as e: logger.error(f"❌ 增量重建索引失败: {e}") raise def optimize_index_performance(self): """优化索引性能""" try: # 调整HNSW参数 hnsw_config = models.HnswConfigDiff( ef=300, # 增加搜索深度 m=32 # 增加连接数 ) self.qdrant_client.update_collection( collection_name=self.collection_name, hnsw_config=hnsw_config ) # 调整优化器配置 optimizers_config = models.OptimizersConfigDiff( deleted_threshold=0.1, # 降低删除阈值 vacuum_min_vector_number=500, # 降低最小向量数 default_segment_number=4 # 调整分段数量 ) self.qdrant_client.update_collection( collection_name=self.collection_name, optimizers_config=optimizers_config ) logger.info("✅ 索引性能优化完成") except Exception as e: logger.error(f"❌ 索引性能优化失败: {e}") raise def monitor_index_health(self): """监控索引健康状态""" try: # 获取Collection信息 collection_info = self.qdrant_client.get_collection(self.collection_name) # 计算健康指标 health_metrics = { 'status': collection_info.status, 'vector_count': collection_info.vectors_count, 'segments_count': len(collection_info.config.params.hnsw_config), 'optimization_status': collection_info.optimization_status, 'config': { 'hnsw_ef': collection_info.config.params.hnsw_config.ef, 'hnsw_m': collection_info.config.params.hnsw_config.m, 'deleted_threshold': collection_info.config.params.optimizers_config.deleted_threshold, 'vacuum_threshold': collection_info.config.params.optimizers_config.vacuum_min_vector_number } } # 评估健康状态 health_score = 100 if collection_info.status != 0: health_score -= 30 if collection_info.vectors_count == 0: health_score -= 20 segments_count = len(collection_info.config.params.hnsw_config) if segments_count > 10: health_score -= (segments_count - 10) * 5 # 添加健康状态评估 health_metrics['health_score'] = health_score health_metrics['health_status'] = 'healthy' if health_score > 70 else 'warning' if health_score > 40 else 'critical' return health_metrics except Exception as e: logger.error(f"❌ 监控索引健康失败: {e}") return None
A:
A:
A:
A:
A:
通过本节的详细讲解,我们掌握了Qdrant流式数据处理的完整方法:
流式处理为Qdrant提供了强大的实时数据能力,适合现代AI应用对实时性的严格要求。下一节我们将探讨内存优化与性能调优策略。