7.8 近实时搜索 (NRT Near Real-Time Search) Elasticsearch 近实时搜索 (NRT Near Real-Time Search) 详解及实践 NRT 的核心原理 Elasticsearch 的 NRT 基于以下几个关键组件和流程: 内存缓冲区 (Memory Buffer): 当文档被索引时,它们首先被添加到节点的内存缓冲区中。这个缓冲区允许快速写入操作,而无需立即刷新到磁盘。 Refresh: Refresh 操作将内存缓冲区中的文档刷新到新的 segment,并将 segment 写入文件系统缓存 (file system cache)。 这些 segment 仍然是临时的,但已经可以被搜索到。Refresh 操作是控制 NRT 延迟的关键。
Elasticsearch 的 NRT 基于以下几个关键组件和流程:
内存缓冲区 (Memory Buffer): 当文档被索引时,它们首先被添加到节点的内存缓冲区中。这个缓冲区允许快速写入操作,而无需立即刷新到磁盘。
Refresh: Refresh 操作将内存缓冲区中的文档刷新到新的 segment,并将 segment 写入文件系统缓存 (file system cache)。 这些 segment 仍然是临时的,但已经可以被搜索到。Refresh 操作是控制 NRT 延迟的关键。
Translog: Translog (transaction log) 是一个持久化的事务日志,用于记录每次索引、更新或删除操作。 它确保了即使在发生故障的情况下,数据也不会丢失。
Flush: Flush 操作将内存中的 segment 持久化到磁盘,并将 translog 清空。 Flush 操作通常比 Refresh 操作更耗时。
Segment: Segment 是 Lucene (Elasticsearch 底层使用的搜索引擎库) 中的基本存储单元。 每个 segment 都是一个不可变的倒排索引,包含文档的子集。
Merge: 随着时间的推移,会创建许多小的 segment。 Merge 操作将这些小的 segment 合并成更大的 segment,以提高搜索效率。
可以用以下 Mermaid 图来表示这个流程:
流程说明:
Index Request: 客户端发送索引请求。
Memory Buffer: 文档首先存储在内存缓冲区中。
Refresh?: 系统定期检查是否需要执行 Refresh 操作。
New Segment in File System Cache: 如果需要 Refresh,则将内存缓冲区中的文档刷新到新的 segment,并写入文件系统缓存。
Search Request: 客户端发送搜索请求。
Flush?: 系统定期检查是否需要执行 Flush 操作。
Segment on Disk: 如果需要 Flush,则将内存中的 segment 持久化到磁盘。
Merge?: 系统定期检查是否需要执行 Merge 操作。
Merged Segment: 如果需要 Merge,则将多个小的 segment 合并成一个更大的 segment。
Translog: 所有索引操作都会记录到 Translog 中,用于数据恢复。
refresh_intervalElasticsearch 提供了 refresh_interval 设置来控制 Refresh 操作的频率,从而控制 NRT 延迟。 refresh_interval 可以在索引级别或动态索引设置中配置。
refresh_interval。PUT /my_index { "settings": { "index": { "refresh_interval": "1s" } } }
refresh_interval。PUT /my_index/_settings { "index": { "refresh_interval": "5s" } }
refresh_interval 的值:
正数 (例如 "1s", "30s"): 指定 Refresh 操作的频率。 值越小,NRT 延迟越低,但会增加 CPU 和 I/O 负载。
"-1": 禁用自动 Refresh 操作。 在这种情况下,必须手动执行 Refresh 操作。
null (默认值): Elasticsearch 会根据负载自动调整 Refresh 频率。
手动 Refresh:
可以使用 _refresh API 手动触发 Refresh 操作。
POST /my_index/_refresh
以下是一些使用 Java High Level REST Client 进行 NRT 相关的代码实践示例。
3.1 索引文档并立即搜索:
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class NRTExample { public static void main(String[] args) throws IOException, InterruptedException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_nrt_index"; // 1. 索引文档 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "Alice"); jsonMap.put("message", "Hello Elasticsearch!"); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); System.out.println("Index Response: " + indexResponse.toString()); // 2. 稍微等待,确保 refresh 发生 Thread.sleep(2000); // 等待 2 秒,可以根据 refresh_interval 调整 // 3. 执行搜索 SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Elasticsearch")); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("Search Response: " + searchResponse.toString()); client.close(); } }
代码说明:
索引文档: 使用 IndexRequest 将包含 "user" 和 "message" 字段的文档索引到 "my_nrt_index" 索引中。
等待: Thread.sleep(2000) 用于等待 2 秒,以确保 Refresh 操作发生。 这个等待时间应该根据 refresh_interval 的设置进行调整。 如果 refresh_interval 设置为 1 秒,那么等待 2 秒就足够了。
执行搜索: 使用 SearchRequest 和 SearchSourceBuilder 构建搜索请求,并使用 matchQuery 查找包含 "Elasticsearch" 的文档。
输出结果: 打印索引和搜索响应,以便查看结果。
注意:
需要添加 Elasticsearch Java High Level REST Client 的依赖。
需要替换代码中的 Elasticsearch 连接信息。
根据实际情况调整等待时间。
3.2 手动 Refresh:
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.RefreshRequest; import org.elasticsearch.client.indices.RefreshResponse; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class ManualRefreshExample { public static void main(String[] args) throws IOException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_nrt_index"; // 1. 索引文档 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "Bob"); jsonMap.put("message", "Manual Refresh Example"); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); System.out.println("Index Response: " + indexResponse.toString()); // 2. 手动执行 Refresh RefreshRequest refreshRequest = new RefreshRequest(indexName); RefreshResponse refreshResponse = client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); System.out.println("Refresh Response: " + refreshResponse.toString()); // 3. 执行搜索 SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Manual Refresh")); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("Search Response: " + searchResponse.toString()); client.close(); } }
代码说明:
索引文档: 与上一个示例类似,索引一个包含 "user" 和 "message" 字段的文档。
手动 Refresh: 使用 RefreshRequest 和 client.indices().refresh() 手动触发 Refresh 操作。 这确保了索引的文档可以立即被搜索到,而无需等待 refresh_interval 到期。
执行搜索: 执行搜索并打印结果。
3.3 批量索引与 Refresh:
在批量索引数据时,频繁的 Refresh 操作会降低性能。 建议在批量索引完成后执行一次 Refresh 操作。
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.RefreshRequest; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class BulkIndexWithRefreshExample { public static void main(String[] args) throws IOException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_bulk_index"; // 1. 创建 BulkRequest BulkRequest bulkRequest = new BulkRequest(); // 2. 添加多个索引请求 for (int i = 0; i < 10; i++) { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("id", i); jsonMap.put("data", "Bulk data " + i); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); bulkRequest.add(indexRequest); } // 3. 执行 BulkRequest BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); System.out.println("Bulk Response: " + bulkResponse.toString()); // 4. 执行 Refresh RefreshRequest refreshRequest = new RefreshRequest(indexName); client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); System.out.println("Refresh completed after bulk indexing."); client.close(); } }
代码说明:
创建 BulkRequest: 创建一个 BulkRequest 对象,用于批量执行索引操作。
添加多个索引请求: 循环创建 10 个 IndexRequest 对象,并将它们添加到 BulkRequest 中。
执行 BulkRequest: 使用 client.bulk() 执行 BulkRequest。
执行 Refresh: 在批量索引完成后,执行一次 RefreshRequest,以确保所有文档都可以被搜索到。
虽然低 NRT 延迟对于某些应用至关重要,但也需要权衡。 更频繁的 Refresh 操作会增加 CPU 和 I/O 负载,并可能影响索引性能。 因此,需要根据实际需求和性能要求来调整 refresh_interval。
以下是一些建议:
高吞吐量索引: 如果需要高吞吐量索引,可以适当增加 refresh_interval 或禁用自动 Refresh,并在批量索引完成后手动执行 Refresh。
低延迟搜索: 如果需要低延迟搜索,可以减小 refresh_interval。
监控: 监控 Elasticsearch 的 CPU、I/O 和索引性能,以便找到最佳的 refresh_interval 设置。
Elasticsearch 的 NRT 特性提供了快速搜索能力,允许在文档索引后几乎立即搜索到它们。 通过理解 NRT 的工作原理、控制 refresh_interval、以及合理使用手动 Refresh,可以优化 Elasticsearch 的性能并满足不同的应用需求。 在实际应用中,需要根据具体的业务场景和性能要求,权衡 NRT 延迟和系统负载,找到最佳的配置方案。 通过本文提供的代码实践,可以更好地理解和应用 Elasticsearch 的 NRT 特性。