Fix rounding composite aggs on sorted index (#57867)

This commit fixes a bug on the composite aggregation when the index
is sorted and the primary composite source needs to round values (date_histo).
In such case, we cannot take into account the subsequent sources even if they
match the index sort because the rounding of the primary sort value may break
the original index order.

Fixes #57849
This commit is contained in:
Jim Ferenczi 2020-06-09 20:29:46 +02:00 committed by jimczi
parent 44a79d1739
commit ea696198e9
3 changed files with 145 additions and 6 deletions

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.queries.SearchAfterSortedDocQuery;
@ -31,12 +32,15 @@ import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSelector;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.common.lease.Releasables;
@ -231,6 +235,11 @@ final class CompositeAggregator extends BucketsAggregator {
break;
}
sortFields.add(indexSortField);
if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
// the rounding "squashes" many values together, that breaks the ordering of sub-values
// so we ignore subsequent source even if they match the index sort.
break;
}
}
return sortFields.isEmpty() ? null : new Sort(sortFields.toArray(new SortField[0]));
}
@ -256,6 +265,76 @@ final class CompositeAggregator extends BucketsAggregator {
}
}
/**
* Rewrites the provided {@link Sort} to apply rounding on {@link SortField} that target
* {@link RoundingValuesSource}.
*/
private Sort applySortFieldRounding(Sort sort) {
SortField[] sortFields = new SortField[sort.getSort().length];
for (int i = 0; i < sort.getSort().length; i++) {
if (sourceConfigs[i].valuesSource() instanceof RoundingValuesSource) {
LongUnaryOperator round = ((RoundingValuesSource) sourceConfigs[i].valuesSource())::round;
final SortedNumericSortField delegate = (SortedNumericSortField) sort.getSort()[i];
sortFields[i] = new SortedNumericSortField(delegate.getField(), delegate.getNumericType(), delegate.getReverse()) {
@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
@Override
public FieldComparator<?> getComparator(int numHits, int sortPos) {
return new FieldComparator.LongComparator(1, delegate.getField(), (Long) missingValue) {
@Override
protected NumericDocValues getNumericDocValues(LeafReaderContext context, String field) throws IOException {
NumericDocValues dvs = SortedNumericSelector.wrap(DocValues.getSortedNumeric(context.reader(), field),
delegate.getSelector(), delegate.getNumericType());
return new NumericDocValues() {
@Override
public long longValue() throws IOException {
return round.applyAsLong(dvs.longValue());
}
@Override
public boolean advanceExact(int target) throws IOException {
return dvs.advanceExact(target);
}
@Override
public int docID() {
return dvs.docID();
}
@Override
public int nextDoc() throws IOException {
return dvs.nextDoc();
}
@Override
public int advance(int target) throws IOException {
return dvs.advance(target);
}
@Override
public long cost() {
return dvs.cost();
}
};
}
};
}
};
} else {
sortFields[i] = sort.getSort()[i];
}
}
return new Sort(sortFields);
}
private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) throws IOException {
DocValueFormat[] formats = new DocValueFormat[indexSortPrefix.getSort().length];
for (int i = 0; i < formats.length; i++) {
@ -265,11 +344,11 @@ final class CompositeAggregator extends BucketsAggregator {
Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length));
if (indexSortPrefix.getSort().length < sources.length) {
// include all docs that belong to the partial bucket
fieldDoc.doc = 0;
fieldDoc.doc = -1;
}
BooleanQuery newQuery = new BooleanQuery.Builder()
.add(context.query(), BooleanClause.Occur.MUST)
.add(new SearchAfterSortedDocQuery(indexSortPrefix, fieldDoc), BooleanClause.Occur.FILTER)
.add(new SearchAfterSortedDocQuery(applySortFieldRounding(indexSortPrefix), fieldDoc), BooleanClause.Occur.FILTER)
.build();
Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer scorer = weight.scorer(ctx);

View File

@ -198,7 +198,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue<Integer> impleme
for (int i = 0; i < arrays.length; i++) {
int cmp = arrays[i].compareCurrentWithAfter();
if (cmp != 0) {
return cmp;
return cmp > 0 ? i+1 : -(i+1);
}
}
return 0;

View File

@ -1963,6 +1963,69 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
public void testIndexSortWithDuplicate() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
createDocument("date", asLong("2020-06-03T00:53:10"), "keyword", "37640"),
createDocument("date", asLong("2020-06-03T00:55:10"), "keyword", "90640"),
createDocument("date", asLong("2020-06-03T01:10:10"), "keyword", "22640"),
createDocument("date", asLong("2020-06-03T01:15:10"), "keyword", "91640"),
createDocument("date", asLong("2020-06-03T01:21:10"), "keyword", "11640"),
createDocument("date", asLong("2020-06-03T01:22:10"), "keyword", "90640"),
createDocument("date", asLong("2020-06-03T01:54:10"), "keyword", "31640")
)
);
for (SortOrder order : SortOrder.values()) {
executeTestCase(true, false, new MatchAllDocsQuery(),
dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
new DateHistogramValuesSourceBuilder("date")
.field("date")
.order(order)
.calendarInterval(DateHistogramInterval.days(1)),
new TermsValuesSourceBuilder("keyword").field("keyword")
)).size(3),
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1591142400000, keyword=31640}", result.afterKey().toString());
assertEquals("{date=1591142400000, keyword=11640}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1591142400000, keyword=22640}", result.getBuckets().get(1).getKeyAsString());
assertEquals(1L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1591142400000, keyword=31640}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
}
);
executeTestCase(true, false, new MatchAllDocsQuery(),
dataset,
() ->
new CompositeAggregationBuilder("name",
Arrays.asList(
new DateHistogramValuesSourceBuilder("date")
.field("date")
.order(order)
.calendarInterval(DateHistogramInterval.days(1)),
new TermsValuesSourceBuilder("keyword").field("keyword")
)).aggregateAfter(createAfterKey("date", 1591142400000L, "keyword", "31640")).size(3),
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1591142400000, keyword=91640}", result.afterKey().toString());
assertEquals("{date=1591142400000, keyword=37640}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1591142400000, keyword=90640}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1591142400000, keyword=91640}", result.getBuckets().get(2).getKeyAsString());
assertEquals(1L, result.getBuckets().get(2).getDocCount());
}
);
}
}
private void testSearchCase(List<Query> queries,
List<Map<String, List<Object>>> dataset,
Supplier<CompositeAggregationBuilder> create,
@ -2101,9 +2164,6 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
break;
}
sortFields.add(sortField);
if (sortField instanceof SortedNumericSortField && ((SortedNumericSortField) sortField).getType() == SortField.Type.LONG) {
break;
}
}
while (remainingFieldTypes.size() > 0 && randomBoolean()) {
// Add extra unused sorts