Aggregations: reduce histogram buckets on the fly using a priority queue.

This commit makes histogram reduction a bit cleaner by expecting buckets
returned from shards to be sorted by key and merging them on-the-fly on the
coordinating node using a priority queue.

Close #8797
This commit is contained in:
Adrien Grand 2014-12-11 10:27:47 +01:00
parent 5059d6fe1c
commit 5694626f79
2 changed files with 117 additions and 65 deletions

View File

@ -118,7 +118,8 @@ public class HistogramAggregator extends BucketsAggregator {
buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.get(i)), bucketDocCount(i), bucketAggregations(i), keyed, formatter));
}
CollectionUtil.introSort(buckets, order.comparator());
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, InternalOrder.KEY_ASC.comparator());
// value source will be null for unmapped fields
InternalHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0 ? new InternalHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds) : null;

View File

@ -20,7 +20,9 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -38,6 +40,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@ -289,96 +292,144 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return FACTORY;
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
private static class IteratorAndCurrent<B> {
private final Iterator<B> iterator;
private B current;
IteratorAndCurrent(Iterator<B> iterator) {
this.iterator = iterator;
current = iterator.next();
}
}
private List<B> reduceBuckets(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays());
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
return a.current.key < b.current.key;
}
};
for (InternalAggregation aggregation : aggregations) {
InternalHistogram<B> histogram = (InternalHistogram) aggregation;
for (B bucket : histogram.buckets) {
List<B> bucketList = bucketsByKey.get(bucket.key);
if (bucketList == null) {
bucketList = new ArrayList<>(aggregations.size());
bucketsByKey.put(bucket.key, bucketList);
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
}
}
List<B> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<B> currentBuckets = new ArrayList<>();
long key = pq.top().current.key;
do {
final IteratorAndCurrent<B> top = pq.top();
if (top.current.key != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
reducedBuckets.add(currentBuckets.get(0).reduce(currentBuckets, reduceContext));
currentBuckets.clear();
key = top.current.key;
}
bucketList.add(bucket);
currentBuckets.add(top.current);
if (top.iterator.hasNext()) {
final B next = top.iterator.next();
assert next.key > top.current.key : "shards must return data sorted by key";
top.current = next;
pq.updateTop();
} else {
pq.pop();
}
} while (pq.size() > 0);
if (currentBuckets.isEmpty() == false) {
reducedBuckets.add(currentBuckets.get(0).reduce(currentBuckets, reduceContext));
}
}
List<B> reducedBuckets = new ArrayList<>((int) bucketsByKey.size());
for (LongObjectPagedHashMap.Cursor<List<B>> cursor : bucketsByKey) {
List<B> sameTermBuckets = cursor.value;
B bucket = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
if (bucket.getDocCount() >= minDocCount) {
reducedBuckets.add(bucket);
}
}
bucketsByKey.close();
return reducedBuckets;
}
// adding empty buckets in needed
if (minDocCount == 0) {
CollectionUtil.introSort(reducedBuckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
List<B> list = order.asc ? reducedBuckets : Lists.reverse(reducedBuckets);
B lastBucket = null;
ExtendedBounds bounds = emptyBucketInfo.bounds;
ListIterator<B> iter = list.listIterator();
private void addEmptyBuckets(List<B> list) {
B lastBucket = null;
ExtendedBounds bounds = emptyBucketInfo.bounds;
ListIterator<B> iter = list.listIterator();
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
if (bounds != null) {
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
if (bounds.min != null && bounds.max != null) {
long key = bounds.min;
long max = bounds.max;
while (key <= max) {
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
if (bounds != null) {
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
if (bounds.min != null && bounds.max != null) {
long key = bounds.min;
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
} else {
if (bounds.min != null) {
long key = bounds.min;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
} else {
if (bounds.min != null) {
long key = bounds.min;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
}
}
}
}
// now adding the empty buckets within the actual data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
while (iter.hasNext()) {
B nextBucket = list.get(iter.nextIndex());
if (lastBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
assert key == nextBucket.key;
}
lastBucket = iter.next();
}
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
// now adding the empty buckets within the actual data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6
while (iter.hasNext()) {
B nextBucket = list.get(iter.nextIndex());
if (lastBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
while (key <= max) {
while (key < nextBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
assert key == nextBucket.key;
}
lastBucket = iter.next();
}
if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) {
CollectionUtil.introSort(reducedBuckets, order.comparator());
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<B> reducedBuckets = reduceBuckets(reduceContext);
// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets);
}
if (order == InternalOrder.KEY_ASC) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (order == InternalOrder.KEY_DESC) {
// we just need to reverse here...
reducedBuckets = Lists.reverse(reducedBuckets);
} else {
// sorted by sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator());
}