diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index d8f74b1b8e4..ba786f94147 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -255,30 +255,48 @@ public abstract class InternalTerms, B extends Int } } - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); - final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); - for (List sameTermBuckets : buckets.values()) { - final B b = reduceBucket(sameTermBuckets, reduceContext); - if (sumDocCountError == -1) { - b.docCountError = -1; - } else { - b.docCountError += sumDocCountError; - } - if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) { - B removed = ordered.insertWithOverflow(b); - if (removed != null) { - otherDocCount += removed.getDocCount(); - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + final B[] list; + if (reduceContext.isFinalReduce()) { + final int size = Math.min(requiredSize, buckets.size()); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + if (sumDocCountError == -1) { + b.docCountError = -1; } else { - reduceContext.consumeBucketsAndMaybeBreak(1); + b.docCountError += sumDocCountError; + } + if (b.docCount >= minDocCount) { + B removed = ordered.insertWithOverflow(b); + if (removed != null) { + otherDocCount += removed.getDocCount(); + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + } else { + reduceContext.consumeBucketsAndMaybeBreak(1); + } + } else { + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); } - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); } - } - B[] list = createBucketsArray(ordered.size()); - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); + list = createBucketsArray(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = ordered.pop(); + } + } else { + // keep all buckets on partial reduce + // TODO: we could prune the buckets when sorting by key + list = createBucketsArray(buckets.size()); + int pos = 0; + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + if (sumDocCountError == -1) { + b.docCountError = -1; + } else { + b.docCountError += sumDocCountError; + } + list[pos++] = b; + } } long docCountError; if (sumDocCountError == -1) {