From 383e64bd5c7979d30e0d7bb74741b96abeb27e2b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 4 May 2014 18:22:01 +0700 Subject: [PATCH] Aggregations: Add `children` bucket aggregator that is able to map buckets between parent types and child types using the already builtin parent/child support. Closes #6936 --- .../search/aggregations/bucket.asciidoc | 2 + .../bucket/children-aggregation.asciidoc | 343 ++++++++++++++++++ .../index/search/child/ConstantScorer.java | 4 +- .../aggregations/AggregationBuilders.java | 5 + .../aggregations/AggregationModule.java | 2 + .../TransportAggregationModule.java | 2 + .../bucket/children/Children.java | 27 ++ .../bucket/children/ChildrenBuilder.java | 51 +++ .../bucket/children/ChildrenParser.java | 93 +++++ .../bucket/children/InternalChildren.java | 64 ++++ .../children/ParentToChildrenAggregator.java | 214 +++++++++++ .../support/AggregationContext.java | 5 +- .../aggregations/support/ValuesSource.java | 58 +++ .../search/child/ChildSearchBenchmark.java | 42 +++ .../aggregations/bucket/ChildrenTests.java | 208 +++++++++++ 15 files changed, 1117 insertions(+), 3 deletions(-) create mode 100644 docs/reference/search/aggregations/bucket/children-aggregation.asciidoc create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/children/Children.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/children/InternalChildren.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/children/ParentToChildrenAggregator.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/bucket/ChildrenTests.java diff --git a/docs/reference/search/aggregations/bucket.asciidoc b/docs/reference/search/aggregations/bucket.asciidoc index e9d163a67e7..7d7848fa1a2 100644 --- a/docs/reference/search/aggregations/bucket.asciidoc +++ b/docs/reference/search/aggregations/bucket.asciidoc @@ -12,6 +12,8 @@ include::bucket/nested-aggregation.asciidoc[] include::bucket/reverse-nested-aggregation.asciidoc[] +include::bucket/children-aggregation.asciidoc[] + include::bucket/terms-aggregation.asciidoc[] include::bucket/significantterms-aggregation.asciidoc[] diff --git a/docs/reference/search/aggregations/bucket/children-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/children-aggregation.asciidoc new file mode 100644 index 00000000000..a7ddf4fddda --- /dev/null +++ b/docs/reference/search/aggregations/bucket/children-aggregation.asciidoc @@ -0,0 +1,343 @@ +[[search-aggregations-bucket-children-aggregation]] +=== Children Aggregation + +A special single bucket aggregation that enables aggregating from buckets on parent document types to buckets on child documents. + +This aggregation relies on the <> in the mapping. This aggregation has a single option: +* `type` - The what child type the buckets in the parent space should be mapped to. + +For example, let's say we have an index of questions and answers. The answer type has the following `_parent` field in the mapping: +[source,js] +-------------------------------------------------- +{ + "answer" : { + "_parent" : { + "type" : "question" + } + } +} +-------------------------------------------------- + +The question typed document contain a tag field and the answer typed documents contain an owner field. With the `children` +aggregation the tag buckets can be mapped to the owner buckets in a single request even though the two fields exist in +two different kinds of documents. + +An example of a question typed document: +[source,js] +-------------------------------------------------- +{ + "body": "

I have Windows 2003 server and i bought a new Windows 2008 server...", + "title": "Whats the best way to file transfer my site from server to a newer one?", + "tags": [ + "windows-server-2003", + "windows-server-2008", + "file-transfer" + ], +} +-------------------------------------------------- + +An example of an answer typed document: +[source,js] +-------------------------------------------------- +{ + "owner": { + "location": "Norfolk, United Kingdom", + "display_name": "Sam", + "id": 48 + }, + "body": "

Unfortunately your pretty much limited to FTP...", + "creation_date": "2009-05-04T13:45:37.030" +} +-------------------------------------------------- + +The following request can be built that connects the two together: + +[source,js] +-------------------------------------------------- +{ + "aggs": { + "top-tags": { + "terms": { + "field": "tags", + "size": 10 + }, + "aggs": { + "to-answers": { + "children": { + "type" : "answer" <1> + }, + "aggs": { + "top-names": { + "terms": { + "field": "owner.display_name", + "size": 10 + } + } + } + } + } + } + } +} +-------------------------------------------------- + +<1> The `type` points to type / mapping with the name `answer`. + +The above example returns the top question tags and per tag the top answer owners. + +Possible response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "top-tags": { + "buckets": [ + { + "key": "windows-server-2003", + "doc_count": 25365, <1> + "to-answers": { + "doc_count": 36004, <2> + "top-names": { + "buckets": [ + { + "key": "Sam", + "doc_count": 274 + }, + { + "key": "chris", + "doc_count": 19 + }, + { + "key": "david", + "doc_count": 14 + }, + ... + ] + } + } + }, + { + "key": "linux", + "doc_count": 18342, + "to-answers": { + "doc_count": 6655, + "top-names": { + "buckets": [ + { + "key": "abrams", + "doc_count": 25 + }, + { + "key": "ignacio", + "doc_count": 25 + }, + { + "key": "vazquez", + "doc_count": 25 + }, + ... + ] + } + } + }, + { + "key": "windows", + "doc_count": 18119, + "to-answers": { + "doc_count": 24051, + "top-names": { + "buckets": [ + { + "key": "molly7244", + "doc_count": 265 + }, + { + "key": "david", + "doc_count": 27 + }, + { + "key": "chris", + "doc_count": 26 + }, + ... + ] + } + } + }, + { + "key": "osx", + "doc_count": 10971, + "to-answers": { + "doc_count": 5902, + "top-names": { + "buckets": [ + { + "key": "diago", + "doc_count": 4 + }, + { + "key": "albert", + "doc_count": 3 + }, + { + "key": "asmus", + "doc_count": 3 + }, + ... + ] + } + } + }, + { + "key": "ubuntu", + "doc_count": 8743, + "to-answers": { + "doc_count": 8784, + "top-names": { + "buckets": [ + { + "key": "ignacio", + "doc_count": 9 + }, + { + "key": "abrams", + "doc_count": 8 + }, + { + "key": "molly7244", + "doc_count": 8 + }, + ... + ] + } + } + }, + { + "key": "windows-xp", + "doc_count": 7517, + "to-answers": { + "doc_count": 13610, + "top-names": { + "buckets": [ + { + "key": "molly7244", + "doc_count": 232 + }, + { + "key": "chris", + "doc_count": 9 + }, + { + "key": "john", + "doc_count": 9 + }, + ... + ] + } + } + }, + { + "key": "networking", + "doc_count": 6739, + "to-answers": { + "doc_count": 2076, + "top-names": { + "buckets": [ + { + "key": "molly7244", + "doc_count": 6 + }, + { + "key": "alnitak", + "doc_count": 5 + }, + { + "key": "chris", + "doc_count": 3 + }, + ... + ] + } + } + }, + { + "key": "mac", + "doc_count": 5590, + "to-answers": { + "doc_count": 999, + "top-names": { + "buckets": [ + { + "key": "abrams", + "doc_count": 2 + }, + { + "key": "ignacio", + "doc_count": 2 + }, + { + "key": "vazquez", + "doc_count": 2 + }, + ... + ] + } + } + }, + { + "key": "wireless-networking", + "doc_count": 4409, + "to-answers": { + "doc_count": 6497, + "top-names": { + "buckets": [ + { + "key": "molly7244", + "doc_count": 61 + }, + { + "key": "chris", + "doc_count": 5 + }, + { + "key": "mike", + "doc_count": 5 + }, + ... + ] + } + } + }, + { + "key": "windows-8", + "doc_count": 3601, + "to-answers": { + "doc_count": 4263, + "top-names": { + "buckets": [ + { + "key": "molly7244", + "doc_count": 3 + }, + { + "key": "msft", + "doc_count": 2 + }, + { + "key": "user172132", + "doc_count": 2 + }, + ... + ] + } + } + } + ] + } + } +} +-------------------------------------------------- + +<1> The number of question documents with the tag `windows-server-2003`. +<2> The number of answer documents that are related to question documents with the tag `windows-server-2003`. \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/search/child/ConstantScorer.java b/src/main/java/org/elasticsearch/index/search/child/ConstantScorer.java index defae7de671..9fe80f0193a 100644 --- a/src/main/java/org/elasticsearch/index/search/child/ConstantScorer.java +++ b/src/main/java/org/elasticsearch/index/search/child/ConstantScorer.java @@ -28,9 +28,9 @@ import java.io.IOException; * A scorer that wraps a {@link DocIdSetIterator} and emits a constant score. */ // Borrowed from ConstantScoreQuery -class ConstantScorer extends Scorer { +public class ConstantScorer extends Scorer { - static ConstantScorer create(DocIdSetIterator iterator, Weight weight, float constantScore) throws IOException { + public static ConstantScorer create(DocIdSetIterator iterator, Weight weight, float constantScore) throws IOException { return new ConstantScorer(iterator, weight, constantScore); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java index d71212001f2..9f1f0a02469 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.search.aggregations.bucket.children.ChildrenBuilder; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridBuilder; @@ -106,6 +107,10 @@ public class AggregationBuilders { return new ReverseNestedBuilder(name); } + public static ChildrenBuilder children(String name) { + return new ChildrenBuilder(name); + } + public static GeoDistanceBuilder geoDistance(String name) { return new GeoDistanceBuilder(name); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index b8c8c7d4ea2..db005162d83 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations; import com.google.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; +import org.elasticsearch.search.aggregations.bucket.children.ChildrenParser; import org.elasticsearch.search.aggregations.bucket.filter.FilterParser; import org.elasticsearch.search.aggregations.bucket.filters.FiltersParser; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridParser; @@ -87,6 +88,7 @@ public class AggregationModule extends AbstractModule { parsers.add(ReverseNestedParser.class); parsers.add(TopHitsParser.class); parsers.add(GeoBoundsParser.class); + parsers.add(ChildrenParser.class); } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index e37e68df25d..fbf33ece6da 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.search.aggregations.bucket.children.InternalChildren; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.filters.InternalFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoHashGrid; @@ -95,5 +96,6 @@ public class TransportAggregationModule extends AbstractModule { InternalReverseNested.registerStream(); InternalTopHits.registerStreams(); InternalGeoBounds.registerStream(); + InternalChildren.registerStream(); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/children/Children.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/Children.java new file mode 100644 index 00000000000..e1ec2d54a61 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/Children.java @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.children; + +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; + +/** + */ +public interface Children extends SingleBucketAggregation { +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenBuilder.java new file mode 100644 index 00000000000..c456777a06e --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenBuilder.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.children; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilderException; + +import java.io.IOException; + +/** + */ +public class ChildrenBuilder extends AggregationBuilder { + + private String childType; + + public ChildrenBuilder(String name) { + super(name, InternalChildren.TYPE.name()); + } + + public ChildrenBuilder childType(String childType) { + this.childType = childType; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (childType == null) { + throw new SearchSourceBuilderException("child_type must be set on children aggregation [" + name + "]"); + } + builder.field("type", childType); + return builder.endObject(); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenParser.java new file mode 100644 index 00000000000..93ab8f7fff8 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ChildrenParser.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.children; + +import org.apache.lucene.search.Filter; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.FieldContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +/** + * + */ +public class ChildrenParser implements Aggregator.Parser { + + @Override + public String type() { + return InternalChildren.TYPE.name(); + } + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + String childType = null; + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("type".equals(currentFieldName)) { + childType = parser.text(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "]."); + } + } + + if (childType == null) { + throw new SearchParseException(context, "Missing [child_type] field for children aggregation [" + aggregationName + "]"); + } + + DocumentMapper childDocMapper = context.mapperService().documentMapper(childType); + if (childDocMapper == null) { + throw new SearchParseException(context, "[children] No mapping for for type [" + childType + "]"); + } + ParentFieldMapper parentFieldMapper = childDocMapper.parentFieldMapper(); + if (!parentFieldMapper.active()) { + throw new SearchParseException(context, "[children] _parent field not configured"); + } + + String parentType = parentFieldMapper.type(); + DocumentMapper parentDocMapper = context.mapperService().documentMapper(parentType); + if (parentDocMapper == null) { + throw new SearchParseException(context, "[children] Type [" + childType + "] points to a non existent parent type [" + parentType + "]"); + } + + Filter parentFilter = context.filterCache().cache(parentDocMapper.typeFilter()); + Filter childFilter = context.filterCache().cache(childDocMapper.typeFilter()); + + ParentChildIndexFieldData parentChildIndexFieldData = context.fieldData().getForField(parentFieldMapper); + ValuesSourceConfig config = new ValuesSourceConfig<>(ValuesSource.Bytes.WithOrdinals.ParentChild.class); + config.fieldContext(new FieldContext(parentFieldMapper.names().indexName(), parentChildIndexFieldData, parentFieldMapper)); + return new ParentToChildrenAggregator.Factory(aggregationName, config, parentType, parentFilter, childFilter); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/children/InternalChildren.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/InternalChildren.java new file mode 100644 index 00000000000..a7a6b9620c4 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/InternalChildren.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.children; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; + +import java.io.IOException; + +/** + */ +public class InternalChildren extends InternalSingleBucketAggregation implements Children { + + public static final Type TYPE = new Type("children"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalChildren readResult(StreamInput in) throws IOException { + InternalChildren result = new InternalChildren(); + result.readFrom(in); + return result; + } + }; + + public static void registerStream() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + public InternalChildren() { + } + + public InternalChildren(String name, long docCount, InternalAggregations aggregations) { + super(name, docCount, aggregations); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalChildren(name, docCount, subAggregations); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ParentToChildrenAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ParentToChildrenAggregator.java new file mode 100644 index 00000000000..701a26a9ec4 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/children/ParentToChildrenAggregator.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.children; + +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Filter; +import org.apache.lucene.search.join.FixedBitSetCachingWrapperFilter; +import org.apache.lucene.util.Bits; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.ReaderContextAware; +import org.elasticsearch.common.lucene.docset.DocIdSets; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.index.search.child.ConstantScorer; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this +// aggregation, for this reason that collector can't be used +public class ParentToChildrenAggregator extends SingleBucketAggregator implements ReaderContextAware { + + private final String parentType; + private final Filter childFilter; + private final Filter parentFilter; + private final ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource; + + // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, but then we don't have the reuse feature of BigArrays. + // Also if we know the highest possible value that a parent agg will create then we store multiple values into one slot + private final LongArray parentOrdToBuckets; + + // Only pay the extra storage price if the a parentOrd has multiple buckets + // Most of the times a parent doesn't have multiple buckets, since there is only one document per parent ord, + // only in the case of terms agg if a parent doc has multiple terms per field this is needed: + private final LongObjectPagedHashMap parentOrdToOtherBuckets; + private boolean multipleBucketsPerParentOrd = false; + + private List replay = new ArrayList<>(); + private SortedDocValues globalOrdinals; + private Bits parentDocs; + + public ParentToChildrenAggregator(String name, AggregatorFactories factories, AggregationContext aggregationContext, + Aggregator parent, String parentType, Filter childFilter, Filter parentFilter, + ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource, long maxOrd) { + super(name, factories, aggregationContext, parent); + this.parentType = parentType; + this.childFilter = childFilter; + // TODO: remove FixedBitSetCachingWrapperFilter once #7031 gets in + this.parentFilter = new FixedBitSetCachingWrapperFilter(parentFilter); + this.parentOrdToBuckets = aggregationContext.bigArrays().newLongArray(maxOrd, false); + this.parentOrdToBuckets.fill(0, maxOrd, -1); + this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(aggregationContext.bigArrays()); + this.valuesSource = valuesSource; + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) { + return new InternalChildren(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal)); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalChildren(name, 0, buildEmptySubAggregations()); + } + + @Override + public void collect(int docId, long bucketOrdinal) throws IOException { + if (parentDocs != null && parentDocs.get(docId)) { + long globalOrdinal = globalOrdinals.getOrd(docId); + if (globalOrdinal != -1) { + if (parentOrdToBuckets.get(globalOrdinal) == -1) { + parentOrdToBuckets.set(globalOrdinal, bucketOrdinal); + } else { + long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); + if (bucketOrds != null) { + bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); + bucketOrds[bucketOrds.length - 1] = bucketOrdinal; + parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds); + } else { + parentOrdToOtherBuckets.put(globalOrdinal, new long[]{bucketOrdinal}); + } + multipleBucketsPerParentOrd = true; + } + } + } + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + if (replay == null) { + return; + } + + globalOrdinals = valuesSource.globalOrdinalsValues(parentType); + assert globalOrdinals != null; + try { + DocIdSet parentDocIdSet = parentFilter.getDocIdSet(reader, null); + if (parentDocIdSet != null) { + parentDocs = parentDocIdSet.bits(); + } else { + parentDocs = null; + } + DocIdSet childDocIdSet = childFilter.getDocIdSet(reader, null); + if (globalOrdinals != null && !DocIdSets.isEmpty(childDocIdSet)) { + replay.add(reader); + } + } catch (IOException e) { + throw ExceptionsHelper.convertToElastic(e); + } + } + + @Override + protected void doPostCollection() throws IOException { + List replay = this.replay; + this.replay = null; + + for (AtomicReaderContext atomicReaderContext : replay) { + context.setNextReader(atomicReaderContext); + + SortedDocValues globalOrdinals = valuesSource.globalOrdinalsValues(parentType); + DocIdSet childDocIdSet = childFilter.getDocIdSet(atomicReaderContext, atomicReaderContext.reader().getLiveDocs()); + if (childDocIdSet == null) { + continue; + } + DocIdSetIterator childDocsIter = childDocIdSet.iterator(); + if (childDocsIter == null) { + continue; + } + + // Set the scorer, since we now replay only the child docIds + context.setScorer(ConstantScorer.create(childDocsIter, null, 1f)); + + for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) { + long globalOrdinal = globalOrdinals.getOrd(docId); + if (globalOrdinal != -1) { + long bucketOrd = parentOrdToBuckets.get(globalOrdinal); + if (bucketOrd != -1) { + collectBucket(docId, bucketOrd); + if (multipleBucketsPerParentOrd) { + long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); + if (otherBucketOrds != null) { + for (long otherBucketOrd : otherBucketOrds) { + collectBucket(docId, otherBucketOrd); + } + } + } + } + } + } + } + } + + @Override + protected void doClose() { + Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets); + } + + public static class Factory extends ValuesSourceAggregatorFactory { + + private final String parentType; + private final Filter parentFilter; + private final Filter childFilter; + + public Factory(String name, ValuesSourceConfig config, String parentType, Filter parentFilter, Filter childFilter) { + super(name, InternalChildren.TYPE.name(), config); + this.parentType = parentType; + this.parentFilter = parentFilter; + this.childFilter = childFilter; + } + + @Override + protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { + throw new ElasticsearchIllegalStateException("[children] aggregation doesn't support unmapped"); + } + + @Override + protected Aggregator create(ValuesSource.Bytes.WithOrdinals.ParentChild valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) { + long maxOrd = valuesSource.globalMaxOrd(aggregationContext.searchContext().searcher(), parentType); + return new ParentToChildrenAggregator(name, factories, aggregationContext, parent, parentType, childFilter, parentFilter, valuesSource, maxOrd); + } + + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index 39557e8f6ba..154e120bbe4 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexGeoPointFieldData; import org.elasticsearch.index.fielddata.IndexNumericFieldData; import org.elasticsearch.index.fielddata.IndexOrdinalsFieldData; +import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.internal.SearchContext; @@ -177,7 +178,9 @@ public class AggregationContext implements ReaderContextAware, ScorerAware { if (dataSource == null) { final IndexFieldData indexFieldData = config.fieldContext.indexFieldData(); ValuesSource.MetaData metaData = ValuesSource.MetaData.load(config.fieldContext.indexFieldData(), searchContext); - if (indexFieldData instanceof IndexOrdinalsFieldData) { + if (indexFieldData instanceof ParentChildIndexFieldData) { + dataSource = new ValuesSource.Bytes.WithOrdinals.ParentChild((ParentChildIndexFieldData) indexFieldData, metaData); + } else if (indexFieldData instanceof IndexOrdinalsFieldData) { dataSource = new ValuesSource.Bytes.WithOrdinals.FieldData((IndexOrdinalsFieldData) indexFieldData, metaData); } else { dataSource = new ValuesSource.Bytes.FieldData(indexFieldData, metaData); diff --git a/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java b/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java index 3fc38a8f29f..c77d209621a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java +++ b/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSource.java @@ -25,6 +25,8 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.lucene.ReaderContextAware; import org.elasticsearch.common.lucene.TopReaderContextAware; import org.elasticsearch.index.fielddata.*; +import org.elasticsearch.index.fielddata.plain.ParentChildAtomicFieldData; +import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData; import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric.WithScript.DoubleValues; import org.elasticsearch.search.aggregations.support.ValuesSource.WithScript.BytesValues; @@ -287,7 +289,63 @@ public abstract class ValuesSource { } } } + } + public static class ParentChild extends Bytes implements ReaderContextAware, TopReaderContextAware { + + protected final ParentChildIndexFieldData indexFieldData; + protected final MetaData metaData; + + protected AtomicParentChildFieldData atomicFieldData; + protected IndexParentChildFieldData globalFieldData; + + private long maxOrd = -1; + + public ParentChild(ParentChildIndexFieldData indexFieldData, MetaData metaData) { + this.indexFieldData = indexFieldData; + this.metaData = metaData; + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + atomicFieldData = globalFieldData.load(reader); + } + + @Override + public void setNextReader(IndexReaderContext reader) { + globalFieldData = indexFieldData.loadGlobal(reader.reader()); + } + + public SortedDocValues globalOrdinalsValues(String type) { + return atomicFieldData.getOrdinalsValues(type); + } + + public long globalMaxOrd(IndexSearcher indexSearcher, String type) { + if (maxOrd != -1) { + return maxOrd; + } + + IndexReader indexReader = indexSearcher.getIndexReader(); + if (indexReader.leaves().isEmpty()) { + return maxOrd = 0; + } else { + AtomicReaderContext atomicReaderContext = indexReader.leaves().get(0); + IndexParentChildFieldData globalFieldData = indexFieldData.loadGlobal(indexReader); + AtomicParentChildFieldData afd = globalFieldData.load(atomicReaderContext); + SortedDocValues values = afd.getOrdinalsValues(type); + return maxOrd = values.getValueCount(); + } + } + + @Override + public SortedBinaryDocValues bytesValues() { + throw new UnsupportedOperationException(); + } + + @Override + public MetaData metaData() { + return metaData; + } } public static class FieldData extends Bytes implements ReaderContextAware { diff --git a/src/test/java/org/elasticsearch/benchmark/search/child/ChildSearchBenchmark.java b/src/test/java/org/elasticsearch/benchmark/search/child/ChildSearchBenchmark.java index b8155147c88..82bbfca4909 100644 --- a/src/test/java/org/elasticsearch/benchmark/search/child/ChildSearchBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/search/child/ChildSearchBenchmark.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.node.Node; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.children.Children; import java.util.Arrays; @@ -158,6 +160,46 @@ public class ChildSearchBenchmark { } System.out.println("--> has_child filter with match_all child query, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms"); + + System.out.println("--> Running children agg"); + totalQueryTime = 0; + for (int j = 1; j <= QUERY_COUNT; j++) { + SearchResponse searchResponse = client.prepareSearch(indexName) + .setQuery(matchQuery("field1", parentChildIndexGenerator.getQueryValue())) + .addAggregation( + AggregationBuilders.children("to-child").childType("child") + ) + .execute().actionGet(); + totalQueryTime += searchResponse.getTookInMillis(); + if (searchResponse.getFailedShards() > 0) { + System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures())); + } + Children children = searchResponse.getAggregations().get("to-child"); + if (j % 10 == 0) { + System.out.println("--> children doc count [" + j + "], got [" + children.getDocCount() + "]"); + } + } + System.out.println("--> children agg, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms"); + + System.out.println("--> Running children agg with match_all"); + totalQueryTime = 0; + for (int j = 1; j <= QUERY_COUNT; j++) { + SearchResponse searchResponse = client.prepareSearch(indexName) + .addAggregation( + AggregationBuilders.children("to-child").childType("child") + ) + .execute().actionGet(); + totalQueryTime += searchResponse.getTookInMillis(); + if (searchResponse.getFailedShards() > 0) { + System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures())); + } + Children children = searchResponse.getAggregations().get("to-child"); + if (j % 10 == 0) { + System.out.println("--> children doc count [" + j + "], got [" + children.getDocCount() + "]"); + } + } + System.out.println("--> children agg, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms"); + // run parent child constant query for (int j = 0; j < QUERY_WARMUP; j++) { SearchResponse searchResponse = client.prepareSearch(indexName) diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/ChildrenTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/ChildrenTests.java new file mode 100644 index 00000000000..ccf8ba1d5ec --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/ChildrenTests.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.bucket.children.Children; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.tophits.TopHits; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.*; + +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + */ +@ElasticsearchIntegrationTest.SuiteScopeTest +public class ChildrenTests extends ElasticsearchIntegrationTest { + + private final static Map categoryToControl = new HashMap<>(); + + @Override + public void setupSuiteScopeCluster() throws Exception { + assertAcked( + prepareCreate("test") + .addMapping("article") + .addMapping("comment", "_parent", "type=article") + ); + + List requests = new ArrayList<>(); + String[] uniqueCategories = new String[randomIntBetween(1, 25)]; + for (int i = 0; i < uniqueCategories.length; i++) { + uniqueCategories[i] = Integer.toString(i); + } + int catIndex = 0; + + int numParentDocs = randomIntBetween(uniqueCategories.length, uniqueCategories.length * 5); + for (int i = 0; i < numParentDocs; i++) { + String id = Integer.toString(i); + + String[] categories = new String[randomIntBetween(1,1)]; + for (int j = 0; j < categories.length; j++) { + String category = categories[j] = uniqueCategories[catIndex++ % uniqueCategories.length]; + Control control = categoryToControl.get(category); + if (control == null) { + categoryToControl.put(category, control = new Control(category)); + } + control.articleIds.add(id); + } + + requests.add(client().prepareIndex("test", "article", id).setCreate(true).setSource("category", categories, "randomized", true)); + } + + String[] commenters = new String[randomIntBetween(5, 50)]; + for (int i = 0; i < commenters.length; i++) { + commenters[i] = Integer.toString(i); + } + + int id = 0; + for (Control control : categoryToControl.values()) { + for (String articleId : control.articleIds) { + int numChildDocsPerParent = randomIntBetween(0, 5); + for (int i = 0; i < numChildDocsPerParent; i++) { + String commenter = commenters[id % commenters.length]; + String idValue = Integer.toString(id++); + control.commentIds.add(idValue); + Set ids = control.commenterToCommentId.get(commenter); + if (ids == null) { + control.commenterToCommentId.put(commenter, ids = new HashSet<>()); + } + ids.add(idValue); + requests.add(client().prepareIndex("test", "comment", idValue).setCreate(true).setParent(articleId).setSource("commenter", commenter)); + } + } + } + + requests.add(client().prepareIndex("test", "article", "a").setSource("category", new String[]{"a"}, "randomized", false)); + requests.add(client().prepareIndex("test", "article", "b").setSource("category", new String[]{"a", "b"}, "randomized", false)); + requests.add(client().prepareIndex("test", "article", "c").setSource("category", new String[]{"a", "b", "c"}, "randomized", false)); + requests.add(client().prepareIndex("test", "article", "d").setSource("category", new String[]{"c"}, "randomized", false)); + requests.add(client().prepareIndex("test", "comment", "a").setParent("a").setSource("{}")); + requests.add(client().prepareIndex("test", "comment", "c").setParent("c").setSource("{}")); + + indexRandom(true, requests); + ensureSearchable("test"); + } + + @Test + public void testChildrenAggs() throws Exception { + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchQuery("randomized", true)) + .addAggregation( + terms("category").field("category").size(0).subAggregation( + children("to_comment").childType("comment").subAggregation( + terms("commenters").field("commenter").size(0).subAggregation( + topHits("top_comments") + )) + ) + ).get(); + assertSearchResponse(searchResponse); + + StringTerms categoryTerms = searchResponse.getAggregations().get("category"); + assertThat(categoryTerms.getBuckets().size(), equalTo(categoryToControl.size())); + for (Map.Entry entry1 : categoryToControl.entrySet()) { + Terms.Bucket categoryBucket = categoryTerms.getBucketByKey(entry1.getKey()); + assertThat(categoryBucket.getKey(), equalTo(entry1.getKey())); + assertThat(categoryBucket.getDocCount(), equalTo((long) entry1.getValue().articleIds.size())); + + Children childrenBucket = categoryBucket.getAggregations().get("to_comment"); + assertThat(childrenBucket.getName(), equalTo("to_comment")); + assertThat(childrenBucket.getDocCount(), equalTo((long) entry1.getValue().commentIds.size())); + + StringTerms commentersTerms = childrenBucket.getAggregations().get("commenters"); + assertThat(commentersTerms.getBuckets().size(), equalTo(entry1.getValue().commenterToCommentId.size())); + for (Map.Entry> entry2 : entry1.getValue().commenterToCommentId.entrySet()) { + Terms.Bucket commentBucket = commentersTerms.getBucketByKey(entry2.getKey()); + assertThat(commentBucket.getKey(), equalTo(entry2.getKey())); + assertThat(commentBucket.getDocCount(), equalTo((long) entry2.getValue().size())); + + TopHits topHits = commentBucket.getAggregations().get("top_comments"); + for (SearchHit searchHit : topHits.getHits().getHits()) { + assertThat(entry2.getValue().contains(searchHit.getId()), is(true)); + } + } + } + } + + @Test + public void testParentWithMultipleBuckets() throws Exception { + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(matchQuery("randomized", false)) + .addAggregation( + terms("category").field("category").size(0).subAggregation( + children("to_comment").childType("comment").subAggregation(topHits("top_comments").addSort("_uid", SortOrder.ASC)) + ) + ).get(); + assertSearchResponse(searchResponse); + + StringTerms categoryTerms = searchResponse.getAggregations().get("category"); + assertThat(categoryTerms.getBuckets().size(), equalTo(3)); + + Terms.Bucket categoryBucket = categoryTerms.getBucketByKey("a"); + assertThat(categoryBucket.getKey(), equalTo("a")); + assertThat(categoryBucket.getDocCount(), equalTo(3l)); + + Children childrenBucket = categoryBucket.getAggregations().get("to_comment"); + assertThat(childrenBucket.getName(), equalTo("to_comment")); + assertThat(childrenBucket.getDocCount(), equalTo(2l)); + TopHits topHits = childrenBucket.getAggregations().get("top_comments"); + assertThat(topHits.getHits().getAt(0).getId(), equalTo("a")); + assertThat(topHits.getHits().getAt(1).getId(), equalTo("c")); + + categoryBucket = categoryTerms.getBucketByKey("b"); + assertThat(categoryBucket.getKey(), equalTo("b")); + assertThat(categoryBucket.getDocCount(), equalTo(2l)); + + childrenBucket = categoryBucket.getAggregations().get("to_comment"); + assertThat(childrenBucket.getName(), equalTo("to_comment")); + assertThat(childrenBucket.getDocCount(), equalTo(1l)); + + categoryBucket = categoryTerms.getBucketByKey("c"); + assertThat(categoryBucket.getKey(), equalTo("c")); + assertThat(categoryBucket.getDocCount(), equalTo(2l)); + + childrenBucket = categoryBucket.getAggregations().get("to_comment"); + assertThat(childrenBucket.getName(), equalTo("to_comment")); + assertThat(childrenBucket.getDocCount(), equalTo(1l)); + } + + private static final class Control { + + final String category; + final Set articleIds = new HashSet<>(); + final Set commentIds = new HashSet<>(); + final Map> commenterToCommentId = new HashMap<>(); + + private Control(String category) { + this.category = category; + } + } + +}