aggs: Add a method that is invoked before the `getLeafCollector(...)` of children aggregators is invoked.
In the case of nested aggregator this allows it to push down buffered child docs down to children aggregator. Before this was done as part of the `NestedAggregator#getLeafCollector(...)`, but by then the children aggregators have already moved on to the next segment and this causes incorrect results to be produced. Closes #27912
This commit is contained in:
parent
cbd271e497
commit
791c5ddd7e
|
@ -105,7 +105,7 @@ public abstract class AggregatorBase extends Aggregator {
|
|||
};
|
||||
addRequestCircuitBreakerBytes(DEFAULT_WEIGHT);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Increment or decrement the number of bytes that have been allocated to service
|
||||
* this request and potentially trigger a {@link CircuitBreakingException}. The
|
||||
|
@ -114,7 +114,7 @@ public abstract class AggregatorBase extends Aggregator {
|
|||
* If memory has been returned, decrement it without tripping the breaker.
|
||||
* For performance reasons subclasses should not call this millions of times
|
||||
* each with small increments and instead batch up into larger allocations.
|
||||
*
|
||||
*
|
||||
* @param bytes the number of bytes to register or negative to deregister the bytes
|
||||
* @return the cumulative size in bytes allocated by this aggregator to service this request
|
||||
*/
|
||||
|
@ -162,10 +162,18 @@ public abstract class AggregatorBase extends Aggregator {
|
|||
|
||||
@Override
|
||||
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
|
||||
preGetSubLeafCollectors();
|
||||
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
|
||||
return getLeafCollector(ctx, sub);
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be overridden by aggregator implementations that like the perform an operation before the leaf collectors
|
||||
* of children aggregators are instantiated for the next segment.
|
||||
*/
|
||||
protected void preGetSubLeafCollectors() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Can be overridden by aggregator implementation to be called back when the collection phase starts.
|
||||
*/
|
||||
|
|
|
@ -102,13 +102,21 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat
|
|||
}
|
||||
};
|
||||
} else {
|
||||
doPostCollection();
|
||||
return bufferingNestedLeafBucketCollector = new BufferingNestedLeafBucketCollector(sub, parentDocs, childDocs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void preGetSubLeafCollectors() throws IOException {
|
||||
processBufferedDocs();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPostCollection() throws IOException {
|
||||
processBufferedDocs();
|
||||
}
|
||||
|
||||
private void processBufferedDocs() throws IOException {
|
||||
if (bufferingNestedLeafBucketCollector != null) {
|
||||
bufferingNestedLeafBucketCollector.postCollect();
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.nested;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.SortedDocValuesField;
|
||||
import org.apache.lucene.document.SortedNumericDocValuesField;
|
||||
import org.apache.lucene.document.SortedSetDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
|
@ -45,9 +46,13 @@ import org.elasticsearch.index.mapper.NumberFieldMapper;
|
|||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.TypeFieldMapper;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.BucketOrder;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
|
||||
|
@ -523,6 +528,118 @@ public class NestedAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testPreGetChildLeafCollectors() throws IOException {
|
||||
try (Directory directory = newDirectory()) {
|
||||
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
|
||||
List<Document> documents = new ArrayList<>();
|
||||
Document document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#1", UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key1")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("a1")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#1", UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key2")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("b1")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "_doc", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(sequenceIDFields.primaryTerm);
|
||||
documents.add(document);
|
||||
iw.addDocuments(documents);
|
||||
iw.commit();
|
||||
documents.clear();
|
||||
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#2", UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key1")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("a2")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#2", UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key2")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("b2")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#2", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "_doc", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(sequenceIDFields.primaryTerm);
|
||||
documents.add(document);
|
||||
iw.addDocuments(documents);
|
||||
iw.commit();
|
||||
documents.clear();
|
||||
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#3", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key1")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("a3")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#3", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "__nested_field", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new SortedDocValuesField("key", new BytesRef("key2")));
|
||||
document.add(new SortedDocValuesField("value", new BytesRef("b3")));
|
||||
documents.add(document);
|
||||
document = new Document();
|
||||
document.add(new Field(UidFieldMapper.NAME, "_doc#1", UidFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(new Field(TypeFieldMapper.NAME, "_doc", TypeFieldMapper.Defaults.FIELD_TYPE));
|
||||
document.add(sequenceIDFields.primaryTerm);
|
||||
documents.add(document);
|
||||
iw.addDocuments(documents);
|
||||
iw.commit();
|
||||
}
|
||||
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
|
||||
TermsAggregationBuilder valueBuilder = new TermsAggregationBuilder("value", ValueType.STRING).field("value");
|
||||
TermsAggregationBuilder keyBuilder = new TermsAggregationBuilder("key", ValueType.STRING).field("key");
|
||||
keyBuilder.subAggregation(valueBuilder);
|
||||
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, "nested_field");
|
||||
nestedBuilder.subAggregation(keyBuilder);
|
||||
FilterAggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filterAgg", new MatchAllQueryBuilder());
|
||||
filterAggregationBuilder.subAggregation(nestedBuilder);
|
||||
|
||||
MappedFieldType fieldType1 = new KeywordFieldMapper.KeywordFieldType();
|
||||
fieldType1.setName("key");
|
||||
fieldType1.setHasDocValues(true);
|
||||
MappedFieldType fieldType2 = new KeywordFieldMapper.KeywordFieldType();
|
||||
fieldType2.setName("value");
|
||||
fieldType2.setHasDocValues(true);
|
||||
|
||||
Filter filter = search(newSearcher(indexReader, false, true),
|
||||
Queries.newNonNestedFilter(Version.CURRENT), filterAggregationBuilder, fieldType1, fieldType2);
|
||||
|
||||
assertEquals("filterAgg", filter.getName());
|
||||
assertEquals(3L, filter.getDocCount());
|
||||
|
||||
Nested nested = filter.getAggregations().get(NESTED_AGG);
|
||||
assertEquals(6L, nested.getDocCount());
|
||||
|
||||
StringTerms keyAgg = nested.getAggregations().get("key");
|
||||
assertEquals(2, keyAgg.getBuckets().size());
|
||||
Terms.Bucket key1 = keyAgg.getBuckets().get(0);
|
||||
assertEquals("key1", key1.getKey());
|
||||
StringTerms valueAgg = key1.getAggregations().get("value");
|
||||
assertEquals(3, valueAgg.getBuckets().size());
|
||||
assertEquals("a1", valueAgg.getBuckets().get(0).getKey());
|
||||
assertEquals("a2", valueAgg.getBuckets().get(1).getKey());
|
||||
assertEquals("a3", valueAgg.getBuckets().get(2).getKey());
|
||||
|
||||
Terms.Bucket key2 = keyAgg.getBuckets().get(1);
|
||||
assertEquals("key2", key2.getKey());
|
||||
valueAgg = key2.getAggregations().get("value");
|
||||
assertEquals(3, valueAgg.getBuckets().size());
|
||||
assertEquals("b1", valueAgg.getBuckets().get(0).getKey());
|
||||
assertEquals("b2", valueAgg.getBuckets().get(1).getKey());
|
||||
assertEquals("b3", valueAgg.getBuckets().get(2).getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private double generateMaxDocs(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
|
||||
return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName))
|
||||
.max().orElse(Double.NEGATIVE_INFINITY);
|
||||
|
|
Loading…
Reference in New Issue