Force depth_first mode execution for terms aggregation under a nested context (#28421)
This commit forces the depth_first mode for `terms` aggregation that contain a sub-aggregation that need to access the score of the document in a nested context (the `terms` aggregation is a child of a `nested` aggregation). The score of children documents is not accessible in breadth_first mode because the `terms` aggregation cannot access the nested context. Close #28394
This commit is contained in:
parent
7dc00ef1f5
commit
e6a8528554
|
@ -47,7 +47,7 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class NestedAggregator extends BucketsAggregator implements SingleBucketAggregator {
|
||||
public class NestedAggregator extends BucketsAggregator implements SingleBucketAggregator {
|
||||
|
||||
static final ParseField PATH_FIELD = new ParseField("path");
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
|||
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
|
||||
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
|
@ -187,7 +188,16 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
|
|||
this.bucketCountThresholds = bucketCountThresholds;
|
||||
this.order = InternalOrder.validate(order, this);
|
||||
this.format = format;
|
||||
this.collectMode = collectMode;
|
||||
if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) {
|
||||
/**
|
||||
* Force the execution to depth_first because we need to access the score of
|
||||
* nested documents in a sub-aggregation and we are not able to generate this score
|
||||
* while replaying deferred documents.
|
||||
*/
|
||||
this.collectMode = SubAggCollectionMode.DEPTH_FIRST;
|
||||
} else {
|
||||
this.collectMode = collectMode;
|
||||
}
|
||||
// Don't defer any child agg if we are dependent on it for pruning results
|
||||
if (order instanceof Aggregation){
|
||||
AggregationPath path = ((Aggregation) order).path();
|
||||
|
@ -203,6 +213,25 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
|
|||
}
|
||||
}
|
||||
|
||||
static boolean descendsFromNestedAggregator(Aggregator parent) {
|
||||
while (parent != null) {
|
||||
if (parent.getClass() == NestedAggregator.class) {
|
||||
return true;
|
||||
}
|
||||
parent = parent.parent();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean subAggsNeedScore() {
|
||||
for (Aggregator subAgg : subAggregators) {
|
||||
if (subAgg.needsScores()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal Optimization for ordering {@link InternalTerms.Bucket}s by a sub aggregation.
|
||||
* <p>
|
||||
|
|
|
@ -30,8 +30,11 @@ import org.apache.lucene.index.DirectoryReader;
|
|||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.DocValuesFieldExistsQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.NumericUtils;
|
||||
|
@ -44,6 +47,9 @@ import org.elasticsearch.index.mapper.IpFieldMapper;
|
|||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.TypeFieldMapper;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
@ -59,9 +65,14 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
|||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
|
||||
import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
|
||||
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.ScoreSortBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
|
@ -74,6 +85,7 @@ import java.util.Map;
|
|||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.index.mapper.SeqNoFieldMapper.PRIMARY_TERM_NAME;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -999,6 +1011,81 @@ public class TermsAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testWithNestedAggregations() throws IOException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
int[] nestedValues = new int[i];
|
||||
for (int j = 0; j < i; j++) {
|
||||
nestedValues[j] = j;
|
||||
}
|
||||
indexWriter.addDocuments(generateDocsWithNested(Integer.toString(i), i, nestedValues));
|
||||
}
|
||||
indexWriter.commit();
|
||||
for (Aggregator.SubAggCollectionMode mode : Aggregator.SubAggCollectionMode.values()) {
|
||||
for (boolean withScore : new boolean[]{true, false}) {
|
||||
NestedAggregationBuilder nested = new NestedAggregationBuilder("nested", "nested_object")
|
||||
.subAggregation(new TermsAggregationBuilder("terms", ValueType.LONG)
|
||||
.field("nested_value")
|
||||
// force the breadth_first mode
|
||||
.collectMode(mode)
|
||||
.order(BucketOrder.key(true))
|
||||
.subAggregation(
|
||||
new TopHitsAggregationBuilder("top_hits")
|
||||
.sort(withScore ? new ScoreSortBuilder() : new FieldSortBuilder("_doc"))
|
||||
.storedField("_none_")
|
||||
)
|
||||
);
|
||||
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||
fieldType.setHasDocValues(true);
|
||||
fieldType.setName("nested_value");
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
InternalNested result = search(newSearcher(indexReader, false, true),
|
||||
// match root document only
|
||||
new DocValuesFieldExistsQuery(PRIMARY_TERM_NAME), nested, fieldType);
|
||||
InternalMultiBucketAggregation<?, ?> terms = result.getAggregations().get("terms");
|
||||
assertThat(terms.getBuckets().size(), equalTo(9));
|
||||
int ptr = 9;
|
||||
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
|
||||
InternalTopHits topHits = bucket.getAggregations().get("top_hits");
|
||||
assertThat(topHits.getHits().totalHits, equalTo((long) ptr));
|
||||
if (withScore) {
|
||||
assertThat(topHits.getHits().getMaxScore(), equalTo(1f));
|
||||
} else {
|
||||
assertThat(topHits.getHits().getMaxScore(), equalTo(Float.NaN));
|
||||
}
|
||||
--ptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
|
||||
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
|
||||
List<Document> documents = new ArrayList<>();
|
||||
|
||||
for (int nestedValue : nestedValues) {
|
||||
Document document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "docs#" + id, UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_object", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedNumericDocValuesField("nested_value", nestedValue));
|
||||
documents.add(document);
|
||||
}
|
||||
|
||||
Document document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "docs#" + id, UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "docs", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedNumericDocValuesField("value", value));
|
||||
document.add(sequenceIDFields.primaryTerm);
|
||||
documents.add(document);
|
||||
|
||||
return documents;
|
||||
}
|
||||
|
||||
|
||||
private IndexReader createIndexWithLongs() throws IOException {
|
||||
Directory directory = newDirectory();
|
||||
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
|
||||
|
|
Loading…
Reference in New Issue