From ae6c25b749ec706678b8da06a88a63c67ec4c0f5 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 10 Mar 2020 13:24:21 +0100 Subject: [PATCH] Speed up partial reduce of terms aggregations (#53216) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change optimizes the merge of terms aggregations by removing the priority queue that was used to collect all the buckets during a non-final reduction. We don't need to keep the result sorted since the merge of buckets in a subsequent reduce can modify the order. I wrote a small micro-benchmark to test the change and the speed ups are significative for small merge buffer sizes: ```` ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 2459,690 ± 198,682 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 1030,620 ± 91,544 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 558,608 ± 44,915 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 287,333 ± 8,342 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 257,325 ± 54,515 ms/op ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 805,611 ± 14,630 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 378,851 ± 17,929 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 261,094 ± 10,176 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 241,051 ± 19,558 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 231,643 ± 6,170 ms/op ```` The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster: ```` ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 12791,083 ± 397,128 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 3974,939 ± 324,617 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 2186,285 ± 267,124 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 914,657 ± 160,784 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 604,198 ± 145,457 ms/op ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 60696,107 ± 929,944 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 16292,894 ± 783,398 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 7705,444 ± 77,588 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 2156,685 ± 88,795 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 760,273 ± 53,738 ms/op ```` The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up. Relates #51857 --- .../bucket/terms/InternalTerms.java | 60 ++++++++++++------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index d8f74b1b8e4..ba786f94147 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -255,30 +255,48 @@ public abstract class InternalTerms, B extends Int } } - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); - final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); - for (List sameTermBuckets : buckets.values()) { - final B b = reduceBucket(sameTermBuckets, reduceContext); - if (sumDocCountError == -1) { - b.docCountError = -1; - } else { - b.docCountError += sumDocCountError; - } - if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) { - B removed = ordered.insertWithOverflow(b); - if (removed != null) { - otherDocCount += removed.getDocCount(); - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + final B[] list; + if (reduceContext.isFinalReduce()) { + final int size = Math.min(requiredSize, buckets.size()); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + if (sumDocCountError == -1) { + b.docCountError = -1; } else { - reduceContext.consumeBucketsAndMaybeBreak(1); + b.docCountError += sumDocCountError; + } + if (b.docCount >= minDocCount) { + B removed = ordered.insertWithOverflow(b); + if (removed != null) { + otherDocCount += removed.getDocCount(); + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); + } else { + reduceContext.consumeBucketsAndMaybeBreak(1); + } + } else { + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); } - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); } - } - B[] list = createBucketsArray(ordered.size()); - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); + list = createBucketsArray(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; i--) { + list[i] = ordered.pop(); + } + } else { + // keep all buckets on partial reduce + // TODO: we could prune the buckets when sorting by key + list = createBucketsArray(buckets.size()); + int pos = 0; + for (List sameTermBuckets : buckets.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + reduceContext.consumeBucketsAndMaybeBreak(1); + if (sumDocCountError == -1) { + b.docCountError = -1; + } else { + b.docCountError += sumDocCountError; + } + list[pos++] = b; + } } long docCountError; if (sumDocCountError == -1) {