Aggs: Make the nested aggregation call sub aggregators with doc IDs in order.

Close #9547
This commit is contained in:
Adrien Grand 2015-02-03 16:40:49 +01:00
parent ebb7ecb00e
commit 13b64cc362
2 changed files with 13 additions and 63 deletions

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;
import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
@ -53,10 +51,6 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
private BitSet parentDocs;
private LeafReaderContext reader;
private BitSet rootDocs;
private int currentRootDoc = -1;
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();
public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator, Map<String, Object> metaData, FilterCachingPolicy filterCachingPolicy) throws IOException {
super(name, factories, aggregationContext, parentAggregator, metaData);
this.parentAggregator = parentAggregator;
@ -76,12 +70,6 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
} else {
childDocs = childDocIdSet.iterator();
}
BitDocIdSetFilter rootDocsFilter = context.searchContext().bitsetFilterCache().getBitDocIdSetFilter(NonNestedDocsFilter.INSTANCE);
BitDocIdSet rootDocIdSet = rootDocsFilter.getDocIdSet(reader);
rootDocs = rootDocIdSet.bits();
// We need to reset the current root doc, otherwise we may emit incorrect child docs if the next segment happen to start with the same root doc id value
currentRootDoc = -1;
childDocIdBuffers.clear();
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
@ -89,6 +77,7 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
@Override
public void collect(int parentDoc, long bucketOrd) throws IOException {
assert bucketOrd == 0;
// here we translate the parent doc to a list of its nested docs, and then call super.collect for evey one of them so they'll be collected
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent doc), so we can skip:
@ -119,21 +108,19 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
}
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
int numChildren = 0;
IntArrayList iterator = getChildren(parentDoc);
final int[] buffer = iterator.buffer;
final int size = iterator.size();
for (int i = 0; i < size; i++) {
numChildren++;
collectBucketNoCounts(buffer[i], bucketOrd);
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucketNoCounts(childDocId, bucketOrd);
numChildren += 1;
}
incrementBucketDocCount(bucketOrd, numChildren);
}
@Override
protected void doClose() {
childDocIdBuffers.clear();
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
@ -169,6 +156,9 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
@Override
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
MapperService.SmartNameObjectMapper mapper = context.searchContext().smartNameObjectMapper(path);
if (mapper == null) {
return new Unmapped(name, context, parent, metaData);
@ -196,43 +186,4 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
}
}
// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
// of the current root doc.
// Examples:
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
private IntArrayList getChildren(final int parentDocId) throws IOException {
int rootDocId = rootDocs.nextSetBit(parentDocId);
if (currentRootDoc == rootDocId) {
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
if (childDocIdBuffer != null) {
return childDocIdBuffer;
} else {
// here we translate the parent doc to a list of its nested docs,
// and then collect buckets for every one of them so they'll be collected
final IntArrayList newChildDocIdBuffer = new IntArrayList();
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
newChildDocIdBuffer.add(childDocId);
}
return newChildDocIdBuffer;
}
} else {
this.currentRootDoc = rootDocId;
childDocIdBuffers.clear();
return getChildren(parentDocId);
}
}
}

View File

@ -483,7 +483,6 @@ public class ReverseNestedTests extends ElasticsearchIntegrationTest {
}
@Test
@AwaitsFix(bugUrl="http://github.com/elasticsearch/elasticsearch/issues/9547")
public void testSameParentDocHavingMultipleBuckets() throws Exception {
XContentBuilder mapping = jsonBuilder().startObject().startObject("product").field("dynamic", "strict").startObject("properties")
.startObject("id").field("type", "long").endObject()