From fbd7c9aa5d6c1c730ab73887ca275b319a588804 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 12 Jun 2014 14:54:08 +0200 Subject: [PATCH] Aggregations: Fix reducing of range aggregations. Under some rare circumstances: - local transport, - the range aggregation has both a parent and a child aggregation, - the range aggregation got no documents on one shard or more and several documents on one shard or more. the range aggregation could return incorrect counts and sub aggregations. The root cause is that since the reduce happens in-place and since the range aggregation uses the same instance for all sub-aggregation in case of an empty bucket, sometimes non-empty buckets would have been reduced into this shared instance. In order to avoid similar bugs in the future, aggregations have been updated to return a new instance when reducing instead of doing it in-place. Close #6435 --- .../aggregations/InternalAggregations.java | 21 +-- .../InternalSingleBucketAggregation.java | 27 ++-- .../bucket/filter/InternalFilter.java | 4 + .../bucket/geogrid/InternalGeoHashGrid.java | 56 +------- .../bucket/global/InternalGlobal.java | 5 + .../histogram/InternalDateHistogram.java | 10 ++ .../bucket/histogram/InternalHistogram.java | 125 +++--------------- .../bucket/missing/InternalMissing.java | 4 + .../bucket/nested/InternalNested.java | 4 + .../bucket/nested/InternalReverseNested.java | 4 + .../bucket/range/InternalRange.java | 97 +++++--------- .../bucket/range/RangeAggregator.java | 6 +- .../bucket/range/date/InternalDateRange.java | 17 ++- .../geodistance/InternalGeoDistance.java | 16 ++- .../bucket/range/ipv4/InternalIPv4Range.java | 18 ++- .../significant/InternalSignificantTerms.java | 70 ++-------- .../significant/SignificantLongTerms.java | 11 ++ .../significant/SignificantStringTerms.java | 10 ++ .../significant/UnmappedSignificantTerms.java | 17 +++ .../bucket/terms/DoubleTerms.java | 68 ++-------- .../bucket/terms/InternalTerms.java | 78 +++-------- .../aggregations/bucket/terms/LongTerms.java | 69 ++-------- .../bucket/terms/StringTerms.java | 15 +++ .../bucket/terms/UnmappedTerms.java | 17 +++ .../aggregations/metrics/avg/InternalAvg.java | 20 +-- .../cardinality/HyperLogLogPlusPlus.java | 4 + .../cardinality/InternalCardinality.java | 11 +- .../metrics/geobounds/InternalGeoBounds.java | 45 +++---- .../aggregations/metrics/max/InternalMax.java | 20 +-- .../aggregations/metrics/min/InternalMin.java | 20 +-- .../percentiles/InternalPercentiles.java | 13 +- .../percentiles/tdigest/TDigestState.java | 4 + .../metrics/stats/InternalStats.java | 38 ++---- .../stats/extended/InternalExtendedStats.java | 10 +- .../aggregations/metrics/sum/InternalSum.java | 20 +-- .../metrics/tophits/InternalTopHits.java | 4 - .../valuecount/InternalValueCount.java | 17 +-- .../search/aggregations/RandomTests.java | 46 +++++++ 38 files changed, 385 insertions(+), 656 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 88fd3806dab..85720c37133 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -61,12 +61,6 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl this.aggregations = aggregations; } - /** Resets the internal addAggregation */ - void reset(List aggregations) { - this.aggregations = aggregations; - this.aggregationsAsMap = null; - } - /** * Iterates over the {@link Aggregation}s. */ @@ -145,20 +139,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays))); } - InternalAggregations result = aggregationsList.get(0); - result.reset(reducedAggregations); - return result; - } - - /** - * Reduces this aggregations, effectively propagates the reduce to all the sub aggregations - * @param cacheRecycler - */ - public void reduce(BigArrays bigArrays) { - for (int i = 0; i < aggregations.size(); i++) { - InternalAggregation aggregation = aggregations.get(i); - aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays))); - } + return new InternalAggregations(reducedAggregations); } /** The fields required to write this addAggregation to xcontent */ diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 96a5a0fcdac..e00bb327e59 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -33,8 +33,8 @@ import java.util.List; */ public abstract class InternalSingleBucketAggregation extends InternalAggregation implements SingleBucketAggregation { - protected long docCount; - protected InternalAggregations aggregations; + private long docCount; + private InternalAggregations aggregations; protected InternalSingleBucketAggregation() {} // for serialization @@ -61,26 +61,23 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio return aggregations; } + /** + * Create a new empty sub aggregation. This must be a new instance on each call. + */ + protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations); + @Override public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0)); - reduced.aggregations.reduce(reduceContext.bigArrays()); - return reduced; - } - InternalSingleBucketAggregation reduced = null; + long docCount = 0L; List subAggregationsList = new ArrayList<>(aggregations.size()); for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalSingleBucketAggregation) aggregation; - } else { - this.docCount += ((InternalSingleBucketAggregation) aggregation).docCount; - } + assert aggregation.getName().equals(getName()); + docCount += ((InternalSingleBucketAggregation) aggregation).docCount; subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations); } - reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays()); - return reduced; + final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays()); + return newAggregation(getName(), docCount, aggs); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilter.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilter.java index a5db72a34de..c6509ad8588 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilter.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilter.java @@ -56,4 +56,8 @@ public class InternalFilter extends InternalSingleBucketAggregation implements F return TYPE; } + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalFilter(name, docCount, subAggregations); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index caba75a5881..5921184afcb 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -106,24 +106,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } public Bucket reduce(List buckets, BigArrays bigArrays) { - if (buckets.size() == 1) { - // we still need to reduce the sub aggs - Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(bigArrays); - return bucket; - } - Bucket reduced = null; List aggregationsList = new ArrayList<>(buckets.size()); + long docCount = 0; for (Bucket bucket : buckets) { - if (reduced == null) { - reduced = bucket; - } else { - reduced.docCount += bucket.docCount; - } + docCount += bucket.docCount; aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); - return reduced; + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays); + return new Bucket(geohashAsLong, docCount, aggs); } @Override @@ -181,19 +171,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG @Override public InternalGeoHashGrid reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0); - grid.reduceAndTrimBuckets(reduceContext.bigArrays()); - return grid; - } - InternalGeoHashGrid reduced = null; LongObjectPagedHashMap> buckets = null; for (InternalAggregation aggregation : aggregations) { InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation; - if (reduced == null) { - reduced = grid; - } if (buckets == null) { buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); } @@ -207,12 +188,6 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG } } - if (reduced == null) { - // there are only unmapped terms, so we just return the first one (no need to reduce) - return (InternalGeoHashGrid) aggregations.get(0); - } - - // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? final int size = (int) Math.min(requiredSize, buckets.size()); BucketPriorityQueue ordered = new BucketPriorityQueue(size); for (LongObjectPagedHashMap.Cursor> cursor : buckets) { @@ -224,28 +199,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = ordered.pop(); } - reduced.buckets = Arrays.asList(list); - return reduced; - } - - protected void reduceAndTrimBuckets(BigArrays bigArrays) { - - if (requiredSize > buckets.size()) { // nothing to trim - for (Bucket bucket : buckets) { - bucket.aggregations.reduce(bigArrays); - } - return; - } - - List trimmedBuckets = new ArrayList<>(requiredSize); - for (Bucket bucket : buckets) { - if (trimmedBuckets.size() >= requiredSize) { - break; - } - bucket.aggregations.reduce(bigArrays); - trimmedBuckets.add(bucket); - } - buckets = trimmedBuckets; + return new InternalGeoHashGrid(getName(), requiredSize, Arrays.asList(list)); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/global/InternalGlobal.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/global/InternalGlobal.java index b71e922d8b7..7e52d2ae1c0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/global/InternalGlobal.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/global/InternalGlobal.java @@ -56,4 +56,9 @@ public class InternalGlobal extends InternalSingleBucketAggregation implements G public Type type() { return TYPE; } + + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalGlobal(name, docCount, subAggregations); + } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index bb093261e91..2eca156a6f9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -57,6 +57,11 @@ public class InternalDateHistogram extends InternalHistogram getFactory() { + return FACTORY; + } + @Override public String getKey() { return formatter != null ? formatter.format(key) : ValueFormatter.DateTime.DEFAULT.format(key); @@ -109,6 +114,11 @@ public class InternalDateHistogram extends InternalHistogram getFactory() { + return FACTORY; + } + @Override public Bucket getBucketByKey(String key) { try { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 49035f2840e..b6d6b7a266f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -67,10 +67,10 @@ public class InternalHistogram extends Inter public static class Bucket implements Histogram.Bucket { - long key; - long docCount; + final long key; + final long docCount; protected transient final @Nullable ValueFormatter formatter; - InternalAggregations aggregations; + final InternalAggregations aggregations; public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) { this.key = key; @@ -79,6 +79,10 @@ public class InternalHistogram extends Inter this.aggregations = aggregations; } + protected Factory getFactory() { + return FACTORY; + } + @Override public String getKey() { return formatter != null ? formatter.format(key) : ValueFormatter.RAW.format(key); @@ -105,24 +109,14 @@ public class InternalHistogram extends Inter } B reduce(List buckets, BigArrays bigArrays) { - if (buckets.size() == 1) { - // we only need to reduce the sub aggregations - Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(bigArrays); - return (B) bucket; - } List aggregations = new ArrayList<>(buckets.size()); - Bucket reduced = null; + long docCount = 0; for (Bucket bucket : buckets) { - if (reduced == null) { - reduced = bucket; - } else { - reduced.docCount += bucket.docCount; - } + docCount += bucket.docCount; aggregations.add((InternalAggregations) bucket.getAggregations()); } - reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays); - return (B) reduced; + InternalAggregations aggs = InternalAggregations.reduce(aggregations, bigArrays); + return (B) getFactory().createBucket(key, docCount, aggs, formatter); } void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException { @@ -256,98 +250,13 @@ public class InternalHistogram extends Inter return bucketsMap.get(key.longValue()); } + protected Factory getFactory() { + return FACTORY; + } + @Override public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - - InternalHistogram histo = (InternalHistogram) aggregations.get(0); - - if (minDocCount == 1) { - for (B bucket : histo.buckets) { - bucket.aggregations.reduce(reduceContext.bigArrays()); - } - return histo; - } - - CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator()); - List list = order.asc ? histo.buckets : Lists.reverse(histo.buckets); - B lastBucket = null; - ListIterator iter = list.listIterator(); - - // we need to fill the gaps with empty buckets - if (minDocCount == 0) { - ExtendedBounds bounds = emptyBucketInfo.bounds; - - // first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested) - if (bounds != null) { - B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; - if (firstBucket == null) { - if (bounds.min != null && bounds.max != null) { - long key = bounds.min; - long max = bounds.max; - while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - } - } else { - if (bounds.min != null) { - long key = bounds.min; - while (key < firstBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - } - } - } - - // now adding the empty buckets within the actual data, - // e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6 - while (iter.hasNext()) { - // look ahead on the next bucket without advancing the iter - // so we'll be able to insert elements at the right position - B nextBucket = list.get(iter.nextIndex()); - nextBucket.aggregations.reduce(reduceContext.bigArrays()); - if (lastBucket != null) { - long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); - while (key != nextBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - } - lastBucket = iter.next(); - } - - // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) - if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) { - long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); - long max = bounds.max; - while (key <= max) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - } - - } else { - while (iter.hasNext()) { - InternalHistogram.Bucket bucket = iter.next(); - if (bucket.getDocCount() < minDocCount) { - iter.remove(); - } else { - bucket.aggregations.reduce(reduceContext.bigArrays()); - } - } - } - - if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) { - CollectionUtil.introSort(histo.buckets, order.comparator()); - } - - return histo; - } - - InternalHistogram reduced = (InternalHistogram) aggregations.get(0); LongObjectPagedHashMap> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays()); for (InternalAggregation aggregation : aggregations) { @@ -437,9 +346,7 @@ public class InternalHistogram extends Inter CollectionUtil.introSort(reducedBuckets, order.comparator()); } - - reduced.buckets = reducedBuckets; - return reduced; + return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed); } protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/InternalMissing.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/InternalMissing.java index 7e4198c89fb..8975336a2c3 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/InternalMissing.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/InternalMissing.java @@ -58,4 +58,8 @@ public class InternalMissing extends InternalSingleBucketAggregation implements return TYPE; } + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalMissing(name, docCount, subAggregations); + } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalNested.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalNested.java index ed968ad6bbe..f9718185212 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalNested.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalNested.java @@ -57,4 +57,8 @@ public class InternalNested extends InternalSingleBucketAggregation implements N return TYPE; } + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalNested(name, docCount, subAggregations); + } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalReverseNested.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalReverseNested.java index 29b35720d12..eed2a52ce01 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalReverseNested.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/InternalReverseNested.java @@ -57,4 +57,8 @@ public class InternalReverseNested extends InternalSingleBucketAggregation imple return TYPE; } + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalReverseNested(name, docCount, subAggregations); + } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index cea46cd8585..4695e4a06c6 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -60,11 +60,12 @@ public class InternalRange extends InternalAggre public static class Bucket implements Range.Bucket { - private double from = Double.NEGATIVE_INFINITY; - private double to = Double.POSITIVE_INFINITY; - private long docCount; - InternalAggregations aggregations; - private String key; + private final ValueFormatter formatter; + private final double from; + private final double to; + private final long docCount; + final InternalAggregations aggregations; + private final String key; public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) { this.key = key != null ? key : generateKey(from, to, formatter); @@ -72,6 +73,7 @@ public class InternalRange extends InternalAggre this.to = to; this.docCount = docCount; this.aggregations = aggregations; + this.formatter = formatter; } public String getKey() { @@ -103,25 +105,19 @@ public class InternalRange extends InternalAggre return aggregations; } + protected Factory getFactory() { + return FACTORY; + } + Bucket reduce(List ranges, BigArrays bigArrays) { - if (ranges.size() == 1) { - // we stil need to call reduce on all the sub aggregations - Bucket bucket = ranges.get(0); - bucket.aggregations.reduce(bigArrays); - return bucket; - } - Bucket reduced = null; + long docCount = 0; List aggregationsList = Lists.newArrayListWithCapacity(ranges.size()); for (Bucket range : ranges) { - if (reduced == null) { - reduced = range; - } else { - reduced.docCount += range.docCount; - } + docCount += range.docCount; aggregationsList.add(range.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); - return reduced; + final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays); + return getFactory().createBucket(key, from, to, docCount, aggs, formatter); } void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException { @@ -164,8 +160,8 @@ public class InternalRange extends InternalAggre return TYPE.name(); } - public R create(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { - return (R) new InternalRange<>(name, ranges, formatter, keyed, unmapped); + public R create(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { + return (R) new InternalRange<>(name, ranges, formatter, keyed); } @@ -178,16 +174,14 @@ public class InternalRange extends InternalAggre private Map rangeMap; private @Nullable ValueFormatter formatter; private boolean keyed; - private boolean unmapped; public InternalRange() {} // for serialization - public InternalRange(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { + public InternalRange(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { super(name); this.ranges = ranges; this.formatter = formatter; this.keyed = keyed; - this.unmapped = unmapped; } @Override @@ -211,52 +205,31 @@ public class InternalRange extends InternalAggre return rangeMap.get(key); } + protected Factory getFactory() { + return FACTORY; + } + @Override public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalRange reduced = (InternalRange) aggregations.get(0); - for (B bucket : reduced.ranges) { - bucket.aggregations.reduce(reduceContext.bigArrays()); - } - return reduced; + @SuppressWarnings("unchecked") + List[] rangeList = new List[ranges.size()]; + for (int i = 0; i < rangeList.length; ++i) { + rangeList[i] = new ArrayList(); } - List> rangesList = null; for (InternalAggregation aggregation : aggregations) { - InternalRange ranges = (InternalRange) aggregation; - if (ranges.unmapped) { - continue; - } - if (rangesList == null) { - rangesList = new ArrayList<>(ranges.ranges.size()); - for (Bucket bucket : ranges.ranges) { - List sameRangeList = new ArrayList<>(aggregations.size()); - sameRangeList.add(bucket); - rangesList.add(sameRangeList); - } - } else { - int i = 0; - for (Bucket range : ranges.ranges) { - rangesList.get(i++).add(range); - } + InternalRange ranges = (InternalRange) aggregation; + int i = 0; + for (Bucket range : ranges.ranges) { + rangeList[i++].add(range); } } - if (rangesList == null) { - // unmapped, we can just take the first one - return aggregations.get(0); + final List ranges = new ArrayList<>(); + for (int i = 0; i < this.ranges.size(); ++i) { + ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext.bigArrays())); } - - InternalRange reduced = (InternalRange) aggregations.get(0); - int i = 0; - for (List sameRangeList : rangesList) { - reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays())); - } - return reduced; - } - - protected B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) { - return (B) new Bucket(key, from, to, docCount, aggregations, formatter); + return getFactory().create(name, ranges, formatter, keyed); } @Override @@ -268,7 +241,7 @@ public class InternalRange extends InternalAggre List ranges = Lists.newArrayListWithCapacity(size); for (int i = 0; i < size; i++) { String key = in.readOptionalString(); - ranges.add(createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter)); + ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter)); } this.ranges = ranges; this.rangeMap = null; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index d9c8af6c679..dfcf41ef991 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -203,7 +203,7 @@ public class RangeAggregator extends BucketsAggregator { buckets.add(bucket); } // value source can be null in the case of unmapped fields - return rangeFactory.create(name, buckets, formatter, keyed, false); + return rangeFactory.create(name, buckets, formatter, keyed); } @Override @@ -217,7 +217,7 @@ public class RangeAggregator extends BucketsAggregator { buckets.add(bucket); } // value source can be null in the case of unmapped fields - return rangeFactory.create(name, buckets, formatter, keyed, false); + return rangeFactory.create(name, buckets, formatter, keyed); } private static final void sortRanges(final Range[] ranges) { @@ -274,7 +274,7 @@ public class RangeAggregator extends BucketsAggregator { for (RangeAggregator.Range range : ranges) { buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter)); } - return factory.create(name, buckets, formatter, keyed, true); + return factory.create(name, buckets, formatter, keyed); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java index 622d250fc28..68e5c86da80 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/date/InternalDateRange.java @@ -72,6 +72,11 @@ public class InternalDateRange extends InternalRange i public DateTime getToAsDate() { return Double.isInfinite(getTo().doubleValue()) ? null : new DateTime(getTo().longValue(), DateTimeZone.UTC); } + + @Override + protected InternalRange.Factory getFactory() { + return FACTORY; + } } private static class Factory extends InternalRange.Factory { @@ -82,8 +87,8 @@ public class InternalDateRange extends InternalRange i } @Override - public InternalDateRange create(String name, List ranges, ValueFormatter formatter, boolean keyed, boolean unmapped) { - return new InternalDateRange(name, ranges, formatter, keyed, unmapped); + public InternalDateRange create(String name, List ranges, ValueFormatter formatter, boolean keyed) { + return new InternalDateRange(name, ranges, formatter, keyed); } @Override @@ -94,8 +99,8 @@ public class InternalDateRange extends InternalRange i InternalDateRange() {} // for serialization - InternalDateRange(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { - super(name, ranges, formatter, keyed, unmapped); + InternalDateRange(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { + super(name, ranges, formatter, keyed); } @Override @@ -104,7 +109,7 @@ public class InternalDateRange extends InternalRange i } @Override - protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) { - return new Bucket(key, from, to, docCount, aggregations, formatter); + protected InternalRange.Factory getFactory() { + return FACTORY; } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java index 102486d26a1..5eefc6ae66f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/geodistance/InternalGeoDistance.java @@ -61,6 +61,10 @@ public class InternalGeoDistance extends InternalRange getFactory() { + return FACTORY; + } } private static class Factory extends InternalRange.Factory { @@ -71,8 +75,8 @@ public class InternalGeoDistance extends InternalRange ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { - return new InternalGeoDistance(name, ranges, formatter, keyed, unmapped); + public InternalGeoDistance create(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { + return new InternalGeoDistance(name, ranges, formatter, keyed); } @Override @@ -83,8 +87,8 @@ public class InternalGeoDistance extends InternalRange ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { - super(name, ranges, formatter, keyed, unmapped); + public InternalGeoDistance(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { + super(name, ranges, formatter, keyed); } @Override @@ -93,7 +97,7 @@ public class InternalGeoDistance extends InternalRange getFactory() { + return FACTORY; } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java index bd0141e291f..c34766775dc 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/range/ipv4/InternalIPv4Range.java @@ -74,6 +74,11 @@ public class InternalIPv4Range extends InternalRange i double to = getTo().doubleValue(); return Double.isInfinite(to) ? null : MAX_IP == to ? null : ValueFormatter.IPv4.format(to); } + + @Override + protected InternalRange.Factory getFactory() { + return FACTORY; + } } private static class Factory extends InternalRange.Factory { @@ -84,8 +89,8 @@ public class InternalIPv4Range extends InternalRange i } @Override - public InternalIPv4Range create(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) { - return new InternalIPv4Range(name, ranges, keyed, unmapped); + public InternalIPv4Range create(String name, List ranges, @Nullable ValueFormatter formatter, boolean keyed) { + return new InternalIPv4Range(name, ranges, keyed); } @Override @@ -96,8 +101,8 @@ public class InternalIPv4Range extends InternalRange i public InternalIPv4Range() {} // for serialization - public InternalIPv4Range(String name, List ranges, boolean keyed, boolean unmapped) { - super(name, ranges, ValueFormatter.IPv4, keyed, unmapped); + public InternalIPv4Range(String name, List ranges, boolean keyed) { + super(name, ranges, ValueFormatter.IPv4, keyed); } @Override @@ -106,8 +111,7 @@ public class InternalIPv4Range extends InternalRange i } @Override - protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter ) { - return new Bucket(key, from, to, docCount, aggregations); + protected InternalRange.Factory getFactory() { + return FACTORY; } - } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index 6526f3a5693..e2db41c3617 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations.bucket.significant; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.util.BigArrays; @@ -143,25 +142,20 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple } public Bucket reduce(List buckets, BigArrays bigArrays) { - if (buckets.size() == 1) { - return buckets.get(0); - } - Bucket reduced = null; + long subsetDf = 0; + long supersetDf = 0; List aggregationsList = new ArrayList<>(buckets.size()); for (Bucket bucket : buckets) { - if (reduced == null) { - reduced = bucket; - } else { - reduced.subsetDf += bucket.subsetDf; - reduced.supersetDf += bucket.supersetDf; - reduced.updateScore(); - } + subsetDf += bucket.subsetDf; + supersetDf += bucket.supersetDf; aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); - return reduced; + InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays); + return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs); } + abstract Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations); + @Override public double getSignificanceScore() { return score; @@ -201,14 +195,8 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple } @Override - public InternalSignificantTerms reduce(ReduceContext reduceContext) { + public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalSignificantTerms terms = (InternalSignificantTerms) aggregations.get(0); - terms.trimExcessEntries(); - return terms; - } - InternalSignificantTerms reduced = null; long globalSubsetSize = 0; long globalSupersetSize = 0; @@ -219,18 +207,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple globalSubsetSize += terms.subsetSize; globalSupersetSize += terms.supersetSize; } - Map> buckets = null; + Map> buckets = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; - if (terms instanceof UnmappedSignificantTerms) { - continue; - } - if (reduced == null) { - reduced = terms; - } - if (buckets == null) { - buckets = new HashMap<>(terms.buckets.size()); - } for (Bucket bucket : terms.buckets) { List existingBuckets = buckets.get(bucket.getKey()); if (existingBuckets == null) { @@ -239,19 +218,10 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple } // Adjust the buckets with the global stats representing the // total size of the pots from which the stats are drawn - bucket.subsetSize = globalSubsetSize; - bucket.supersetSize = globalSupersetSize; - bucket.updateScore(); - existingBuckets.add(bucket); + existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, bucket.aggregations)); } } - if (reduced == null) { - // there are only unmapped terms, so we just return the first one - // (no need to reduce) - return (UnmappedSignificantTerms) aggregations.get(0); - } - final int size = Math.min(requiredSize, buckets.size()); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); for (Map.Entry> entry : buckets.entrySet()) { @@ -265,23 +235,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = (Bucket) ordered.pop(); } - reduced.buckets = Arrays.asList(list); - reduced.subsetSize = globalSubsetSize; - reduced.supersetSize = globalSupersetSize; - return reduced; + return newAggregation(globalSubsetSize, globalSupersetSize, Arrays.asList(list)); } - final void trimExcessEntries() { - final List newBuckets = Lists.newArrayList(); - for (Bucket b : buckets) { - if (newBuckets.size() >= requiredSize) { - break; - } - if (b.subsetDf >= minDocCount) { - newBuckets.add(b); - } - } - buckets = newBuckets; - } + abstract InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List buckets); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index 371ea4e1639..7766629a775 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -83,6 +83,11 @@ public class SignificantLongTerms extends InternalSignificantTerms { return Long.toString(term); } + @Override + Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { + return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations); + } + } private ValueFormatter formatter; @@ -101,6 +106,12 @@ public class SignificantLongTerms extends InternalSignificantTerms { return TYPE; } + @Override + InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, + List buckets) { + return new SignificantLongTerms(subsetSize, supersetSize, getName(), formatter, requiredSize, supersetSize, buckets); + } + @Override public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java index 619c904b751..5f5bd2288a6 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java @@ -85,6 +85,10 @@ public class SignificantStringTerms extends InternalSignificantTerms { return termBytes.utf8ToString(); } + @Override + Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) { + return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations); + } } SignificantStringTerms() {} // for serialization @@ -99,6 +103,12 @@ public class SignificantStringTerms extends InternalSignificantTerms { return TYPE; } + @Override + InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, + List buckets) { + return new SignificantStringTerms(subsetSize, supersetSize, getName(), requiredSize, supersetSize, buckets); + } + @Override public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 33708d8e10a..f2564283fd2 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -64,6 +66,21 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { return TYPE; } + @Override + public InternalAggregation reduce(ReduceContext reduceContext) { + for (InternalAggregation aggregation : reduceContext.aggregations()) { + if (!(aggregation instanceof UnmappedSignificantTerms)) { + return aggregation.reduce(reduceContext); + } + } + return this; + } + + @Override + InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List buckets) { + throw new UnsupportedOperationException("How did you get there?"); + } + @Override public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index d41e912a6cd..7ae8c12a503 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.util.DoubleObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; -import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -87,6 +83,15 @@ public class DoubleTerms extends InternalTerms { return Double.compare(term, other.getKeyAsNumber().doubleValue()); } + @Override + Object getKeyAsObject() { + return getKeyAsNumber(); + } + + @Override + Bucket newBucket(long docCount, InternalAggregations aggs) { + return new Bucket(term, docCount, aggs); + } } private @Nullable ValueFormatter formatter; @@ -104,59 +109,8 @@ public class DoubleTerms extends InternalTerms { } @Override - public InternalTerms reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.bigArrays()); - return terms; - } - InternalTerms reduced = null; - - DoubleObjectPagedHashMap> buckets = null; - for (InternalAggregation aggregation : aggregations) { - InternalTerms terms = (InternalTerms) aggregation; - if (terms instanceof UnmappedTerms) { - continue; - } - if (reduced == null) { - reduced = terms; - } - if (buckets == null) { - buckets = new DoubleObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays()); - } - for (Terms.Bucket bucket : terms.buckets) { - List existingBuckets = buckets.get(((Bucket) bucket).term); - if (existingBuckets == null) { - existingBuckets = new ArrayList<>(aggregations.size()); - buckets.put(((Bucket) bucket).term, existingBuckets); - } - existingBuckets.add((Bucket) bucket); - } - } - - if (reduced == null) { - // there are only unmapped terms, so we just return the first one (no need to reduce) - return (UnmappedTerms) aggregations.get(0); - } - - // TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ? - final int size = (int) Math.min(requiredSize, buckets.size()); - BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); - for (DoubleObjectPagedHashMap.Cursor> cursor : buckets) { - List sameTermBuckets = cursor.value; - final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); - if (b.getDocCount() >= minDocCount) { - ordered.insertWithOverflow(b); - } - } - buckets.close(); - InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = (Bucket) ordered.pop(); - } - reduced.buckets = Arrays.asList(list); - return reduced; + protected InternalTerms newAggregation(String name, List buckets) { + return new DoubleTerms(name, order, formatter, requiredSize, minDocCount, buckets); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 6a937aee4bc..3e016ad597e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -18,10 +18,10 @@ */ package org.elasticsearch.search.aggregations.bucket.terms; -import com.google.common.collect.Lists; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.text.Text; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.search.aggregations.Aggregations; @@ -58,24 +58,19 @@ public abstract class InternalTerms extends InternalAggregation implements Terms return aggregations; } + abstract Object getKeyAsObject(); + + abstract Bucket newBucket(long docCount, InternalAggregations aggs); + public Bucket reduce(List buckets, BigArrays bigArrays) { - if (buckets.size() == 1) { - Bucket bucket = buckets.get(0); - bucket.aggregations.reduce(bigArrays); - return bucket; - } - Bucket reduced = null; + long docCount = 0; List aggregationsList = new ArrayList<>(buckets.size()); for (Bucket bucket : buckets) { - if (reduced == null) { - reduced = bucket; - } else { - reduced.docCount += bucket.docCount; - } + docCount += bucket.docCount; aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays); - return reduced; + InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays); + return newBucket(docCount, aggs); } } @@ -113,47 +108,21 @@ public abstract class InternalTerms extends InternalAggregation implements Terms } @Override - public InternalTerms reduce(ReduceContext reduceContext) { + public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.bigArrays()); - return terms; - } - InternalTerms reduced = null; - - Map> buckets = null; + Multimap buckets = ArrayListMultimap.create(); for (InternalAggregation aggregation : aggregations) { InternalTerms terms = (InternalTerms) aggregation; - if (terms instanceof UnmappedTerms) { - continue; - } - if (reduced == null) { - reduced = terms; - } - if (buckets == null) { - buckets = new HashMap<>(terms.buckets.size()); - } for (Bucket bucket : terms.buckets) { - List existingBuckets = buckets.get(bucket.getKeyAsText()); - if (existingBuckets == null) { - existingBuckets = new ArrayList<>(aggregations.size()); - buckets.put(bucket.getKeyAsText(), existingBuckets); - } - existingBuckets.add(bucket); + buckets.put(bucket.getKeyAsObject(), bucket); } } - if (reduced == null) { - // there are only unmapped terms, so we just return the first one (no need to reduce) - return (UnmappedTerms) aggregations.get(0); - } - final int size = Math.min(requiredSize, buckets.size()); BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); - for (Map.Entry> entry : buckets.entrySet()) { - List sameTermBuckets = entry.getValue(); + for (Collection l : buckets.asMap().values()) { + List sameTermBuckets = (List) l; // cast is ok according to javadocs final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); if (b.docCount >= minDocCount) { ordered.insertWithOverflow(b); @@ -163,22 +132,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = (Bucket) ordered.pop(); } - reduced.buckets = Arrays.asList(list); - return reduced; + return newAggregation(name, Arrays.asList(list)); } - final void trimExcessEntries(BigArrays bigArrays) { - final List newBuckets = Lists.newArrayList(); - for (Bucket b : buckets) { - if (newBuckets.size() >= requiredSize) { - break; - } - if (b.docCount >= minDocCount) { - newBuckets.add(b); - b.aggregations.reduce(bigArrays); - } - } - buckets = newBuckets; - } + protected abstract InternalTerms newAggregation(String name, List buckets); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index eb642d05f4f..cbbf86d56e7 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; -import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -87,6 +83,16 @@ public class LongTerms extends InternalTerms { int compareTerm(Terms.Bucket other) { return Long.compare(term, other.getKeyAsNumber().longValue()); } + + @Override + Object getKeyAsObject() { + return getKeyAsNumber(); + } + + @Override + Bucket newBucket(long docCount, InternalAggregations aggs) { + return new Bucket(term, docCount, aggs); + } } private @Nullable ValueFormatter formatter; @@ -104,59 +110,8 @@ public class LongTerms extends InternalTerms { } @Override - public InternalTerms reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - InternalTerms terms = (InternalTerms) aggregations.get(0); - terms.trimExcessEntries(reduceContext.bigArrays()); - return terms; - } - InternalTerms reduced = null; - - LongObjectPagedHashMap> buckets = null; - for (InternalAggregation aggregation : aggregations) { - InternalTerms terms = (InternalTerms) aggregation; - if (terms instanceof UnmappedTerms) { - continue; - } - if (reduced == null) { - reduced = terms; - } - if (buckets == null) { - buckets = new LongObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays()); - } - for (Terms.Bucket bucket : terms.buckets) { - List existingBuckets = buckets.get(((Bucket) bucket).term); - if (existingBuckets == null) { - existingBuckets = new ArrayList<>(aggregations.size()); - buckets.put(((Bucket) bucket).term, existingBuckets); - } - existingBuckets.add((Bucket) bucket); - } - } - - if (reduced == null) { - // there are only unmapped terms, so we just return the first one (no need to reduce) - return (UnmappedTerms) aggregations.get(0); - } - - // TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ? - final int size = (int) Math.min(requiredSize, buckets.size()); - BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null)); - for (LongObjectPagedHashMap.Cursor> cursor : buckets) { - List sameTermBuckets = cursor.value; - final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays()); - if (b.getDocCount() >= minDocCount) { - ordered.insertWithOverflow(b); - } - } - buckets.close(); - InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = (Bucket) ordered.pop(); - } - reduced.buckets = Arrays.asList(list); - return reduced; + protected InternalTerms newAggregation(String name, List buckets) { + return new LongTerms(name, order, formatter, requiredSize, minDocCount, buckets); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index 33db4e89664..d104ef5faba 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -84,6 +84,16 @@ public class StringTerms extends InternalTerms { int compareTerm(Terms.Bucket other) { return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes); } + + @Override + Object getKeyAsObject() { + return getKeyAsText(); + } + + @Override + Bucket newBucket(long docCount, InternalAggregations aggs) { + return new Bucket(termBytes, docCount, aggs); + } } StringTerms() {} // for serialization @@ -97,6 +107,11 @@ public class StringTerms extends InternalTerms { return TYPE; } + @Override + protected InternalTerms newAggregation(String name, List buckets) { + return new StringTerms(name, order, requiredSize, minDocCount, buckets); + } + @Override public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index ae58b68a901..fb1b4999fcd 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -80,6 +82,21 @@ public class UnmappedTerms extends InternalTerms { out.writeVLong(minDocCount); } + @Override + public InternalAggregation reduce(ReduceContext reduceContext) { + for (InternalAggregation agg : reduceContext.aggregations()) { + if (!(agg instanceof UnmappedTerms)) { + return agg.reduce(reduceContext); + } + } + return this; + } + + @Override + protected InternalTerms newAggregation(String name, List buckets) { + throw new UnsupportedOperationException("How did you get there?"); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(name); diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvg.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvg.java index 255c1a7b47f..f30a4c9a157 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvg.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvg.java @@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.List; /** * @@ -76,20 +75,13 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i @Override public InternalAvg reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return (InternalAvg) aggregations.get(0); + long count = 0; + double sum = 0; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + count += ((InternalAvg) aggregation).count; + sum += ((InternalAvg) aggregation).sum; } - InternalAvg reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalAvg) aggregation; - } else { - reduced.count += ((InternalAvg) aggregation).count; - reduced.sum += ((InternalAvg) aggregation).sum; - } - } - return reduced; + return new InternalAvg(getName(), sum, count); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/HyperLogLogPlusPlus.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/HyperLogLogPlusPlus.java index 6f4a873ec67..c4980dc8cf9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/HyperLogLogPlusPlus.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/HyperLogLogPlusPlus.java @@ -184,6 +184,10 @@ public final class HyperLogLogPlusPlus implements Releasable { alphaMM = alpha * m * m; } + public int precision() { + return p; + } + public long maxBucket() { return runLens.size() >>> p; } diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java index 3df55f4104c..046d35193b8 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/cardinality/InternalCardinality.java @@ -104,18 +104,17 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation final InternalCardinality cardinality = (InternalCardinality) aggregation; if (cardinality.counts != null) { if (reduced == null) { - reduced = cardinality; - } else { - reduced.merge(cardinality); + reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), BigArrays.NON_RECYCLING_INSTANCE, 1)); } + reduced.merge(cardinality); } } if (reduced == null) { // all empty - reduced = (InternalCardinality) aggregations.get(0); + return aggregations.get(0); + } else { + return reduced; } - - return reduced; } public void merge(InternalCardinality other) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/geobounds/InternalGeoBounds.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/geobounds/InternalGeoBounds.java index df667312856..5df5cd1db86 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/geobounds/InternalGeoBounds.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/geobounds/InternalGeoBounds.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; import java.io.IOException; -import java.util.List; public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds { @@ -72,36 +71,36 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo @Override public InternalAggregation reduce(ReduceContext reduceContext) { - InternalGeoBounds reduced = null; - List aggregations = reduceContext.aggregations(); - for (InternalAggregation aggregation : aggregations) { - InternalGeoBounds bounds = (InternalGeoBounds) aggregation; - - if (reduced == null) { - reduced = bounds; - continue; - } + double top = Double.NEGATIVE_INFINITY; + double bottom = Double.POSITIVE_INFINITY; + double posLeft = Double.POSITIVE_INFINITY; + double posRight = Double.NEGATIVE_INFINITY; + double negLeft = Double.POSITIVE_INFINITY; + double negRight = Double.NEGATIVE_INFINITY; - if (bounds.top > reduced.top) { - reduced.top = bounds.top; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + InternalGeoBounds bounds = (InternalGeoBounds) aggregation; + + if (bounds.top > top) { + top = bounds.top; } - if (bounds.bottom < reduced.bottom) { - reduced.bottom = bounds.bottom; + if (bounds.bottom < bottom) { + bottom = bounds.bottom; } - if (bounds.posLeft < reduced.posLeft) { - reduced.posLeft = bounds.posLeft; + if (bounds.posLeft < posLeft) { + posLeft = bounds.posLeft; } - if (bounds.posRight > reduced.posRight) { - reduced.posRight = bounds.posRight; + if (bounds.posRight > posRight) { + posRight = bounds.posRight; } - if (bounds.negLeft < reduced.negLeft) { - reduced.negLeft = bounds.negLeft; + if (bounds.negLeft < negLeft) { + negLeft = bounds.negLeft; } - if (bounds.negRight > reduced.negRight) { - reduced.negRight = bounds.negRight; + if (bounds.negRight > negRight) { + negRight = bounds.negRight; } } - return reduced; + return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/max/InternalMax.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/max/InternalMax.java index e153580db46..c7dffa16cc6 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/max/InternalMax.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/max/InternalMax.java @@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.List; /** * @@ -74,22 +73,11 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i @Override public InternalMax reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return (InternalMax) aggregations.get(0); + double max = Double.NEGATIVE_INFINITY; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + max = Math.max(max, ((InternalMax) aggregation).max); } - InternalMax reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalMax) aggregation; - } else { - reduced.max = Math.max(reduced.max, ((InternalMax) aggregation).max); - } - } - if (reduced != null) { - return reduced; - } - return (InternalMax) aggregations.get(0); + return new InternalMax(name, max); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/min/InternalMin.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/min/InternalMin.java index 0ce5d7561d5..1cf8f94dd4e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/min/InternalMin.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/min/InternalMin.java @@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.List; /** * @@ -75,22 +74,11 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i @Override public InternalMin reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return (InternalMin) aggregations.get(0); + double min = Double.POSITIVE_INFINITY; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + min = Math.min(min, ((InternalMin) aggregation).min); } - InternalMin reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalMin) aggregation; - } else { - reduced.min = Math.min(reduced.min, ((InternalMin) aggregation).min); - } - } - if (reduced != null) { - return reduced; - } - return (InternalMin) aggregations.get(0); + return new InternalMin(getName(), min); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/InternalPercentiles.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/InternalPercentiles.java index 060e8916128..47472efa60b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/InternalPercentiles.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/InternalPercentiles.java @@ -32,7 +32,6 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream import java.io.IOException; import java.util.Iterator; -import java.util.List; /** * @@ -89,17 +88,15 @@ public class InternalPercentiles extends InternalNumericMetricsAggregation.Multi @Override public InternalPercentiles reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - InternalPercentiles merged = null; - for (InternalAggregation aggregation : aggregations) { + TDigestState merged = null; + for (InternalAggregation aggregation : reduceContext.aggregations()) { final InternalPercentiles percentiles = (InternalPercentiles) aggregation; if (merged == null) { - merged = percentiles; - } else { - merged.state.add(percentiles.state); + merged = new TDigestState(percentiles.state.compression()); } + merged.add(percentiles.state); } - return merged; + return new InternalPercentiles(getName(), percents, merged, keyed); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java index 3eeb56ae921..57bdc35c432 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java @@ -36,6 +36,10 @@ public class TDigestState extends AVLTreeDigest { this.compression = compression; } + public double compression() { + return compression; + } + public static void write(TDigestState state, StreamOutput out) throws IOException { out.writeDouble(state.compression); out.writeVInt(state.centroidCount()); diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java index 0bbc9bbbac8..527050deb23 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/InternalStats.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.List; /** * @@ -120,33 +119,18 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue @Override public InternalStats reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return (InternalStats) aggregations.get(0); + long count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sum = 0; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + InternalStats stats = (InternalStats) aggregation; + count += stats.getCount(); + min = Math.min(min, stats.getMin()); + max = Math.max(max, stats.getMax()); + sum += stats.getSum(); } - InternalStats reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - if (((InternalStats) aggregation).count != 0) { - reduced = (InternalStats) aggregation; - } - } else { - if (((InternalStats) aggregation).count != 0) { - reduced.count += ((InternalStats) aggregation).count; - reduced.min = Math.min(reduced.min, ((InternalStats) aggregation).min); - reduced.max = Math.max(reduced.max, ((InternalStats) aggregation).max); - reduced.sum += ((InternalStats) aggregation).sum; - mergeOtherStats(reduced, aggregation); - } - } - } - if (reduced != null) { - return reduced; - } - return (InternalStats) aggregations.get(0); - } - - protected void mergeOtherStats(InternalStats to, InternalAggregation from) { + return new InternalStats(name, count, sum, min, max); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java index c5552434d0f..eed0f565c38 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/stats/extended/InternalExtendedStats.java @@ -101,8 +101,14 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat } @Override - protected void mergeOtherStats(InternalStats to, InternalAggregation from) { - ((InternalExtendedStats) to).sumOfSqrs += ((InternalExtendedStats) from).sumOfSqrs; + public InternalExtendedStats reduce(ReduceContext reduceContext) { + double sumOfSqrs = 0; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + InternalExtendedStats stats = (InternalExtendedStats) aggregation; + sumOfSqrs += stats.getSumOfSquares(); + } + final InternalStats stats = super.reduce(reduceContext); + return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/sum/InternalSum.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/sum/InternalSum.java index 27d16106f22..1b89ecddd57 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/sum/InternalSum.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/sum/InternalSum.java @@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import java.io.IOException; -import java.util.List; /** * @@ -74,22 +73,11 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i @Override public InternalSum reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return (InternalSum) aggregations.get(0); + double sum = 0; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + sum += ((InternalSum) aggregation).sum; } - InternalSum reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalSum) aggregation; - } else { - reduced.sum += ((InternalSum) aggregation).sum; - } - } - if (reduced != null) { - return reduced; - } - return (InternalSum) aggregations.get(0); + return new InternalSum(name, sum); } @Override diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java index a1ce92490ef..5c049c2e9d5 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java @@ -94,10 +94,6 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi @Override public InternalAggregation reduce(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1 && from == 0) { - return aggregations.get(0); - } - TopDocs[] shardDocs = new TopDocs[aggregations.size()]; InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()]; for (int i = 0; i < shardDocs.length; i++) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/valuecount/InternalValueCount.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/valuecount/InternalValueCount.java index 08d05b2078d..81e40159306 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/valuecount/InternalValueCount.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/valuecount/InternalValueCount.java @@ -26,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import java.io.IOException; -import java.util.List; /** * An internal implementation of {@link ValueCount}. @@ -69,19 +68,11 @@ public class InternalValueCount extends InternalNumericMetricsAggregation implem @Override public InternalAggregation reduce(ReduceContext reduceContext) { - List aggregations = reduceContext.aggregations(); - if (aggregations.size() == 1) { - return aggregations.get(0); + long valueCount = 0; + for (InternalAggregation aggregation : reduceContext.aggregations()) { + valueCount += ((InternalValueCount) aggregation).value; } - InternalValueCount reduced = null; - for (InternalAggregation aggregation : aggregations) { - if (reduced == null) { - reduced = (InternalValueCount) aggregation; - } else { - reduced.value += ((InternalValueCount) aggregation).value; - } - } - return reduced; + return new InternalValueCount(name, valueCount); } @Override diff --git a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java b/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java index a0e6a0ca262..80519bd2ae1 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/RandomTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.test.ElasticsearchIntegrationTest; import java.util.List; @@ -310,4 +311,49 @@ public class RandomTests extends ElasticsearchIntegrationTest { assertEquals(numDocs, response.getHits().getTotalHits()); } + // https://github.com/elasticsearch/elasticsearch/issues/6435 + public void testReduce() throws Exception { + createIndex("idx"); + final int value = randomIntBetween(0, 10); + indexRandom(true, client().prepareIndex("idx", "type").setSource("f", value)); + + SearchResponse response = client().prepareSearch("idx") + .addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter()) + .subAggregation(range("range") + .field("f") + .addUnboundedTo(6) + .addUnboundedFrom(6) + .subAggregation(sum("sum").field("f")))) + .execute().actionGet(); + + assertSearchResponse(response);System.out.println(response); + + Filter filter = response.getAggregations().get("filter"); + assertNotNull(filter); + assertEquals(1, filter.getDocCount()); + + Range range = filter.getAggregations().get("range"); + assertThat(range, notNullValue()); + assertThat(range.getName(), equalTo("range")); + assertThat(range.getBuckets().size(), equalTo(2)); + + Range.Bucket bucket = range.getBucketByKey("*-6.0"); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("*-6.0")); + assertThat(bucket.getFrom().doubleValue(), equalTo(Double.NEGATIVE_INFINITY)); + assertThat(bucket.getTo().doubleValue(), equalTo(6.0)); + assertThat(bucket.getDocCount(), equalTo(value < 6 ? 1L : 0L)); + Sum sum = bucket.getAggregations().get("sum"); + assertEquals(value < 6 ? value : 0, sum.getValue(), 0d); + + bucket = range.getBucketByKey("6.0-*"); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("6.0-*")); + assertThat(bucket.getFrom().doubleValue(), equalTo(6.0)); + assertThat(bucket.getTo().doubleValue(), equalTo(Double.POSITIVE_INFINITY)); + assertThat(bucket.getDocCount(), equalTo(value >= 6 ? 1L : 0L)); + sum = bucket.getAggregations().get("sum"); + assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d); + } + }