From 3e2fa0966608fb7d33b0325da95a6cd856c6fa24 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Thu, 5 Nov 2020 11:29:47 +0100 Subject: [PATCH] Fix merging of terms aggregation with compound order (#64469) This change fixes a bug introduced in #61779 that uses a compound order to compare buckets when merging. The bug is triggered when the compound order uses a primary sort ordered by key (asc or desc). This commit ensures that we always extract the primary sort when comparing keys during merging. The PR is marked as no-issue since the bug has not been released in any official version. --- .../bucket/terms/InternalTerms.java | 18 +- .../bucket/terms/InternalTermsTestCase.java | 4 +- .../bucket/terms/StringTermsTests.java | 181 +++++++++++------- .../test/InternalAggregationTestCase.java | 8 +- ...nternalMultiBucketAggregationTestCase.java | 4 + 5 files changed, 133 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index cd6f3b0afb9..12b89f122e3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc; import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; public abstract class InternalTerms, B extends InternalTerms.Bucket> @@ -257,9 +258,9 @@ public abstract class InternalTerms, B extends Int } private List reduceMergeSort(List aggregations, - BucketOrder reduceOrder, ReduceContext reduceContext) { - assert isKeyOrder(reduceOrder); - final Comparator cmp = reduceOrder.comparator(); + BucketOrder thisReduceOrder, ReduceContext reduceContext) { + assert isKeyOrder(thisReduceOrder); + final Comparator cmp = thisReduceOrder.comparator(); final PriorityQueue> pq = new PriorityQueue>(aggregations.size()) { @Override protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { @@ -369,6 +370,8 @@ public abstract class InternalTerms, B extends Int bucket.docCountError -= thisAggDocCountError; } } + + final List reducedBuckets; /** * Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}. * That allows to perform a merge sort when reducing multiple aggregations together. @@ -376,8 +379,13 @@ public abstract class InternalTerms, B extends Int * the provided aggregations use a different {@link InternalTerms#reduceOrder}. */ BucketOrder thisReduceOrder = getReduceOrder(aggregations); - List reducedBuckets = isKeyOrder(thisReduceOrder) ? - reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext); + if (isKeyOrder(thisReduceOrder)) { + // extract the primary sort in case this is a compound order. + thisReduceOrder = InternalOrder.key(isKeyAsc(thisReduceOrder) ? true : false); + reducedBuckets = reduceMergeSort(aggregations, thisReduceOrder, reduceContext); + } else { + reducedBuckets = reduceLegacy(aggregations, reduceContext); + } final B[] list; if (reduceContext.isFinalReduce()) { final int size = Math.min(requiredSize, reducedBuckets.size()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTermsTestCase.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTermsTestCase.java index 6c1063ffbe3..b3e96a5769c 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTermsTestCase.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTermsTestCase.java @@ -35,8 +35,8 @@ import static org.hamcrest.Matchers.equalTo; public abstract class InternalTermsTestCase extends InternalMultiBucketAggregationTestCase> { - private boolean showDocCount; - private long docCountError; + protected boolean showDocCount; + protected long docCountError; @Before public void init() { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java index 867df8a666a..63cf1c8cdef 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java @@ -34,31 +34,24 @@ import java.util.Map; import java.util.Set; public class StringTermsTests extends InternalTermsTestCase { - @Override protected InternalTerms createTestInstance(String name, Map metadata, InternalAggregations aggregations, boolean showTermDocCountError, long docCountError) { - BucketOrder order = BucketOrder.count(false); - long minDocCount = 1; - int requiredSize = 3; - int shardSize = requiredSize + 2; - DocValueFormat format = DocValueFormat.RAW; - long otherDocCount = 0; - List buckets = new ArrayList<>(); - final int numBuckets = randomNumberOfBuckets(); - Set terms = new HashSet<>(); - for (int i = 0; i < numBuckets; ++i) { - BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10))); - int docCount = randomIntBetween(1, 100); - buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format)); + return createTestInstance(generateRandomDict(), name, metadata, aggregations, showTermDocCountError, docCountError); + } + + @Override + protected List> randomResultsToReduce(String name, int size) { + List> inputs = new ArrayList<>(); + BytesRef[] dict = generateRandomDict(); + for (int i = 0; i < size; i++) { + InternalTerms t = randomBoolean() ? createUnmappedInstance(name) : createTestInstance(dict, name); + inputs.add(t); } - BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true); - Collections.sort(buckets, reduceOrder.comparator()); - return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, - metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); + return inputs; } @Override @@ -82,46 +75,46 @@ public class StringTermsTests extends InternalTermsTestCase { long docCountError = stringTerms.getDocCountError(); Map metadata = stringTerms.getMetadata(); switch (between(0, 8)) { - case 0: - name += randomAlphaOfLength(5); - break; - case 1: - requiredSize += between(1, 100); - break; - case 2: - minDocCount += between(1, 100); - break; - case 3: - shardSize += between(1, 100); - break; - case 4: - showTermDocCountError = showTermDocCountError == false; - break; - case 5: - otherDocCount += between(1, 100); - break; - case 6: - docCountError += between(1, 100); - break; - case 7: - buckets = new ArrayList<>(buckets); - buckets.add(new StringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(), + case 0: + name += randomAlphaOfLength(5); + break; + case 1: + requiredSize += between(1, 100); + break; + case 2: + minDocCount += between(1, 100); + break; + case 3: + shardSize += between(1, 100); + break; + case 4: + showTermDocCountError = showTermDocCountError == false; + break; + case 5: + otherDocCount += between(1, 100); + break; + case 6: + docCountError += between(1, 100); + break; + case 7: + buckets = new ArrayList<>(buckets); + buckets.add(new StringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(), InternalAggregations.EMPTY, showTermDocCountError, docCountError, format)); - break; - case 8: - if (metadata == null) { - metadata = new HashMap<>(1); - } else { - metadata = new HashMap<>(instance.getMetadata()); - } - metadata.put(randomAlphaOfLength(15), randomInt()); - break; - default: - throw new AssertionError("Illegal randomisation branch"); + break; + case 8: + if (metadata == null) { + metadata = new HashMap<>(1); + } else { + metadata = new HashMap<>(instance.getMetadata()); + } + metadata.put(randomAlphaOfLength(15), randomInt()); + break; + default: + throw new AssertionError("Illegal randomisation branch"); } Collections.sort(buckets, stringTerms.reduceOrder.comparator()); return new StringTerms(name, stringTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, - showTermDocCountError, otherDocCount, buckets, docCountError); + showTermDocCountError, otherDocCount, buckets, docCountError); } else { String name = instance.getName(); BucketOrder order = instance.order; @@ -129,27 +122,69 @@ public class StringTermsTests extends InternalTermsTestCase { long minDocCount = instance.minDocCount; Map metadata = instance.getMetadata(); switch (between(0, 3)) { - case 0: - name += randomAlphaOfLength(5); - break; - case 1: - requiredSize += between(1, 100); - break; - case 2: - minDocCount += between(1, 100); - break; - case 3: - if (metadata == null) { - metadata = new HashMap<>(1); - } else { - metadata = new HashMap<>(instance.getMetadata()); - } - metadata.put(randomAlphaOfLength(15), randomInt()); - break; - default: - throw new AssertionError("Illegal randomisation branch"); + case 0: + name += randomAlphaOfLength(5); + break; + case 1: + requiredSize += between(1, 100); + break; + case 2: + minDocCount += between(1, 100); + break; + case 3: + if (metadata == null) { + metadata = new HashMap<>(1); + } else { + metadata = new HashMap<>(instance.getMetadata()); + } + metadata.put(randomAlphaOfLength(15), randomInt()); + break; + default: + throw new AssertionError("Illegal randomisation branch"); } return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); } } + + private BytesRef[] generateRandomDict() { + Set terms = new HashSet<>(); + int numTerms = randomIntBetween(2, 100); + for (int i = 0; i < numTerms; i++) { + terms.add(new BytesRef(randomAlphaOfLength(10))); + } + return terms.stream().toArray(BytesRef[]::new); + } + + private InternalTerms createTestInstance(BytesRef[] dict, String name) { + return createTestInstance(dict, name, createTestMetadata(), createSubAggregations(), showDocCount, docCountError); + } + + private InternalTerms createTestInstance(BytesRef[] dict, + String name, + Map metadata, + InternalAggregations aggregations, + boolean showTermDocCountError, + long docCountError) { + BucketOrder order = BucketOrder.count(false); + long minDocCount = 1; + int requiredSize = 3; + int shardSize = requiredSize + 2; + DocValueFormat format = DocValueFormat.RAW; + long otherDocCount = 0; + List buckets = new ArrayList<>(); + final int numBuckets = randomNumberOfBuckets(); + Set terms = new HashSet<>(); + for (int i = 0; i < numBuckets; ++i) { + BytesRef term = dict[randomIntBetween(0, dict.length-1)]; + if (terms.add(term)) { + int docCount = randomIntBetween(1, 100); + buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format)); + } + } + BucketOrder reduceOrder = randomBoolean() ? + BucketOrder.compound(BucketOrder.key(true), BucketOrder.count(false)) : BucketOrder.key(true); + Collections.sort(buckets, reduceOrder.comparator()); + return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, + metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index f27f6e7e468..54e0e9fa8b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -419,7 +419,7 @@ public abstract class InternalAggregationTestCase return createTestInstance(randomAlphaOfLength(5)); } - private T createTestInstance(String name) { + public final Map createTestMetadata() { Map metadata = null; if (randomBoolean()) { metadata = new HashMap<>(); @@ -428,7 +428,11 @@ public abstract class InternalAggregationTestCase metadata.put(randomAlphaOfLength(5), randomAlphaOfLength(5)); } } - return createTestInstance(name, metadata); + return metadata; + } + + private T createTestInstance(String name) { + return createTestInstance(name, createTestMetadata()); } /** Return an instance on an unmapped field. */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index 44358c666fa..1b784c6191d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -66,6 +66,10 @@ public abstract class InternalMultiBucketAggregationTestCase