Nested aggregator: Fix handling of multiple buckets being emitted for the same parent doc id.
This bug was introduced by #8454 which allowed the childFilter to only be consumed once. By adding the child docid buffering multiple buckets can now be emitted by the same doc id. This child docid buffering only happens in the scope of the current root document, so the amount of child doc ids buffered is small. Closes #9317 Closes #9346
This commit is contained in:
parent
f8294352f7
commit
7ca2ef9b93
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.nested;
|
||||
|
||||
import com.carrotsearch.hppc.IntArrayList;
|
||||
import com.carrotsearch.hppc.IntObjectOpenHashMap;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
|
@ -49,9 +51,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
|
|||
|
||||
private DocIdSetIterator childDocs;
|
||||
private BitSet parentDocs;
|
||||
|
||||
private LeafReaderContext reader;
|
||||
|
||||
|
||||
private BitSet rootDocs;
|
||||
private int currentRootDoc = -1;
|
||||
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();
|
||||
|
||||
public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator, Map<String, Object> metaData, FilterCachingPolicy filterCachingPolicy) throws IOException {
|
||||
super(name, factories, aggregationContext, parentAggregator, metaData);
|
||||
this.parentAggregator = parentAggregator;
|
||||
|
@ -71,6 +76,9 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
|
|||
} else {
|
||||
childDocs = childDocIdSet.iterator();
|
||||
}
|
||||
BitDocIdSetFilter rootDocsFilter = context.searchContext().bitsetFilterCache().getBitDocIdSetFilter(NonNestedDocsFilter.INSTANCE);
|
||||
BitDocIdSet rootDocIdSet = rootDocsFilter.getDocIdSet(reader);
|
||||
rootDocs = rootDocIdSet.bits();
|
||||
} catch (IOException ioe) {
|
||||
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
|
||||
}
|
||||
|
@ -108,22 +116,22 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
|
|||
}
|
||||
}
|
||||
|
||||
int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
|
||||
int childDocId;
|
||||
if (childDocs.docID() > prevParentDoc) {
|
||||
childDocId = childDocs.docID();
|
||||
} else {
|
||||
childDocId = childDocs.advance(prevParentDoc + 1);
|
||||
}
|
||||
|
||||
int numChildren = 0;
|
||||
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
|
||||
IntArrayList iterator = getChildren(parentDoc);
|
||||
final int[] buffer = iterator.buffer;
|
||||
final int size = iterator.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
numChildren++;
|
||||
collectBucketNoCounts(childDocId, bucketOrd);
|
||||
collectBucketNoCounts(buffer[i], bucketOrd);
|
||||
}
|
||||
incrementBucketDocCount(bucketOrd, numChildren);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
childDocIdBuffers.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
|
||||
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metaData());
|
||||
|
@ -185,5 +193,43 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
|
|||
}
|
||||
}
|
||||
|
||||
// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
|
||||
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
|
||||
// of the current root doc.
|
||||
|
||||
// Examples:
|
||||
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
|
||||
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
|
||||
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
|
||||
private IntArrayList getChildren(final int parentDocId) throws IOException {
|
||||
int rootDocId = rootDocs.nextSetBit(parentDocId);
|
||||
if (currentRootDoc == rootDocId) {
|
||||
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
|
||||
if (childDocIdBuffer != null) {
|
||||
return childDocIdBuffer;
|
||||
} else {
|
||||
// here we translate the parent doc to a list of its nested docs,
|
||||
// and then collect buckets for every one of them so they'll be collected
|
||||
final IntArrayList newChildDocIdBuffer = new IntArrayList();
|
||||
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
|
||||
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
|
||||
int childDocId;
|
||||
if (childDocs.docID() > prevParentDoc) {
|
||||
childDocId = childDocs.docID();
|
||||
} else {
|
||||
childDocId = childDocs.advance(prevParentDoc + 1);
|
||||
}
|
||||
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
|
||||
newChildDocIdBuffer.add(childDocId);
|
||||
}
|
||||
return newChildDocIdBuffer;
|
||||
}
|
||||
} else {
|
||||
this.currentRootDoc = rootDocId;
|
||||
childDocIdBuffers.clear();
|
||||
return getChildren(parentDocId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -451,4 +451,90 @@ public class NestedTests extends ElasticsearchIntegrationTest {
|
|||
tags = nestedTags.getAggregations().get("tag");
|
||||
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nestedSameDocIdProcessedMultipleTime() throws Exception {
|
||||
assertAcked(
|
||||
prepareCreate("idx4")
|
||||
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
|
||||
.addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested")
|
||||
);
|
||||
|
||||
client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject()
|
||||
.field("name", "product1")
|
||||
.field("categories", "1", "2", "3", "4")
|
||||
.startArray("property")
|
||||
.startObject().field("id", 1).endObject()
|
||||
.startObject().field("id", 2).endObject()
|
||||
.startObject().field("id", 3).endObject()
|
||||
.endArray()
|
||||
.endObject()).get();
|
||||
client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject()
|
||||
.field("name", "product2")
|
||||
.field("categories", "1", "2")
|
||||
.startArray("property")
|
||||
.startObject().field("id", 1).endObject()
|
||||
.startObject().field("id", 5).endObject()
|
||||
.startObject().field("id", 4).endObject()
|
||||
.endArray()
|
||||
.endObject()).get();
|
||||
refresh();
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx4").setTypes("product")
|
||||
.addAggregation(terms("category").field("categories").subAggregation(
|
||||
nested("property").path("property").subAggregation(
|
||||
terms("property_id").field("property.id")
|
||||
)
|
||||
))
|
||||
.get();
|
||||
assertNoFailures(response);
|
||||
assertHitCount(response, 2);
|
||||
|
||||
Terms category = response.getAggregations().get("category");
|
||||
assertThat(category.getBuckets().size(), equalTo(4));
|
||||
|
||||
Terms.Bucket bucket = category.getBucketByKey("1");
|
||||
assertThat(bucket.getDocCount(), equalTo(2l));
|
||||
Nested property = bucket.getAggregations().get("property");
|
||||
assertThat(property.getDocCount(), equalTo(6l));
|
||||
Terms propertyId = property.getAggregations().get("property_id");
|
||||
assertThat(propertyId.getBuckets().size(), equalTo(5));
|
||||
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
|
||||
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));
|
||||
|
||||
bucket = category.getBucketByKey("2");
|
||||
assertThat(bucket.getDocCount(), equalTo(2l));
|
||||
property = bucket.getAggregations().get("property");
|
||||
assertThat(property.getDocCount(), equalTo(6l));
|
||||
propertyId = property.getAggregations().get("property_id");
|
||||
assertThat(propertyId.getBuckets().size(), equalTo(5));
|
||||
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
|
||||
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));
|
||||
|
||||
bucket = category.getBucketByKey("3");
|
||||
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||
property = bucket.getAggregations().get("property");
|
||||
assertThat(property.getDocCount(), equalTo(3l));
|
||||
propertyId = property.getAggregations().get("property_id");
|
||||
assertThat(propertyId.getBuckets().size(), equalTo(3));
|
||||
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
|
||||
|
||||
bucket = category.getBucketByKey("4");
|
||||
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||
property = bucket.getAggregations().get("property");
|
||||
assertThat(property.getDocCount(), equalTo(3l));
|
||||
propertyId = property.getAggregations().get("property_id");
|
||||
assertThat(propertyId.getBuckets().size(), equalTo(3));
|
||||
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
|
||||
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue