Aggregations: Add `children` bucket aggregator that is able to map buckets between parent types and child types using the already builtin parent/child support.

Closes #6936
This commit is contained in:
Martijn van Groningen 2014-05-04 18:22:01 +07:00
parent 122c2b7a12
commit 383e64bd5c
15 changed files with 1117 additions and 3 deletions

View File

@ -12,6 +12,8 @@ include::bucket/nested-aggregation.asciidoc[]
include::bucket/reverse-nested-aggregation.asciidoc[] include::bucket/reverse-nested-aggregation.asciidoc[]
include::bucket/children-aggregation.asciidoc[]
include::bucket/terms-aggregation.asciidoc[] include::bucket/terms-aggregation.asciidoc[]
include::bucket/significantterms-aggregation.asciidoc[] include::bucket/significantterms-aggregation.asciidoc[]

View File

@ -0,0 +1,343 @@
[[search-aggregations-bucket-children-aggregation]]
=== Children Aggregation
A special single bucket aggregation that enables aggregating from buckets on parent document types to buckets on child documents.
This aggregation relies on the <<mapping-parent-field,_parent field>> in the mapping. This aggregation has a single option:
* `type` - The what child type the buckets in the parent space should be mapped to.
For example, let's say we have an index of questions and answers. The answer type has the following `_parent` field in the mapping:
[source,js]
--------------------------------------------------
{
"answer" : {
"_parent" : {
"type" : "question"
}
}
}
--------------------------------------------------
The question typed document contain a tag field and the answer typed documents contain an owner field. With the `children`
aggregation the tag buckets can be mapped to the owner buckets in a single request even though the two fields exist in
two different kinds of documents.
An example of a question typed document:
[source,js]
--------------------------------------------------
{
"body": "<p>I have Windows 2003 server and i bought a new Windows 2008 server...",
"title": "Whats the best way to file transfer my site from server to a newer one?",
"tags": [
"windows-server-2003",
"windows-server-2008",
"file-transfer"
],
}
--------------------------------------------------
An example of an answer typed document:
[source,js]
--------------------------------------------------
{
"owner": {
"location": "Norfolk, United Kingdom",
"display_name": "Sam",
"id": 48
},
"body": "<p>Unfortunately your pretty much limited to FTP...",
"creation_date": "2009-05-04T13:45:37.030"
}
--------------------------------------------------
The following request can be built that connects the two together:
[source,js]
--------------------------------------------------
{
"aggs": {
"top-tags": {
"terms": {
"field": "tags",
"size": 10
},
"aggs": {
"to-answers": {
"children": {
"type" : "answer" <1>
},
"aggs": {
"top-names": {
"terms": {
"field": "owner.display_name",
"size": 10
}
}
}
}
}
}
}
}
--------------------------------------------------
<1> The `type` points to type / mapping with the name `answer`.
The above example returns the top question tags and per tag the top answer owners.
Possible response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"top-tags": {
"buckets": [
{
"key": "windows-server-2003",
"doc_count": 25365, <1>
"to-answers": {
"doc_count": 36004, <2>
"top-names": {
"buckets": [
{
"key": "Sam",
"doc_count": 274
},
{
"key": "chris",
"doc_count": 19
},
{
"key": "david",
"doc_count": 14
},
...
]
}
}
},
{
"key": "linux",
"doc_count": 18342,
"to-answers": {
"doc_count": 6655,
"top-names": {
"buckets": [
{
"key": "abrams",
"doc_count": 25
},
{
"key": "ignacio",
"doc_count": 25
},
{
"key": "vazquez",
"doc_count": 25
},
...
]
}
}
},
{
"key": "windows",
"doc_count": 18119,
"to-answers": {
"doc_count": 24051,
"top-names": {
"buckets": [
{
"key": "molly7244",
"doc_count": 265
},
{
"key": "david",
"doc_count": 27
},
{
"key": "chris",
"doc_count": 26
},
...
]
}
}
},
{
"key": "osx",
"doc_count": 10971,
"to-answers": {
"doc_count": 5902,
"top-names": {
"buckets": [
{
"key": "diago",
"doc_count": 4
},
{
"key": "albert",
"doc_count": 3
},
{
"key": "asmus",
"doc_count": 3
},
...
]
}
}
},
{
"key": "ubuntu",
"doc_count": 8743,
"to-answers": {
"doc_count": 8784,
"top-names": {
"buckets": [
{
"key": "ignacio",
"doc_count": 9
},
{
"key": "abrams",
"doc_count": 8
},
{
"key": "molly7244",
"doc_count": 8
},
...
]
}
}
},
{
"key": "windows-xp",
"doc_count": 7517,
"to-answers": {
"doc_count": 13610,
"top-names": {
"buckets": [
{
"key": "molly7244",
"doc_count": 232
},
{
"key": "chris",
"doc_count": 9
},
{
"key": "john",
"doc_count": 9
},
...
]
}
}
},
{
"key": "networking",
"doc_count": 6739,
"to-answers": {
"doc_count": 2076,
"top-names": {
"buckets": [
{
"key": "molly7244",
"doc_count": 6
},
{
"key": "alnitak",
"doc_count": 5
},
{
"key": "chris",
"doc_count": 3
},
...
]
}
}
},
{
"key": "mac",
"doc_count": 5590,
"to-answers": {
"doc_count": 999,
"top-names": {
"buckets": [
{
"key": "abrams",
"doc_count": 2
},
{
"key": "ignacio",
"doc_count": 2
},
{
"key": "vazquez",
"doc_count": 2
},
...
]
}
}
},
{
"key": "wireless-networking",
"doc_count": 4409,
"to-answers": {
"doc_count": 6497,
"top-names": {
"buckets": [
{
"key": "molly7244",
"doc_count": 61
},
{
"key": "chris",
"doc_count": 5
},
{
"key": "mike",
"doc_count": 5
},
...
]
}
}
},
{
"key": "windows-8",
"doc_count": 3601,
"to-answers": {
"doc_count": 4263,
"top-names": {
"buckets": [
{
"key": "molly7244",
"doc_count": 3
},
{
"key": "msft",
"doc_count": 2
},
{
"key": "user172132",
"doc_count": 2
},
...
]
}
}
}
]
}
}
}
--------------------------------------------------
<1> The number of question documents with the tag `windows-server-2003`.
<2> The number of answer documents that are related to question documents with the tag `windows-server-2003`.

View File

@ -28,9 +28,9 @@ import java.io.IOException;
* A scorer that wraps a {@link DocIdSetIterator} and emits a constant score. * A scorer that wraps a {@link DocIdSetIterator} and emits a constant score.
*/ */
// Borrowed from ConstantScoreQuery // Borrowed from ConstantScoreQuery
class ConstantScorer extends Scorer { public class ConstantScorer extends Scorer {
static ConstantScorer create(DocIdSetIterator iterator, Weight weight, float constantScore) throws IOException { public static ConstantScorer create(DocIdSetIterator iterator, Weight weight, float constantScore) throws IOException {
return new ConstantScorer(iterator, weight, constantScore); return new ConstantScorer(iterator, weight, constantScore);
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.elasticsearch.search.aggregations.bucket.children.ChildrenBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder;
@ -106,6 +107,10 @@ public class AggregationBuilders {
return new ReverseNestedBuilder(name); return new ReverseNestedBuilder(name);
} }
public static ChildrenBuilder children(String name) {
return new ChildrenBuilder(name);
}
public static GeoDistanceBuilder geoDistance(String name) { public static GeoDistanceBuilder geoDistance(String name) {
return new GeoDistanceBuilder(name); return new GeoDistanceBuilder(name);
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.search.aggregations.bucket.children.ChildrenParser;
import org.elasticsearch.search.aggregations.bucket.filter.FilterParser; import org.elasticsearch.search.aggregations.bucket.filter.FilterParser;
import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser; import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser;
@ -87,6 +88,7 @@ public class AggregationModule extends AbstractModule {
parsers.add(ReverseNestedParser.class); parsers.add(ReverseNestedParser.class);
parsers.add(TopHitsParser.class); parsers.add(TopHitsParser.class);
parsers.add(GeoBoundsParser.class); parsers.add(GeoBoundsParser.class);
parsers.add(ChildrenParser.class);
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.search.aggregations.bucket.children.InternalChildren;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters; import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters;
import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid;
@ -95,5 +96,6 @@ public class TransportAggregationModule extends AbstractModule {
InternalReverseNested.registerStream(); InternalReverseNested.registerStream();
InternalTopHits.registerStreams(); InternalTopHits.registerStreams();
InternalGeoBounds.registerStream(); InternalGeoBounds.registerStream();
InternalChildren.registerStream();
} }
} }

View File

@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.children;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
/**
*/
public interface Children extends SingleBucketAggregation {
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.children;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilderException;
import java.io.IOException;
/**
*/
public class ChildrenBuilder extends AggregationBuilder<ChildrenBuilder> {
private String childType;
public ChildrenBuilder(String name) {
super(name, InternalChildren.TYPE.name());
}
public ChildrenBuilder childType(String childType) {
this.childType = childType;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (childType == null) {
throw new SearchSourceBuilderException("child_type must be set on children aggregation [" + name + "]");
}
builder.field("type", childType);
return builder.endObject();
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.children;
import org.apache.lucene.search.Filter;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
/**
*
*/
public class ChildrenParser implements Aggregator.Parser {
@Override
public String type() {
return InternalChildren.TYPE.name();
}
@Override
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
String childType = null;
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if ("type".equals(currentFieldName)) {
childType = parser.text();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].");
}
}
if (childType == null) {
throw new SearchParseException(context, "Missing [child_type] field for children aggregation [" + aggregationName + "]");
}
DocumentMapper childDocMapper = context.mapperService().documentMapper(childType);
if (childDocMapper == null) {
throw new SearchParseException(context, "[children] No mapping for for type [" + childType + "]");
}
ParentFieldMapper parentFieldMapper = childDocMapper.parentFieldMapper();
if (!parentFieldMapper.active()) {
throw new SearchParseException(context, "[children] _parent field not configured");
}
String parentType = parentFieldMapper.type();
DocumentMapper parentDocMapper = context.mapperService().documentMapper(parentType);
if (parentDocMapper == null) {
throw new SearchParseException(context, "[children] Type [" + childType + "] points to a non existent parent type [" + parentType + "]");
}
Filter parentFilter = context.filterCache().cache(parentDocMapper.typeFilter());
Filter childFilter = context.filterCache().cache(childDocMapper.typeFilter());
ParentChildIndexFieldData parentChildIndexFieldData = context.fieldData().getForField(parentFieldMapper);
ValuesSourceConfig<ValuesSource.Bytes.WithOrdinals.ParentChild> config = new ValuesSourceConfig<>(ValuesSource.Bytes.WithOrdinals.ParentChild.class);
config.fieldContext(new FieldContext(parentFieldMapper.names().indexName(), parentChildIndexFieldData, parentFieldMapper));
return new ParentToChildrenAggregator.Factory(aggregationName, config, parentType, parentFilter, childFilter);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.children;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import java.io.IOException;
/**
*/
public class InternalChildren extends InternalSingleBucketAggregation implements Children {
public static final Type TYPE = new Type("children");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalChildren readResult(StreamInput in) throws IOException {
InternalChildren result = new InternalChildren();
result.readFrom(in);
return result;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public InternalChildren() {
}
public InternalChildren(String name, long docCount, InternalAggregations aggregations) {
super(name, docCount, aggregations);
}
@Override
public Type type() {
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalChildren(name, docCount, subAggregations);
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.children;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.join.FixedBitSetCachingWrapperFilter;
import org.apache.lucene.util.Bits;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.index.search.child.ConstantScorer;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this
// aggregation, for this reason that collector can't be used
public class ParentToChildrenAggregator extends SingleBucketAggregator implements ReaderContextAware {
private final String parentType;
private final Filter childFilter;
private final Filter parentFilter;
private final ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource;
// Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, but then we don't have the reuse feature of BigArrays.
// Also if we know the highest possible value that a parent agg will create then we store multiple values into one slot
private final LongArray parentOrdToBuckets;
// Only pay the extra storage price if the a parentOrd has multiple buckets
// Most of the times a parent doesn't have multiple buckets, since there is only one document per parent ord,
// only in the case of terms agg if a parent doc has multiple terms per field this is needed:
private final LongObjectPagedHashMap<long[]> parentOrdToOtherBuckets;
private boolean multipleBucketsPerParentOrd = false;
private List<AtomicReaderContext> replay = new ArrayList<>();
private SortedDocValues globalOrdinals;
private Bits parentDocs;
public ParentToChildrenAggregator(String name, AggregatorFactories factories, AggregationContext aggregationContext,
Aggregator parent, String parentType, Filter childFilter, Filter parentFilter,
ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource, long maxOrd) {
super(name, factories, aggregationContext, parent);
this.parentType = parentType;
this.childFilter = childFilter;
// TODO: remove FixedBitSetCachingWrapperFilter once #7031 gets in
this.parentFilter = new FixedBitSetCachingWrapperFilter(parentFilter);
this.parentOrdToBuckets = aggregationContext.bigArrays().newLongArray(maxOrd, false);
this.parentOrdToBuckets.fill(0, maxOrd, -1);
this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(aggregationContext.bigArrays());
this.valuesSource = valuesSource;
}
@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
return new InternalChildren(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal));
}
@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalChildren(name, 0, buildEmptySubAggregations());
}
@Override
public void collect(int docId, long bucketOrdinal) throws IOException {
if (parentDocs != null && parentDocs.get(docId)) {
long globalOrdinal = globalOrdinals.getOrd(docId);
if (globalOrdinal != -1) {
if (parentOrdToBuckets.get(globalOrdinal) == -1) {
parentOrdToBuckets.set(globalOrdinal, bucketOrdinal);
} else {
long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
if (bucketOrds != null) {
bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1);
bucketOrds[bucketOrds.length - 1] = bucketOrdinal;
parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds);
} else {
parentOrdToOtherBuckets.put(globalOrdinal, new long[]{bucketOrdinal});
}
multipleBucketsPerParentOrd = true;
}
}
}
}
@Override
public void setNextReader(AtomicReaderContext reader) {
if (replay == null) {
return;
}
globalOrdinals = valuesSource.globalOrdinalsValues(parentType);
assert globalOrdinals != null;
try {
DocIdSet parentDocIdSet = parentFilter.getDocIdSet(reader, null);
if (parentDocIdSet != null) {
parentDocs = parentDocIdSet.bits();
} else {
parentDocs = null;
}
DocIdSet childDocIdSet = childFilter.getDocIdSet(reader, null);
if (globalOrdinals != null && !DocIdSets.isEmpty(childDocIdSet)) {
replay.add(reader);
}
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
}
@Override
protected void doPostCollection() throws IOException {
List<AtomicReaderContext> replay = this.replay;
this.replay = null;
for (AtomicReaderContext atomicReaderContext : replay) {
context.setNextReader(atomicReaderContext);
SortedDocValues globalOrdinals = valuesSource.globalOrdinalsValues(parentType);
DocIdSet childDocIdSet = childFilter.getDocIdSet(atomicReaderContext, atomicReaderContext.reader().getLiveDocs());
if (childDocIdSet == null) {
continue;
}
DocIdSetIterator childDocsIter = childDocIdSet.iterator();
if (childDocsIter == null) {
continue;
}
// Set the scorer, since we now replay only the child docIds
context.setScorer(ConstantScorer.create(childDocsIter, null, 1f));
for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) {
long globalOrdinal = globalOrdinals.getOrd(docId);
if (globalOrdinal != -1) {
long bucketOrd = parentOrdToBuckets.get(globalOrdinal);
if (bucketOrd != -1) {
collectBucket(docId, bucketOrd);
if (multipleBucketsPerParentOrd) {
long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal);
if (otherBucketOrds != null) {
for (long otherBucketOrd : otherBucketOrds) {
collectBucket(docId, otherBucketOrd);
}
}
}
}
}
}
}
}
@Override
protected void doClose() {
Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets);
}
public static class Factory extends ValuesSourceAggregatorFactory<ValuesSource.Bytes.WithOrdinals.ParentChild> {
private final String parentType;
private final Filter parentFilter;
private final Filter childFilter;
public Factory(String name, ValuesSourceConfig<ValuesSource.Bytes.WithOrdinals.ParentChild> config, String parentType, Filter parentFilter, Filter childFilter) {
super(name, InternalChildren.TYPE.name(), config);
this.parentType = parentType;
this.parentFilter = parentFilter;
this.childFilter = childFilter;
}
@Override
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) {
throw new ElasticsearchIllegalStateException("[children] aggregation doesn't support unmapped");
}
@Override
protected Aggregator create(ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
long maxOrd = valuesSource.globalMaxOrd(aggregationContext.searchContext().searcher(), parentType);
return new ParentToChildrenAggregator(name, factories, aggregationContext, parent, parentType, childFilter, parentFilter, valuesSource, maxOrd);
}
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData; import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData; import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
@ -177,7 +178,9 @@ public class AggregationContext implements ReaderContextAware, ScorerAware {
if (dataSource == null) { if (dataSource == null) {
final IndexFieldData<?> indexFieldData = config.fieldContext.indexFieldData(); final IndexFieldData<?> indexFieldData = config.fieldContext.indexFieldData();
ValuesSource.MetaData metaData = ValuesSource.MetaData.load(config.fieldContext.indexFieldData(), searchContext); ValuesSource.MetaData metaData = ValuesSource.MetaData.load(config.fieldContext.indexFieldData(), searchContext);
if (indexFieldData instanceof IndexOrdinalsFieldData) { if (indexFieldData instanceof ParentChildIndexFieldData) {
dataSource = new ValuesSource.Bytes.WithOrdinals.ParentChild((ParentChildIndexFieldData) indexFieldData, metaData);
} else if (indexFieldData instanceof IndexOrdinalsFieldData) {
dataSource = new ValuesSource.Bytes.WithOrdinals.FieldData((IndexOrdinalsFieldData) indexFieldData, metaData); dataSource = new ValuesSource.Bytes.WithOrdinals.FieldData((IndexOrdinalsFieldData) indexFieldData, metaData);
} else { } else {
dataSource = new ValuesSource.Bytes.FieldData(indexFieldData, metaData); dataSource = new ValuesSource.Bytes.FieldData(indexFieldData, metaData);

View File

@ -25,6 +25,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.ReaderContextAware; import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.TopReaderContextAware; import org.elasticsearch.common.lucene.TopReaderContextAware;
import org.elasticsearch.index.fielddata.*; import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.plain.ParentChildAtomicFieldData;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.script.SearchScript; import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric.WithScript.DoubleValues; import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric.WithScript.DoubleValues;
import org.elasticsearch.search.aggregations.support.ValuesSource.WithScript.BytesValues; import org.elasticsearch.search.aggregations.support.ValuesSource.WithScript.BytesValues;
@ -287,7 +289,63 @@ public abstract class ValuesSource {
} }
} }
} }
}
public static class ParentChild extends Bytes implements ReaderContextAware, TopReaderContextAware {
protected final ParentChildIndexFieldData indexFieldData;
protected final MetaData metaData;
protected AtomicParentChildFieldData atomicFieldData;
protected IndexParentChildFieldData globalFieldData;
private long maxOrd = -1;
public ParentChild(ParentChildIndexFieldData indexFieldData, MetaData metaData) {
this.indexFieldData = indexFieldData;
this.metaData = metaData;
}
@Override
public void setNextReader(AtomicReaderContext reader) {
atomicFieldData = globalFieldData.load(reader);
}
@Override
public void setNextReader(IndexReaderContext reader) {
globalFieldData = indexFieldData.loadGlobal(reader.reader());
}
public SortedDocValues globalOrdinalsValues(String type) {
return atomicFieldData.getOrdinalsValues(type);
}
public long globalMaxOrd(IndexSearcher indexSearcher, String type) {
if (maxOrd != -1) {
return maxOrd;
}
IndexReader indexReader = indexSearcher.getIndexReader();
if (indexReader.leaves().isEmpty()) {
return maxOrd = 0;
} else {
AtomicReaderContext atomicReaderContext = indexReader.leaves().get(0);
IndexParentChildFieldData globalFieldData = indexFieldData.loadGlobal(indexReader);
AtomicParentChildFieldData afd = globalFieldData.load(atomicReaderContext);
SortedDocValues values = afd.getOrdinalsValues(type);
return maxOrd = values.getValueCount();
}
}
@Override
public SortedBinaryDocValues bytesValues() {
throw new UnsupportedOperationException();
}
@Override
public MetaData metaData() {
return metaData;
}
} }
public static class FieldData extends Bytes implements ReaderContextAware { public static class FieldData extends Bytes implements ReaderContextAware {

View File

@ -27,6 +27,8 @@ import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.children.Children;
import java.util.Arrays; import java.util.Arrays;
@ -158,6 +160,46 @@ public class ChildSearchBenchmark {
} }
System.out.println("--> has_child filter with match_all child query, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms"); System.out.println("--> has_child filter with match_all child query, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms");
System.out.println("--> Running children agg");
totalQueryTime = 0;
for (int j = 1; j <= QUERY_COUNT; j++) {
SearchResponse searchResponse = client.prepareSearch(indexName)
.setQuery(matchQuery("field1", parentChildIndexGenerator.getQueryValue()))
.addAggregation(
AggregationBuilders.children("to-child").childType("child")
)
.execute().actionGet();
totalQueryTime += searchResponse.getTookInMillis();
if (searchResponse.getFailedShards() > 0) {
System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures()));
}
Children children = searchResponse.getAggregations().get("to-child");
if (j % 10 == 0) {
System.out.println("--> children doc count [" + j + "], got [" + children.getDocCount() + "]");
}
}
System.out.println("--> children agg, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms");
System.out.println("--> Running children agg with match_all");
totalQueryTime = 0;
for (int j = 1; j <= QUERY_COUNT; j++) {
SearchResponse searchResponse = client.prepareSearch(indexName)
.addAggregation(
AggregationBuilders.children("to-child").childType("child")
)
.execute().actionGet();
totalQueryTime += searchResponse.getTookInMillis();
if (searchResponse.getFailedShards() > 0) {
System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures()));
}
Children children = searchResponse.getAggregations().get("to-child");
if (j % 10 == 0) {
System.out.println("--> children doc count [" + j + "], got [" + children.getDocCount() + "]");
}
}
System.out.println("--> children agg, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms");
// run parent child constant query // run parent child constant query
for (int j = 0; j < QUERY_WARMUP; j++) { for (int j = 0; j < QUERY_WARMUP; j++) {
SearchResponse searchResponse = client.prepareSearch(indexName) SearchResponse searchResponse = client.prepareSearch(indexName)

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.bucket.children.Children;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
@ElasticsearchIntegrationTest.SuiteScopeTest
public class ChildrenTests extends ElasticsearchIntegrationTest {
private final static Map<String, Control> categoryToControl = new HashMap<>();
@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(
prepareCreate("test")
.addMapping("article")
.addMapping("comment", "_parent", "type=article")
);
List<IndexRequestBuilder> requests = new ArrayList<>();
String[] uniqueCategories = new String[randomIntBetween(1, 25)];
for (int i = 0; i < uniqueCategories.length; i++) {
uniqueCategories[i] = Integer.toString(i);
}
int catIndex = 0;
int numParentDocs = randomIntBetween(uniqueCategories.length, uniqueCategories.length * 5);
for (int i = 0; i < numParentDocs; i++) {
String id = Integer.toString(i);
String[] categories = new String[randomIntBetween(1,1)];
for (int j = 0; j < categories.length; j++) {
String category = categories[j] = uniqueCategories[catIndex++ % uniqueCategories.length];
Control control = categoryToControl.get(category);
if (control == null) {
categoryToControl.put(category, control = new Control(category));
}
control.articleIds.add(id);
}
requests.add(client().prepareIndex("test", "article", id).setCreate(true).setSource("category", categories, "randomized", true));
}
String[] commenters = new String[randomIntBetween(5, 50)];
for (int i = 0; i < commenters.length; i++) {
commenters[i] = Integer.toString(i);
}
int id = 0;
for (Control control : categoryToControl.values()) {
for (String articleId : control.articleIds) {
int numChildDocsPerParent = randomIntBetween(0, 5);
for (int i = 0; i < numChildDocsPerParent; i++) {
String commenter = commenters[id % commenters.length];
String idValue = Integer.toString(id++);
control.commentIds.add(idValue);
Set<String> ids = control.commenterToCommentId.get(commenter);
if (ids == null) {
control.commenterToCommentId.put(commenter, ids = new HashSet<>());
}
ids.add(idValue);
requests.add(client().prepareIndex("test", "comment", idValue).setCreate(true).setParent(articleId).setSource("commenter", commenter));
}
}
}
requests.add(client().prepareIndex("test", "article", "a").setSource("category", new String[]{"a"}, "randomized", false));
requests.add(client().prepareIndex("test", "article", "b").setSource("category", new String[]{"a", "b"}, "randomized", false));
requests.add(client().prepareIndex("test", "article", "c").setSource("category", new String[]{"a", "b", "c"}, "randomized", false));
requests.add(client().prepareIndex("test", "article", "d").setSource("category", new String[]{"c"}, "randomized", false));
requests.add(client().prepareIndex("test", "comment", "a").setParent("a").setSource("{}"));
requests.add(client().prepareIndex("test", "comment", "c").setParent("c").setSource("{}"));
indexRandom(true, requests);
ensureSearchable("test");
}
@Test
public void testChildrenAggs() throws Exception {
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchQuery("randomized", true))
.addAggregation(
terms("category").field("category").size(0).subAggregation(
children("to_comment").childType("comment").subAggregation(
terms("commenters").field("commenter").size(0).subAggregation(
topHits("top_comments")
))
)
).get();
assertSearchResponse(searchResponse);
StringTerms categoryTerms = searchResponse.getAggregations().get("category");
assertThat(categoryTerms.getBuckets().size(), equalTo(categoryToControl.size()));
for (Map.Entry<String, Control> entry1 : categoryToControl.entrySet()) {
Terms.Bucket categoryBucket = categoryTerms.getBucketByKey(entry1.getKey());
assertThat(categoryBucket.getKey(), equalTo(entry1.getKey()));
assertThat(categoryBucket.getDocCount(), equalTo((long) entry1.getValue().articleIds.size()));
Children childrenBucket = categoryBucket.getAggregations().get("to_comment");
assertThat(childrenBucket.getName(), equalTo("to_comment"));
assertThat(childrenBucket.getDocCount(), equalTo((long) entry1.getValue().commentIds.size()));
StringTerms commentersTerms = childrenBucket.getAggregations().get("commenters");
assertThat(commentersTerms.getBuckets().size(), equalTo(entry1.getValue().commenterToCommentId.size()));
for (Map.Entry<String, Set<String>> entry2 : entry1.getValue().commenterToCommentId.entrySet()) {
Terms.Bucket commentBucket = commentersTerms.getBucketByKey(entry2.getKey());
assertThat(commentBucket.getKey(), equalTo(entry2.getKey()));
assertThat(commentBucket.getDocCount(), equalTo((long) entry2.getValue().size()));
TopHits topHits = commentBucket.getAggregations().get("top_comments");
for (SearchHit searchHit : topHits.getHits().getHits()) {
assertThat(entry2.getValue().contains(searchHit.getId()), is(true));
}
}
}
}
@Test
public void testParentWithMultipleBuckets() throws Exception {
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(matchQuery("randomized", false))
.addAggregation(
terms("category").field("category").size(0).subAggregation(
children("to_comment").childType("comment").subAggregation(topHits("top_comments").addSort("_uid", SortOrder.ASC))
)
).get();
assertSearchResponse(searchResponse);
StringTerms categoryTerms = searchResponse.getAggregations().get("category");
assertThat(categoryTerms.getBuckets().size(), equalTo(3));
Terms.Bucket categoryBucket = categoryTerms.getBucketByKey("a");
assertThat(categoryBucket.getKey(), equalTo("a"));
assertThat(categoryBucket.getDocCount(), equalTo(3l));
Children childrenBucket = categoryBucket.getAggregations().get("to_comment");
assertThat(childrenBucket.getName(), equalTo("to_comment"));
assertThat(childrenBucket.getDocCount(), equalTo(2l));
TopHits topHits = childrenBucket.getAggregations().get("top_comments");
assertThat(topHits.getHits().getAt(0).getId(), equalTo("a"));
assertThat(topHits.getHits().getAt(1).getId(), equalTo("c"));
categoryBucket = categoryTerms.getBucketByKey("b");
assertThat(categoryBucket.getKey(), equalTo("b"));
assertThat(categoryBucket.getDocCount(), equalTo(2l));
childrenBucket = categoryBucket.getAggregations().get("to_comment");
assertThat(childrenBucket.getName(), equalTo("to_comment"));
assertThat(childrenBucket.getDocCount(), equalTo(1l));
categoryBucket = categoryTerms.getBucketByKey("c");
assertThat(categoryBucket.getKey(), equalTo("c"));
assertThat(categoryBucket.getDocCount(), equalTo(2l));
childrenBucket = categoryBucket.getAggregations().get("to_comment");
assertThat(childrenBucket.getName(), equalTo("to_comment"));
assertThat(childrenBucket.getDocCount(), equalTo(1l));
}
private static final class Control {
final String category;
final Set<String> articleIds = new HashSet<>();
final Set<String> commentIds = new HashSet<>();
final Map<String, Set<String>> commenterToCommentId = new HashMap<>();
private Control(String category) {
this.category = category;
}
}
}