aggs: Changed how `nested` and `reverse_nested` aggs know about their nested depth level.

Before the aggregation tree was traversed to figure out what the parent level is, this commit
changes that by using `NestedScope` to figure out the nested depth level. The big upsides
are that this cleans up `NestedAggregator` (it used a hack to lazily figure out the nested parent filter)
 and this is also what `nested` query uses and therefor the `nested` query can be included inside `nested`
 aggregation and work correctly.

Closes #11749
Closes #12410
This commit is contained in:
Martijn van Groningen 2016-07-22 14:54:40 +02:00
parent 94bc489275
commit c7c0faa54d
8 changed files with 244 additions and 116 deletions

View File

@ -229,6 +229,10 @@ public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> {
return type.name();
}
public AggregatorFactory<?> getParent() {
return parent;
}
/**
* Utility method. Given an {@link AggregatorFactory} that creates
* {@link Aggregator}s that only know how to collect bucket <tt>0</tt>, this
@ -241,4 +245,4 @@ public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> {
return new MultiBucketAggregatorWrapper(bigArrays, context, parent, factory, first);
}
}
}

View File

@ -25,8 +25,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
@ -80,7 +82,22 @@ public class NestedAggregationBuilder extends AbstractAggregationBuilder<NestedA
@Override
protected AggregatorFactory<?> doBuild(AggregationContext context, AggregatorFactory<?> parent, Builder subFactoriesBuilder)
throws IOException {
return new NestedAggregatorFactory(name, type, path, context, parent, subFactoriesBuilder, metaData);
ObjectMapper childObjectMapper = context.searchContext().getObjectMapper(path);
if (childObjectMapper == null) {
// in case the path has been unmapped:
return new NestedAggregatorFactory(name, type, null, null, context, parent, subFactoriesBuilder, metaData);
}
if (childObjectMapper.nested().isNested() == false) {
throw new AggregationExecutionException("[nested] nested path [" + path + "] is not nested");
}
try {
ObjectMapper parentObjectMapper = context.searchContext().getQueryShardContext().nestedScope().nextLevel(childObjectMapper);
return new NestedAggregatorFactory(name, type, parentObjectMapper, childObjectMapper, context, parent, subFactoriesBuilder,
metaData);
} finally {
context.searchContext().getQueryShardContext().nestedScope().previousLevel();
}
}
@Override

View File

@ -44,69 +44,40 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*
*/
public class NestedAggregator extends SingleBucketAggregator {
static final ParseField PATH_FIELD = new ParseField("path");
private BitSetProducer parentFilter;
private final BitSetProducer parentFilter;
private final Query childFilter;
private DocIdSetIterator childDocs;
private BitSet parentDocs;
public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper,
AggregationContext aggregationContext, Aggregator parentAggregator,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, aggregationContext, parentAggregator, pipelineAggregators, metaData);
childFilter = objectMapper.nestedTypeFilter();
Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
this.parentFilter = context.searchContext().bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
}
@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
// Reset parentFilter, so we resolve the parentDocs for each new segment being searched
this.parentFilter = null;
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createNormalizedWeight(childFilter, false);
Weight weight = searcher.createNormalizedWeight(childFilter, false);
Scorer childDocsScorer = weight.scorer(ctx);
if (childDocsScorer == null) {
childDocs = null;
} else {
childDocs = childDocsScorer.iterator();
}
final BitSet parentDocs = parentFilter.getBitSet(ctx);
final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null;
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// here we translate the parent doc to a list of its nested docs, and then call super.collect for evey one of them so they'll be collected
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent doc), so we can skip:
if (parentDoc == 0 || childDocs == null) {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}
if (parentFilter == null) {
// The aggs are instantiated in reverse, first the most inner nested aggs and lastly the top level aggs
// So at the time a nested 'nested' aggs is parsed its closest parent nested aggs hasn't been constructed.
// So the trick is to set at the last moment just before needed and we can use its child filter as the
// parent filter.
// Additional NOTE: Before this logic was performed in the setNextReader(...) method, but the assumption
// that aggs instances are constructed in reverse doesn't hold when buckets are constructed lazily during
// aggs execution
Query parentFilterNotCached = findClosestNestedPath(parent());
if (parentFilterNotCached == null) {
parentFilterNotCached = Queries.newNonNestedFilter();
}
parentFilter = context.searchContext().bitsetFilterCache().getBitSetProducer(parentFilterNotCached);
parentDocs = parentFilter.getBitSet(ctx);
if (parentDocs == null) {
// There are no parentDocs in the segment, so return and set childDocs to null, so we exit early for future invocations.
childDocs = null;
return;
}
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
@ -123,8 +94,8 @@ public class NestedAggregator extends SingleBucketAggregator {
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), pipelineAggregators(),
metaData());
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal),
pipelineAggregators(), metaData());
}
@Override
@ -132,15 +103,4 @@ public class NestedAggregator extends SingleBucketAggregator {
return new InternalNested(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
}
private static Query findClosestNestedPath(Aggregator parent) {
for (; parent != null; parent = parent.parent()) {
if (parent instanceof NestedAggregator) {
return ((NestedAggregator) parent).childFilter;
} else if (parent instanceof ReverseNestedAggregator) {
return ((ReverseNestedAggregator) parent).getParentFilter();
}
}
return null;
}
}

View File

@ -36,12 +36,15 @@ import java.util.Map;
public class NestedAggregatorFactory extends AggregatorFactory<NestedAggregatorFactory> {
private final String path;
private final ObjectMapper parentObjectMapper;
private final ObjectMapper childObjectMapper;
public NestedAggregatorFactory(String name, Type type, String path, AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactories, Map<String, Object> metaData) throws IOException {
public NestedAggregatorFactory(String name, Type type, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper,
AggregationContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactories,
Map<String, Object> metaData) throws IOException {
super(name, type, context, parent, subFactories, metaData);
this.path = path;
this.parentObjectMapper = parentObjectMapper;
this.childObjectMapper = childObjectMapper;
}
@Override
@ -50,14 +53,10 @@ public class NestedAggregatorFactory extends AggregatorFactory<NestedAggregatorF
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
ObjectMapper objectMapper = context.searchContext().getObjectMapper(path);
if (objectMapper == null) {
if (childObjectMapper == null) {
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
}
if (!objectMapper.nested().isNested()) {
throw new AggregationExecutionException("[nested] nested path [" + path + "] is not nested");
}
return new NestedAggregator(name, factories, objectMapper, context, parent, pipelineAggregators, metaData);
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, pipelineAggregators, metaData);
}
private static final class Unmapped extends NonCollectingAggregator {

View File

@ -25,14 +25,20 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Objects;
public class ReverseNestedAggregationBuilder extends AbstractAggregationBuilder<ReverseNestedAggregationBuilder> {
@ -82,7 +88,40 @@ public class ReverseNestedAggregationBuilder extends AbstractAggregationBuilder<
@Override
protected AggregatorFactory<?> doBuild(AggregationContext context, AggregatorFactory<?> parent, Builder subFactoriesBuilder)
throws IOException {
return new ReverseNestedAggregatorFactory(name, type, path, context, parent, subFactoriesBuilder, metaData);
if (findNestedAggregatorFactory(parent) == null) {
throw new SearchParseException(context.searchContext(),
"Reverse nested aggregation [" + name + "] can only be used inside a [nested] aggregation", null);
}
ObjectMapper parentObjectMapper = null;
if (path != null) {
parentObjectMapper = context.searchContext().getObjectMapper(path);
if (parentObjectMapper == null) {
return new ReverseNestedAggregatorFactory(name, type, true, null, context, parent, subFactoriesBuilder, metaData);
}
if (parentObjectMapper.nested().isNested() == false) {
throw new AggregationExecutionException("[reverse_nested] nested path [" + path + "] is not nested");
}
}
NestedScope nestedScope = context.searchContext().getQueryShardContext().nestedScope();
try {
nestedScope.nextLevel(parentObjectMapper);
return new ReverseNestedAggregatorFactory(name, type, false, parentObjectMapper, context, parent, subFactoriesBuilder,
metaData);
} finally {
nestedScope.previousLevel();
}
}
private static NestedAggregatorFactory findNestedAggregatorFactory(AggregatorFactory<?> parent) {
if (parent == null) {
return null;
} else if (parent instanceof NestedAggregatorFactory) {
return (NestedAggregatorFactory) parent;
} else {
return findNestedAggregatorFactory(parent.getParent());
}
}
@Override

View File

@ -37,46 +37,26 @@ import java.util.Map;
public class ReverseNestedAggregatorFactory extends AggregatorFactory<ReverseNestedAggregatorFactory> {
private final String path;
private final boolean unmapped;
private final ObjectMapper parentObjectMapper;
public ReverseNestedAggregatorFactory(String name, Type type, String path, AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactories, Map<String, Object> metaData) throws IOException {
public ReverseNestedAggregatorFactory(String name, Type type, boolean unmapped, ObjectMapper parentObjectMapper,
AggregationContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactories,
Map<String, Object> metaData) throws IOException {
super(name, type, context, parent, subFactories, metaData);
this.path = path;
this.unmapped = unmapped;
this.parentObjectMapper = parentObjectMapper;
}
@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
// Early validation
NestedAggregator closestNestedAggregator = findClosestNestedAggregator(parent);
if (closestNestedAggregator == null) {
throw new SearchParseException(context.searchContext(),
"Reverse nested aggregation [" + name + "] can only be used inside a [nested] aggregation", null);
}
final ObjectMapper objectMapper;
if (path != null) {
objectMapper = context.searchContext().getObjectMapper(path);
if (objectMapper == null) {
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
}
if (!objectMapper.nested().isNested()) {
throw new AggregationExecutionException("[reverse_nested] nested path [" + path + "] is not nested");
}
if (unmapped) {
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
} else {
objectMapper = null;
return new ReverseNestedAggregator(name, factories, parentObjectMapper, context, parent, pipelineAggregators, metaData);
}
return new ReverseNestedAggregator(name, factories, objectMapper, context, parent, pipelineAggregators, metaData);
}
private static NestedAggregator findClosestNestedAggregator(Aggregator parent) {
for (; parent != null; parent = parent.parent()) {
if (parent instanceof NestedAggregator) {
return (NestedAggregator) parent;
}
}
return null;
}
private static final class Unmapped extends NonCollectingAggregator {

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
@ -44,6 +45,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
@ -62,15 +64,12 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsNull.notNullValue;
/**
*
*/
@ESIntegTestCase.SuiteScopeTestCase
public class NestedIT extends ESIntegTestCase {
static int numParents;
static int[] numChildren;
static SubAggCollectionMode aggCollectionMode;
private static int numParents;
private static int[] numChildren;
private static SubAggCollectionMode aggCollectionMode;
@Override
public void setupSuiteScopeCluster() throws Exception {
@ -245,7 +244,7 @@ public class NestedIT extends ESIntegTestCase {
assertThat(nested, notNullValue());
assertThat(nested.getName(), equalTo("nested"));
assertThat(nested.getDocCount(), equalTo(docCount));
assertThat((long) nested.getProperty("_count"), equalTo(docCount));
assertThat(nested.getProperty("_count"), equalTo(docCount));
assertThat(nested.getAggregations().asList().isEmpty(), is(false));
LongTerms values = nested.getAggregations().get("values");
@ -263,7 +262,7 @@ public class NestedIT extends ESIntegTestCase {
assertEquals(counts[i], bucket.getDocCount());
}
}
assertThat((LongTerms) nested.getProperty("values"), sameInstance(values));
assertThat(nested.getProperty("values"), sameInstance(values));
}
public void testNestedAsSubAggregation() throws Exception {
@ -544,4 +543,126 @@ public class NestedIT extends ESIntegTestCase {
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1L));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1L));
}
public void testFilterAggInsideNestedAgg() throws Exception {
assertAcked(prepareCreate("classes")
.addMapping("class", jsonBuilder().startObject().startObject("class").startObject("properties")
.startObject("name").field("type", "text").endObject()
.startObject("methods")
.field("type", "nested")
.startObject("properties")
.startObject("name").field("type", "text").endObject()
.startObject("return_type").field("type", "keyword").endObject()
.startObject("parameters")
.field("type", "nested")
.startObject("properties")
.startObject("name").field("type", "text").endObject()
.startObject("type").field("type", "keyword").endObject()
.endObject()
.endObject()
.endObject()
.endObject().endObject().endObject().endObject()));
client().prepareIndex("classes", "class", "1").setSource(jsonBuilder().startObject()
.field("name", "QueryBuilder")
.startArray("methods")
.startObject()
.field("name", "toQuery")
.field("return_type", "Query")
.startArray("parameters")
.startObject()
.field("name", "context")
.field("type", "QueryShardContext")
.endObject()
.endArray()
.endObject()
.startObject()
.field("name", "queryName")
.field("return_type", "QueryBuilder")
.startArray("parameters")
.startObject()
.field("name", "queryName")
.field("type", "String")
.endObject()
.endArray()
.endObject()
.startObject()
.field("name", "boost")
.field("return_type", "QueryBuilder")
.startArray("parameters")
.startObject()
.field("name", "boost")
.field("type", "float")
.endObject()
.endArray()
.endObject()
.endArray()
.endObject()).get();
client().prepareIndex("classes", "class", "2").setSource(jsonBuilder().startObject()
.field("name", "Document")
.startArray("methods")
.startObject()
.field("name", "add")
.field("return_type", "void")
.startArray("parameters")
.startObject()
.field("name", "field")
.field("type", "IndexableField")
.endObject()
.endArray()
.endObject()
.startObject()
.field("name", "removeField")
.field("return_type", "void")
.startArray("parameters")
.startObject()
.field("name", "name")
.field("type", "String")
.endObject()
.endArray()
.endObject()
.startObject()
.field("name", "removeFields")
.field("return_type", "void")
.startArray("parameters")
.startObject()
.field("name", "name")
.field("type", "String")
.endObject()
.endArray()
.endObject()
.endArray()
.endObject()).get();
refresh();
SearchResponse response = client().prepareSearch("classes").addAggregation(nested("to_method", "methods")
.subAggregation(filter("num_string_params",
nestedQuery("methods.parameters", termQuery("methods.parameters.type", "String"), ScoreMode.None)))
).get();
Nested toMethods = response.getAggregations().get("to_method");
Filter numStringParams = toMethods.getAggregations().get("num_string_params");
assertThat(numStringParams.getDocCount(), equalTo(3L));
response = client().prepareSearch("classes").addAggregation(nested("to_method", "methods")
.subAggregation(terms("return_type").field("methods.return_type").subAggregation(
filter("num_string_params", nestedQuery("methods.parameters", termQuery("methods.parameters.type", "String"), ScoreMode.None))
)
)).get();
toMethods = response.getAggregations().get("to_method");
Terms terms = toMethods.getAggregations().get("return_type");
Bucket bucket = terms.getBucketByKey("void");
assertThat(bucket.getDocCount(), equalTo(3L));
numStringParams = bucket.getAggregations().get("num_string_params");
assertThat(numStringParams.getDocCount(), equalTo(2L));
bucket = terms.getBucketByKey("QueryBuilder");
assertThat(bucket.getDocCount(), equalTo(2L));
numStringParams = bucket.getAggregations().get("num_string_params");
assertThat(numStringParams.getDocCount(), equalTo(1L));
bucket = terms.getBucketByKey("Query");
assertThat(bucket.getDocCount(), equalTo(1L));
numStringParams = bucket.getAggregations().get("num_string_params");
assertThat(numStringParams.getDocCount(), equalTo(0L));
}
}

View File

@ -55,9 +55,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsNull.notNullValue;
/**
*
*/
@ESIntegTestCase.SuiteScopeTestCase
public class ReverseNestedIT extends ESIntegTestCase {
@ -170,9 +167,9 @@ public class ReverseNestedIT extends ESIntegTestCase {
assertThat(bucket.getKeyAsString(), equalTo("1"));
assertThat(bucket.getDocCount(), equalTo(6L));
ReverseNested reverseNested = bucket.getAggregations().get("nested1_to_field1");
assertThat((long) reverseNested.getProperty("_count"), equalTo(5L));
assertThat(reverseNested.getProperty("_count"), equalTo(5L));
Terms tags = reverseNested.getAggregations().get("field1");
assertThat((Terms) reverseNested.getProperty("field1"), sameInstance(tags));
assertThat(reverseNested.getProperty("field1"), sameInstance(tags));
List<Terms.Bucket> tagsBuckets = new ArrayList<>(tags.getBuckets());
assertThat(tagsBuckets.size(), equalTo(6));
assertThat(tagsBuckets.get(0).getKeyAsString(), equalTo("c"));
@ -472,14 +469,25 @@ public class ReverseNestedIT extends ESIntegTestCase {
SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(nested("nested2", "nested1.nested2").subAggregation(reverseNested("incorrect").path("nested3")))
.execute().actionGet();
.get();
Nested nested = searchResponse.getAggregations().get("nested2");
assertThat(nested, Matchers.notNullValue());
assertThat(nested, notNullValue());
assertThat(nested.getName(), equalTo("nested2"));
ReverseNested reverseNested = nested.getAggregations().get("incorrect");
assertThat(reverseNested.getDocCount(), is(0L));
// Test that parsing the reverse_nested agg doesn't fail, because the parent nested agg is unmapped:
searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery())
.addAggregation(nested("incorrect1", "incorrect1").subAggregation(reverseNested("incorrect2").path("incorrect2")))
.get();
nested = searchResponse.getAggregations().get("incorrect1");
assertThat(nested, notNullValue());
assertThat(nested.getName(), equalTo("incorrect1"));
assertThat(nested.getDocCount(), is(0L));
}
public void testSameParentDocHavingMultipleBuckets() throws Exception {