From 85a603ee6192055332e7b002b2a170a38bb48774 Mon Sep 17 00:00:00 2001 From: Evangelos Chatzikalymnios Date: Wed, 9 Jan 2019 17:01:39 +0100 Subject: [PATCH] Use List instead of priority queue for stable sorting in bucket sort aggregator (#36748) Update BucketSortPipelineAggregator to use a List and Collections.sort() for sorting instead of a priority queue. This preserves the order for equal values. Closes #36322. --- .../BucketSortPipelineAggregator.java | 39 +++++----------- .../aggregations/pipeline/BucketSortIT.java | 44 +++++++++++++++++++ 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java index e98fdec9927..b639a384c76 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -34,7 +33,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -95,22 +93,22 @@ public class BucketSortPipelineAggregator extends PipelineAggregator { return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount)))); } - int queueSize = Math.min(from + currentSize, bucketsCount); - PriorityQueue ordered = new TopNPriorityQueue(queueSize); + List ordered = new ArrayList<>(); for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket); if (comparableBucket.skip() == false) { - ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket)); + ordered.add(comparableBucket); } } - int resultSize = Math.max(ordered.size() - from, 0); + Collections.sort(ordered); - // Popping from the priority queue returns the least element. The elements we want to skip due to offset would pop last. - // Thus, we just have to pop as many elements as we expect in results and store them in reverse order. - LinkedList newBuckets = new LinkedList<>(); - for (int i = 0; i < resultSize; ++i) { - newBuckets.addFirst(ordered.pop().internalBucket); + // We just have to get as many elements as we expect in results and store them in the same order starting from + // the specified offset and taking currentSize into consideration. + int limit = Math.min(from + currentSize, ordered.size()); + List newBuckets = new ArrayList<>(); + for (int i = from; i < limit; ++i) { + newBuckets.add(ordered.get(i).internalBucket); } return originalAgg.create(newBuckets); } @@ -160,11 +158,11 @@ public class BucketSortPipelineAggregator extends PipelineAggregator { if (thisValue == null && thatValue == null) { continue; } else if (thisValue == null) { - return -1; - } else if (thatValue == null) { return 1; + } else if (thatValue == null) { + return -1; } else { - compareResult = sort.order() == SortOrder.DESC ? thisValue.compareTo(thatValue) : -thisValue.compareTo(thatValue); + compareResult = sort.order() == SortOrder.DESC ? -thisValue.compareTo(thatValue) : thisValue.compareTo(thatValue); } if (compareResult != 0) { break; @@ -173,17 +171,4 @@ public class BucketSortPipelineAggregator extends PipelineAggregator { return compareResult; } } - - - private static class TopNPriorityQueue extends PriorityQueue { - - private TopNPriorityQueue(int n) { - super(n); - } - - @Override - protected boolean lessThan(ComparableBucket a, ComparableBucket b) { - return a.compareTo(b) < 0; - } - } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSortIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSortIT.java index 8618d5a34ba..7cb4371354c 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSortIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/BucketSortIT.java @@ -43,6 +43,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketSort; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; @@ -191,6 +192,26 @@ public class BucketSortIT extends ESIntegTestCase { } } + public void testSortTermsOnKeyWithSize() { + SearchResponse response = client().prepareSearch(INDEX) + .setSize(0) + .addAggregation(terms("foos").field(TERM_FIELD) + .subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("_key"))).size(3))) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("foos"); + assertThat(terms, notNullValue()); + List termsBuckets = terms.getBuckets(); + assertEquals(3, termsBuckets.size()); + String previousKey = (String) termsBuckets.get(0).getKey(); + for (Terms.Bucket termBucket : termsBuckets) { + assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey())); + previousKey = (String) termBucket.getKey(); + } + } + public void testSortTermsOnSubAggregation() { SearchResponse response = client().prepareSearch(INDEX) .setSize(0) @@ -231,6 +252,29 @@ public class BucketSortIT extends ESIntegTestCase { } } + public void testSortTermsOnSubAggregationPreservesOrderOnEquals() { + SearchResponse response = client().prepareSearch(INDEX) + .setSize(0) + .addAggregation(terms("foos").field(TERM_FIELD) + .subAggregation(bucketSort("keyBucketSort", Arrays.asList(new FieldSortBuilder("_key")))) + .subAggregation(max("max").field("missingValue").missing(1)) + .subAggregation(bucketSort("maxBucketSort", Arrays.asList(new FieldSortBuilder("max"))))) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("foos"); + assertThat(terms, notNullValue()); + List termsBuckets = terms.getBuckets(); + + // Since all max values are equal, we expect the order of keyBucketSort to have been preserved + String previousKey = (String) termsBuckets.get(0).getKey(); + for (Terms.Bucket termBucket : termsBuckets) { + assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey())); + previousKey = (String) termBucket.getKey(); + } + } + public void testSortTermsOnCountWithSecondarySort() { SearchResponse response = client().prepareSearch(INDEX) .setSize(0)