7.8 近实时搜索 (NRT Near Real-Time Search)


文档摘要

7.8 近实时搜索 (NRT Near Real-Time Search) Elasticsearch 近实时搜索 (NRT Near Real-Time Search) 详解及实践 NRT 的核心原理 Elasticsearch 的 NRT 基于以下几个关键组件和流程: 内存缓冲区 (Memory Buffer): 当文档被索引时,它们首先被添加到节点的内存缓冲区中。这个缓冲区允许快速写入操作,而无需立即刷新到磁盘。 Refresh: Refresh 操作将内存缓冲区中的文档刷新到新的 segment,并将 segment 写入文件系统缓存 (file system cache)。 这些 segment 仍然是临时的,但已经可以被搜索到。Refresh 操作是控制 NRT 延迟的关键。

7.8 近实时搜索 (NRT Near Real-Time Search)

Elasticsearch 近实时搜索 (NRT Near Real-Time Search) 详解及实践

1. NRT 的核心原理

Elasticsearch 的 NRT 基于以下几个关键组件和流程:

  • 内存缓冲区 (Memory Buffer): 当文档被索引时,它们首先被添加到节点的内存缓冲区中。这个缓冲区允许快速写入操作,而无需立即刷新到磁盘。

  • Refresh: Refresh 操作将内存缓冲区中的文档刷新到新的 segment,并将 segment 写入文件系统缓存 (file system cache)。 这些 segment 仍然是临时的,但已经可以被搜索到。Refresh 操作是控制 NRT 延迟的关键。

  • Translog: Translog (transaction log) 是一个持久化的事务日志,用于记录每次索引、更新或删除操作。 它确保了即使在发生故障的情况下,数据也不会丢失。

  • Flush: Flush 操作将内存中的 segment 持久化到磁盘,并将 translog 清空。 Flush 操作通常比 Refresh 操作更耗时。

  • Segment: Segment 是 Lucene (Elasticsearch 底层使用的搜索引擎库) 中的基本存储单元。 每个 segment 都是一个不可变的倒排索引,包含文档的子集。

  • Merge: 随着时间的推移,会创建许多小的 segment。 Merge 操作将这些小的 segment 合并成更大的 segment,以提高搜索效率。

可以用以下 Mermaid 图来表示这个流程:

流程说明:

  1. Index Request: 客户端发送索引请求。

  2. Memory Buffer: 文档首先存储在内存缓冲区中。

  3. Refresh?: 系统定期检查是否需要执行 Refresh 操作。

  4. New Segment in File System Cache: 如果需要 Refresh,则将内存缓冲区中的文档刷新到新的 segment,并写入文件系统缓存。

  5. Search Request: 客户端发送搜索请求。

  6. Flush?: 系统定期检查是否需要执行 Flush 操作。

  7. Segment on Disk: 如果需要 Flush,则将内存中的 segment 持久化到磁盘。

  8. Merge?: 系统定期检查是否需要执行 Merge 操作。

  9. Merged Segment: 如果需要 Merge,则将多个小的 segment 合并成一个更大的 segment。

  10. Translog: 所有索引操作都会记录到 Translog 中,用于数据恢复。

2. 控制 NRT 延迟:refresh_interval

Elasticsearch 提供了 refresh_interval 设置来控制 Refresh 操作的频率,从而控制 NRT 延迟。 refresh_interval 可以在索引级别或动态索引设置中配置。

  • 索引级别设置: 在创建索引时设置 refresh_interval
PUT /my_index { "settings": { "index": { "refresh_interval": "1s" } } }
  • 动态索引设置: 动态更新 refresh_interval
PUT /my_index/_settings { "index": { "refresh_interval": "5s" } }

refresh_interval 的值:

  • 正数 (例如 "1s", "30s"): 指定 Refresh 操作的频率。 值越小,NRT 延迟越低,但会增加 CPU 和 I/O 负载。

  • "-1": 禁用自动 Refresh 操作。 在这种情况下,必须手动执行 Refresh 操作。

  • null (默认值): Elasticsearch 会根据负载自动调整 Refresh 频率。

手动 Refresh:

可以使用 _refresh API 手动触发 Refresh 操作。

POST /my_index/_refresh

3. NRT 相关的代码实践

以下是一些使用 Java High Level REST Client 进行 NRT 相关的代码实践示例。

3.1 索引文档并立即搜索:

import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class NRTExample { public static void main(String[] args) throws IOException, InterruptedException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_nrt_index"; // 1. 索引文档 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "Alice"); jsonMap.put("message", "Hello Elasticsearch!"); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); System.out.println("Index Response: " + indexResponse.toString()); // 2. 稍微等待,确保 refresh 发生 Thread.sleep(2000); // 等待 2 秒,可以根据 refresh_interval 调整 // 3. 执行搜索 SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Elasticsearch")); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("Search Response: " + searchResponse.toString()); client.close(); } }

代码说明:

  1. 索引文档: 使用 IndexRequest 将包含 "user" 和 "message" 字段的文档索引到 "my_nrt_index" 索引中。

  2. 等待: Thread.sleep(2000) 用于等待 2 秒,以确保 Refresh 操作发生。 这个等待时间应该根据 refresh_interval 的设置进行调整。 如果 refresh_interval 设置为 1 秒,那么等待 2 秒就足够了。

  3. 执行搜索: 使用 SearchRequestSearchSourceBuilder 构建搜索请求,并使用 matchQuery 查找包含 "Elasticsearch" 的文档。

  4. 输出结果: 打印索引和搜索响应,以便查看结果。

注意:

  • 需要添加 Elasticsearch Java High Level REST Client 的依赖。

  • 需要替换代码中的 Elasticsearch 连接信息。

  • 根据实际情况调整等待时间。

3.2 手动 Refresh:

import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.RefreshRequest; import org.elasticsearch.client.indices.RefreshResponse; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class ManualRefreshExample { public static void main(String[] args) throws IOException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_nrt_index"; // 1. 索引文档 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "Bob"); jsonMap.put("message", "Manual Refresh Example"); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); System.out.println("Index Response: " + indexResponse.toString()); // 2. 手动执行 Refresh RefreshRequest refreshRequest = new RefreshRequest(indexName); RefreshResponse refreshResponse = client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); System.out.println("Refresh Response: " + refreshResponse.toString()); // 3. 执行搜索 SearchRequest searchRequest = new SearchRequest(indexName); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("message", "Manual Refresh")); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("Search Response: " + searchResponse.toString()); client.close(); } }

代码说明:

  1. 索引文档: 与上一个示例类似,索引一个包含 "user" 和 "message" 字段的文档。

  2. 手动 Refresh: 使用 RefreshRequestclient.indices().refresh() 手动触发 Refresh 操作。 这确保了索引的文档可以立即被搜索到,而无需等待 refresh_interval 到期。

  3. 执行搜索: 执行搜索并打印结果。

3.3 批量索引与 Refresh:

在批量索引数据时,频繁的 Refresh 操作会降低性能。 建议在批量索引完成后执行一次 Refresh 操作。

import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.RefreshRequest; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class BulkIndexWithRefreshExample { public static void main(String[] args) throws IOException { // 替换成你的 Elasticsearch 连接信息 RestHighLevelClient client = new RestHighLevelClientBuilder().build(); String indexName = "my_bulk_index"; // 1. 创建 BulkRequest BulkRequest bulkRequest = new BulkRequest(); // 2. 添加多个索引请求 for (int i = 0; i < 10; i++) { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("id", i); jsonMap.put("data", "Bulk data " + i); IndexRequest indexRequest = new IndexRequest(indexName) .source(jsonMap, XContentType.JSON); bulkRequest.add(indexRequest); } // 3. 执行 BulkRequest BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); System.out.println("Bulk Response: " + bulkResponse.toString()); // 4. 执行 Refresh RefreshRequest refreshRequest = new RefreshRequest(indexName); client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); System.out.println("Refresh completed after bulk indexing."); client.close(); } }

代码说明:

  1. 创建 BulkRequest: 创建一个 BulkRequest 对象,用于批量执行索引操作。

  2. 添加多个索引请求: 循环创建 10 个 IndexRequest 对象,并将它们添加到 BulkRequest 中。

  3. 执行 BulkRequest: 使用 client.bulk() 执行 BulkRequest

  4. 执行 Refresh: 在批量索引完成后,执行一次 RefreshRequest,以确保所有文档都可以被搜索到。

4. NRT 的权衡

虽然低 NRT 延迟对于某些应用至关重要,但也需要权衡。 更频繁的 Refresh 操作会增加 CPU 和 I/O 负载,并可能影响索引性能。 因此,需要根据实际需求和性能要求来调整 refresh_interval

以下是一些建议:

  • 高吞吐量索引: 如果需要高吞吐量索引,可以适当增加 refresh_interval 或禁用自动 Refresh,并在批量索引完成后手动执行 Refresh。

  • 低延迟搜索: 如果需要低延迟搜索,可以减小 refresh_interval

  • 监控: 监控 Elasticsearch 的 CPU、I/O 和索引性能,以便找到最佳的 refresh_interval 设置。

5. 总结

Elasticsearch 的 NRT 特性提供了快速搜索能力,允许在文档索引后几乎立即搜索到它们。 通过理解 NRT 的工作原理、控制 refresh_interval、以及合理使用手动 Refresh,可以优化 Elasticsearch 的性能并满足不同的应用需求。 在实际应用中,需要根据具体的业务场景和性能要求,权衡 NRT 延迟和系统负载,找到最佳的配置方案。 通过本文提供的代码实践,可以更好地理解和应用 Elasticsearch 的 NRT 特性。


发布者: 作者: 转发
评论区 (0)
U