From 2fc8acd40956537cc790d6f96cf944bcb9d5217c Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 1 Feb 2022 12:19:10 -0800 Subject: [PATCH] Fix composite aggregation failed test cases introduce by missing_order parameter (#1942) (#2005) Fix composite aggregation failed test cases introduce by missing_order parameter by using MissingOrder to decide null values's order in InternalBucket. Signed-off-by: Peng Huo --- .../bucket/composite/CompositeAggregator.java | 27 ++++++- .../bucket/composite/InternalComposite.java | 74 ++++++++++++++++--- .../composite/InternalCompositeTests.java | 21 +++++- 3 files changed, 107 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index dd399e40617..1d48850bee1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -70,6 +70,7 @@ import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.sort.SortAndFormats; @@ -89,6 +90,7 @@ final class CompositeAggregator extends BucketsAggregator { private final int size; private final List sourceNames; private final int[] reverseMuls; + private final MissingOrder[] missingOrders; private final List formats; private final CompositeKey rawAfterKey; @@ -117,6 +119,7 @@ final class CompositeAggregator extends BucketsAggregator { this.size = size; this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); + this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new); this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); this.sources = new SingleDimensionValuesSource[sourceConfigs.length]; // check that the provided size is not greater than the search.max_buckets setting @@ -189,7 +192,15 @@ final class CompositeAggregator extends BucketsAggregator { CompositeKey key = queue.toCompositeKey(slot); InternalAggregations aggs = subAggsForBuckets[slot]; int docCount = queue.getDocCount(slot); - buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); + buckets[queue.size()] = new InternalComposite.InternalBucket( + sourceNames, + formats, + key, + reverseMuls, + missingOrders, + docCount, + aggs + ); } CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { @@ -201,6 +212,7 @@ final class CompositeAggregator extends BucketsAggregator { Arrays.asList(buckets), lastBucket, reverseMuls, + missingOrders, earlyTerminated, metadata() ) }; @@ -208,7 +220,18 @@ final class CompositeAggregator extends BucketsAggregator { @Override public InternalAggregation buildEmptyAggregation() { - return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, false, metadata()); + return new InternalComposite( + name, + size, + sourceNames, + formats, + Collections.emptyList(), + null, + reverseMuls, + missingOrders, + false, + metadata() + ); } private void finishLeaf() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java index 64eb4371475..2cfd9d508c1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java @@ -34,6 +34,7 @@ package org.opensearch.search.aggregations.bucket.composite; import org.apache.lucene.util.BytesRef; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.XContentBuilder; @@ -43,6 +44,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.KeyComparable; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import java.io.IOException; import java.util.AbstractMap; @@ -64,6 +66,7 @@ public class InternalComposite extends InternalMultiBucketAggregation buckets; private final CompositeKey afterKey; private final int[] reverseMuls; + private final MissingOrder[] missingOrders; private final List sourceNames; private final List formats; @@ -77,6 +80,7 @@ public class InternalComposite extends InternalMultiBucketAggregation buckets, CompositeKey afterKey, int[] reverseMuls, + MissingOrder[] missingOrders, boolean earlyTerminated, Map metadata ) { @@ -87,6 +91,7 @@ public class InternalComposite extends InternalMultiBucketAggregation new InternalBucket(input, sourceNames, formats, reverseMuls)); + if (in.getVersion().onOrAfter(Version.V_2_0_0)) { + this.missingOrders = in.readArray(MissingOrder::readFromStream, MissingOrder[]::new); + } else { + this.missingOrders = new MissingOrder[reverseMuls.length]; + Arrays.fill(this.missingOrders, MissingOrder.DEFAULT); + } + this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls, missingOrders)); this.afterKey = in.readBoolean() ? new CompositeKey(in) : null; this.earlyTerminated = in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0) ? in.readBoolean() : false; } @@ -112,6 +123,9 @@ public class InternalComposite extends InternalMultiBucketAggregation order.writeTo(output), missingOrders); + } out.writeList(buckets); out.writeBoolean(afterKey != null); if (afterKey != null) { @@ -140,7 +154,18 @@ public class InternalComposite extends InternalMultiBucketAggregationafterKey of the original aggregation in order * to be able to retrieve the next page even if all buckets have been filtered. */ - return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, reverseMuls, earlyTerminated, getMetadata()); + return new InternalComposite( + name, + size, + sourceNames, + formats, + newBuckets, + afterKey, + reverseMuls, + missingOrders, + earlyTerminated, + getMetadata() + ); } @Override @@ -150,6 +175,7 @@ public class InternalComposite extends InternalMultiBucketAggregation reducedFormats = buckets.get(0).formats; - return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, docCount, aggs); + return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs); } @Override @@ -266,12 +303,13 @@ public class InternalComposite extends InternalMultiBucketAggregation { @@ -301,6 +339,7 @@ public class InternalComposite extends InternalMultiBucketAggregation sourceNames; private final transient List formats; @@ -309,6 +348,7 @@ public class InternalComposite extends InternalMultiBucketAggregation formats, CompositeKey key, int[] reverseMuls, + MissingOrder[] missingOrders, long docCount, InternalAggregations aggregations ) { @@ -316,15 +356,23 @@ public class InternalComposite extends InternalMultiBucketAggregation sourceNames, List formats, int[] reverseMuls) throws IOException { + InternalBucket( + StreamInput in, + List sourceNames, + List formats, + int[] reverseMuls, + MissingOrder[] missingOrders + ) throws IOException { this.key = new CompositeKey(in); this.docCount = in.readVLong(); this.aggregations = InternalAggregations.readFrom(in); this.reverseMuls = reverseMuls; + this.missingOrders = missingOrders; this.sourceNames = sourceNames; this.formats = formats; } @@ -400,13 +448,15 @@ public class InternalComposite extends InternalMultiBucketAggregation key.get(index) == null, () -> other.key.get(index) == null, reverseMuls[i]); + if (MissingOrder.unknownOrder(result) == false) { + if (result == 0) { continue; + } else { + return result; } - return -1 * reverseMuls[i]; - } else if (other.key.get(i) == null) { - return reverseMuls[i]; } assert key.get(i).getClass() == other.key.get(i).getClass(); @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java index 311c688f23f..4121954c1ed 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/InternalCompositeTests.java @@ -39,6 +39,7 @@ import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.ParsedAggregation; +import org.opensearch.search.aggregations.bucket.missing.MissingOrder; import org.opensearch.test.InternalMultiBucketAggregationTestCase; import org.junit.After; @@ -71,6 +72,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa private List sourceNames; private List formats; private int[] reverseMuls; + private MissingOrder[] missingOrders; private int[] types; private int size; @@ -100,10 +102,12 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa sourceNames = new ArrayList<>(); formats = new ArrayList<>(); reverseMuls = new int[numFields]; + missingOrders = new MissingOrder[numFields]; types = new int[numFields]; for (int i = 0; i < numFields; i++) { sourceNames.add("field_" + i); reverseMuls[i] = randomBoolean() ? 1 : -1; + missingOrders[i] = randomFrom(MissingOrder.values()); int type = randomIntBetween(0, 2); types[i] = type; formats.add(randomDocValueFormat(type == 0)); @@ -182,6 +186,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa formats, key, reverseMuls, + missingOrders, 1L, aggregations ); @@ -189,7 +194,18 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa } Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2)); CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size() - 1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), metadata); + return new InternalComposite( + name, + size, + sourceNames, + formats, + buckets, + lastBucket, + reverseMuls, + missingOrders, + randomBoolean(), + metadata + ); } @Override @@ -214,6 +230,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa formats, createCompositeKey(), reverseMuls, + missingOrders, randomLongBetween(1, 100), InternalAggregations.EMPTY ) @@ -239,6 +256,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa buckets, lastBucket, reverseMuls, + missingOrders, randomBoolean(), metadata ); @@ -295,6 +313,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa emptyList(), null, reverseMuls, + missingOrders, true, emptyMap() );