基础API使用:掌握Annoy核心功能 AnnoyIndex类详解 AnnoyIndex是Annoy库的核心类,提供了构建和查询向量索引的所有功能。理解这个类的设计和使用方法是掌握Annoy的关键。 类初始化 参数说明: :向量维度,必须是正整数,一旦设定不可更改 :距离度量方式,支持以下选项: :角度距离(余弦相似度),适合文本和推荐场景 :欧氏距离,适合图像和几何数据 :曼哈顿距离,适合稀疏数据 :汉明距离,适合二进制数据 :点积距离,适合特定的机器学习场景 初始化示例: 核心API方法详解 添加向量数据 additem()方法: 这是构建索引的基础方法,用于向索引中添加向量数据。
AnnoyIndex是Annoy库的核心类,提供了构建和查询向量索引的所有功能。理解这个类的设计和使用方法是掌握Annoy的关键。
from annoy import AnnoyIndex # 基本初始化 index = AnnoyIndex(f, metric)
参数说明:
f:向量维度,必须是正整数,一旦设定不可更改metric:距离度量方式,支持以下选项:
'angular':角度距离(余弦相似度),适合文本和推荐场景'euclidean':欧氏距离,适合图像和几何数据'manhattan':曼哈顿距离,适合稀疏数据'hamming':汉明距离,适合二进制数据'dot':点积距离,适合特定的机器学习场景初始化示例:
# 创建128维向量的角度距离索引 text_index = AnnoyIndex(128, 'angular') # 创建512维向量的欧氏距离索引 image_index = AnnoyIndex(512, 'euclidean') # 创建64位二进制向量的汉明距离索引 binary_index = AnnoyIndex(64, 'hamming')
add_item()方法:
index.add_item(i, vector)
这是构建索引的基础方法,用于向索引中添加向量数据。
参数详解:
i:向量的唯一标识符,必须是非负整数vector:向量数据,可以是list、numpy数组或其他可迭代对象使用示例:
import numpy as np from annoy import AnnoyIndex # 创建索引 f = 10 # 向量维度 index = AnnoyIndex(f, 'angular') # 方法1:使用Python列表 vector1 = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] index.add_item(0, vector1) # 方法2:使用NumPy数组 vector2 = np.random.normal(size=f) index.add_item(1, vector2) # 方法3:批量添加 for i in range(1000): vector = np.random.normal(size=f) index.add_item(i, vector)
注意事项:
build()方法:
index.build(n_trees, n_jobs=-1)
构建索引是Annoy使用流程中的关键步骤,将添加的向量数据组织成可搜索的树结构。
参数详解:
n_trees:构建的树的数量,影响查询精度和构建时间n_jobs:并行构建的线程数,-1表示使用所有可用CPU核心参数选择指南:
# 根据数据规模选择树的数量 data_size = 100000 if data_size < 10000: n_trees = 10 elif data_size < 100000: n_trees = 20 else: n_trees = 50 # 构建索引 index.build(n_trees)
构建过程监控:
import time print("Building index...") start_time = time.time() index.build(20) build_time = time.time() - start_time print(f"Index built in {build_time:.2f} seconds")
Annoy提供了多种查询方法来满足不同的使用需求。
get_nns_by_item()方法:
neighbors = index.get_nns_by_item(item_id, n, search_k=-1, include_distances=False)
根据已存在的向量ID查找最近邻。
参数说明:
item_id:查询向量的IDn:返回的最近邻数量search_k:搜索的候选数量,-1表示使用默认值include_distances:是否返回距离信息使用示例:
# 基本查询 neighbors = index.get_nns_by_item(0, 10) print(f"Item 0's 10 nearest neighbors: {neighbors}") # 包含距离信息的查询 neighbors, distances = index.get_nns_by_item(0, 10, include_distances=True) print(f"Neighbors: {neighbors}") print(f"Distances: {distances}")
get_nns_by_vector()方法:
neighbors = index.get_nns_by_vector(vector, n, search_k=-1, include_distances=False)
根据给定的向量查找最近邻,适用于查询不在索引中的新向量。
使用示例:
# 查询新向量 query_vector = np.random.normal(size=f) neighbors = index.get_nns_by_vector(query_vector, 5) print(f"Query vector's 5 nearest neighbors: {neighbors}") # 实时查询示例 def real_time_search(query_vector, top_k=10): """实时搜索函数""" start_time = time.time() neighbors, distances = index.get_nns_by_vector( query_vector, top_k, include_distances=True ) query_time = time.time() - start_time return { 'neighbors': neighbors, 'distances': distances, 'query_time': query_time }
get_item_vector()方法:
vector = index.get_item_vector(item_id)
根据ID获取存储在索引中的向量数据。
使用示例:
# 获取特定向量 original_vector = index.get_item_vector(0) print(f"Vector 0: {original_vector}") # 验证向量完整性 def verify_vectors(index, sample_ids): """验证向量数据完整性""" for item_id in sample_ids: try: vector = index.get_item_vector(item_id) print(f"✓ Vector {item_id}: {len(vector)} dimensions") except Exception as e: print(f"✗ Vector {item_id}: Error - {e}")
get_distance()方法:
distance = index.get_distance(i, j)
计算索引中两个向量之间的距离。
使用示例:
# 计算两个向量间的距离 dist = index.get_distance(0, 1) print(f"Distance between item 0 and 1: {dist}") # 构建距离矩阵 def build_distance_matrix(index, item_ids): """构建距离矩阵""" n = len(item_ids) distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(i+1, n): dist = index.get_distance(item_ids[i], item_ids[j]) distance_matrix[i][j] = dist distance_matrix[j][i] = dist return distance_matrix
save()方法:
index.save(filename)
将构建好的索引保存到磁盘文件。
使用示例:
# 保存索引 index_file = "my_index.ann" index.save(index_file) print(f"Index saved to {index_file}") # 检查文件大小 import os file_size = os.path.getsize(index_file) print(f"Index file size: {file_size / (1024*1024):.2f} MB")
load()方法:
index.load(filename, prefault=False)
从磁盘文件加载已保存的索引。
参数说明:
filename:索引文件路径prefault:是否预加载所有页面到内存使用示例:
# 加载索引 new_index = AnnoyIndex(f, 'angular') new_index.load(index_file) # 验证加载成功 print(f"Loaded index with {new_index.get_n_items()} items") # 预加载优化 large_index = AnnoyIndex(f, 'angular') large_index.load(index_file, prefault=True) # 适合频繁查询的场景
# 获取向量数量 n_items = index.get_n_items() print(f"Number of items: {n_items}") # 获取向量维度 n_dims = index.f print(f"Vector dimension: {n_dims}") # 获取树的数量(仅在构建后可用) n_trees = index.get_n_trees() print(f"Number of trees: {n_trees}")
def check_index_status(index): """检查索引状态""" try: n_items = index.get_n_items() if n_items == 0: return "Empty index" # 尝试查询以检查是否已构建 try: index.get_nns_by_item(0, 1) return f"Ready - {n_items} items indexed" except: return f"Not built - {n_items} items added but not indexed" except: return "Invalid index" print(f"Index status: {check_index_status(index)}")
以下是一个完整的Annoy使用示例,展示了从数据准备到查询的完整流程:
#!/usr/bin/env python3 """Annoy基础API完整使用示例""" import numpy as np import time from annoy import AnnoyIndex def create_sample_data(n_items=10000, n_dims=128): """创建示例数据""" print(f"Creating {n_items} random {n_dims}-dimensional vectors...") # 生成随机向量数据 vectors = [] for i in range(n_items): # 生成正态分布的随机向量 vector = np.random.normal(0, 1, n_dims) # 归一化向量(对angular距离很重要) vector = vector / np.linalg.norm(vector) vectors.append(vector) return vectors def build_index(vectors, metric='angular', n_trees=10): """构建Annoy索引""" n_dims = len(vectors[0]) print(f"Building index with {len(vectors)} vectors, {n_dims} dimensions...") # 创建索引 index = AnnoyIndex(n_dims, metric) # 添加向量 for i, vector in enumerate(vectors): index.add_item(i, vector) # 构建索引 start_time = time.time() index.build(n_trees) build_time = time.time() - start_time print(f"Index built in {build_time:.2f} seconds with {n_trees} trees") return index def query_examples(index, vectors): """查询示例""" print("\nRunning query examples...") # 示例1:根据ID查询 item_id = 0 neighbors = index.get_nns_by_item(item_id, 5, include_distances=True) print(f"Top 5 neighbors of item {item_id}:") for i, (neighbor, distance) in enumerate(zip(neighbors[0], neighbors[1])): print(f" {i+1}. Item {neighbor} (distance: {distance:.4f})") # 示例2:根据向量查询 query_vector = vectors[100] # 使用已知向量作为查询 neighbors = index.get_nns_by_vector(query_vector, 3, include_distances=True) print(f"\nTop 3 neighbors of query vector:") for i, (neighbor, distance) in enumerate(zip(neighbors[0], neighbors[1])): print(f" {i+1}. Item {neighbor} (distance: {distance:.4f})") # 示例3:性能测试 print(f"\nPerformance test:") query_times = [] for _ in range(100): start = time.time() index.get_nns_by_item(np.random.randint(0, index.get_n_items()), 10) query_times.append(time.time() - start) avg_time = np.mean(query_times) print(f"Average query time: {avg_time*1000:.2f}ms") print(f"Queries per second: {1/avg_time:.0f}") def main(): """主函数""" # 参数设置 n_items = 10000 n_dims = 128 n_trees = 20 # 创建数据 vectors = create_sample_data(n_items, n_dims) # 构建索引 index = build_index(vectors, 'angular', n_trees) # 保存索引 index_file = "example_index.ann" index.save(index_file) print(f"Index saved to {index_file}") # 加载索引(演示) new_index = AnnoyIndex(n_dims, 'angular') new_index.load(index_file) print(f"Index loaded successfully") # 查询示例 query_examples(new_index, vectors) # 索引信息 print(f"\nIndex information:") print(f" Items: {new_index.get_n_items()}") print(f" Dimensions: {n_dims}") print(f" Trees: {n_trees}") print(f" Metric: angular") if __name__ == "__main__": main()
这个完整示例展示了Annoy的核心API使用方法,包括数据准备、索引构建、持久化和查询等所有关键步骤。通过运行这个示例,您可以快速了解Annoy的基本使用流程和性能特征。
在下一章节中,我们将深入学习索引管理的高级技巧,包括索引优化、版本控制和生产环境的最佳实践。