Fix merging of terms aggregation with compound order (#64469)

This change fixes a bug introduced in #61779 that uses a compound order to
compare buckets when merging. The bug is triggered when the compound order
uses a primary sort ordered by key (asc or desc).
This commit ensures that we always extract the primary sort when comparing keys
during merging.
The PR is marked as no-issue since the bug has not been released in any official version.
This commit is contained in:
Jim Ferenczi 2020-11-05 11:29:47 +01:00 committed by jimczi
parent a3d9408fda
commit 3e2fa09666
5 changed files with 133 additions and 82 deletions

View File

@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>>
@ -257,9 +258,9 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
}
private List<B> reduceMergeSort(List<InternalAggregation> aggregations,
BucketOrder reduceOrder, ReduceContext reduceContext) {
assert isKeyOrder(reduceOrder);
final Comparator<MultiBucketsAggregation.Bucket> cmp = reduceOrder.comparator();
BucketOrder thisReduceOrder, ReduceContext reduceContext) {
assert isKeyOrder(thisReduceOrder);
final Comparator<MultiBucketsAggregation.Bucket> cmp = thisReduceOrder.comparator();
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
@ -369,6 +370,8 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
bucket.docCountError -= thisAggDocCountError;
}
}
final List<B> reducedBuckets;
/**
* Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
* That allows to perform a merge sort when reducing multiple aggregations together.
@ -376,8 +379,13 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
* the provided aggregations use a different {@link InternalTerms#reduceOrder}.
*/
BucketOrder thisReduceOrder = getReduceOrder(aggregations);
List<B> reducedBuckets = isKeyOrder(thisReduceOrder) ?
reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext);
if (isKeyOrder(thisReduceOrder)) {
// extract the primary sort in case this is a compound order.
thisReduceOrder = InternalOrder.key(isKeyAsc(thisReduceOrder) ? true : false);
reducedBuckets = reduceMergeSort(aggregations, thisReduceOrder, reduceContext);
} else {
reducedBuckets = reduceLegacy(aggregations, reduceContext);
}
final B[] list;
if (reduceContext.isFinalReduce()) {
final int size = Math.min(requiredSize, reducedBuckets.size());

View File

@ -35,8 +35,8 @@ import static org.hamcrest.Matchers.equalTo;
public abstract class InternalTermsTestCase extends InternalMultiBucketAggregationTestCase<InternalTerms<?, ?>> {
private boolean showDocCount;
private long docCountError;
protected boolean showDocCount;
protected long docCountError;
@Before
public void init() {

View File

@ -34,31 +34,24 @@ import java.util.Map;
import java.util.Set;
public class StringTermsTests extends InternalTermsTestCase {
@Override
protected InternalTerms<?, ?> createTestInstance(String name,
Map<String, Object> metadata,
InternalAggregations aggregations,
boolean showTermDocCountError,
long docCountError) {
BucketOrder order = BucketOrder.count(false);
long minDocCount = 1;
int requiredSize = 3;
int shardSize = requiredSize + 2;
DocValueFormat format = DocValueFormat.RAW;
long otherDocCount = 0;
List<StringTerms.Bucket> buckets = new ArrayList<>();
final int numBuckets = randomNumberOfBuckets();
Set<BytesRef> terms = new HashSet<>();
for (int i = 0; i < numBuckets; ++i) {
BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10)));
int docCount = randomIntBetween(1, 100);
buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
return createTestInstance(generateRandomDict(), name, metadata, aggregations, showTermDocCountError, docCountError);
}
BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
Collections.sort(buckets, reduceOrder.comparator());
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
@Override
protected List<InternalTerms<?, ?>> randomResultsToReduce(String name, int size) {
List<InternalTerms<?, ?>> inputs = new ArrayList<>();
BytesRef[] dict = generateRandomDict();
for (int i = 0; i < size; i++) {
InternalTerms<?, ?> t = randomBoolean() ? createUnmappedInstance(name) : createTestInstance(dict, name);
inputs.add(t);
}
return inputs;
}
@Override
@ -152,4 +145,46 @@ public class StringTermsTests extends InternalTermsTestCase {
return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata);
}
}
private BytesRef[] generateRandomDict() {
Set<BytesRef> terms = new HashSet<>();
int numTerms = randomIntBetween(2, 100);
for (int i = 0; i < numTerms; i++) {
terms.add(new BytesRef(randomAlphaOfLength(10)));
}
return terms.stream().toArray(BytesRef[]::new);
}
private InternalTerms<?, ?> createTestInstance(BytesRef[] dict, String name) {
return createTestInstance(dict, name, createTestMetadata(), createSubAggregations(), showDocCount, docCountError);
}
private InternalTerms<?, ?> createTestInstance(BytesRef[] dict,
String name,
Map<String, Object> metadata,
InternalAggregations aggregations,
boolean showTermDocCountError,
long docCountError) {
BucketOrder order = BucketOrder.count(false);
long minDocCount = 1;
int requiredSize = 3;
int shardSize = requiredSize + 2;
DocValueFormat format = DocValueFormat.RAW;
long otherDocCount = 0;
List<StringTerms.Bucket> buckets = new ArrayList<>();
final int numBuckets = randomNumberOfBuckets();
Set<BytesRef> terms = new HashSet<>();
for (int i = 0; i < numBuckets; ++i) {
BytesRef term = dict[randomIntBetween(0, dict.length-1)];
if (terms.add(term)) {
int docCount = randomIntBetween(1, 100);
buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
}
}
BucketOrder reduceOrder = randomBoolean() ?
BucketOrder.compound(BucketOrder.key(true), BucketOrder.count(false)) : BucketOrder.key(true);
Collections.sort(buckets, reduceOrder.comparator());
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
}

View File

@ -419,7 +419,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
return createTestInstance(randomAlphaOfLength(5));
}
private T createTestInstance(String name) {
public final Map<String, Object> createTestMetadata() {
Map<String, Object> metadata = null;
if (randomBoolean()) {
metadata = new HashMap<>();
@ -428,7 +428,11 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
metadata.put(randomAlphaOfLength(5), randomAlphaOfLength(5));
}
}
return createTestInstance(name, metadata);
return metadata;
}
private T createTestInstance(String name) {
return createTestInstance(name, createTestMetadata());
}
/** Return an instance on an unmapped field. */

View File

@ -66,6 +66,10 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
this.subAggregationsSupplier = subAggregationsSupplier;
}
public final InternalAggregations createSubAggregations() {
return subAggregationsSupplier.get();
}
@Override
public void setUp() throws Exception {
super.setUp();