Added global ordinals based implementations for significant terms aggregator.

Closes #5970
This commit is contained in:
Martijn van Groningen 2014-04-29 15:36:25 +07:00
parent a4ef418e6e
commit dce127bcdf
5 changed files with 306 additions and 25 deletions

View File

@ -0,0 +1,173 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
/**
* An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}.
*/
public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStringTermsAggregator {
protected long numCollectedDocs;
protected final SignificantTermsAggregatorFactory termsAggFactory;
public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
long estimatedBucketCount, long maxOrd, int requiredSize, int shardSize, long minDocCount,
AggregationContext aggregationContext, Aggregator parent,
SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, maxOrd, null, requiredSize, shardSize,
minDocCount, aggregationContext, parent);
this.termsAggFactory = termsAggFactory;
}
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
super.collect(doc, owningBucketOrdinal);
numCollectedDocs++;
}
@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
assert owningBucketOrdinal == 0;
if (globalOrdinals == null) { // no context in this reader
return buildEmptyAggregation();
}
final int size;
if (minDocCount == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(globalOrdinals.getMaxOrd(), shardSize);
} else {
size = (int) Math.min(maxBucketOrd(), shardSize);
}
long supersetSize = termsAggFactory.prepareBackground(context);
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
SignificantStringTerms.Bucket spare = null;
for (long termOrd = Ordinals.MIN_ORDINAL; termOrd < globalOrdinals.getMaxOrd(); ++termOrd) {
final long bucketOrd = getBucketOrd(termOrd);
final long bucketDocCount = bucketOrd < 0 ? 0 : bucketDocCount(bucketOrd);
if (minDocCount > 0 && bucketDocCount == 0) {
continue;
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null);
}
spare.bucketOrd = bucketOrd;
copy(globalValues.getValueByOrd(termOrd), spare.termBytes);
spare.subsetDf = bucketDocCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = supersetSize;
assert spare.subsetDf <= spare.supersetDf;
// During shard-local down-selection we use subset/superset stats
// that are for this shard only
// Back at the central reducer these properties will be updated with
// global stats
spare.updateScore();
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare);
}
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return new SignificantStringTerms(subsetSize, supersetSize, name, requiredSize, minDocCount, Arrays.asList(list));
}
@Override
public SignificantStringTerms buildEmptyAggregation() {
// We need to account for the significance of a miss in our global stats - provide corpus size as context
ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
}
@Override
protected void doClose() {
Releasables.close(termsAggFactory);
}
public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {
private final LongHash bucketOrds;
public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
super(name, factories, valuesSource, estimatedBucketCount, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggFactory);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.bigArrays());
}
@Override
public void setNextReader(AtomicReaderContext reader) {
globalValues = valuesSource.globalBytesValues();
globalOrdinals = globalValues.ordinals();
}
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
numCollectedDocs++;
final int numOrds = globalOrdinals.setDocument(doc);
for (int i = 0; i < numOrds; i++) {
final long globalOrd = globalOrdinals.nextOrd();
long bucketOrd = bucketOrds.add(globalOrd);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
}
collectBucket(doc, bucketOrd);
}
}
@Override
protected long getBucketOrd(long termOrd) {
return bucketOrds.find(termOrd);
}
@Override
protected void doClose() {
Releasables.close(termsAggFactory, bucketOrds);
}
}
}

View File

@ -21,18 +21,17 @@ package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
@ -47,8 +46,106 @@ import java.io.IOException;
*/
public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable {
public static final String EXECUTION_HINT_VALUE_MAP = "map";
public static final String EXECUTION_HINT_VALUE_ORDINALS = "ordinals";
public enum ExecutionMode {
MAP(new ParseField("map")) {
@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, termsAggregatorFactory);
}
@Override
boolean needsGlobalOrdinals() {
return false;
}
},
ORDINALS(new ParseField("ordinals")) {
@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
}
@Override
boolean needsGlobalOrdinals() {
return false;
}
},
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
ValuesSource.Bytes.WithOrdinals valueSourceWithOrdinals = (ValuesSource.Bytes.WithOrdinals) valuesSource;
IndexSearcher indexSearcher = aggregationContext.searchContext().searcher();
long maxOrd = valueSourceWithOrdinals.globalMaxOrd(indexSearcher);
return new GlobalOrdinalsSignificantTermsAggregator(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, maxOrd, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
}
@Override
boolean needsGlobalOrdinals() {
return true;
}
},
GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) {
@Override
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory) {
if (includeExclude != null) {
throw new ElasticsearchIllegalArgumentException("The `" + this + "` execution mode cannot filter terms.");
}
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories, (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, termsAggregatorFactory);
}
@Override
boolean needsGlobalOrdinals() {
return true;
}
};
public static ExecutionMode fromString(String value) {
for (ExecutionMode mode : values()) {
if (mode.parseField.match(value)) {
return mode;
}
}
throw new ElasticsearchIllegalArgumentException("Unknown `execution_hint`: [" + value + "], expected any of " + values());
}
private final ParseField parseField;
ExecutionMode(ParseField parseField) {
this.parseField = parseField;
}
abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude,
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggregatorFactory);
abstract boolean needsGlobalOrdinals();
@Override
public String toString() {
return parseField.getPreferredName();
}
}
private final int requiredSize;
private final int shardSize;
@ -117,30 +214,25 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
estimatedBucketCount = Math.min(estimatedBucketCount, 512);
if (valuesSource instanceof ValuesSource.Bytes) {
if (executionHint != null && !executionHint.equals(EXECUTION_HINT_VALUE_MAP) && !executionHint.equals(EXECUTION_HINT_VALUE_ORDINALS)) {
throw new ElasticsearchIllegalArgumentException("execution_hint can only be '" + EXECUTION_HINT_VALUE_MAP + "' or '" + EXECUTION_HINT_VALUE_ORDINALS + "', not " + executionHint);
ExecutionMode execution = null;
if (executionHint != null) {
execution = ExecutionMode.fromString(executionHint);
}
String execution = executionHint;
if (!(valuesSource instanceof ValuesSource.Bytes.WithOrdinals)) {
execution = EXECUTION_HINT_VALUE_MAP;
execution = ExecutionMode.MAP;
} else if (includeExclude != null) {
execution = EXECUTION_HINT_VALUE_MAP;
execution = ExecutionMode.MAP;
}
if (execution == null) {
if ((valuesSource instanceof ValuesSource.Bytes.WithOrdinals)
&& !hasParentBucketAggregator(parent)) {
execution = EXECUTION_HINT_VALUE_ORDINALS;
if (hasParentBucketAggregator(parent)) {
execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
} else {
execution = EXECUTION_HINT_VALUE_MAP;
execution = ExecutionMode.GLOBAL_ORDINALS;
}
}
assert execution != null;
if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) {
assert includeExclude == null;
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (ValuesSource.Bytes.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this);
}
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this);
valuesSource.setNeedsGlobalOrdinals(execution.needsGlobalOrdinals());
return execution.create(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this);
}
if (includeExclude != null) {

View File

@ -36,6 +36,7 @@ public class SignificantTermsBuilder extends AggregationBuilder<SignificantTerms
private int requiredSize = SignificantTermsParser.DEFAULT_REQUIRED_SIZE;
private int shardSize = SignificantTermsParser.DEFAULT_SHARD_SIZE;
private int minDocCount = SignificantTermsParser.DEFAULT_MIN_DOC_COUNT;
private String executionHint;
public SignificantTermsBuilder(String name) {
super(name, SignificantStringTerms.TYPE.name());
@ -61,6 +62,11 @@ public class SignificantTermsBuilder extends AggregationBuilder<SignificantTerms
return this;
}
public SignificantTermsBuilder executionHint(String executionHint) {
this.executionHint = executionHint;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -76,6 +82,9 @@ public class SignificantTermsBuilder extends AggregationBuilder<SignificantTerms
if (shardSize != SignificantTermsParser.DEFAULT_SHARD_SIZE) {
builder.field("shard_size", shardSize);
}
if (executionHint != null) {
builder.field("execution_hint", executionHint);
}
return builder.endObject();
}

View File

@ -80,7 +80,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
}
private static void copy(BytesRef from, BytesRef to) {
protected static void copy(BytesRef from, BytesRef to) {
if (to.bytes.length < from.length) {
to.bytes = new byte[ArrayUtil.oversize(from.length, RamUsageEstimator.NUM_BYTES_BYTE)];
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms.Bucket;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
@ -47,6 +48,10 @@ import static org.hamcrest.Matchers.equalTo;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class SignificantTermsTests extends ElasticsearchIntegrationTest {
public String randomExecutionHint() {
return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString();
}
@Override
public Settings indexSettings() {
return ImmutableSettings.builder()
@ -103,7 +108,7 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
.setSearchType(SearchType.QUERY_AND_FETCH)
.setQuery(new TermQueryBuilder("_all", "terje"))
.setFrom(0).setSize(60).setExplain(true)
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category")
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category").executionHint(randomExecutionHint())
.minDocCount(2))
.execute()
.actionGet();
@ -119,7 +124,7 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
.setSearchType(SearchType.QUERY_AND_FETCH)
.setQuery(new TermQueryBuilder("_all", "terje"))
.setFrom(0).setSize(60).setExplain(true)
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category")
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("fact_category").executionHint(randomExecutionHint())
.minDocCount(2))
.execute()
.actionGet();
@ -134,7 +139,7 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
.setSearchType(SearchType.QUERY_AND_FETCH)
.setQuery(new TermQueryBuilder("_all", "terje"))
.setFrom(0).setSize(60).setExplain(true)
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("description")
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("description").executionHint(randomExecutionHint())
.minDocCount(2))
.execute()
.actionGet();
@ -154,6 +159,7 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
.addAggregation(new TermsBuilder("myCategories").field("fact_category").minDocCount(2)
.subAggregation(
new SignificantTermsBuilder("mySignificantTerms").field("description")
.executionHint(randomExecutionHint())
.minDocCount(2)))
.execute()
.actionGet();
@ -180,6 +186,7 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
.setQuery(new TermQueryBuilder("_all", "terje"))
.setFrom(0).setSize(60).setExplain(true)
.addAggregation(new SignificantTermsBuilder("mySignificantTerms").field("description")
.executionHint(randomExecutionHint())
.minDocCount(2))
.execute()
.actionGet();