Fix bug in deep pipeline agg serialization (#59984)
In #54716 I removed pipeline aggregators from the aggregation result tree and caused us to read them from the request. This saves a bunch of round trip bytes, which is neat. But there was a bug in the backwards compatibility logic. You see, we still have to give the pipeline aggregations to nodes older than 7.8 over the wire because that is how they know what pipelines to run. They have the pipelines in the request but they don't read them. They use the ones in the response tree. Anyway, we had a bug where we were never sending pipelines defined two levels down. So while you are upgrading the pipeline wouldn't run. Sometimes. If the data node of the "first" result was post-7.8 and the coordinating node was pre-7.8. This fixes the bug.
This commit is contained in:
parent
b3363cf8f9
commit
49f365ddfd
|
@ -121,3 +121,72 @@ setup:
|
||||||
bucket_sort:
|
bucket_sort:
|
||||||
sort:
|
sort:
|
||||||
- the_terms>the_max
|
- the_terms>the_max
|
||||||
|
|
||||||
|
---
|
||||||
|
"deep cumulative sum":
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: test
|
||||||
|
body:
|
||||||
|
settings:
|
||||||
|
number_of_replicas: 0
|
||||||
|
number_of_shards: 3
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
index: test
|
||||||
|
refresh: true
|
||||||
|
body:
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"t": "a", "int" : 1, "@timestamp" : "2020-07-16T00:00:00.000Z"}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"t": "a", "int" : 50, "@timestamp" : "2020-07-17T00:00:00.000Z"}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"t": "a", "int" : 99, "@timestamp" : "2020-07-18T00:00:00.000Z"}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"t": "b", "int" : 1, "@timestamp" : "2020-07-16T00:00:00.000Z"}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"t": "b", "int" : 99, "@timestamp" : "2020-07-17T00:00:00.000Z"}'
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
rest_total_hits_as_int: true
|
||||||
|
index: test
|
||||||
|
body:
|
||||||
|
size: 0
|
||||||
|
aggs:
|
||||||
|
t:
|
||||||
|
terms:
|
||||||
|
field: t.keyword
|
||||||
|
aggs:
|
||||||
|
by_date:
|
||||||
|
date_histogram:
|
||||||
|
field: "@timestamp"
|
||||||
|
fixed_interval: 1d
|
||||||
|
aggs:
|
||||||
|
avg:
|
||||||
|
avg:
|
||||||
|
field: int
|
||||||
|
sum:
|
||||||
|
cumulative_sum:
|
||||||
|
buckets_path: avg.value
|
||||||
|
- match: { hits.total: 5 }
|
||||||
|
- length: { aggregations.t.buckets: 2 }
|
||||||
|
- match: { aggregations.t.buckets.0.key: a }
|
||||||
|
- match: { aggregations.t.buckets.1.key: b }
|
||||||
|
- length: { aggregations.t.buckets.0.by_date.buckets: 3 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.0.key_as_string: "2020-07-16T00:00:00.000Z" }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.0.avg.value: 1 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.0.sum.value: 1 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.1.key_as_string: "2020-07-17T00:00:00.000Z" }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.1.avg.value: 50 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.1.sum.value: 51 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.2.key_as_string: "2020-07-18T00:00:00.000Z" }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.2.avg.value: 99 }
|
||||||
|
- match: { aggregations.t.buckets.0.by_date.buckets.2.sum.value: 150 }
|
||||||
|
- length: { aggregations.t.buckets.1.by_date.buckets: 2 }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.0.key_as_string: "2020-07-16T00:00:00.000Z" }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.0.avg.value: 1 }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.0.sum.value: 1 }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.1.key_as_string: "2020-07-17T00:00:00.000Z" }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.1.avg.value: 99 }
|
||||||
|
- match: { aggregations.t.buckets.1.by_date.buckets.1.sum.value: 100 }
|
||||||
|
|
|
@ -164,6 +164,16 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
|
||||||
* 7.8.0.
|
* 7.8.0.
|
||||||
*/
|
*/
|
||||||
public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
|
public final void mergePipelineTreeForBWCSerialization(PipelineAggregator.PipelineTree pipelineTree) {
|
||||||
|
if (pipelineAggregatorsForBwcSerialization != null) {
|
||||||
|
/*
|
||||||
|
* This method is called once per level on the results but only
|
||||||
|
* has useful pipeline aggregations on the top level. Every level
|
||||||
|
* below the top will always be empty. So if we've already been
|
||||||
|
* called we should bail. This is pretty messy but it is the kind
|
||||||
|
* of weird thing we have to do to deal with bwc serialization....
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators();
|
pipelineAggregatorsForBwcSerialization = pipelineTree.aggregators();
|
||||||
forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree));
|
forEachBucket(bucketAggs -> bucketAggs.mergePipelineTreeForBWCSerialization(pipelineTree));
|
||||||
}
|
}
|
||||||
|
|
|
@ -460,6 +460,14 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
|
||||||
assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree);
|
assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMergePipelineTreeTwice() {
|
||||||
|
T agg = createTestInstance();
|
||||||
|
PipelineAggregator.PipelineTree pipelineTree = randomPipelineTree(agg);
|
||||||
|
agg.mergePipelineTreeForBWCSerialization(pipelineTree);
|
||||||
|
agg.mergePipelineTreeForBWCSerialization(randomPipelineTree(agg)); // This should be ignored
|
||||||
|
assertMergedPipelineTreeForBWCSerialization(agg, pipelineTree);
|
||||||
|
}
|
||||||
|
|
||||||
public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) {
|
public static PipelineAggregator.PipelineTree randomPipelineTree(InternalAggregation aggregation) {
|
||||||
Map<String, PipelineTree> subTree = new HashMap<>();
|
Map<String, PipelineTree> subTree = new HashMap<>();
|
||||||
aggregation.forEachBucket(bucketAggs -> {
|
aggregation.forEachBucket(bucketAggs -> {
|
||||||
|
|
Loading…
Reference in New Issue