From 63db34f649d1de038c30d61f3c5e17e059b19b69 Mon Sep 17 00:00:00 2001 From: markharwood Date: Mon, 23 Mar 2015 13:00:44 +0000 Subject: [PATCH] =?UTF-8?q?New=20feature=20-=20Sampler=20aggregation=20use?= =?UTF-8?q?d=20to=20limit=20any=20nested=20aggregations'=20processing=20to?= =?UTF-8?q?=20a=20sample=20of=20the=20top-scoring=20documents.=20Optionall?= =?UTF-8?q?y,=20a=20=E2=80=9Cdiversify=E2=80=9D=20setting=20can=20limit=20?= =?UTF-8?q?the=20number=20of=20collected=20matches=20that=20share=20a=20co?= =?UTF-8?q?mmon=20value=20such=20as=20an=20"author".?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #8108 --- .../bucket/sampler-aggregation.asciidoc | 154 ++++++++++ .../aggregations/AggregationModule.java | 3 + .../search/aggregations/AggregatorBase.java | 9 +- .../TransportAggregationModule.java | 4 + .../bucket/BestBucketsDeferringCollector.java | 191 +++++++++++++ .../bucket/BestDocsDeferringCollector.java | 239 ++++++++++++++++ .../bucket/DeferringBucketCollector.java | 238 +++++----------- ...DiversifiedBytesHashSamplerAggregator.java | 121 ++++++++ .../DiversifiedMapSamplerAggregator.java | 133 +++++++++ .../DiversifiedNumericSamplerAggregator.java | 111 ++++++++ .../DiversifiedOrdinalsSamplerAggregator.java | 119 ++++++++ .../bucket/sampler/InternalSampler.java | 65 +++++ .../aggregations/bucket/sampler/Sampler.java | 29 ++ .../sampler/SamplerAggregationBuilder.java | 80 ++++++ .../bucket/sampler/SamplerAggregator.java | 264 ++++++++++++++++++ .../bucket/sampler/SamplerParser.java | 104 +++++++ .../bucket/sampler/UnmappedSampler.java | 80 ++++++ .../aggregations/bucket/SamplerTests.java | 262 +++++++++++++++++ 18 files changed, 2033 insertions(+), 173 deletions(-) create mode 100644 docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java diff --git a/docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc new file mode 100644 index 00000000000..5ad9dbc0194 --- /dev/null +++ b/docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc @@ -0,0 +1,154 @@ +[[search-aggregations-bucket-sampler-aggregation]] +=== Sampler Aggregation + +experimental[] + +A filtering aggregation used to limit any sub aggregations' processing to a sample of the top-scoring documents. +Optionally, diversity settings can be used to limit the number of matches that share a common value such as an "author". + +.Example use cases: +* Tightening the focus of analytics to high-relevance matches rather than the potentially very long tail of low-quality matches +* Removing bias from analytics by ensuring fair representation of content from different sources +* Reducing the running cost of aggregations that can produce useful results using only samples e.g. `significant_terms` + + +Example: + +[source,js] +-------------------------------------------------- +{ + "query": { + "match": { + "text": "iphone" + } + }, + "aggs": { + "sample": { + "sampler": { + "shard_size": 200, + "field" : "user.id" + }, + "aggs": { + "keywords": { + "significant_terms": { + "field": "text" + } + } + } + } + } +} +-------------------------------------------------- + +Response: + +[source,js] +-------------------------------------------------- +{ + ... + "aggregations": { + "sample": { + "doc_count": 1000,<1> + "keywords": {<2> + "doc_count": 1000, + "buckets": [ + ... + { + "key": "bend", + "doc_count": 58, + "score": 37.982536582524276, + "bg_count": 103 + }, + .... +} +-------------------------------------------------- + +<1> 1000 documents were sampled in total becase we asked for a maximum of 200 from an index with 5 shards. The cost of performing the nested significant_terms aggregation was therefore limited rather than unbounded. +<2> The results of the significant_terms aggregation are not skewed by any single over-active Twitter user because we asked for a maximum of one tweet from any one user in our sample. + + +==== shard_size + +The `shard_size` parameter limits how many top-scoring documents are collected in the sample processed on each shard. +The default value is 100. + +=== Controlling diversity +Optionally, you can use the `field` or `script` and `max_docs_per_value` settings to control the maximum number of documents collected on any one shard which share a common value. +The choice of value (e.g. `author`) is loaded from a regular `field` or derived dynamically by a `script`. + +The aggregation will throw an error if the choice of field or script produces multiple values for a document. +It is currently not possible to offer this form of de-duplication using many values, primarily due to concerns over efficiency. + +NOTE: Any good market researcher will tell you that when working with samples of data it is important +that the sample represents a healthy variety of opinions rather than being skewed by any single voice. +The same is true with aggregations and sampling with these diversify settings can offer a way to remove the bias in your content (an over-populated geography, a large spike in a timeline or an over-active forum spammer). + +==== Field + +Controlling diversity using a field: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sample" : { + "sampler" : { + "field" : "author", + "max_docs_per_value" : 3 + } + } + } +} +-------------------------------------------------- + +Note that the `max_docs_per_value` setting applies on a per-shard basis only for the purposes of shard-local sampling. +It is not intended as a way of providing a global de-duplication feature on search results. + + + +==== Script + +Controlling diversity using a script: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sample" : { + "sampler" : { + "script" : "doc['author'].value + '/' + doc['genre'].value" + } + } + } +} +-------------------------------------------------- +Note in the above example we chose to use the default `max_docs_per_value` setting of 1 and combine author and genre fields to ensure +each shard sample has, at most, one match for an author/genre pair. + + +==== execution_hint + +When using the settings to control diversity, the optional `execution_hint` setting can influence the management of the values used for de-duplication. +Each option will hold up to `shard_size` values in memory while performing de-duplication but the type of value held can be controlled as follows: + + - hold field values directly (`map`) + - hold ordinals of the field as determined by the Lucene index (`global_ordinals`) + - hold hashes of the field values - with potential for hash collisions (`bytes_hash`) + +The default setting is to use `global_ordinals` if this information is available from the Lucene index and reverting to `map` if not. +The `bytes_hash` setting may prove faster in some cases but introduces the possibility of false positives in de-duplication logic due to the possibility of hash collisions. +Please note that Elasticsearch will ignore the choice of execution hint if it is not applicable and that there is no backward compatibility guarantee on these hints. + +=== Limitations + +==== Cannot be nested under `breadth_first` aggregations +Being a quality-based filter the sampler aggregation needs access to the relevance score produced for each document. +It therefore cannot be nested under a `terms` aggregation which has the `collect_mode` switched from the default `depth_first` mode to `breadth_first` as this discards scores. +In this situation an error will be thrown. + +==== Limited de-dup logic. +The de-duplication logic in the diversify settings applies only at a shard level so will not apply across shards. + +==== No specialized syntax for geo/date fields +Currently the syntax for defining the diversifying values is defined by a choice of `field` or `script` - there is no added syntactical sugar for expressing geo or date units such as "1w" (1 week). +This support may be added in a later release and users will currently have to create these sorts of values using a script. \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 2feaf112104..607757fb682 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; @@ -38,6 +39,7 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeParser; import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeParser; import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser; import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerParser; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsParser; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificantTermsHeuristicModule; import org.elasticsearch.search.aggregations.bucket.terms.TermsParser; @@ -80,6 +82,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ parsers.add(MissingParser.class); parsers.add(FilterParser.class); parsers.add(FiltersParser.class); + parsers.add(SamplerParser.class); parsers.add(TermsParser.class); parsers.add(SignificantTermsParser.class); parsers.add(RangeParser.class); diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 4d83603a088..9a2fa3a8a57 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector; import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; @@ -136,7 +137,7 @@ public abstract class AggregatorBase extends Aggregator { for (int i = 0; i < subAggregators.length; ++i) { if (shouldDefer(subAggregators[i])) { if (recordingWrapper == null) { - recordingWrapper = new DeferringBucketCollector(); + recordingWrapper = getDeferringCollector(); } deferredCollectors.add(subAggregators[i]); subAggregators[i] = recordingWrapper.wrap(subAggregators[i]); @@ -153,6 +154,12 @@ public abstract class AggregatorBase extends Aggregator { collectableSubAggregators.preCollection(); } + public DeferringBucketCollector getDeferringCollector() { + // Default impl is a collector that selects the best buckets + // but an alternative defer policy may be based on best docs. + return new BestBucketsDeferringCollector(); + } + /** * This method should be overidden by subclasses that want to defer calculation * of a child aggregation until a first pass is complete and a set of buckets has diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index ce09d1e5c69..a45b9b9857a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -36,6 +36,8 @@ import org.elasticsearch.search.aggregations.bucket.range.InternalRange; import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange; import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance; import org.elasticsearch.search.aggregations.bucket.range.ipv4.InternalIPv4Range; +import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; +import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler; import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms; @@ -83,6 +85,8 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM InternalGlobal.registerStreams(); InternalFilter.registerStreams(); InternalFilters.registerStream(); + InternalSampler.registerStreams(); + UnmappedSampler.registerStreams(); InternalMissing.registerStreams(); StringTerms.registerStreams(); LongTerms.registerStreams(); diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java new file mode 100644 index 00000000000..f0c0294b4d7 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -0,0 +1,191 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedLongValues; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A specialization of {@link DeferringBucketCollector} that collects all + * matches and then is able to replay a given subset of buckets which represent + * the survivors from a pruning process performed by the aggregator that owns + * this collector. + */ +public class BestBucketsDeferringCollector extends DeferringBucketCollector { + private static class Entry { + final LeafReaderContext context; + final PackedLongValues docDeltas; + final PackedLongValues buckets; + + public Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) { + this.context = context; + this.docDeltas = docDeltas; + this.buckets = buckets; + } + } + + final List entries = new ArrayList<>(); + BucketCollector collector; + LeafReaderContext context; + PackedLongValues.Builder docDeltas; + PackedLongValues.Builder buckets; + long maxBucket = -1; + boolean finished = false; + LongHash selectedBuckets; + + /** Sole constructor. */ + public BestBucketsDeferringCollector() { + } + + @Override + public boolean needsScores() { + if (collector == null) { + throw new ElasticsearchIllegalStateException(); + } + return collector.needsScores(); + } + + /** Set the deferred collectors. */ + public void setDeferredCollector(Iterable deferredCollectors) { + this.collector = BucketCollector.wrap(deferredCollectors); + } + + private void finishLeaf() { + if (context != null) { + entries.add(new Entry(context, docDeltas.build(), buckets.build())); + } + context = null; + docDeltas = null; + buckets = null; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + finishLeaf(); + + context = ctx; + docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); + + return new LeafBucketCollector() { + int lastDoc = 0; + + @Override + public void collect(int doc, long bucket) throws IOException { + docDeltas.add(doc - lastDoc); + buckets.add(bucket); + lastDoc = doc; + maxBucket = Math.max(maxBucket, bucket); + } + }; + } + + @Override + public void preCollection() throws IOException { + } + + @Override + public void postCollection() throws IOException { + finishLeaf(); + finished = true; + } + + /** + * Replay the wrapped collector, but only on a selection of buckets. + */ + @Override + public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { + if (!finished) { + throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); + } + if (this.selectedBuckets != null) { + throw new ElasticsearchIllegalStateException("Already been replayed"); + } + + final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); + for (long bucket : selectedBuckets) { + hash.add(bucket); + } + this.selectedBuckets = hash; + + collector.preCollection(); + if (collector.needsScores()) { + throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed"); + } + + for (Entry entry : entries) { + final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); + leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST + + " collection mode is that scores cannot be buffered along with document IDs")); + final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); + final PackedLongValues.Iterator buckets = entry.buckets.iterator(); + int doc = 0; + for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { + doc += docDeltaIterator.next(); + final long bucket = buckets.next(); + final long rebasedBucket = hash.find(bucket); + if (rebasedBucket != -1) { + leafCollector.collect(doc, rebasedBucket); + } + } + } + + collector.postCollection(); + } + + /** + * Wrap the provided aggregator so that it behaves (almost) as if it had + * been collected directly. + */ + @Override + public Aggregator wrap(final Aggregator in) { + + return new WrappedAggregator(in) { + + @Override + public InternalAggregation buildAggregation(long bucket) throws IOException { + if (selectedBuckets == null) { + throw new ElasticsearchIllegalStateException("Collection has not been replayed yet."); + } + final long rebasedBucket = selectedBuckets.find(bucket); + if (rebasedBucket == -1) { + throw new ElasticsearchIllegalStateException("Cannot build for a bucket which has not been collected"); + } + return in.buildAggregation(rebasedBucket); + } + + }; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java new file mode 100644 index 00000000000..437e642d7e6 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java @@ -0,0 +1,239 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.search.TopScoreDocCollector; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * A specialization of {@link DeferringBucketCollector} that collects all + * matches and then replays only the top scoring documents to child + * aggregations. The method + * {@link BestDocsDeferringCollector#createTopDocsCollector(int)} is designed to + * be overridden and allows subclasses to choose a custom collector + * implementation for determining the top N matches. + * + */ + +public class BestDocsDeferringCollector extends DeferringBucketCollector { + final List entries = new ArrayList<>(); + BucketCollector deferred; + TopDocsCollector tdc; + boolean finished = false; + private int shardSize; + private PerSegmentCollects perSegCollector; + private int matchedDocs; + + /** + * Sole constructor. + * + * @param shardSize + */ + public BestDocsDeferringCollector(int shardSize) { + this.shardSize = shardSize; + } + + + @Override + public boolean needsScores() { + return true; + } + + /** Set the deferred collectors. */ + public void setDeferredCollector(Iterable deferredCollectors) { + this.deferred = BucketCollector.wrap(deferredCollectors); + try { + tdc = createTopDocsCollector(shardSize); + } catch (IOException e) { + throw new ElasticsearchException("IO error creating collector", e); + } + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + // finishLeaf(); + perSegCollector = new PerSegmentCollects(ctx); + entries.add(perSegCollector); + + // Deferring collector + return new LeafBucketCollector() { + @Override + public void setScorer(Scorer scorer) throws IOException { + perSegCollector.setScorer(scorer); + } + + @Override + public void collect(int doc, long bucket) throws IOException { + perSegCollector.collect(doc); + } + }; + } + + // Designed to be overridden by subclasses that may score docs by criteria + // other than Lucene score + protected TopDocsCollector createTopDocsCollector(int size) throws IOException { + return TopScoreDocCollector.create(size); + } + + @Override + public void preCollection() throws IOException { + } + + @Override + public void postCollection() throws IOException { + finished = true; + } + + /** + * Replay the wrapped collector, but only on a selection of buckets. + */ + @Override + public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { + if (!finished) { + throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); + } + if (selectedBuckets.length > 1) { + throw new ElasticsearchIllegalStateException("Collection only supported on a single bucket"); + } + + deferred.preCollection(); + + TopDocs topDocs = tdc.topDocs(); + ScoreDoc[] sd = topDocs.scoreDocs; + matchedDocs = sd.length; + // Sort the top matches by docID for the benefit of deferred collector + Arrays.sort(sd, new Comparator() { + @Override + public int compare(ScoreDoc o1, ScoreDoc o2) { + return o1.doc - o2.doc; + } + }); + try { + for (PerSegmentCollects perSegDocs : entries) { + perSegDocs.replayRelatedMatches(sd); + } + // deferred.postCollection(); + } catch (IOException e) { + throw new ElasticsearchException("IOException collecting best scoring results", e); + } + deferred.postCollection(); + } + + class PerSegmentCollects extends Scorer { + private LeafReaderContext readerContext; + int maxDocId = Integer.MIN_VALUE; + private float currentScore; + private int currentDocId = -1; + private LeafCollector currentLeafCollector; + + PerSegmentCollects(LeafReaderContext readerContext) throws IOException { + // The publisher behaviour for Reader/Scorer listeners triggers a + // call to this constructor with a null scorer so we can't call + // scorer.getWeight() and pass the Weight to our base class. + // However, passing null seems to have no adverse effects here... + super(null); + this.readerContext = readerContext; + currentLeafCollector = tdc.getLeafCollector(readerContext); + + } + + public void setScorer(Scorer scorer) throws IOException { + currentLeafCollector.setScorer(scorer); + } + + public void replayRelatedMatches(ScoreDoc[] sd) throws IOException { + final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext); + leafCollector.setScorer(this); + + currentScore = 0; + currentDocId = -1; + if (maxDocId < 0) { + return; + } + for (ScoreDoc scoreDoc : sd) { + // Doc ids from TopDocCollector are root-level Reader so + // need rebasing + int rebased = scoreDoc.doc - readerContext.docBase; + if ((rebased >= 0) && (rebased <= maxDocId)) { + currentScore = scoreDoc.score; + currentDocId = rebased; + leafCollector.collect(rebased, 0); + } + } + + } + + @Override + public float score() throws IOException { + return currentScore; + } + + @Override + public int freq() throws IOException { + throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()"); + } + + @Override + public int docID() { + return currentDocId; + } + + @Override + public int nextDoc() throws IOException { + throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()"); + } + + @Override + public int advance(int target) throws IOException { + throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()"); + } + + @Override + public long cost() { + throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()"); + } + + public void collect(int docId) throws IOException { + currentLeafCollector.collect(docId); + maxDocId = Math.max(maxDocId, docId); + } + } + + + public int getDocCount() { + return matchedDocs; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index 09686e662d5..b0f2693e9eb 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -20,218 +20,112 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.packed.PackedInts; -import org.apache.lucene.util.packed.PackedLongValues; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.LongHash; import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; -import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; /** * A {@link BucketCollector} that records collected doc IDs and buckets and * allows to replay a subset of the collected buckets. */ -public final class DeferringBucketCollector extends BucketCollector { - - private static class Entry { - final LeafReaderContext context; - final PackedLongValues docDeltas; - final PackedLongValues buckets; - - public Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) { - this.context = context; - this.docDeltas = docDeltas; - this.buckets = buckets; - } - } - - final List entries = new ArrayList<>(); - BucketCollector collector; - LeafReaderContext context; - PackedLongValues.Builder docDeltas; - PackedLongValues.Builder buckets; - long maxBucket = -1; - boolean finished = false; - LongHash selectedBuckets; +public abstract class DeferringBucketCollector extends BucketCollector { + private BucketCollector collector; /** Sole constructor. */ public DeferringBucketCollector() {} - @Override - public boolean needsScores() { - if (collector == null) { - throw new ElasticsearchIllegalStateException(); - } - return false; - } - /** Set the deferred collectors. */ public void setDeferredCollector(Iterable deferredCollectors) { this.collector = BucketCollector.wrap(deferredCollectors); } + - private void finishLeaf() { - if (context != null) { - entries.add(new Entry(context, docDeltas.build(), buckets.build())); - } - context = null; - docDeltas = null; - buckets = null; + public final void replay(long... selectedBuckets) throws IOException + { + prepareSelectedBuckets(selectedBuckets); } - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - finishLeaf(); - - context = ctx; - docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); - - return new LeafBucketCollector() { - int lastDoc = 0; - @Override - public void collect(int doc, long bucket) throws IOException { - docDeltas.add(doc - lastDoc); - buckets.add(bucket); - lastDoc = doc; - maxBucket = Math.max(maxBucket, bucket); - } - }; - } - - @Override - public void preCollection() throws IOException { - } - - @Override - public void postCollection() throws IOException { - finishLeaf(); - finished = true; - } + public abstract void prepareSelectedBuckets(long... selectedBuckets) throws IOException; /** - * Replay the wrapped collector, but only on a selection of buckets. - */ - public void replay(long... selectedBuckets) throws IOException { - if (!finished) { - throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); - } - if (this.selectedBuckets != null) { - throw new ElasticsearchIllegalStateException("Alerady been replayed"); - } - - final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); - for (long bucket : selectedBuckets) { - hash.add(bucket); - } - this.selectedBuckets = hash; - - collector.preCollection(); - if (collector.needsScores()) { - throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed"); - } - - for (Entry entry : entries) { - final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); - leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST - + " collection mode is that scores cannot be buffered along with document IDs")); - final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); - final PackedLongValues.Iterator buckets = entry.buckets.iterator(); - int doc = 0; - for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { - doc += docDeltaIterator.next(); - final long bucket = buckets.next(); - final long rebasedBucket = hash.find(bucket); - if (rebasedBucket != -1) { - leafCollector.collect(doc, rebasedBucket); - } - } - } - - collector.postCollection(); - } - - /** - * Wrap the provided aggregator so that it behaves (almost) as if it had been - * collected directly. + * Wrap the provided aggregator so that it behaves (almost) as if it had + * been collected directly. */ public Aggregator wrap(final Aggregator in) { - return new Aggregator() { + return new WrappedAggregator(in); + } - @Override - public boolean needsScores() { - return in.needsScores(); - } + protected class WrappedAggregator extends Aggregator { + private Aggregator in; - @Override - public void close() throws ElasticsearchException { - in.close(); - } + WrappedAggregator(Aggregator in) { + this.in = in; + } - @Override - public String name() { - return in.name(); - } + @Override + public boolean needsScores() { + return in.needsScores(); + } - @Override - public Aggregator parent() { - return in.parent(); - } + @Override + public void close() throws ElasticsearchException { + in.close(); + } - @Override - public AggregationContext context() { - return in.context(); - } + @Override + public String name() { + return in.name(); + } - @Override - public Aggregator subAggregator(String name) { - return in.subAggregator(name); - } + @Override + public Aggregator parent() { + return in.parent(); + } - @Override - public InternalAggregation buildAggregation(long bucket) throws IOException { - if (selectedBuckets == null) { - throw new ElasticsearchIllegalStateException("Collection has not been replayed yet."); - } - final long rebasedBucket = selectedBuckets.find(bucket); - if (rebasedBucket == -1) { - throw new ElasticsearchIllegalStateException("Cannot build for a bucket which has not been collected"); - } - return in.buildAggregation(rebasedBucket); - } + @Override + public AggregationContext context() { + return in.context(); + } - @Override - public InternalAggregation buildEmptyAggregation() { - return in.buildEmptyAggregation(); - } + @Override + public Aggregator subAggregator(String name) { + return in.subAggregator(name); + } - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); - } + @Override + public InternalAggregation buildAggregation(long bucket) throws IOException { + return in.buildAggregation(bucket); + } - @Override - public void preCollection() throws IOException { - throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); - } + @Override + public InternalAggregation buildEmptyAggregation() { + return in.buildEmptyAggregation(); + } - @Override - public void postCollection() throws IOException { - throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); - } + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + throw new ElasticsearchIllegalStateException( + "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); + } + + @Override + public void preCollection() throws IOException { + throw new ElasticsearchIllegalStateException( + "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); + } + + @Override + public void postCollection() throws IOException { + throw new ElasticsearchIllegalStateException( + "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper."); + } - }; } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java new file mode 100644 index 00000000000..c74df049d12 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java @@ -0,0 +1,121 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DiversifiedTopDocsCollector; +import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +import java.io.IOException; +import java.util.Map; + +/** + * Alternative, faster implementation for converting String keys to longs but + * with the potential for hash collisions. + */ +public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator { + + private ValuesSource valuesSource; + private int maxDocsPerValue; + + public DiversifiedBytesHashSamplerAggregator(String name, int shardSize, AggregatorFactories factories, + AggregationContext aggregationContext, Aggregator parent, Map metaData, ValuesSource valuesSource, + int maxDocsPerValue) throws IOException { + super(name, shardSize, factories, aggregationContext, parent, metaData); + this.valuesSource = valuesSource; + this.maxDocsPerValue = maxDocsPerValue; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + bdd = new DiverseDocsDeferringCollector(); + return bdd; + } + + /** + * A {@link DeferringBucketCollector} that identifies top scoring documents + * but de-duped by a key then passes only these on to nested collectors. + * This implementation is only for use with a single bucket aggregation. + */ + class DiverseDocsDeferringCollector extends BestDocsDeferringCollector { + + public DiverseDocsDeferringCollector() { + super(shardSize); + } + + + @Override + protected TopDocsCollector createTopDocsCollector(int size) { + return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue); + } + + // This class extends the DiversifiedTopDocsCollector and provides + // a lookup from elasticsearch's ValuesSource + class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector { + + private SortedBinaryDocValues values; + + public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerValue) { + super(numHits, maxHitsPerValue); + + } + + @Override + protected NumericDocValues getKeys(LeafReaderContext context) { + try { + values = valuesSource.bytesValues(context); + } catch (IOException e) { + throw new ElasticsearchException("Error reading values", e); + } + return new NumericDocValues() { + @Override + public long get(int doc) { + + values.setDocument(doc); + final int valuesCount = values.count(); + if (valuesCount > 1) { + throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field"); + } + if (valuesCount == 1) { + final BytesRef bytes = values.valueAt(0); + return bytes.hashCode(); + } + return 0; + } + }; + } + + } + + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java new file mode 100644 index 00000000000..bf196245ce1 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DiversifiedTopDocsCollector; +import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey; +import org.apache.lucene.search.TopDocsCollector; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +import java.io.IOException; +import java.util.Map; + +public class DiversifiedMapSamplerAggregator extends SamplerAggregator { + + private ValuesSource valuesSource; + private int maxDocsPerValue; + private BytesRefHash bucketOrds; + + public DiversifiedMapSamplerAggregator(String name, int shardSize, AggregatorFactories factories, + AggregationContext aggregationContext, Aggregator parent, Map metaData, ValuesSource valuesSource, + int maxDocsPerValue) throws IOException { + super(name, shardSize, factories, aggregationContext, parent, metaData); + this.valuesSource = valuesSource; + this.maxDocsPerValue = maxDocsPerValue; + bucketOrds = new BytesRefHash(shardSize, aggregationContext.bigArrays()); + + } + + @Override + protected void doClose() { + Releasables.close(bucketOrds); + super.doClose(); + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + bdd = new DiverseDocsDeferringCollector(); + return bdd; + } + + /** + * A {@link DeferringBucketCollector} that identifies top scoring documents + * but de-duped by a key then passes only these on to nested collectors. + * This implementation is only for use with a single bucket aggregation. + */ + class DiverseDocsDeferringCollector extends BestDocsDeferringCollector { + + public DiverseDocsDeferringCollector() { + super(shardSize); + } + + + @Override + protected TopDocsCollector createTopDocsCollector(int size) { + return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue); + } + + // This class extends the DiversifiedTopDocsCollector and provides + // a lookup from elasticsearch's ValuesSource + class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector { + + private SortedBinaryDocValues values; + + public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) { + super(numHits, maxHitsPerKey); + + } + + @Override + protected NumericDocValues getKeys(LeafReaderContext context) { + try { + values = valuesSource.bytesValues(context); + } catch (IOException e) { + throw new ElasticsearchException("Error reading values", e); + } + return new NumericDocValues() { + @Override + public long get(int doc) { + + values.setDocument(doc); + final int valuesCount = values.count(); + if (valuesCount > 1) { + throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field"); + } + if (valuesCount == 1) { + final BytesRef bytes = values.valueAt(0); + + long bucketOrdinal = bucketOrds.add(bytes); + if (bucketOrdinal < 0) { // already seen + bucketOrdinal = -1 - bucketOrdinal; + } + return bucketOrdinal; + } + return 0; + } + }; + } + + } + + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java new file mode 100644 index 00000000000..e5f963ed3ef --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DiversifiedTopDocsCollector; +import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey; +import org.apache.lucene.search.TopDocsCollector; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +import java.io.IOException; +import java.util.Map; + +public class DiversifiedNumericSamplerAggregator extends SamplerAggregator { + + private ValuesSource.Numeric valuesSource; + private int maxDocsPerValue; + + public DiversifiedNumericSamplerAggregator(String name, int shardSize, AggregatorFactories factories, + AggregationContext aggregationContext, Aggregator parent, Map metaData, ValuesSource.Numeric valuesSource, + int maxDocsPerValue) throws IOException { + super(name, shardSize, factories, aggregationContext, parent, metaData); + this.valuesSource = valuesSource; + this.maxDocsPerValue = maxDocsPerValue; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + bdd = new DiverseDocsDeferringCollector(); + return bdd; + } + + /** + * A {@link DeferringBucketCollector} that identifies top scoring documents + * but de-duped by a key then passes only these on to nested collectors. + * This implementation is only for use with a single bucket aggregation. + */ + class DiverseDocsDeferringCollector extends BestDocsDeferringCollector { + public DiverseDocsDeferringCollector() { + super(shardSize); + } + + @Override + protected TopDocsCollector createTopDocsCollector(int size) { + return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue); + } + + // This class extends the DiversifiedTopDocsCollector and provides + // a lookup from elasticsearch's ValuesSource + class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector { + + private SortedNumericDocValues values; + + public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) { + super(numHits, maxHitsPerKey); + } + + @Override + protected NumericDocValues getKeys(LeafReaderContext context) { + try { + values = valuesSource.longValues(context); + } catch (IOException e) { + throw new ElasticsearchException("Error reading values", e); + } + return new NumericDocValues() { + @Override + public long get(int doc) { + values.setDocument(doc); + final int valuesCount = values.count(); + if (valuesCount > 1) { + throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field"); + } + if (valuesCount == 1) { + return values.valueAt(0); + } + return Long.MIN_VALUE; + } + }; + } + + } + + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java new file mode 100644 index 00000000000..808acc49883 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.RandomAccessOrds; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.DiversifiedTopDocsCollector; +import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey; +import org.apache.lucene.search.TopDocsCollector; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; + +import java.io.IOException; +import java.util.Map; + +public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator { + + private ValuesSource.Bytes.WithOrdinals.FieldData valuesSource; + private int maxDocsPerValue; + + public DiversifiedOrdinalsSamplerAggregator(String name, int shardSize, AggregatorFactories factories, + AggregationContext aggregationContext, Aggregator parent, Map metaData, + ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, int maxDocsPerValue) throws IOException { + super(name, shardSize, factories, aggregationContext, parent, metaData); + this.valuesSource = valuesSource; + this.maxDocsPerValue = maxDocsPerValue; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + bdd = new DiverseDocsDeferringCollector(); + return bdd; + } + + /** + * A {@link DeferringBucketCollector} that identifies top scoring documents + * but de-duped by a key then passes only these on to nested collectors. + * This implementation is only for use with a single bucket aggregation. + */ + class DiverseDocsDeferringCollector extends BestDocsDeferringCollector { + + public DiverseDocsDeferringCollector() { + super(shardSize); + } + + @Override + protected TopDocsCollector createTopDocsCollector(int size) { + return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue); + } + + // This class extends the DiversifiedTopDocsCollector and provides + // a lookup from elasticsearch's ValuesSource + class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector { + + + public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) { + super(numHits, maxHitsPerKey); + } + + @Override + protected NumericDocValues getKeys(LeafReaderContext context) { + final RandomAccessOrds globalOrds = valuesSource.globalOrdinalsValues(context); + final SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds); + if (singleValues != null) { + return new NumericDocValues() { + @Override + public long get(int doc) { + return singleValues.getOrd(doc); + } + }; + } + return new NumericDocValues() { + @Override + public long get(int doc) { + globalOrds.setDocument(doc); + final long valuesCount = globalOrds.cardinality(); + if (valuesCount > 1) { + throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field"); + } + if (valuesCount == 1) { + long result = globalOrds.ordAt(0); + return result; + } + return Long.MIN_VALUE; + } + }; + + } + + } + + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java new file mode 100644 index 00000000000..509c89e3ccc --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; + +import java.io.IOException; +import java.util.Map; + +/** +* +*/ +public class InternalSampler extends InternalSingleBucketAggregation implements Sampler { + + public final static Type TYPE = new Type("sampler"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalSampler readResult(StreamInput in) throws IOException { + InternalSampler result = new InternalSampler(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + InternalSampler() { + } // for serialization + + InternalSampler(String name, long docCount, InternalAggregations subAggregations, Map metaData) { + super(name, docCount, subAggregations, metaData); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalSampler(name, docCount, subAggregations, metaData); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java new file mode 100644 index 00000000000..19d3569aeed --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; + +/** + * A {@code filter} aggregation that defines a single bucket to hold a sample of + * top-matching documents. Computation of child aggregations is deferred until + * the top-matching documents on a shard have been determined. + */ +public interface Sampler extends SingleBucketAggregation { +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java new file mode 100644 index 00000000000..a623735db31 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.ValuesSourceAggregationBuilder; + +import java.io.IOException; + +/** + * Builder for the {@link Sampler} aggregation. + */ +public class SamplerAggregationBuilder extends ValuesSourceAggregationBuilder { + + private int shardSize = SamplerParser.DEFAULT_SHARD_SAMPLE_SIZE; + + int maxDocsPerValue = SamplerParser.MAX_DOCS_PER_VALUE_DEFAULT; + String executionHint = null; + + /** + * Sole constructor. + */ + public SamplerAggregationBuilder(String name) { + super(name, InternalSampler.TYPE.name()); + } + + /** + * Set the max num docs to be returned from each shard. + */ + public SamplerAggregationBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + + public SamplerAggregationBuilder maxDocsPerValue(int maxDocsPerValue) { + this.maxDocsPerValue = maxDocsPerValue; + return this; + } + + public SamplerAggregationBuilder executionHint(String executionHint) { + this.executionHint = executionHint; + return this; + } + + @Override + protected XContentBuilder doInternalXContent(XContentBuilder builder, Params params) throws IOException { + // builder.startObject(); + if (shardSize != SamplerParser.DEFAULT_SHARD_SAMPLE_SIZE) { + builder.field(SamplerParser.SHARD_SIZE_FIELD.getPreferredName(), shardSize); + } + + if (maxDocsPerValue != SamplerParser.MAX_DOCS_PER_VALUE_DEFAULT) { + builder.field(SamplerParser.MAX_DOCS_PER_VALUE_FIELD.getPreferredName(), maxDocsPerValue); + } + if (executionHint != null) { + builder.field(SamplerParser.EXECUTION_HINT_FIELD.getPreferredName(), executionHint); + } + + return builder; + } + + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java new file mode 100644 index 00000000000..27bfc8666c5 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java @@ -0,0 +1,264 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.NonCollectingAggregator; +import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector; +import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.Map; + +/** + * Aggregate on only the top-scoring docs on a shard. + * + * TODO currently the diversity feature of this agg offers only 'script' and + * 'field' as a means of generating a de-dup value. In future it would be nice + * if users could use any of the "bucket" aggs syntax (geo, date histogram...) + * as the basis for generating de-dup values. Their syntax for creating bucket + * values would be preferable to users having to recreate this logic in a + * 'script' e.g. to turn a datetime in milliseconds into a month key value. + */ +public class SamplerAggregator extends SingleBucketAggregator { + + + public enum ExecutionMode { + + MAP(new ParseField("map")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource, + AggregationContext context, Aggregator parent, Map metaData) throws IOException { + + return new DiversifiedMapSamplerAggregator(name, shardSize, factories, context, parent, metaData, valuesSource, + maxDocsPerValue); + } + + @Override + boolean needsGlobalOrdinals() { + return false; + } + + }, + BYTES_HASH(new ParseField("bytes_hash")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource, + AggregationContext context, Aggregator parent, Map metaData) throws IOException { + + return new DiversifiedBytesHashSamplerAggregator(name, shardSize, factories, context, parent, metaData, valuesSource, + maxDocsPerValue); + } + + @Override + boolean needsGlobalOrdinals() { + return false; + } + + }, + GLOBAL_ORDINALS(new ParseField("global_ordinals")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource, + AggregationContext context, Aggregator parent, Map metaData) throws IOException { + return new DiversifiedOrdinalsSamplerAggregator(name, shardSize, factories, context, parent, metaData, + (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, maxDocsPerValue); + } + + @Override + boolean needsGlobalOrdinals() { + return true; + } + + }; + + public static ExecutionMode fromString(String value) { + for (ExecutionMode mode : values()) { + if (mode.parseField.match(value)) { + return mode; + } + } + throw new ElasticsearchIllegalArgumentException("Unknown `execution_hint`: [" + value + "], expected any of " + values()); + } + + private final ParseField parseField; + + ExecutionMode(ParseField parseField) { + this.parseField = parseField; + } + + abstract Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource, + AggregationContext context, Aggregator parent, Map metaData) throws IOException; + + abstract boolean needsGlobalOrdinals(); + + @Override + public String toString() { + return parseField.getPreferredName(); + } + } + + + protected final int shardSize; + protected BestDocsDeferringCollector bdd; + + public SamplerAggregator(String name, int shardSize, AggregatorFactories factories, + AggregationContext aggregationContext, Aggregator parent, Map metaData) throws IOException { + super(name, factories, aggregationContext, parent, metaData); + this.shardSize = shardSize; + } + + @Override + public boolean needsScores() { + return true; + } + + @Override + public DeferringBucketCollector getDeferringCollector() { + bdd = new BestDocsDeferringCollector(shardSize); + return bdd; + + } + + + @Override + protected boolean shouldDefer(Aggregator aggregator) { + return true; + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { + runDeferredCollections(owningBucketOrdinal); + return new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(), bucketAggregations(owningBucketOrdinal), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalSampler(name, 0, buildEmptySubAggregations(), metaData()); + } + + public static class Factory extends AggregatorFactory { + + private int shardSize; + + public Factory(String name, int shardSize) { + super(name, InternalSampler.TYPE.name()); + this.shardSize = shardSize; + } + + @Override + public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, + Map metaData) throws IOException { + + if (collectsFromSingleBucket == false) { + return asMultiBucketAggregator(this, context, parent); + } + return new SamplerAggregator(name, shardSize, factories, context, parent, metaData); + } + + } + + public static class DiversifiedFactory extends ValuesSourceAggregatorFactory { + + private int shardSize; + private int maxDocsPerValue; + private String executionHint; + + public DiversifiedFactory(String name, int shardSize, String executionHint, ValuesSourceConfig vsConfig, int maxDocsPerValue) { + super(name, InternalSampler.TYPE.name(), vsConfig); + this.shardSize = shardSize; + this.maxDocsPerValue = maxDocsPerValue; + this.executionHint = executionHint; + } + + @Override + protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext context, Aggregator parent, + boolean collectsFromSingleBucket, Map metaData) throws IOException { + + if (collectsFromSingleBucket == false) { + return asMultiBucketAggregator(this, context, parent); + } + + + if (valuesSource instanceof ValuesSource.Numeric) { + return new DiversifiedNumericSamplerAggregator(name, shardSize, factories, context, parent, metaData, + (Numeric) valuesSource, maxDocsPerValue); + } + + if (valuesSource instanceof ValuesSource.Bytes) { + ExecutionMode execution = null; + if (executionHint != null) { + execution = ExecutionMode.fromString(executionHint); + } + + // In some cases using ordinals is just not supported: override + // it + if(execution==null){ + execution = ExecutionMode.GLOBAL_ORDINALS; + } + if ((execution.needsGlobalOrdinals()) && (!(valuesSource instanceof ValuesSource.Bytes.WithOrdinals))) { + execution = ExecutionMode.MAP; + } + return execution.create(name, factories, shardSize, maxDocsPerValue, valuesSource, context, parent, metaData); + } + + throw new AggregationExecutionException("Sampler aggregation cannot be applied to field [" + config.fieldContext().field() + + "]. It can only be applied to numeric or string fields."); + } + + @Override + protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, Map metaData) + throws IOException { + final UnmappedSampler aggregation = new UnmappedSampler(name, metaData); + + return new NonCollectingAggregator(name, aggregationContext, parent, factories, metaData) { + @Override + public InternalAggregation buildEmptyAggregation() { + return aggregation; + } + }; + } + + } + + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + if (bdd == null) { + throw new AggregationExecutionException("Sampler aggregation must be used with child aggregations."); + } + return bdd.getLeafCollector(ctx); + } + +} + diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java new file mode 100644 index 00000000000..35a2963187e --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceParser; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +/** + * + */ +public class SamplerParser implements Aggregator.Parser { + + public static final int DEFAULT_SHARD_SAMPLE_SIZE = 100; + public static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); + public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value"); + public static final ParseField EXECUTION_HINT_FIELD = new ParseField("execution_hint"); + public static final boolean DEFAULT_USE_GLOBAL_ORDINALS = false; + public static final int MAX_DOCS_PER_VALUE_DEFAULT = 1; + + + @Override + public String type() { + return InternalSampler.TYPE.name(); + } + + @Override + public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException { + + XContentParser.Token token; + String currentFieldName = null; + String executionHint = null; + int shardSize = DEFAULT_SHARD_SAMPLE_SIZE; + int maxDocsPerValue = MAX_DOCS_PER_VALUE_DEFAULT; + ValuesSourceParser vsParser = null; + boolean diversityChoiceMade = false; + + vsParser = ValuesSourceParser.any(aggregationName, InternalSampler.TYPE, context).scriptable(true).formattable(false).build(); + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (vsParser.token(currentFieldName, token, parser)) { + continue; + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (SHARD_SIZE_FIELD.match(currentFieldName)) { + shardSize = parser.intValue(); + } else if (MAX_DOCS_PER_VALUE_FIELD.match(currentFieldName)) { + diversityChoiceMade = true; + maxDocsPerValue = parser.intValue(); + } else { + throw new SearchParseException(context, "Unsupported property \"" + currentFieldName + "\" for aggregation \"" + + aggregationName); + } + } else if (!vsParser.token(currentFieldName, token, parser)) { + if (EXECUTION_HINT_FIELD.match(currentFieldName)) { + executionHint = parser.text(); + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "]."); + } + } else { + throw new SearchParseException(context, "Unsupported property \"" + currentFieldName + "\" for aggregation \"" + + aggregationName); + } + } + + ValuesSourceConfig vsConfig = vsParser.config(); + if (vsConfig.valid()) { + return new SamplerAggregator.DiversifiedFactory(aggregationName, shardSize, executionHint, vsConfig, maxDocsPerValue); + } else { + if (diversityChoiceMade) { + throw new SearchParseException(context, "Sampler aggregation has " + MAX_DOCS_PER_VALUE_FIELD.getPreferredName() + + " setting but no \"field\" or \"script\" setting to provide values for aggregation \"" + aggregationName + "\""); + + } + return new SamplerAggregator.Factory(aggregationName, shardSize); + } + } + + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java new file mode 100644 index 00000000000..95f8c7bfe78 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.sampler; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class UnmappedSampler extends InternalSampler { + + public static final Type TYPE = new Type("sampler", "umsampler"); + + + public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public UnmappedSampler readResult(StreamInput in) throws IOException { + UnmappedSampler sampler = new UnmappedSampler(); + sampler.readFrom(in); + return sampler; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + UnmappedSampler() { + } + + public UnmappedSampler(String name, Map metaData) { + super(name, 0, InternalAggregations.EMPTY, metaData); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + for (InternalAggregation agg : aggregations) { + if (!(agg instanceof UnmappedSampler)) { + return agg.reduce(aggregations, reduceContext); + } + } + return this; + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(InternalAggregation.CommonFields.DOC_COUNT, 0); + return builder; + } + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java new file mode 100644 index 00000000000..859f5b274ea --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.Sampler; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.Collection; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + * Tests the Sampler aggregation + */ +@ElasticsearchIntegrationTest.SuiteScopeTest +public class SamplerTests extends ElasticsearchIntegrationTest { + + public static final int NUM_SHARDS = 2; + + public String randomExecutionHint() { + return randomBoolean() ? null : randomFrom(SamplerAggregator.ExecutionMode.values()).toString(); + } + + + @Override + public void setupSuiteScopeCluster() throws Exception { + assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0).addMapping( + "book", "author", "type=string,index=not_analyzed", "name", "type=string,index=analyzed", "genre", + "type=string,index=not_analyzed")); + createIndex("idx_unmapped"); + // idx_unmapped_author is same as main index but missing author field + assertAcked(prepareCreate("idx_unmapped_author").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0) + .addMapping("book", "name", "type=string,index=analyzed", "genre", "type=string,index=not_analyzed")); + + ensureGreen(); + String data[] = { + // "id,cat,name,price,inStock,author_t,series_t,sequence_i,genre_s", + "0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,A Song of Ice and Fire,1,fantasy", + "0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,A Song of Ice and Fire,2,fantasy", + "055357342X,book,A Storm of Swords,7.99,true,George R.R. Martin,A Song of Ice and Fire,3,fantasy", + "0553293354,book,Foundation,7.99,true,Isaac Asimov,Foundation Novels,1,scifi", + "0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy", + "0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi", + "0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy", + "0380014300,book,Nine Princes In Amber,6.99,true,Roger Zelazny,the Chronicles of Amber,1,fantasy", + "0805080481,book,The Book of Three,5.99,true,Lloyd Alexander,The Chronicles of Prydain,1,fantasy", + "080508049X,book,The Black Cauldron,5.99,true,Lloyd Alexander,The Chronicles of Prydain,2,fantasy" + + }; + + for (int i = 0; i < data.length; i++) { + String[] parts = data[i].split(","); + client().prepareIndex("test", "book", "" + i).setSource("author", parts[5], "name", parts[2], "genre", parts[8]).get(); + client().prepareIndex("idx_unmapped_author", "book", "" + i).setSource("name", parts[2], "genre", parts[8]).get(); + } + client().admin().indices().refresh(new RefreshRequest("test")).get(); + } + + @Test + public void noDiversity() throws Exception { + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + Terms authors = sample.getAggregations().get("authors"); + Collection testBuckets = authors.getBuckets(); + + long maxBooksPerAuthor = 0; + for (Terms.Bucket testBucket : testBuckets) { + maxBooksPerAuthor = Math.max(testBucket.getDocCount(), maxBooksPerAuthor); + } + assertThat(maxBooksPerAuthor, equalTo(3l)); + } + + @Test + public void simpleDiversity() throws Exception { + int MAX_DOCS_PER_AUTHOR = 1; + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint()); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("test") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")) + .setFrom(0).setSize(60) + .addAggregation(sampleAgg) + .execute() + .actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + Terms authors = sample.getAggregations().get("authors"); + Collection testBuckets = authors.getBuckets(); + + for (Terms.Bucket testBucket : testBuckets) { + assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR)); + } + } + + @Test + public void nestedDiversity() throws Exception { + // Test multiple samples gathered under buckets made by a parent agg + int MAX_DOCS_PER_AUTHOR = 1; + TermsBuilder rootTerms = new TermsBuilder("genres").field("genre"); + + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint()); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + + rootTerms.subAggregation(sampleAgg); + SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH) + .addAggregation(rootTerms).execute().actionGet(); + assertSearchResponse(response); + Terms genres = response.getAggregations().get("genres"); + Collection genreBuckets = genres.getBuckets(); + for (Terms.Bucket genreBucket : genreBuckets) { + Sampler sample = genreBucket.getAggregations().get("sample"); + Terms authors = sample.getAggregations().get("authors"); + Collection testBuckets = authors.getBuckets(); + + for (Terms.Bucket testBucket : testBuckets) { + assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR)); + } + } + } + + @Test + public void nestedSamples() throws Exception { + // Test samples nested under samples + int MAX_DOCS_PER_AUTHOR = 1; + int MAX_DOCS_PER_GENRE = 2; + SamplerAggregationBuilder rootSample = new SamplerAggregationBuilder("genreSample").shardSize(100).field("genre") + .maxDocsPerValue(MAX_DOCS_PER_GENRE); + + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint()); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + sampleAgg.subAggregation(new TermsBuilder("genres").field("genre")); + + rootSample.subAggregation(sampleAgg); + SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH).addAggregation(rootSample) + .execute().actionGet(); + assertSearchResponse(response); + Sampler genreSample = response.getAggregations().get("genreSample"); + Sampler sample = genreSample.getAggregations().get("sample"); + + Terms genres = sample.getAggregations().get("genres"); + Collection testBuckets = genres.getBuckets(); + for (Terms.Bucket testBucket : testBuckets) { + assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_GENRE)); + } + + Terms authors = sample.getAggregations().get("authors"); + testBuckets = authors.getBuckets(); + for (Terms.Bucket testBucket : testBuckets) { + assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR)); + } + } + + @Test + public void unmappedChildAggNoDiversity() throws Exception { + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("idx_unmapped") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")) + .setFrom(0).setSize(60) + .addAggregation(sampleAgg) + .execute() + .actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + assertThat(sample.getDocCount(), equalTo(0l)); + Terms authors = sample.getAggregations().get("authors"); + assertThat(authors.getBuckets().size(), equalTo(0)); + } + + + + @Test + public void partiallyUnmappedChildAggNoDiversity() throws Exception { + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("idx_unmapped", "test") + .setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")) + .setFrom(0).setSize(60).setExplain(true) + .addAggregation(sampleAgg) + .execute() + .actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + assertThat(sample.getDocCount(), greaterThan(0l)); + Terms authors = sample.getAggregations().get("authors"); + assertThat(authors.getBuckets().size(), greaterThan(0)); + } + + @Test + public void partiallyUnmappedDiversifyField() throws Exception { + // One of the indexes is missing the "author" field used for + // diversifying results + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100).field("author").maxDocsPerValue(1); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("idx_unmapped_author", "test").setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg) + .execute().actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + assertThat(sample.getDocCount(), greaterThan(0l)); + Terms authors = sample.getAggregations().get("authors"); + assertThat(authors.getBuckets().size(), greaterThan(0)); + } + + @Test + public void whollyUnmappedDiversifyField() throws Exception { + //All of the indices are missing the "author" field used for diversifying results + int MAX_DOCS_PER_AUTHOR = 1; + SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100); + sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint()); + sampleAgg.subAggregation(new TermsBuilder("authors").field("author")); + SearchResponse response = client().prepareSearch("idx_unmapped", "idx_unmapped_author").setSearchType(SearchType.QUERY_AND_FETCH) + .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet(); + assertSearchResponse(response); + Sampler sample = response.getAggregations().get("sample"); + assertThat(sample.getDocCount(), equalTo(0l)); + Terms authors = sample.getAggregations().get("authors"); + assertNull(authors); + } + +}