diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java new file mode 100644 index 00000000000..83af85c5a13 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.significant; + +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.index.fielddata.ordinals.Ordinals; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.ContextIndexSearcher; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +/** + * An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}. + */ +public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStringTermsAggregator { + + protected long numCollectedDocs; + protected final SignificantTermsAggregatorFactory termsAggFactory; + + public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, + long estimatedBucketCount, long maxOrd, int requiredSize, int shardSize, long minDocCount, + AggregationContext aggregationContext, Aggregator parent, + SignificantTermsAggregatorFactory termsAggFactory) { + + super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, requiredSize, shardSize, + minDocCount, aggregationContext, parent); + this.termsAggFactory = termsAggFactory; + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + super.collect(doc, owningBucketOrdinal); + numCollectedDocs++; + } + + @Override + public SignificantStringTerms buildAggregation(long owningBucketOrdinal) { + assert owningBucketOrdinal == 0; + if (globalOrdinals == null) { // no context in this reader + return buildEmptyAggregation(); + } + + final int size; + if (minDocCount == 0) { + // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns + size = (int) Math.min(globalOrdinals.getMaxOrd(), shardSize); + } else { + size = (int) Math.min(maxBucketOrd(), shardSize); + } + long supersetSize = termsAggFactory.prepareBackground(context); + long subsetSize = numCollectedDocs; + + BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); + SignificantStringTerms.Bucket spare = null; + for (long termOrd = Ordinals.MIN_ORDINAL; termOrd < globalOrdinals.getMaxOrd(); ++termOrd) { + final long bucketOrd = getBucketOrd(termOrd); + final long bucketDocCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd); + if (minDocCount > 0 && bucketDocCount == 0) { + continue; + } + if (spare == null) { + spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null); + } + spare.bucketOrd = bucketOrd; + copy(globalValues.getValueByOrd(termOrd), spare.termBytes); + spare.subsetDf = bucketDocCount; + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); + spare.supersetSize = supersetSize; + assert spare.subsetDf <= spare.supersetDf; + // During shard-local down-selection we use subset/superset stats + // that are for this shard only + // Back at the central reducer these properties will be updated with + // global stats + spare.updateScore(); + + spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare); + } + + final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; + for (int i = ordered.size() - 1; i >= 0; i--) { + final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop(); + // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point + bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); + bucket.aggregations = bucketAggregations(bucket.bucketOrd); + list[i] = bucket; + } + + return new SignificantStringTerms(subsetSize, supersetSize, name, requiredSize, minDocCount, Arrays.asList(list)); + } + + @Override + public SignificantStringTerms buildEmptyAggregation() { + // We need to account for the significance of a miss in our global stats - provide corpus size as context + ContextIndexSearcher searcher = context.searchContext().searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections.emptyList()); + } + + @Override + protected void doClose() { + Releasables.close(termsAggFactory); + } + + public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator { + + private final LongHash bucketOrds; + + public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) { + super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggFactory); + bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.bigArrays()); + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + globalValues = valuesSource.globalBytesValues(); + globalOrdinals = globalValues.ordinals(); + } + + @Override + public void collect(int doc, long owningBucketOrdinal) throws IOException { + numCollectedDocs++; + final int numOrds = globalOrdinals.setDocument(doc); + for (int i = 0; i < numOrds; i++) { + final long globalOrd = globalOrdinals.nextOrd(); + long bucketOrd = bucketOrds.add(globalOrd); + if (bucketOrd < 0) { + bucketOrd = -1 - bucketOrd; + } + collectBucket(doc, bucketOrd); + } + } + + @Override + protected long getBucketOrd(long termOrd) { + return bucketOrds.find(termOrd); + } + + @Override + protected void doClose() { + Releasables.close(termsAggFactory, bucketOrds); + } + } + +} + diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java index 6d926851e9f..27cd3ff29bf 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java @@ -21,18 +21,17 @@ package org.elasticsearch.search.aggregations.bucket.significant; import org.apache.lucene.index.DocsEnum; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Filter; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.index.FilterableTermsEnum; import org.elasticsearch.common.lucene.index.FreqTermsEnum; import org.elasticsearch.index.mapper.FieldMapper; -import org.elasticsearch.search.aggregations.AggregationExecutionException; -import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.*; import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.NonCollectingAggregator; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -47,8 +46,106 @@ import java.io.IOException; */ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable { - public static final String EXECUTION_HINT_VALUE_MAP = "map"; - public static final String EXECUTION_HINT_VALUE_ORDINALS = "ordinals"; + public enum ExecutionMode { + + MAP(new ParseField("map")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, + int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, + AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) { + return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, termsAggregatorFactory); + } + + @Override + boolean needsGlobalOrdinals() { + return false; + } + + }, + ORDINALS(new ParseField("ordinals")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, + int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, + AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) { + if (includeExclude != null) { + throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms."); + } + return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory); + } + + @Override + boolean needsGlobalOrdinals() { + return false; + } + + }, + GLOBAL_ORDINALS(new ParseField("global_ordinals")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, + int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, + AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) { + if (includeExclude != null) { + throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms."); + } + ValuesSource.Bytes.WithOrdinals valueSourceWithOrdinals = (ValuesSource.Bytes.WithOrdinals) valuesSource; + IndexSearcher indexSearcher = aggregationContext.searchContext().searcher(); + long maxOrd = valueSourceWithOrdinals.globalMaxOrd(indexSearcher); + return new GlobalOrdinalsSignificantTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory); + } + + @Override + boolean needsGlobalOrdinals() { + return true; + } + + }, + GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) { + + @Override + Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, + int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, + AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) { + if (includeExclude != null) { + throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms."); + } + return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory); + } + + @Override + boolean needsGlobalOrdinals() { + return true; + } + }; + + public static ExecutionMode fromString(String value) { + for (ExecutionMode mode : values()) { + if (mode.parseField.match(value)) { + return mode; + } + } + throw new ElasticsearchIllegalArgumentException("Unknown `execution_hint`: [" + value + "], expected any of " + values()); + } + + private final ParseField parseField; + + ExecutionMode(ParseField parseField) { + this.parseField = parseField; + } + + abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, + int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, + AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory); + + abstract boolean needsGlobalOrdinals(); + + @Override + public String toString() { + return parseField.getPreferredName(); + } + } private final int requiredSize; private final int shardSize; @@ -117,30 +214,25 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac estimatedBucketCount = Math.min(estimatedBucketCount, 512); if (valuesSource instanceof ValuesSource.Bytes) { - if (executionHint != null && !executionHint.equals(EXECUTION_HINT_VALUE_MAP) && !executionHint.equals(EXECUTION_HINT_VALUE_ORDINALS)) { - throw new ElasticsearchIllegalArgumentException("execution_hint can only be '" + EXECUTION_HINT_VALUE_MAP + "' or '" + EXECUTION_HINT_VALUE_ORDINALS + "', not " + executionHint); + ExecutionMode execution = null; + if (executionHint != null) { + execution = ExecutionMode.fromString(executionHint); } - String execution = executionHint; if (!(valuesSource instanceof ValuesSource.Bytes.WithOrdinals)) { - execution = EXECUTION_HINT_VALUE_MAP; + execution = ExecutionMode.MAP; } else if (includeExclude != null) { - execution = EXECUTION_HINT_VALUE_MAP; + execution = ExecutionMode.MAP; } if (execution == null) { - if ((valuesSource instanceof ValuesSource.Bytes.WithOrdinals) - && !hasParentBucketAggregator(parent)) { - execution = EXECUTION_HINT_VALUE_ORDINALS; + if (hasParentBucketAggregator(parent)) { + execution = ExecutionMode.GLOBAL_ORDINALS_HASH; } else { - execution = EXECUTION_HINT_VALUE_MAP; + execution = ExecutionMode.GLOBAL_ORDINALS; } } assert execution != null; - - if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) { - assert includeExclude == null; - return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this); - } - return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this); + valuesSource.setNeedsGlobalOrdinals(execution.needsGlobalOrdinals()); + return execution.create(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this); } if (includeExclude != null) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java index 1604cf80ed5..be040e8d91c 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsBuilder.java @@ -36,6 +36,7 @@ public class SignificantTermsBuilder extends AggregationBuilder