From fec882a45781fd4216f1012b6edae80f5947f6cc Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 5 Dec 2019 16:11:27 -0500 Subject: [PATCH] Decouple pipeline reductions from final agg reduction (#45796) Historically only two things happened in the final reduction: empty buckets were filled, and pipeline aggs were reduced (since it was the final reduction, this was safe). Usage of the final reduction is growing however. Auto-date-histo might need to perform many reductions on final-reduce to merge down buckets, CCS may need to side-step the final reduction if sending to a different cluster, etc Having pipelines generate their output in the final reduce was convenient, but is becoming increasingly difficult to manage as the rest of the agg framework advances. This commit decouples pipeline aggs from the final reduction by introducing a new "top level" reduce, which should be called at the beginning of the reduce cycle (e.g. from the SearchPhaseController). This will only reduce pipeline aggs on the final reduce after the non-pipeline agg tree has been fully reduced. By separating pipeline reduction into their own set of methods, aggregations are free to use the final reduction for whatever purpose without worrying about generating pipeline results which are non-reducible --- .../matrix/stats/InternalMatrixStats.java | 2 +- .../action/search/SearchPhaseController.java | 4 +- .../action/search/SearchResponseMerger.java | 2 +- .../aggregations/InternalAggregation.java | 24 +++++---- .../aggregations/InternalAggregations.java | 47 ++++++++++++++--- .../InternalMultiBucketAggregation.java | 27 +++++++++- .../InternalSingleBucketAggregation.java | 2 +- .../adjacency/InternalAdjacencyMatrix.java | 2 +- .../bucket/composite/InternalComposite.java | 2 +- .../bucket/filter/InternalFilters.java | 2 +- .../bucket/geogrid/InternalGeoGrid.java | 2 +- .../histogram/InternalAutoDateHistogram.java | 2 +- .../histogram/InternalDateHistogram.java | 2 +- .../bucket/histogram/InternalHistogram.java | 2 +- .../bucket/range/InternalBinaryRange.java | 2 +- .../bucket/range/InternalRange.java | 2 +- .../bucket/sampler/UnmappedSampler.java | 2 +- ...balOrdinalsSignificantTermsAggregator.java | 2 +- .../significant/InternalSignificantTerms.java | 5 +- .../significant/SignificantLongTerms.java | 11 ++-- .../SignificantLongTermsAggregator.java | 2 +- .../significant/SignificantStringTerms.java | 13 ++--- .../SignificantStringTermsAggregator.java | 2 +- .../SignificantTextAggregator.java | 2 +- .../significant/UnmappedSignificantTerms.java | 2 +- .../bucket/terms/DoubleTerms.java | 6 +-- .../bucket/terms/InternalMappedRareTerms.java | 2 +- .../bucket/terms/InternalRareTerms.java | 2 +- .../bucket/terms/InternalTerms.java | 2 +- .../aggregations/bucket/terms/LongTerms.java | 6 +-- .../bucket/terms/UnmappedRareTerms.java | 2 +- .../bucket/terms/UnmappedTerms.java | 2 +- .../AbstractInternalHDRPercentiles.java | 2 +- .../AbstractInternalTDigestPercentiles.java | 2 +- .../aggregations/metrics/InternalAvg.java | 2 +- .../metrics/InternalCardinality.java | 2 +- .../metrics/InternalExtendedStats.java | 4 +- .../metrics/InternalGeoBounds.java | 2 +- .../metrics/InternalGeoCentroid.java | 2 +- .../aggregations/metrics/InternalMax.java | 2 +- .../InternalMedianAbsoluteDeviation.java | 2 +- .../aggregations/metrics/InternalMin.java | 2 +- .../metrics/InternalScriptedMetric.java | 2 +- .../aggregations/metrics/InternalStats.java | 2 +- .../aggregations/metrics/InternalSum.java | 2 +- .../aggregations/metrics/InternalTopHits.java | 2 +- .../metrics/InternalValueCount.java | 2 +- .../metrics/InternalWeightedAvg.java | 2 +- .../pipeline/InternalBucketMetricValue.java | 2 +- .../pipeline/InternalExtendedStatsBucket.java | 2 +- .../pipeline/InternalPercentilesBucket.java | 2 +- .../pipeline/InternalSimpleValue.java | 2 +- .../pipeline/InternalStatsBucket.java | 2 +- .../InternalAggregationsTests.java | 6 +-- .../AutoDateHistogramAggregatorTests.java | 52 ++++++++++++++++++- .../InternalAutoDateHistogramTests.java | 1 - .../histogram/InternalHistogramTests.java | 2 +- .../SignificanceHeuristicTests.java | 10 ++-- .../SignificantLongTermsTests.java | 4 +- .../SignificantStringTermsTests.java | 4 +- .../bucket/terms/TermsAggregatorTests.java | 2 +- .../metrics/InternalAvgTests.java | 4 +- .../metrics/InternalExtendedStatsTests.java | 2 +- .../metrics/InternalStatsTests.java | 2 +- .../metrics/InternalSumTests.java | 2 +- .../pipeline/AvgBucketAggregatorTests.java | 2 +- .../aggregations/AggregatorTestCase.java | 9 +++- .../InternalSimpleLongValue.java | 2 +- .../stringstats/InternalStringStats.java | 2 +- .../RollupResponseTranslationTests.java | 2 +- .../extractor/TestMultiValueAggregation.java | 2 +- .../extractor/TestSingleValueAggregation.java | 2 +- 72 files changed, 223 insertions(+), 122 deletions(-) diff --git a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java index c70ddea7e3b..5fda5af7418 100644 --- a/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java +++ b/modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java @@ -233,7 +233,7 @@ public class InternalMatrixStats extends InternalAggregation implements MatrixSt } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { // merge stats across all shards List aggs = new ArrayList<>(aggregations); aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 0bd7afa7f61..27b5c9cf3b2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -487,7 +487,7 @@ public final class SearchPhaseController { } ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce); final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : - InternalAggregations.reduce(aggregationsList, reduceContext); + InternalAggregations.topLevelReduce(aggregationsList, reduceContext); final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size, reducedCompletionSuggestions); @@ -625,7 +625,7 @@ public final class SearchPhaseController { if (index == bufferSize) { if (hasAggs) { ReduceContext reduceContext = controller.reduceContextFunction.apply(false); - InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); Arrays.fill(aggsBuffer, null); aggsBuffer[0] = reducedAggs; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index 53afda59841..16092434ab1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -196,7 +196,7 @@ final class SearchResponseMerger { SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats); setSuggestShardIndex(shards, groupedSuggestions); Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true)); ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); //make failures ordering consistent between ordinary search and CCS by looking at the shard they come from diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 4b929c692ed..a7fe4635183 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -126,23 +126,25 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable return name; } + /** + * Creates the output from all pipeline aggs that this aggregation is associated with. Should only + * be called after all aggregations have been fully reduced + */ + public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) { + assert reduceContext.isFinalReduce(); + for (PipelineAggregator pipelineAggregator : pipelineAggregators) { + reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext); + } + return reducedAggs; + } + /** * Reduces the given aggregations to a single one and returns it. In most cases, the assumption will be the all given * aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing, * try reusing an existing instance (typically the first in the given list) to save on redundant object * construction. */ - public final InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - InternalAggregation aggResult = doReduce(aggregations, reduceContext); - if (reduceContext.isFinalReduce()) { - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - aggResult = pipelineAggregator.reduce(aggResult, reduceContext); - } - } - return aggResult; - } - - public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); + public abstract InternalAggregation reduce(List aggregations, ReduceContext reduceContext); /** * Return true if this aggregation is mapped, and can lead a reduction. If this agg returns diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index f076f40b32e..cdf7bdea9ec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * An internal implementation of {@link Aggregations}. @@ -98,10 +99,47 @@ public final class InternalAggregations extends Aggregations implements Writeabl return topLevelPipelineAggregators; } + @SuppressWarnings("unchecked") + private List getInternalAggregations() { + return (List) aggregations; + } + + /** + * Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by + * SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called + * as an intermediate reduction step (e.g. in the middle of an aggregation tree). + * + * This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline + * aggregations (both embedded parent/sibling as well as top-level sibling pipelines) + */ + public static InternalAggregations topLevelReduce(List aggregationsList, ReduceContext context) { + InternalAggregations reduced = reduce(aggregationsList, context); + if (reduced == null) { + return null; + } + + if (context.isFinalReduce()) { + List reducedInternalAggs = reduced.getInternalAggregations(); + reducedInternalAggs = reducedInternalAggs.stream() + .map(agg -> agg.reducePipelines(agg, context)) + .collect(Collectors.toList()); + + List topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators(); + for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { + InternalAggregation newAgg + = pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context); + reducedInternalAggs.add(newAgg); + } + return new InternalAggregations(reducedInternalAggs); + } + return reduced; + } + /** * Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first * {@link InternalAggregations} object found in the list. - * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + * Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled + * separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)} */ public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { if (aggregationsList.isEmpty()) { @@ -130,13 +168,6 @@ public final class InternalAggregations extends Aggregations implements Writeabl reducedAggregations.add(first.reduce(aggregations, context)); } - if (context.isFinalReduce()) { - for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); - reducedAggregations.add(newAgg); - } - return new InternalAggregations(reducedAggregations); - } return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index 41b1a9aef62..cfad39166e7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -73,7 +74,7 @@ public abstract class InternalMultiBucketAggregation buckets, ReduceContext context); @Override - public abstract List getBuckets(); + public abstract List getBuckets(); @Override public Object getProperty(List path) { @@ -141,6 +142,30 @@ public abstract class InternalMultiBucketAggregation materializedBuckets = reducePipelineBuckets(reduceContext); + return super.reducePipelines(create(materializedBuckets), reduceContext); + } + + private List reducePipelineBuckets(ReduceContext reduceContext) { + List reducedBuckets = new ArrayList<>(); + for (B bucket : getBuckets()) { + List aggs = new ArrayList<>(); + for (Aggregation agg : bucket.getAggregations()) { + aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext)); + } + reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket)); + } + return reducedBuckets; + } + public abstract static class InternalBucket implements Bucket, Writeable { public Object getProperty(String containingAggName, List path) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index cf5ddc54884..0a34e7a92b8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -97,7 +97,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long docCount = 0L; List subAggregationsList = new ArrayList<>(aggregations.size()); for (InternalAggregation aggregation : aggregations) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 57c7d703cbd..78181e3a336 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -181,7 +181,7 @@ public class InternalAdjacencyMatrix } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> bucketsMap = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 9bfe0e7090e..cd4699642ad 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -156,7 +156,7 @@ public class InternalComposite } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { PriorityQueue pq = new PriorityQueue<>(aggregations.size()); for (InternalAggregation agg : aggregations) { InternalComposite sortedAgg = (InternalComposite) agg; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java index 271d1c54d58..d99da0187f7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java @@ -189,7 +189,7 @@ public class InternalFilters extends InternalMultiBucketAggregation aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List> bucketsList = null; for (InternalAggregation aggregation : aggregations) { InternalFilters filters = (InternalFilters) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index 61c06a062cc..1760fc19728 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -81,7 +81,7 @@ public abstract class InternalGeoGrid } @Override - public InternalGeoGrid doReduce(List aggregations, ReduceContext reduceContext) { + public InternalGeoGrid reduce(List aggregations, ReduceContext reduceContext) { LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoGrid grid = (InternalGeoGrid) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index c776d764637..0d81188c8ca 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -498,7 +498,7 @@ public final class InternalAutoDateHistogram extends } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 69c8552af32..b4e0ba659af 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -448,7 +448,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index e814fbca290..a7b16e89499 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -421,7 +421,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java index 4b3b29d6016..c0a92d6c7bc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -238,7 +238,7 @@ public final class InternalBinaryRange } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { reduceContext.consumeBucketsAndMaybeBreak(buckets.size()); long[] docCounts = new long[buckets.size()]; InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 5788c9ba736..2d8f0357e87 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -295,7 +295,7 @@ public class InternalRange aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { reduceContext.consumeBucketsAndMaybeBreak(ranges.size()); List[] rangeList = new List[ranges.size()]; for (int i = 0; i < rangeList.length; ++i) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 5f5f557ffd5..6f1a4cc6a4a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -49,7 +49,7 @@ public class UnmappedSampler extends InternalSampler { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedSampler(name, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java index d641a2773e6..47926760d3b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java @@ -126,7 +126,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri } if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format); + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); } spare.bucketOrd = bucketOrd; copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index 49c2718baaf..789ced9cd03 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -105,6 +105,9 @@ public abstract class InternalSignificantTerms getBuckets(); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index 582346f529a..d9f4ac7e1f1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -42,14 +42,9 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index 8bc0e83c8d6..9672136cf5b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -144,7 +144,7 @@ public class DoubleTerms extends InternalMappedTerms aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { boolean promoteToDouble = false; for (InternalAggregation agg : aggregations) { if (agg instanceof LongTerms && ((LongTerms) agg).format == DocValueFormat.RAW) { @@ -157,7 +157,7 @@ public class DoubleTerms extends InternalMappedTerms newAggs = new ArrayList<>(aggregations.size()); for (InternalAggregation agg : aggregations) { @@ -168,7 +168,7 @@ public class DoubleTerms extends InternalMappedTerms, } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> buckets = new HashMap<>(); InternalRareTerms referenceTerms = null; SetBackedScalingCuckooFilter filter = null; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java index ae9f8e27ec6..dee3424621c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java @@ -154,7 +154,7 @@ public abstract class InternalRareTerms, B ext public abstract B getBucketByKey(String term); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 3eefc9bee01..8f45749a363 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -192,7 +192,7 @@ public abstract class InternalTerms, B extends Int public abstract B getBucketByKey(String term); @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { Map> buckets = new HashMap<>(); long sumDocCountError = 0; long otherDocCount = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 6a0fcde1fa0..90ebe7af36a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -144,13 +144,13 @@ public class LongTerms extends InternalMappedTerms } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { for (InternalAggregation agg : aggregations) { if (agg instanceof DoubleTerms) { - return agg.doReduce(aggregations, reduceContext); + return agg.reduce(aggregations, reduceContext); } } - return super.doReduce(aggregations, reduceContext); + return super.reduce(aggregations, reduceContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java index c4a019e6fe9..58dac3ece2b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedRareTerms.java @@ -93,7 +93,7 @@ public class UnmappedRareTerms extends InternalRareTerms aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedRareTerms(name, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 8096366f6d6..2a1baff9ded 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -99,7 +99,7 @@ public class UnmappedTerms extends InternalTerms aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java index d8e043ee9b5..692ea1761fe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalHDRPercentiles.java @@ -103,7 +103,7 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr } @Override - public AbstractInternalHDRPercentiles doReduce(List aggregations, ReduceContext reduceContext) { + public AbstractInternalHDRPercentiles reduce(List aggregations, ReduceContext reduceContext) { DoubleHistogram merged = null; for (InternalAggregation aggregation : aggregations) { final AbstractInternalHDRPercentiles percentiles = (AbstractInternalHDRPercentiles) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java index ca03e2aa2b1..f691a438d0d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AbstractInternalTDigestPercentiles.java @@ -87,7 +87,7 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics } @Override - public AbstractInternalTDigestPercentiles doReduce(List aggregations, ReduceContext reduceContext) { + public AbstractInternalTDigestPercentiles reduce(List aggregations, ReduceContext reduceContext) { TDigestState merged = null; for (InternalAggregation aggregation : aggregations) { final AbstractInternalTDigestPercentiles percentiles = (AbstractInternalTDigestPercentiles) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java index 3e3b2ae03ea..199f3a7ccc8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java @@ -87,7 +87,7 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i } @Override - public InternalAvg doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAvg reduce(List aggregations, ReduceContext reduceContext) { CompensatedSum kahanSummation = new CompensatedSum(0, 0); long count = 0; // Compute the sum of double values with Kahan summation algorithm which is more diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java index c3132a29904..bc2c4d88c46 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalCardinality.java @@ -85,7 +85,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { InternalCardinality reduced = null; for (InternalAggregation aggregation : aggregations) { final InternalCardinality cardinality = (InternalCardinality) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java index 3fe2e75576a..53859605794 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java @@ -140,7 +140,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat } @Override - public InternalExtendedStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalExtendedStats reduce(List aggregations, ReduceContext reduceContext) { double sumOfSqrs = 0; double compensationOfSqrs = 0; for (InternalAggregation aggregation : aggregations) { @@ -158,7 +158,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat sumOfSqrs = newSumOfSqrs; } } - final InternalStats stats = super.doReduce(aggregations, reduceContext); + final InternalStats stats = super.reduce(aggregations, reduceContext); return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs, sigma, format, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java index 4d48e4ab896..91007ab2f8f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoBounds.java @@ -93,7 +93,7 @@ public class InternalGeoBounds extends InternalAggregation implements GeoBounds } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { double top = Double.NEGATIVE_INFINITY; double bottom = Double.POSITIVE_INFINITY; double posLeft = Double.POSITIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java index 2172d15259b..24493273aa5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalGeoCentroid.java @@ -114,7 +114,7 @@ public class InternalGeoCentroid extends InternalAggregation implements GeoCentr } @Override - public InternalGeoCentroid doReduce(List aggregations, ReduceContext reduceContext) { + public InternalGeoCentroid reduce(List aggregations, ReduceContext reduceContext) { double lonSum = Double.NaN; double latSum = Double.NaN; int totalCount = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java index 9a8458c85a6..6abb0d3a51d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMax.java @@ -71,7 +71,7 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i } @Override - public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMax reduce(List aggregations, ReduceContext reduceContext) { double max = Double.NEGATIVE_INFINITY; for (InternalAggregation aggregation : aggregations) { max = Math.max(max, ((InternalMax) aggregation).max); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java index 871f387638d..b228c95c0dc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java @@ -80,7 +80,7 @@ public class InternalMedianAbsoluteDeviation extends InternalNumericMetricsAggre } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { final TDigestState valueMerged = new TDigestState(valuesSketch.compression()); for (InternalAggregation aggregation : aggregations) { final InternalMedianAbsoluteDeviation madAggregation = (InternalMedianAbsoluteDeviation) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java index f68d5a46860..6912f55ecaf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMin.java @@ -71,7 +71,7 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i } @Override - public InternalMin doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMin reduce(List aggregations, ReduceContext reduceContext) { double min = Double.POSITIVE_INFINITY; for (InternalAggregation aggregation : aggregations) { min = Math.min(min, ((InternalMin) aggregation).min); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java index ce051a1691b..e6113f3763c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java @@ -85,7 +85,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { List aggregationObjects = new ArrayList<>(); for (InternalAggregation aggregation : aggregations) { InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java index adb879999a4..661457e1c1f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java @@ -145,7 +145,7 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue } @Override - public InternalStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStats reduce(List aggregations, ReduceContext reduceContext) { long count = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index 6e6315eded1..5778edb4da1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -71,7 +71,7 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i } @Override - public InternalSum doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSum reduce(List aggregations, ReduceContext reduceContext) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. CompensatedSum kahanSummation = new CompensatedSum(0, 0); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java index 348e98302d2..4f922bd39d9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java @@ -99,7 +99,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { final SearchHits[] shardHits = new SearchHits[aggregations.size()]; final int from; final int size; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java index 32ee8bd36d1..0c942e1afbc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalValueCount.java @@ -70,7 +70,7 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { long valueCount = 0; for (InternalAggregation aggregation : aggregations) { valueCount += ((InternalValueCount) aggregation).value; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java index 4b3523b03ac..e4c79f7f899 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java @@ -87,7 +87,7 @@ public class InternalWeightedAvg extends InternalNumericMetricsAggregation.Singl } @Override - public InternalWeightedAvg doReduce(List aggregations, ReduceContext reduceContext) { + public InternalWeightedAvg reduce(List aggregations, ReduceContext reduceContext) { CompensatedSum sumCompensation = new CompensatedSum(0, 0); CompensatedSum weightCompensation = new CompensatedSum(0, 0); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java index 1acdb540806..84b4c3b305a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalBucketMetricValue.java @@ -85,7 +85,7 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java index b0b78eb0120..d1f60fe30e5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalExtendedStatsBucket.java @@ -48,7 +48,7 @@ public class InternalExtendedStatsBucket extends InternalExtendedStats implement } @Override - public InternalExtendedStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalExtendedStats reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java index e633f934013..dbb1113cd77 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalPercentilesBucket.java @@ -126,7 +126,7 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation } @Override - public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + public InternalMax reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java index 4f7b51b6e3b..29c25b11746 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java @@ -76,7 +76,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl } @Override - public InternalSimpleValue doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSimpleValue reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java index 51d3cfc060f..c8beef459b8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalStatsBucket.java @@ -47,7 +47,7 @@ public class InternalStatsBucket extends InternalStats implements StatsBucket { } @Override - public InternalStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStats reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index f56137c8d00..c19727041dd 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -62,7 +62,7 @@ public class InternalAggregationsTests extends ESTestCase { List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms), topLevelPipelineAggs)); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext); assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -78,11 +78,11 @@ public class InternalAggregationsTests extends ESTestCase { if (randomBoolean()) { InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), Collections.singletonList(siblingPipelineAggregator)); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext); } else { InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), Collections.singletonList(siblingPipelineAggregator)); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext); } assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 9293b33e22f..c6bb10fa6c9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -36,10 +36,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.hamcrest.Matchers; import org.junit.Assert; @@ -58,9 +63,12 @@ import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; + public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { private static final String DATE_FIELD = "date"; private static final String INSTANT_FIELD = "instant"; + private static final String NUMERIC_FIELD = "numeric"; private static final List DATES_WITH_TIME = Arrays.asList( ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC), @@ -718,6 +726,35 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { ); } + public void testWithPipelineReductions() throws IOException { + testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME, + aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD) + .subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1) + .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)) + .subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + final List buckets = histogram.getBuckets(); + assertEquals(1, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(10, bucket.getDocCount()); + assertThat(bucket.getAggregations().asList().size(), equalTo(1)); + InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0); + assertThat(histo.getBuckets().size(), equalTo(10)); + for (int i = 0; i < 10; i++) { + assertThat(histo.getBuckets().get(i).key, equalTo((double)i)); + assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i)); + if (i > 0) { + assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0)); + } + } + + + }); + } + private void testSearchCase(final Query query, final List dataset, final Consumer configure, final Consumer verify) throws IOException { @@ -757,6 +794,7 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { final Document document = new Document(); + int i = 0; for (final ZonedDateTime date : dataset) { if (frequently()) { indexWriter.commit(); @@ -765,8 +803,10 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { final long instant = date.toInstant().toEpochMilli(); document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); indexWriter.addDocument(document); document.clear(); + i += 1; } } @@ -783,11 +823,19 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { fieldType.setHasDocValues(true); fieldType.setName(aggregationBuilder.field()); + MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + instantFieldType.setName(INSTANT_FIELD); + instantFieldType.setHasDocValues(true); + + MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + numericFieldType.setName(NUMERIC_FIELD); + numericFieldType.setHasDocValues(true); + final InternalAutoDateHistogram histogram; if (reduced) { - histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType); } else { - histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType); } verify.accept(histogram); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index fa4bce9a4e9..4ada5e349de 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -108,7 +108,6 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati assertThat(result, equalTo(2)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39497") public void testReduceRandom() { super.testReduceRandom(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index fb9f6dd29c7..5b8264b9e71 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -109,7 +109,7 @@ public class InternalHistogramTests extends InternalMultiBucketAggregationTestCa newBuckets.add(new InternalHistogram.Bucket(Double.NaN, b.docCount, keyed, b.format, b.aggregations)); InternalHistogram newHistogram = histogram.create(newBuckets); - newHistogram.doReduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false)); + newHistogram.reduce(Arrays.asList(newHistogram, histogram2), new InternalAggregation.ReduceContext(null, null, false)); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index 63415d981e6..a1d5d498d7b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -126,12 +126,12 @@ public class SignificanceHeuristicTests extends ESTestCase { InternalMappedSignificantTerms getRandomSignificantTerms(SignificanceHeuristic heuristic) { if (randomBoolean()) { SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, - DocValueFormat.RAW); + DocValueFormat.RAW, randomDoubleBetween(0, 100, true)); return new SignificantLongTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); } else { SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(new BytesRef("someterm"), 1, 2, 3, 4, - InternalAggregations.EMPTY, DocValueFormat.RAW); + InternalAggregations.EMPTY, DocValueFormat.RAW, randomDoubleBetween(0, 100, true)); return new SignificantStringTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); } @@ -149,7 +149,7 @@ public class SignificanceHeuristicTests extends ESTestCase { public void testReduce() { List aggs = createInternalAggregations(); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true); - SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, context); + SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).reduce(aggs, context); assertThat(reducedAgg.getBuckets().size(), equalTo(2)); assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8L)); assertThat(reducedAgg.getBuckets().get(0).getSubsetSize(), equalTo(16L)); @@ -196,7 +196,7 @@ public class SignificanceHeuristicTests extends ESTestCase { @Override SignificantStringTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) { return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF, - subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW); + subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW, 0); } } private class LongTestAggFactory extends TestAggFactory { @@ -210,7 +210,7 @@ public class SignificanceHeuristicTests extends ESTestCase { @Override SignificantLongTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) { return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY, - DocValueFormat.RAW); + DocValueFormat.RAW, 0); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java index 755cb6e8529..3a9684d3051 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsTests.java @@ -58,7 +58,7 @@ public class SignificantLongTermsTests extends InternalSignificantTermsTestCase for (int i = 0; i < numBuckets; ++i) { long term = randomValueOtherThanMany(l -> terms.add(l) == false, random()::nextLong); SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(subsetDfs[i], subsetSize, - supersetDfs[i], supersetSize, term, aggs, format); + supersetDfs[i], supersetSize, term, aggs, format, 0); bucket.updateScore(significanceHeuristic); buckets.add(bucket); } @@ -109,7 +109,7 @@ public class SignificantLongTermsTests extends InternalSignificantTermsTestCase case 5: buckets = new ArrayList<>(buckets); buckets.add(new SignificantLongTerms.Bucket(randomLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format)); + randomNonNegativeLong(), randomNonNegativeLong(), InternalAggregations.EMPTY, format, 0)); break; case 8: if (metaData == null) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java index 2255373fd34..d230b681cbe 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTermsTests.java @@ -51,7 +51,7 @@ public class SignificantStringTermsTests extends InternalSignificantTermsTestCas for (int i = 0; i < numBuckets; ++i) { BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10))); SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(term, subsetDfs[i], subsetSize, - supersetDfs[i], supersetSize, aggs, format); + supersetDfs[i], supersetSize, aggs, format, 0); bucket.updateScore(significanceHeuristic); buckets.add(bucket); } @@ -103,7 +103,7 @@ public class SignificantStringTermsTests extends InternalSignificantTermsTestCas buckets = new ArrayList<>(buckets); buckets.add(new SignificantStringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - InternalAggregations.EMPTY, format)); + InternalAggregations.EMPTY, format, 0)); break; case 8: if (metaData == null) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 727c3ea3a87..611e7d916c9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -1073,7 +1073,7 @@ public class TermsAggregatorTests extends AggregatorTestCase { new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), null, true); for (InternalAggregation internalAgg : aggs) { - InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx); + InternalAggregation mergedAggs = internalAgg.reduce(aggs, ctx); assertTrue(mergedAggs instanceof DoubleTerms); long expected = numLongs + numDoubles; List buckets = ((DoubleTerms) mergedAggs).getBuckets(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java index 10ae10a9af1..5582af4ced6 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalAvgTests.java @@ -23,8 +23,6 @@ import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.ParsedAggregation; -import org.elasticsearch.search.aggregations.metrics.InternalAvg; -import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.test.InternalAggregationTestCase; @@ -95,7 +93,7 @@ public class InternalAvgTests extends InternalAggregationTestCase { aggregations.add(new InternalAvg("dummy1", value, 1, null, null, null)); } InternalAvg internalAvg = new InternalAvg("dummy2", 0, 0, null, null, null); - InternalAvg reduced = internalAvg.doReduce(aggregations, null); + InternalAvg reduced = internalAvg.reduce(aggregations, null); assertEquals(expected, reduced.getValue(), delta); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java index 3c5201bfa8a..2f53902bb53 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java @@ -225,7 +225,7 @@ public class InternalExtendedStatsTests extends InternalAggregationTestCase { aggregations.add(new InternalSum("dummy1", value, null, null, null)); } InternalSum internalSum = new InternalSum("dummy", 0, null, null, null); - InternalSum reduced = internalSum.doReduce(aggregations, null); + InternalSum reduced = internalSum.reduce(aggregations, null); assertEquals(expected, reduced.value(), delta); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java index afea0f13bd7..b8e9f1444dd 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java @@ -77,7 +77,7 @@ public class AvgBucketAggregatorTests extends AggregatorTestCase { * it is fixed. * * Note: we have this test inside of the `avg_bucket` package so that we can get access to the package-private - * `doReduce()` needed for testing this + * `reduce()` needed for testing this */ public void testSameAggNames() throws IOException { Query query = new MatchAllDocsQuery(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index c90d65ba0a0..5d361fa1c5b 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -385,7 +385,7 @@ public abstract class AggregatorTestCase extends ESTestCase { InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, false); - A reduced = (A) aggs.get(0).doReduce(toReduce, context); + A reduced = (A) aggs.get(0).reduce(toReduce, context); doAssertReducedMultiBucketConsumer(reduced, reduceBucketConsumer); aggs = new ArrayList<>(aggs.subList(r, toReduceSize)); aggs.add(reduced); @@ -396,7 +396,12 @@ public abstract class AggregatorTestCase extends ESTestCase { new InternalAggregation.ReduceContext(root.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, true); @SuppressWarnings("unchecked") - A internalAgg = (A) aggs.get(0).doReduce(aggs, context); + A internalAgg = (A) aggs.get(0).reduce(aggs, context); + + // materialize any parent pipelines + internalAgg = (A) internalAgg.reducePipelines(internalAgg, context); + + // materialize any sibling pipelines at top level if (internalAgg.pipelineAggregators().size() > 0) { for (PipelineAggregator pipelineAggregator : internalAgg.pipelineAggregators()) { internalAgg = (A) pipelineAggregator.reduce(internalAgg, context); diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java index e8db75edad5..a6b8aad6998 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/InternalSimpleLongValue.java @@ -64,7 +64,7 @@ public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.S } @Override - public InternalSimpleLongValue doReduce(List aggregations, ReduceContext reduceContext) { + public InternalSimpleLongValue reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException("Not supported"); } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java index 88cb505615c..007fbcea902 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStats.java @@ -182,7 +182,7 @@ public class InternalStringStats extends InternalAggregation { } @Override - public InternalStringStats doReduce(List aggregations, ReduceContext reduceContext) { + public InternalStringStats reduce(List aggregations, ReduceContext reduceContext) { long count = 0; long totalLength = 0; int minLength = Integer.MAX_VALUE; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 25fe2f51b2f..91953295bd5 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -610,7 +610,7 @@ public class RollupResponseTranslationTests extends AggregatorTestCase { ScriptService scriptService = mock(ScriptService.class); InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true); - InternalAggregation reduced = ((InternalDateHistogram)unrolled).doReduce(Collections.singletonList(unrolled), context); + InternalAggregation reduced = ((InternalDateHistogram)unrolled).reduce(Collections.singletonList(unrolled), context); assertThat(reduced.toString(), equalTo("{\"histo\":{\"buckets\":[{\"key_as_string\":\"1970-01-01T00:00:00.100Z\",\"key\":100," + "\"doc_count\":1},{\"key_as_string\":\"1970-01-01T00:00:00.200Z\",\"key\":200,\"doc_count\":1}," + "{\"key_as_string\":\"1970-01-01T00:00:00.300Z\",\"key\":300,\"doc_count\":0,\"histo._count\":{\"value\":0.0}}," + diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java index 7a24a5515b5..8061ce31ef8 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestMultiValueAggregation.java @@ -42,7 +42,7 @@ class TestMultiValueAggregation extends InternalNumericMetricsAggregation.MultiV } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java index 08f4edda3cb..56183414aa0 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TestSingleValueAggregation.java @@ -37,7 +37,7 @@ public class TestSingleValueAggregation extends InternalAggregation { } @Override - public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { throw new UnsupportedOperationException(); }