diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java index 03bb3150408..5453c1e1f08 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregator.java @@ -118,7 +118,8 @@ public class HistogramAggregator extends BucketsAggregator { buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), keyed, formatter)); } - CollectionUtil.introSort(buckets, order.comparator()); + // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, InternalOrder.KEY_ASC.comparator()); // value source will be null for unmapped fields InternalHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 ? new InternalHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds) : null; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 7c6b17546e1..b0ac9ce877e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -20,7 +20,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import com.carrotsearch.hppc.LongObjectOpenHashMap; import com.google.common.collect.Lists; + import org.apache.lucene.util.CollectionUtil; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -38,6 +40,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -289,96 +292,150 @@ public class InternalHistogram extends Inter return FACTORY; } - @Override - public InternalAggregation reduce(ReduceContext reduceContext) { + private static class IteratorAndCurrent { + + private final Iterator iterator; + private B current; + + IteratorAndCurrent(Iterator iterator) { + this.iterator = iterator; + current = iterator.next(); + } + + } + + private List reduceBuckets(ReduceContext reduceContext) { List aggregations = reduceContext.aggregations(); - LongObjectPagedHashMap> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays()); + final PriorityQueue> pq = new PriorityQueue>(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current.key < b.current.key; + } + }; for (InternalAggregation aggregation : aggregations) { InternalHistogram histogram = (InternalHistogram) aggregation; - for (B bucket : histogram.buckets) { - List bucketList = bucketsByKey.get(bucket.key); - if (bucketList == null) { - bucketList = new ArrayList<>(aggregations.size()); - bucketsByKey.put(bucket.key, bucketList); + if (histogram.buckets.isEmpty() == false) { + pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator())); + } + } + + List reducedBuckets = new ArrayList<>(); + if (pq.size() > 0) { + // list of buckets coming from different shards that have the same key + List currentBuckets = new ArrayList<>(); + long key = pq.top().current.key; + + do { + final IteratorAndCurrent top = pq.top(); + + if (top.current.key != key) { + // the key changes, reduce what we already buffered and reset the buffer for current buckets + final B reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + if (reduced.getDocCount() >= minDocCount) { + reducedBuckets.add(reduced); + } + currentBuckets.clear(); + key = top.current.key; + } + + currentBuckets.add(top.current); + + if (top.iterator.hasNext()) { + final B next = top.iterator.next(); + assert next.key > top.current.key : "shards must return data sorted by key"; + top.current = next; + pq.updateTop(); + } else { + pq.pop(); + } + } while (pq.size() > 0); + + if (currentBuckets.isEmpty() == false) { + final B reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); + if (reduced.getDocCount() >= minDocCount) { + reducedBuckets.add(reduced); } - bucketList.add(bucket); } } - List reducedBuckets = new ArrayList<>((int) bucketsByKey.size()); - for (LongObjectPagedHashMap.Cursor> cursor : bucketsByKey) { - List sameTermBuckets = cursor.value; - B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); - if (bucket.getDocCount() >= minDocCount) { - reducedBuckets.add(bucket); - } - } - bucketsByKey.close(); + return reducedBuckets; + } - // adding empty buckets in needed - if (minDocCount == 0) { - CollectionUtil.introSort(reducedBuckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator()); - List list = order.asc ? reducedBuckets : Lists.reverse(reducedBuckets); - B lastBucket = null; - ExtendedBounds bounds = emptyBucketInfo.bounds; - ListIterator iter = list.listIterator(); + private void addEmptyBuckets(List list) { + B lastBucket = null; + ExtendedBounds bounds = emptyBucketInfo.bounds; + ListIterator iter = list.listIterator(); - // first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested) - if (bounds != null) { - B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; - if (firstBucket == null) { - if (bounds.min != null && bounds.max != null) { - long key = bounds.min; - long max = bounds.max; - while (key <= max) { + // first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested) + if (bounds != null) { + B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null; + if (firstBucket == null) { + if (bounds.min != null && bounds.max != null) { + long key = bounds.min; + long max = bounds.max; + while (key <= max) { + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + key = emptyBucketInfo.rounding.nextRoundingValue(key); + } + } + } else { + if (bounds.min != null) { + long key = bounds.min; + if (key < firstBucket.key) { + while (key < firstBucket.key) { iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } } - } else { - if (bounds.min != null) { - long key = bounds.min; - if (key < firstBucket.key) { - while (key < firstBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - } - } } } + } - // now adding the empty buckets within the actual data, - // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 - while (iter.hasNext()) { - B nextBucket = list.get(iter.nextIndex()); - if (lastBucket != null) { - long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); - while (key < nextBucket.key) { - iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); - key = emptyBucketInfo.rounding.nextRoundingValue(key); - } - assert key == nextBucket.key; - } - lastBucket = iter.next(); - } - - // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) - if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) { + // now adding the empty buckets within the actual data, + // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 + while (iter.hasNext()) { + B nextBucket = list.get(iter.nextIndex()); + if (lastBucket != null) { long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); - long max = bounds.max; - while (key <= max) { + while (key < nextBucket.key) { iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); key = emptyBucketInfo.rounding.nextRoundingValue(key); } + assert key == nextBucket.key; } + lastBucket = iter.next(); + } - if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) { - CollectionUtil.introSort(reducedBuckets, order.comparator()); + // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) + if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) { + long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key); + long max = bounds.max; + while (key <= max) { + iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter)); + key = emptyBucketInfo.rounding.nextRoundingValue(key); } + } + } + @Override + public InternalAggregation reduce(ReduceContext reduceContext) { + List reducedBuckets = reduceBuckets(reduceContext); + + // adding empty buckets if needed + if (minDocCount == 0) { + addEmptyBuckets(reducedBuckets); + } + + if (order == InternalOrder.KEY_ASC) { + // nothing to do, data are already sorted since shards return + // sorted buckets and the merge-sort performed by reduceBuckets + // maintains order + } else if (order == InternalOrder.KEY_DESC) { + // we just need to reverse here... + reducedBuckets = Lists.reverse(reducedBuckets); } else { + // sorted by sub-aggregation, need to fall back to a costly n*log(n) sort CollectionUtil.introSort(reducedBuckets, order.comparator()); }