6.4 Pipeline Aggregations (管道聚合 依赖其他聚合结果) 6.4 Pipeline Aggregations (管道聚合): 依赖其他聚合结果的强大分析工具 Pipeline Aggregations(管道聚合)是 Elasticsearch 中一类特殊的聚合,它们不直接基于文档数据进行计算,而是依赖于其他聚合的结果。这种特性使得管道聚合能够执行更复杂、更高级的数据分析,例如计算移动平均值、差值、百分比变化等。 6.4.1 核心概念 依赖性: 管道聚合必须依赖于另一个聚合的结果。这个被依赖的聚合可以是桶聚合(Bucket Aggregation)或度量聚合(Metric Aggregation)。
Pipeline Aggregations(管道聚合)是 Elasticsearch 中一类特殊的聚合,它们不直接基于文档数据进行计算,而是依赖于其他聚合的结果。这种特性使得管道聚合能够执行更复杂、更高级的数据分析,例如计算移动平均值、差值、百分比变化等。
依赖性: 管道聚合必须依赖于另一个聚合的结果。这个被依赖的聚合可以是桶聚合(Bucket Aggregation)或度量聚合(Metric Aggregation)。
执行阶段: 管道聚合在所有其他聚合完成后执行,处理的是其他聚合的输出结果。
应用场景: 适用于需要基于聚合结果进行二次计算和分析的场景,例如趋势分析、异常检测、比率计算等。
Elasticsearch 提供了多种管道聚合,可以分为以下几类:
Parent Aggregations: 依赖于父级聚合的结果。
cumulative_sum: 计算累积总和。
moving_avg: 计算移动平均值。
Sibling Aggregations: 依赖于同级聚合的结果。
avg_bucket: 计算桶平均值。
max_bucket: 计算桶最大值。
min_bucket: 计算桶最小值。
sum_bucket: 计算桶总和。
stats_bucket: 计算桶统计信息(平均值、最大值、最小值、总和、方差、标准差)。
percentiles_bucket: 计算桶百分位数。
Derivative Aggregations: 计算导数(变化率)。
derivative: 计算导数。
cumulative_cardinality:计算累积基数。
为了更好地理解管道聚合,我们将通过一些实际的代码示例进行演示。
示例数据
假设我们有一个名为 sales 的索引,其中包含以下文档结构:
{ "date": "2023-10-26", "product": "A", "revenue": 100 }
1. cumulative_sum 聚合 (累积总和)
此聚合计算指定度量的累积总和。
GET sales/_search { "size": 0, "aggs": { "sales_by_date": { "date_histogram": { "field": "date", "calendar_interval": "day" }, "aggs": { "total_revenue": { "sum": { "field": "revenue" } }, "cumulative_revenue": { "cumulative_sum": { "buckets_path": "total_revenue" } } } } } }
sales_by_date: 按日期进行桶聚合。
total_revenue: 计算每个日期的总收入。
cumulative_revenue: cumulative_sum 管道聚合,buckets_path 指定了依赖的度量聚合 total_revenue。它会计算每天的累积总收入。
结果示例:
{ "aggregations": { "sales_by_date": { "buckets": [ { "key_as_string": "2023-10-26", "key": 1698278400000, "doc_count": 1, "total_revenue": { "value": 100 }, "cumulative_revenue": { "value": 100 } }, { "key_as_string": "2023-10-27", "key": 1698364800000, "doc_count": 1, "total_revenue": { "value": 150 }, "cumulative_revenue": { "value": 250 } }, { "key_as_string": "2023-10-28", "key": 1698451200000, "doc_count": 1, "total_revenue": { "value": 200 }, "cumulative_revenue": { "value": 450 } } ] } } }
2. moving_avg 聚合 (移动平均值)
此聚合计算指定度量的移动平均值。
GET sales/_search { "size": 0, "aggs": { "sales_by_date": { "date_histogram": { "field": "date", "calendar_interval": "day" }, "aggs": { "total_revenue": { "sum": { "field": "revenue" } }, "moving_average_revenue": { "moving_avg": { "buckets_path": "total_revenue", "window": 3, "model": "linear", "gap_policy": "skip" } } } } } }
window: 指定移动平均的窗口大小(例如,3 天)。
model: 指定移动平均的模型(例如,linear、simple、ewma)。
gap_policy: 指定如何处理缺失值(例如,skip、insert_zeros)。
结果示例:
{ "aggregations": { "sales_by_date": { "buckets": [ { "key_as_string": "2023-10-26", "key": 1698278400000, "doc_count": 1, "total_revenue": { "value": 100 }, "moving_average_revenue": { "value": 100 } }, { "key_as_string": "2023-10-27", "key": 1698364800000, "doc_count": 1, "total_revenue": { "value": 150 }, "moving_average_revenue": { "value": 125 } }, { "key_as_string": "2023-10-28", "key": 1698451200000, "doc_count": 1, "total_revenue": { "value": 200 }, "moving_average_revenue": { "value": 150 } }, { "key_as_string": "2023-10-29", "key": 1698537600000, "doc_count": 1, "total_revenue": { "value": 250 }, "moving_average_revenue": { "value": 200 } } ] } } }
3. derivative 聚合 (导数)
此聚合计算指定度量的一阶导数(变化率)。
GET sales/_search { "size": 0, "aggs": { "sales_by_date": { "date_histogram": { "field": "date", "calendar_interval": "day" }, "aggs": { "total_revenue": { "sum": { "field": "revenue" } }, "revenue_derivative": { "derivative": { "buckets_path": "total_revenue", "unit": "1d" } } } } } }
unit: 指定时间单位(例如,1d 表示一天)。结果示例:
{ "aggregations": { "sales_by_date": { "buckets": [ { "key_as_string": "2023-10-26", "key": 1698278400000, "doc_count": 1, "total_revenue": { "value": 100 }, "revenue_derivative": { "value": null } }, { "key_as_string": "2023-10-27", "key": 1698364800000, "doc_count": 1, "total_revenue": { "value": 150 }, "revenue_derivative": { "value": 50 } }, { "key_as_string": "2023-10-28", "key": 1698451200000, "doc_count": 1, "total_revenue": { "value": 200 }, "revenue_derivative": { "value": 50 } } ] } } }
4. Sibling Pipeline Aggregations 示例 (avg_bucket)
GET sales/_search { "size": 0, "aggs": { "product_sales": { "terms": { "field": "product" }, "aggs": { "total_revenue": { "sum": { "field": "revenue" } } } }, "average_product_revenue": { "avg_bucket": { "buckets_path": "product_sales>total_revenue" } } } }
product_sales: 按产品进行桶聚合,并计算每个产品的总收入。
average_product_revenue: avg_bucket 管道聚合,计算所有产品桶的平均收入。buckets_path 使用 > 符号来指定嵌套的聚合路径。
结果示例:
{ "aggregations": { "product_sales": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "A", "doc_count": 3, "total_revenue": { "value": 450 } }, { "key": "B", "doc_count": 2, "total_revenue": { "value": 300 } } ] }, "average_product_revenue": { "value": 375 } } }
buckets_path 语法buckets_path 是管道聚合中最重要的参数之一。它用于指定依赖的聚合路径。 语法格式如下:
aggregation_name: 引用顶层聚合的名称。
aggregation_name>aggregation_name: 引用嵌套聚合的名称,使用 > 分隔每一层聚合。
在使用管道聚合时,需要注意一些潜在的错误情况:
依赖的聚合不存在: 如果 buckets_path 指定的聚合不存在,Elasticsearch 会抛出异常。
数据类型不匹配: 管道聚合要求依赖的聚合返回的数据类型与自身期望的数据类型匹配。例如,cumulative_sum 期望依赖的聚合返回数值类型。
空桶: 如果依赖的聚合返回空桶,管道聚合可能会返回 null 或抛出异常,具体取决于管道聚合的类型和配置。
Pipeline Aggregations 是 Elasticsearch 中强大的数据分析工具,能够基于其他聚合的结果进行二次计算和分析。 通过掌握不同类型的管道聚合和 buckets_path 语法,可以实现更复杂、更高级的数据分析需求。 在实际应用中,需要根据具体的业务场景选择合适的管道聚合类型,并注意处理潜在的错误情况。