Fix composite aggregation failed test cases introduce by missing_order parameter (#1942) (#2005)

Fix composite aggregation failed test cases introduce by missing_order parameter by using MissingOrder to decide null values's order in InternalBucket.

Signed-off-by: Peng Huo <penghuo@gmail.com>
This commit is contained in:
Peng Huo 2022-02-01 12:19:10 -08:00 committed by GitHub
parent 6c2f01a045
commit 2fc8acd409
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 15 deletions

View File

@ -70,6 +70,7 @@ import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.MultiBucketCollector; import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator; import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder; import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.sort.SortAndFormats;
@ -89,6 +90,7 @@ final class CompositeAggregator extends BucketsAggregator {
private final int size; private final int size;
private final List<String> sourceNames; private final List<String> sourceNames;
private final int[] reverseMuls; private final int[] reverseMuls;
private final MissingOrder[] missingOrders;
private final List<DocValueFormat> formats; private final List<DocValueFormat> formats;
private final CompositeKey rawAfterKey; private final CompositeKey rawAfterKey;
@ -117,6 +119,7 @@ final class CompositeAggregator extends BucketsAggregator {
this.size = size; this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
this.sources = new SingleDimensionValuesSource[sourceConfigs.length]; this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
// check that the provided size is not greater than the search.max_buckets setting // check that the provided size is not greater than the search.max_buckets setting
@ -189,7 +192,15 @@ final class CompositeAggregator extends BucketsAggregator {
CompositeKey key = queue.toCompositeKey(slot); CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = subAggsForBuckets[slot]; InternalAggregations aggs = subAggsForBuckets[slot];
int docCount = queue.getDocCount(slot); int docCount = queue.getDocCount(slot);
buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); buckets[queue.size()] = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
docCount,
aggs
);
} }
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] { return new InternalAggregation[] {
@ -201,6 +212,7 @@ final class CompositeAggregator extends BucketsAggregator {
Arrays.asList(buckets), Arrays.asList(buckets),
lastBucket, lastBucket,
reverseMuls, reverseMuls,
missingOrders,
earlyTerminated, earlyTerminated,
metadata() metadata()
) }; ) };
@ -208,7 +220,18 @@ final class CompositeAggregator extends BucketsAggregator {
@Override @Override
public InternalAggregation buildEmptyAggregation() { public InternalAggregation buildEmptyAggregation() {
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, false, metadata()); return new InternalComposite(
name,
size,
sourceNames,
formats,
Collections.emptyList(),
null,
reverseMuls,
missingOrders,
false,
metadata()
);
} }
private void finishLeaf() { private void finishLeaf() {

View File

@ -34,6 +34,7 @@ package org.opensearch.search.aggregations.bucket.composite;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.opensearch.LegacyESVersion; import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentBuilder;
@ -43,6 +44,7 @@ import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.InternalMultiBucketAggregation; import org.opensearch.search.aggregations.InternalMultiBucketAggregation;
import org.opensearch.search.aggregations.KeyComparable; import org.opensearch.search.aggregations.KeyComparable;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import java.io.IOException; import java.io.IOException;
import java.util.AbstractMap; import java.util.AbstractMap;
@ -64,6 +66,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
private final List<InternalBucket> buckets; private final List<InternalBucket> buckets;
private final CompositeKey afterKey; private final CompositeKey afterKey;
private final int[] reverseMuls; private final int[] reverseMuls;
private final MissingOrder[] missingOrders;
private final List<String> sourceNames; private final List<String> sourceNames;
private final List<DocValueFormat> formats; private final List<DocValueFormat> formats;
@ -77,6 +80,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
List<InternalBucket> buckets, List<InternalBucket> buckets,
CompositeKey afterKey, CompositeKey afterKey,
int[] reverseMuls, int[] reverseMuls,
MissingOrder[] missingOrders,
boolean earlyTerminated, boolean earlyTerminated,
Map<String, Object> metadata Map<String, Object> metadata
) { ) {
@ -87,6 +91,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
this.afterKey = afterKey; this.afterKey = afterKey;
this.size = size; this.size = size;
this.reverseMuls = reverseMuls; this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.earlyTerminated = earlyTerminated; this.earlyTerminated = earlyTerminated;
} }
@ -99,7 +104,13 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
formats.add(in.readNamedWriteable(DocValueFormat.class)); formats.add(in.readNamedWriteable(DocValueFormat.class));
} }
this.reverseMuls = in.readIntArray(); this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls)); if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
this.missingOrders = in.readArray(MissingOrder::readFromStream, MissingOrder[]::new);
} else {
this.missingOrders = new MissingOrder[reverseMuls.length];
Arrays.fill(this.missingOrders, MissingOrder.DEFAULT);
}
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls, missingOrders));
this.afterKey = in.readBoolean() ? new CompositeKey(in) : null; this.afterKey = in.readBoolean() ? new CompositeKey(in) : null;
this.earlyTerminated = in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0) ? in.readBoolean() : false; this.earlyTerminated = in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0) ? in.readBoolean() : false;
} }
@ -112,6 +123,9 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
out.writeNamedWriteable(format); out.writeNamedWriteable(format);
} }
out.writeIntArray(reverseMuls); out.writeIntArray(reverseMuls);
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeArray((output, order) -> order.writeTo(output), missingOrders);
}
out.writeList(buckets); out.writeList(buckets);
out.writeBoolean(afterKey != null); out.writeBoolean(afterKey != null);
if (afterKey != null) { if (afterKey != null) {
@ -140,7 +154,18 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
* keep the <code>afterKey</code> of the original aggregation in order * keep the <code>afterKey</code> of the original aggregation in order
* to be able to retrieve the next page even if all buckets have been filtered. * to be able to retrieve the next page even if all buckets have been filtered.
*/ */
return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, reverseMuls, earlyTerminated, getMetadata()); return new InternalComposite(
name,
size,
sourceNames,
formats,
newBuckets,
afterKey,
reverseMuls,
missingOrders,
earlyTerminated,
getMetadata()
);
} }
@Override @Override
@ -150,6 +175,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
prototype.formats, prototype.formats,
prototype.key, prototype.key,
prototype.reverseMuls, prototype.reverseMuls,
prototype.missingOrders,
prototype.docCount, prototype.docCount,
aggregations aggregations
); );
@ -235,7 +261,18 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
lastKey = lastBucket.getRawKey(); lastKey = lastBucket.getRawKey();
} }
reduceContext.consumeBucketsAndMaybeBreak(result.size()); reduceContext.consumeBucketsAndMaybeBreak(result.size());
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata); return new InternalComposite(
name,
size,
sourceNames,
reducedFormats,
result,
lastKey,
reverseMuls,
missingOrders,
earlyTerminated,
metadata
);
} }
@Override @Override
@ -253,7 +290,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
* just whatever formats make sense for *its* index. This can be real * just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */ * trouble when the index doing the reducing is unmapped. */
List<DocValueFormat> reducedFormats = buckets.get(0).formats; List<DocValueFormat> reducedFormats = buckets.get(0).formats;
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, docCount, aggs); return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs);
} }
@Override @Override
@ -266,12 +303,13 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
return Objects.equals(size, that.size) return Objects.equals(size, that.size)
&& Objects.equals(buckets, that.buckets) && Objects.equals(buckets, that.buckets)
&& Objects.equals(afterKey, that.afterKey) && Objects.equals(afterKey, that.afterKey)
&& Arrays.equals(reverseMuls, that.reverseMuls); && Arrays.equals(reverseMuls, that.reverseMuls)
&& Arrays.equals(missingOrders, that.missingOrders);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls)); return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders));
} }
private static class BucketIterator implements Comparable<BucketIterator> { private static class BucketIterator implements Comparable<BucketIterator> {
@ -301,6 +339,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
private final long docCount; private final long docCount;
private final InternalAggregations aggregations; private final InternalAggregations aggregations;
private final transient int[] reverseMuls; private final transient int[] reverseMuls;
private final transient MissingOrder[] missingOrders;
private final transient List<String> sourceNames; private final transient List<String> sourceNames;
private final transient List<DocValueFormat> formats; private final transient List<DocValueFormat> formats;
@ -309,6 +348,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
List<DocValueFormat> formats, List<DocValueFormat> formats,
CompositeKey key, CompositeKey key,
int[] reverseMuls, int[] reverseMuls,
MissingOrder[] missingOrders,
long docCount, long docCount,
InternalAggregations aggregations InternalAggregations aggregations
) { ) {
@ -316,15 +356,23 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
this.docCount = docCount; this.docCount = docCount;
this.aggregations = aggregations; this.aggregations = aggregations;
this.reverseMuls = reverseMuls; this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.sourceNames = sourceNames; this.sourceNames = sourceNames;
this.formats = formats; this.formats = formats;
} }
InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException { InternalBucket(
StreamInput in,
List<String> sourceNames,
List<DocValueFormat> formats,
int[] reverseMuls,
MissingOrder[] missingOrders
) throws IOException {
this.key = new CompositeKey(in); this.key = new CompositeKey(in);
this.docCount = in.readVLong(); this.docCount = in.readVLong();
this.aggregations = InternalAggregations.readFrom(in); this.aggregations = InternalAggregations.readFrom(in);
this.reverseMuls = reverseMuls; this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.sourceNames = sourceNames; this.sourceNames = sourceNames;
this.formats = formats; this.formats = formats;
} }
@ -400,13 +448,15 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
@Override @Override
public int compareKey(InternalBucket other) { public int compareKey(InternalBucket other) {
for (int i = 0; i < key.size(); i++) { for (int i = 0; i < key.size(); i++) {
if (key.get(i) == null) { // lambda function require final variable.
if (other.key.get(i) == null) { final int index = i;
int result = missingOrders[i].compare(() -> key.get(index) == null, () -> other.key.get(index) == null, reverseMuls[i]);
if (MissingOrder.unknownOrder(result) == false) {
if (result == 0) {
continue; continue;
} else {
return result;
} }
return -1 * reverseMuls[i];
} else if (other.key.get(i) == null) {
return reverseMuls[i];
} }
assert key.get(i).getClass() == other.key.get(i).getClass(); assert key.get(i).getClass() == other.key.get(i).getClass();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -39,6 +39,7 @@ import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.ParsedAggregation; import org.opensearch.search.aggregations.ParsedAggregation;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.test.InternalMultiBucketAggregationTestCase; import org.opensearch.test.InternalMultiBucketAggregationTestCase;
import org.junit.After; import org.junit.After;
@ -71,6 +72,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
private List<String> sourceNames; private List<String> sourceNames;
private List<DocValueFormat> formats; private List<DocValueFormat> formats;
private int[] reverseMuls; private int[] reverseMuls;
private MissingOrder[] missingOrders;
private int[] types; private int[] types;
private int size; private int size;
@ -100,10 +102,12 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
sourceNames = new ArrayList<>(); sourceNames = new ArrayList<>();
formats = new ArrayList<>(); formats = new ArrayList<>();
reverseMuls = new int[numFields]; reverseMuls = new int[numFields];
missingOrders = new MissingOrder[numFields];
types = new int[numFields]; types = new int[numFields];
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
sourceNames.add("field_" + i); sourceNames.add("field_" + i);
reverseMuls[i] = randomBoolean() ? 1 : -1; reverseMuls[i] = randomBoolean() ? 1 : -1;
missingOrders[i] = randomFrom(MissingOrder.values());
int type = randomIntBetween(0, 2); int type = randomIntBetween(0, 2);
types[i] = type; types[i] = type;
formats.add(randomDocValueFormat(type == 0)); formats.add(randomDocValueFormat(type == 0));
@ -182,6 +186,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
formats, formats,
key, key,
reverseMuls, reverseMuls,
missingOrders,
1L, 1L,
aggregations aggregations
); );
@ -189,7 +194,18 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
} }
Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2)); Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size() - 1).getRawKey() : null; CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size() - 1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), metadata); return new InternalComposite(
name,
size,
sourceNames,
formats,
buckets,
lastBucket,
reverseMuls,
missingOrders,
randomBoolean(),
metadata
);
} }
@Override @Override
@ -214,6 +230,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
formats, formats,
createCompositeKey(), createCompositeKey(),
reverseMuls, reverseMuls,
missingOrders,
randomLongBetween(1, 100), randomLongBetween(1, 100),
InternalAggregations.EMPTY InternalAggregations.EMPTY
) )
@ -239,6 +256,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
buckets, buckets,
lastBucket, lastBucket,
reverseMuls, reverseMuls,
missingOrders,
randomBoolean(), randomBoolean(),
metadata metadata
); );
@ -295,6 +313,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
emptyList(), emptyList(),
null, null,
reverseMuls, reverseMuls,
missingOrders,
true, true,
emptyMap() emptyMap()
); );