Use List instead of priority queue for stable sorting in bucket sort aggregator (#36748)
Update BucketSortPipelineAggregator to use a List and Collections.sort() for sorting instead of a priority queue. This preserves the order for equal values. Closes #36322.
This commit is contained in:
parent
eacc63b032
commit
85a603ee61
|
@ -19,7 +19,6 @@
|
||||||
package org.elasticsearch.search.aggregations.pipeline;
|
package org.elasticsearch.search.aggregations.pipeline;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.lucene.util.PriorityQueue;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
|
@ -34,7 +33,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -95,22 +93,22 @@ public class BucketSortPipelineAggregator extends PipelineAggregator {
|
||||||
return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount))));
|
return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount))));
|
||||||
}
|
}
|
||||||
|
|
||||||
int queueSize = Math.min(from + currentSize, bucketsCount);
|
List<ComparableBucket> ordered = new ArrayList<>();
|
||||||
PriorityQueue<ComparableBucket> ordered = new TopNPriorityQueue(queueSize);
|
|
||||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||||
ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket);
|
ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket);
|
||||||
if (comparableBucket.skip() == false) {
|
if (comparableBucket.skip() == false) {
|
||||||
ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket));
|
ordered.add(comparableBucket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int resultSize = Math.max(ordered.size() - from, 0);
|
Collections.sort(ordered);
|
||||||
|
|
||||||
// Popping from the priority queue returns the least element. The elements we want to skip due to offset would pop last.
|
// We just have to get as many elements as we expect in results and store them in the same order starting from
|
||||||
// Thus, we just have to pop as many elements as we expect in results and store them in reverse order.
|
// the specified offset and taking currentSize into consideration.
|
||||||
LinkedList<InternalMultiBucketAggregation.InternalBucket> newBuckets = new LinkedList<>();
|
int limit = Math.min(from + currentSize, ordered.size());
|
||||||
for (int i = 0; i < resultSize; ++i) {
|
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
|
||||||
newBuckets.addFirst(ordered.pop().internalBucket);
|
for (int i = from; i < limit; ++i) {
|
||||||
|
newBuckets.add(ordered.get(i).internalBucket);
|
||||||
}
|
}
|
||||||
return originalAgg.create(newBuckets);
|
return originalAgg.create(newBuckets);
|
||||||
}
|
}
|
||||||
|
@ -160,11 +158,11 @@ public class BucketSortPipelineAggregator extends PipelineAggregator {
|
||||||
if (thisValue == null && thatValue == null) {
|
if (thisValue == null && thatValue == null) {
|
||||||
continue;
|
continue;
|
||||||
} else if (thisValue == null) {
|
} else if (thisValue == null) {
|
||||||
return -1;
|
|
||||||
} else if (thatValue == null) {
|
|
||||||
return 1;
|
return 1;
|
||||||
|
} else if (thatValue == null) {
|
||||||
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
compareResult = sort.order() == SortOrder.DESC ? thisValue.compareTo(thatValue) : -thisValue.compareTo(thatValue);
|
compareResult = sort.order() == SortOrder.DESC ? -thisValue.compareTo(thatValue) : thisValue.compareTo(thatValue);
|
||||||
}
|
}
|
||||||
if (compareResult != 0) {
|
if (compareResult != 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -173,17 +171,4 @@ public class BucketSortPipelineAggregator extends PipelineAggregator {
|
||||||
return compareResult;
|
return compareResult;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class TopNPriorityQueue extends PriorityQueue<ComparableBucket> {
|
|
||||||
|
|
||||||
private TopNPriorityQueue(int n) {
|
|
||||||
super(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean lessThan(ComparableBucket a, ComparableBucket b) {
|
|
||||||
return a.compareTo(b) < 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
|
||||||
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
|
||||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||||
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketSort;
|
import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.bucketSort;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||||
|
@ -191,6 +192,26 @@ public class BucketSortIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSortTermsOnKeyWithSize() {
|
||||||
|
SearchResponse response = client().prepareSearch(INDEX)
|
||||||
|
.setSize(0)
|
||||||
|
.addAggregation(terms("foos").field(TERM_FIELD)
|
||||||
|
.subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("_key"))).size(3)))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
Terms terms = response.getAggregations().get("foos");
|
||||||
|
assertThat(terms, notNullValue());
|
||||||
|
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||||
|
assertEquals(3, termsBuckets.size());
|
||||||
|
String previousKey = (String) termsBuckets.get(0).getKey();
|
||||||
|
for (Terms.Bucket termBucket : termsBuckets) {
|
||||||
|
assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey()));
|
||||||
|
previousKey = (String) termBucket.getKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testSortTermsOnSubAggregation() {
|
public void testSortTermsOnSubAggregation() {
|
||||||
SearchResponse response = client().prepareSearch(INDEX)
|
SearchResponse response = client().prepareSearch(INDEX)
|
||||||
.setSize(0)
|
.setSize(0)
|
||||||
|
@ -231,6 +252,29 @@ public class BucketSortIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSortTermsOnSubAggregationPreservesOrderOnEquals() {
|
||||||
|
SearchResponse response = client().prepareSearch(INDEX)
|
||||||
|
.setSize(0)
|
||||||
|
.addAggregation(terms("foos").field(TERM_FIELD)
|
||||||
|
.subAggregation(bucketSort("keyBucketSort", Arrays.asList(new FieldSortBuilder("_key"))))
|
||||||
|
.subAggregation(max("max").field("missingValue").missing(1))
|
||||||
|
.subAggregation(bucketSort("maxBucketSort", Arrays.asList(new FieldSortBuilder("max")))))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
Terms terms = response.getAggregations().get("foos");
|
||||||
|
assertThat(terms, notNullValue());
|
||||||
|
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
|
||||||
|
|
||||||
|
// Since all max values are equal, we expect the order of keyBucketSort to have been preserved
|
||||||
|
String previousKey = (String) termsBuckets.get(0).getKey();
|
||||||
|
for (Terms.Bucket termBucket : termsBuckets) {
|
||||||
|
assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey()));
|
||||||
|
previousKey = (String) termBucket.getKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testSortTermsOnCountWithSecondarySort() {
|
public void testSortTermsOnCountWithSecondarySort() {
|
||||||
SearchResponse response = client().prepareSearch(INDEX)
|
SearchResponse response = client().prepareSearch(INDEX)
|
||||||
.setSize(0)
|
.setSize(0)
|
||||||
|
|
Loading…
Reference in New Issue