Freq Terms Enum

A frequency caching terms enum, that also allows to be configured with an optional filter. To be used by both significant terms and phrase suggester.
This change extracts the frequency caching into the same code, and allow in the future to add a filter to control/customize the background frequencies
Closes #5597
This commit is contained in:
Shay Banon 2014-03-21 22:54:16 +01:00 committed by markharwood
parent b9cb70198e
commit 63290a910e
9 changed files with 642 additions and 252 deletions

View File

@ -0,0 +1,215 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.index;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.lucene.search.ApplyAcceptedDocsFilter;
import org.elasticsearch.common.lucene.search.Queries;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
/**
* A frequency TermsEnum that returns frequencies derived from a collection of
* cached leaf termEnums. It also allows to provide a filter to explicitly
* compute frequencies only for docs that match the filter (heavier!).
*/
public class FilterableTermsEnum extends TermsEnum {
static class Holder {
final TermsEnum termsEnum;
@Nullable
DocsEnum docsEnum;
@Nullable
final Bits bits;
Holder(TermsEnum termsEnum, Bits bits) {
this.termsEnum = termsEnum;
this.bits = bits;
}
}
static final String UNSUPPORTED_MESSAGE = "This TermsEnum only supports #seekExact(BytesRef) as well as #docFreq() and #totalTermFreq()";
protected final static int NOT_FOUND = -1;
private final Holder[] enums;
protected int currentDocFreq = 0;
protected long currentTotalTermFreq = 0;
protected BytesRef current;
protected final int docsEnumFlag;
protected int numDocs;
public FilterableTermsEnum(IndexReader reader, String field, int docsEnumFlag, @Nullable Filter filter) throws IOException {
if ((docsEnumFlag != DocsEnum.FLAG_FREQS) && (docsEnumFlag != DocsEnum.FLAG_NONE)) {
throw new ElasticsearchIllegalArgumentException("invalid docsEnumFlag of " + docsEnumFlag);
}
this.docsEnumFlag = docsEnumFlag;
if (filter == null) {
numDocs = reader.numDocs();
}
List<AtomicReaderContext> leaves = reader.leaves();
List<Holder> enums = Lists.newArrayListWithExpectedSize(leaves.size());
for (AtomicReaderContext context : leaves) {
Terms terms = context.reader().terms(field);
if (terms == null) {
continue;
}
TermsEnum termsEnum = terms.iterator(null);
if (termsEnum == null) {
continue;
}
Bits bits = null;
if (filter != null) {
if (filter == Queries.MATCH_ALL_FILTER) {
bits = context.reader().getLiveDocs();
} else {
// we want to force apply deleted docs
filter = new ApplyAcceptedDocsFilter(filter);
DocIdSet docIdSet = filter.getDocIdSet(context, context.reader().getLiveDocs());
if (DocIdSets.isEmpty(docIdSet)) {
// fully filtered, none matching, no need to iterate on this
continue;
}
bits = DocIdSets.toSafeBits(context.reader(), docIdSet);
// Count how many docs are in our filtered set
// TODO make this lazy-loaded only for those that need it?
DocIdSetIterator iterator = docIdSet.iterator();
if (iterator != null) {
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
numDocs++;
}
}
}
}
enums.add(new Holder(termsEnum, bits));
}
this.enums = enums.toArray(new Holder[enums.size()]);
}
public int getNumDocs() {
return numDocs;
}
@Override
public BytesRef term() throws IOException {
return current;
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
boolean found = false;
currentDocFreq = NOT_FOUND;
currentTotalTermFreq = NOT_FOUND;
int docFreq = 0;
long totalTermFreq = 0;
for (Holder anEnum : enums) {
if (!anEnum.termsEnum.seekExact(text)) {
continue;
}
found = true;
if (anEnum.bits == null) {
docFreq += anEnum.termsEnum.docFreq();
if (docsEnumFlag == DocsEnum.FLAG_FREQS) {
long leafTotalTermFreq = anEnum.termsEnum.totalTermFreq();
if (totalTermFreq == -1 || leafTotalTermFreq == -1) {
totalTermFreq = -1;
continue;
}
totalTermFreq += leafTotalTermFreq;
}
} else {
DocsEnum docsEnum = anEnum.docsEnum = anEnum.termsEnum.docs(anEnum.bits, anEnum.docsEnum, docsEnumFlag);
// 2 choices for performing same heavy loop - one attempts to calculate totalTermFreq and other does not
if (docsEnumFlag == DocsEnum.FLAG_FREQS) {
for (int docId = docsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = docsEnum.nextDoc()) {
docFreq++;
// docsEnum.freq() returns 1 if doc indexed with IndexOptions.DOCS_ONLY so no way of knowing if value
// is really 1 or unrecorded when filtering like this
totalTermFreq += docsEnum.freq();
}
} else {
for (int docId = docsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = docsEnum.nextDoc()) {
// docsEnum.freq() behaviour is undefined if docsEnumFlag==DocsEnum.FLAG_NONE so don't bother with call
docFreq++;
}
}
}
currentDocFreq = docFreq;
currentTotalTermFreq = totalTermFreq;
current = text;
}
return found;
}
@Override
public int docFreq() throws IOException {
return currentDocFreq;
}
@Override
public long totalTermFreq() throws IOException {
return currentTotalTermFreq;
}
@Override
public void seekExact(long ord) throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public SeekStatus seekCeil(BytesRef text) throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public long ord() throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public BytesRef next() throws IOException {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
@Override
public Comparator<BytesRef> getComparator() {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.index;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import java.io.IOException;
/**
* A frequency terms enum that maintains a cache of docFreq, totalTermFreq, or both for repeated term lookup.
*/
public class FreqTermsEnum extends FilterableTermsEnum implements Releasable {
private static final int INITIAL_NUM_TERM_FREQS_CACHED = 512;
private final BigArrays bigArrays;
private IntArray termDocFreqs;
private LongArray termsTotalFreqs;
private BytesRefHash cachedTermOrds;
private final boolean needDocFreqs;
private final boolean needTotalTermFreqs;
public FreqTermsEnum(IndexReader reader, String field, boolean needDocFreq, boolean needTotalTermFreq, @Nullable Filter filter, BigArrays bigArrays) throws IOException {
super(reader, field, needTotalTermFreq ? DocsEnum.FLAG_FREQS : DocsEnum.FLAG_NONE, filter);
this.bigArrays = bigArrays;
this.needDocFreqs = needDocFreq;
this.needTotalTermFreqs = needTotalTermFreq;
if (needDocFreq) {
termDocFreqs = bigArrays.newIntArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
} else {
termDocFreqs = null;
}
if (needTotalTermFreq) {
termsTotalFreqs = bigArrays.newLongArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
} else {
termsTotalFreqs = null;
}
cachedTermOrds = new BytesRefHash(INITIAL_NUM_TERM_FREQS_CACHED, bigArrays);
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
//Check cache
long currentTermOrd = cachedTermOrds.add(text);
if (currentTermOrd < 0) { // already seen, initialize instance data with the cached frequencies
currentTermOrd = -1 - currentTermOrd;
boolean found = true;
if (needDocFreqs) {
currentDocFreq = termDocFreqs.get(currentTermOrd);
if (currentDocFreq == NOT_FOUND) {
found = false;
}
}
if (needTotalTermFreqs) {
currentTotalTermFreq = termsTotalFreqs.get(currentTermOrd);
if (currentTotalTermFreq == NOT_FOUND) {
found = false;
}
}
current = found ? text : null;
return found;
}
//Cache miss - gather stats
boolean found = super.seekExact(text);
//Cache the result - found or not.
if (needDocFreqs) {
termDocFreqs = bigArrays.grow(termDocFreqs, currentTermOrd + 1);
termDocFreqs.set(currentTermOrd, currentDocFreq);
}
if (needTotalTermFreqs) {
termsTotalFreqs = bigArrays.grow(termsTotalFreqs, currentTermOrd + 1);
termsTotalFreqs.set(currentTermOrd, currentTotalTermFreq);
}
return found;
}
@Override
public boolean release() throws ElasticsearchException {
try {
Releasables.release(cachedTermOrds, termDocFreqs, termsTotalFreqs);
} finally {
cachedTermOrds = null;
termDocFreqs = null;
termsTotalFreqs = null;
}
return true;
}
}

View File

@ -60,9 +60,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
final int size = (int) Math.min(bucketOrds.size(), shardSize);
ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader();
long supersetSize = topReader.numDocs();
long supersetSize = termsAggFactory.prepareBackground(context);
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);

View File

@ -67,10 +67,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
assert owningBucketOrdinal == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader();
long supersetSize = topReader.numDocs();
long supersetSize = termsAggFactory.prepareBackground(context);
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);

View File

@ -18,18 +18,15 @@
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.index.*;
import org.apache.lucene.index.FilterAtomicReader.FilterTermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode;
@ -40,7 +37,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFacto
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueParser;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -52,7 +48,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
public static final String EXECUTION_HINT_VALUE_MAP = "map";
public static final String EXECUTION_HINT_VALUE_ORDINALS = "ordinals";
static final int INITIAL_NUM_TERM_FREQS_CACHED = 512;
private final int requiredSize;
private final int shardSize;
@ -61,15 +56,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
private final String executionHint;
private String indexedFieldName;
private FieldMapper mapper;
private IntArray termDocFreqs;
private BytesRefHash cachedTermOrds;
private BigArrays bigArrays;
private TermsEnum termsEnum;
private FilterableTermsEnum termsEnum;
private int numberOfAggregatorsCreated = 0;
private Filter filter;
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, ValueFormatter formatter, ValueParser parser,
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) {
int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint, Filter filter) {
super(name, SignificantStringTerms.TYPE.name(), valueSourceConfig, formatter, parser);
this.requiredSize = requiredSize;
this.shardSize = shardSize;
@ -80,7 +72,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
this.indexedFieldName = config.fieldContext().field();
mapper = SearchContext.current().smartNameFieldMapper(indexedFieldName);
}
bigArrays = SearchContext.current().bigArrays();
this.filter = filter;
}
@Override
@ -105,30 +97,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
@Override
protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
numberOfAggregatorsCreated++;
if (numberOfAggregatorsCreated == 1) {
// Setup a termsEnum for use by first aggregator
try {
SearchContext searchContext = aggregationContext.searchContext();
ContextIndexSearcher searcher = searchContext.searcher();
Terms terms = MultiFields.getTerms(searcher.getIndexReader(), indexedFieldName);
// terms can be null if the choice of field is not found in this index
if (terms != null) {
termsEnum = terms.iterator(null);
}
} catch (IOException e) {
throw new ElasticsearchException("IOException loading background document frequency info", e);
}
} else if (numberOfAggregatorsCreated == 2) {
// When we have > 1 agg we have possibility of duplicate term frequency lookups and
// so introduce a cache in the form of a wrapper around the plain termsEnum created
// for use with the first agg
if (termsEnum != null) {
SearchContext searchContext = aggregationContext.searchContext();
termsEnum = new FrequencyCachingTermsEnumWrapper(termsEnum, searchContext.bigArrays(), true, false);
}
}
long estimatedBucketCount = valuesSource.metaData().maxAtomicUniqueValuesCount();
if (estimatedBucketCount < 0) {
@ -188,6 +157,33 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
"]. It can only be applied to numeric or string fields.");
}
/**
* Creates the TermsEnum (if not already created) and must be called before any calls to getBackgroundFrequency
* @param context The aggregation context
* @return The number of documents in the index (after an optional filter might have been applied)
*/
public long prepareBackground(AggregationContext context) {
if (termsEnum != null) {
// already prepared - return
return termsEnum.getNumDocs();
}
SearchContext searchContext = context.searchContext();
IndexReader reader = searchContext.searcher().getIndexReader();
try {
if (numberOfAggregatorsCreated == 1) {
// Setup a termsEnum for sole use by one aggregator
termsEnum = new FilterableTermsEnum(reader, indexedFieldName, DocsEnum.FLAG_NONE, filter);
} else {
// When we have > 1 agg we have possibility of duplicate term frequency lookups
// and so use a TermsEnum that caches results of all term lookups
termsEnum = new FreqTermsEnum(reader, indexedFieldName, true, false, filter, searchContext.bigArrays());
}
} catch (IOException e) {
throw new ElasticsearchException("failed to build terms enumeration", e);
}
return termsEnum.getNumDocs();
}
public long getBackgroundFrequency(BytesRef termBytes) {
assert termsEnum != null; // having failed to find a field in the index we don't expect any calls for frequencies
long result = 0;
@ -218,116 +214,4 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
}
return true;
}
// A specialist TermsEnum wrapper for use in the repeated look-ups of frequency stats.
// TODO factor out as a utility class to replace similar org.elasticsearch.search.suggest.phrase.WordScorer.FrequencyCachingTermsEnumWrapper
// This implementation is likely to produce less garbage than WordScorer's impl but will need benchmarking/testing for that use case.
static class FrequencyCachingTermsEnumWrapper extends FilterTermsEnum implements Releasable {
int currentTermDocFreq = 0;
long currentTermTotalFreq = 0;
private IntArray termDocFreqs;
private LongArray termTotalFreqs;
private BytesRefHash cachedTermOrds;
protected BigArrays bigArrays;
private boolean cacheDocFreqs;
private boolean cacheTotalFreqs;
private long currentTermOrd;
public FrequencyCachingTermsEnumWrapper(TermsEnum delegate, BigArrays bigArrays, boolean cacheDocFreqs, boolean cacheTotalFreqs) {
super(delegate);
this.bigArrays = bigArrays;
this.cacheDocFreqs = cacheDocFreqs;
this.cacheTotalFreqs = cacheTotalFreqs;
if (cacheDocFreqs) {
termDocFreqs = bigArrays.newIntArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
}
if (cacheTotalFreqs) {
termTotalFreqs = bigArrays.newLongArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
}
cachedTermOrds = new BytesRefHash(INITIAL_NUM_TERM_FREQS_CACHED, bigArrays);
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
currentTermDocFreq = 0;
currentTermTotalFreq = 0;
currentTermOrd = cachedTermOrds.add(text);
if (currentTermOrd < 0) { // already seen, initialize instance data with the cached frequencies
currentTermOrd = -1 - currentTermOrd;
if (cacheDocFreqs) {
currentTermDocFreq = termDocFreqs.get(currentTermOrd);
}
if (cacheTotalFreqs) {
currentTermTotalFreq = termTotalFreqs.get(currentTermOrd);
}
return true;
} else { // cache miss - pre-emptively read and cache the required frequency values
if (in.seekExact(text)) {
if (cacheDocFreqs) {
currentTermDocFreq = in.docFreq();
termDocFreqs = bigArrays.grow(termDocFreqs, currentTermOrd + 1);
termDocFreqs.set(currentTermOrd, currentTermDocFreq);
}
if (cacheTotalFreqs) {
currentTermTotalFreq = in.totalTermFreq();
termTotalFreqs = bigArrays.grow(termTotalFreqs, currentTermOrd + 1);
termTotalFreqs.set(currentTermOrd, currentTermTotalFreq);
}
return true;
}
}
return false;
}
@Override
public long totalTermFreq() throws IOException {
assert cacheTotalFreqs;
return currentTermTotalFreq;
}
@Override
public int docFreq() throws IOException {
assert cacheDocFreqs;
return currentTermDocFreq;
}
@Override
public void seekExact(long ord) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
public SeekStatus seekCeil(BytesRef text) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public BytesRef next() {
throw new UnsupportedOperationException();
}
@Override
public boolean release() throws ElasticsearchException {
try {
Releasables.release(cachedTermOrds, termDocFreqs, termTotalFreqs);
} finally {
cachedTermOrds = null;
termDocFreqs = null;
termTotalFreqs = null;
}
return true;
}
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.search.Filter;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.IndexFieldData;
@ -30,8 +31,8 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueParser;
@ -60,6 +61,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
String field = null;
Filter filter = null;
int requiredSize = DEFAULT_REQUIRED_SIZE;
int shardSize = DEFAULT_SHARD_SIZE;
String format = null;
@ -100,6 +102,18 @@ public class SignificantTermsParser implements Aggregator.Parser {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
// TODO not sure if code below is the best means to declare a filter for
// defining an alternative background stats context.
// In trial runs it becomes obvious that the choice of background does have to
// be a strict superset of the foreground subset otherwise the significant terms algo
// immediately singles out the odd terms that are in the foreground but not represented
// in the background. So a better approach may be to use a designated parent agg as the
// background because parent aggs are always guaranteed to be a superset whereas arbitrary
// filters defined by end users and parsed below are not.
// if ("background_context".equals(currentFieldName)) {
// filter = context.queryParserService().parseInnerFilter(parser).filter();
// } else
if ("include".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -168,7 +182,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
if (mapper == null) {
ValuesSourceConfig<?> config = new ValuesSourceConfig<>(ValuesSource.Bytes.class);
config.unmapped(true);
return new SignificantTermsAggregatorFactory(aggregationName, config, null, null, requiredSize, shardSize, minDocCount, includeExclude, executionHint);
return new SignificantTermsAggregatorFactory(aggregationName, config, null, null, requiredSize, shardSize, minDocCount, includeExclude, executionHint, filter);
}
IndexFieldData<?> indexFieldData = context.fieldData().getForField(mapper);
@ -205,7 +219,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
// We need values to be unique to be able to run terms aggs efficiently
config.ensureUnique(true);
return new SignificantTermsAggregatorFactory(aggregationName, config, valueFormatter, valueParser, requiredSize, shardSize, minDocCount, includeExclude, executionHint);
return new SignificantTermsAggregatorFactory(aggregationName, config, valueFormatter, valueParser, requiredSize, shardSize, minDocCount, includeExclude, executionHint, filter);
}
}

View File

@ -18,13 +18,14 @@
*/
package org.elasticsearch.search.suggest.phrase;
import com.carrotsearch.hppc.ObjectObjectMap;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import org.apache.lucene.index.*;
import org.apache.lucene.index.FilterAtomicReader.FilterTermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiFields;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.suggest.phrase.DirectCandidateGenerator.Candidate;
import org.elasticsearch.search.suggest.phrase.DirectCandidateGenerator.CandidateSet;
@ -39,7 +40,7 @@ public abstract class WordScorer {
protected final double realWordLikelyhood;
protected final BytesRef spare = new BytesRef();
protected final BytesRef separator;
protected final TermsEnum termsEnum;
private final TermsEnum termsEnum;
private final long numTerms;
private final boolean useTotalTermFreq;
@ -57,7 +58,7 @@ public abstract class WordScorer {
this.vocabluarySize = vocSize == -1 ? reader.maxDoc() : vocSize;
this.useTotalTermFreq = vocSize != -1;
this.numTerms = terms.size();
this.termsEnum = new FrequencyCachingTermsEnumWrapper(terms.iterator(null));
this.termsEnum = new FreqTermsEnum(reader, field, !useTotalTermFreq, useTotalTermFreq, null, BigArrays.NON_RECYCLING_INSTANCE); // non recycling for now
this.reader = reader;
this.realWordLikelyhood = realWordLikelyHood;
this.separator = separator;
@ -103,85 +104,4 @@ public abstract class WordScorer {
public WordScorer newScorer(IndexReader reader, Terms terms,
String field, double realWordLikelyhood, BytesRef separator) throws IOException;
}
/**
* Terms enum wrapper that caches term frequencies in an effort to outright skip seeks. Only works with seekExact(BytesRef), not next or
* not seekCeil. Because of this it really only makes sense in this context.
*/
private static class FrequencyCachingTermsEnumWrapper extends FilterTermsEnum {
private ObjectObjectMap<BytesRef, CacheEntry> cache = new ObjectObjectOpenHashMap<>();
/**
* The last term that the called attempted to seek to.
*/
private CacheEntry last;
public FrequencyCachingTermsEnumWrapper(TermsEnum in) {
super(in);
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
last = cache.get(text);
if (last != null) {
// This'll fail to work properly if the user seeks but doesn't check the frequency, causing us to cache it.
// That is OK because WordScorer only seeks to check the frequency.
return last.ttf != 0 || last.df != 0;
}
last = new CacheEntry();
cache.put(BytesRef.deepCopyOf(text), last);
if (in.seekExact(text)) {
// Found so mark the term uncached.
last.df = -1;
last.ttf = -1;
return true;
}
// Not found. The cache will default to 0 for the freqs, meaning not found.
return false;
}
@Override
public long totalTermFreq() throws IOException {
if (last.ttf == -1) {
last.ttf = in.totalTermFreq();
}
return last.ttf;
}
@Override
public int docFreq() throws IOException {
if (last.df == -1) {
last.df = in.docFreq();
}
return last.df;
}
@Override
public void seekExact(long ord) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
public SeekStatus seekCeil(BytesRef text) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public BytesRef next() {
throw new UnsupportedOperationException();
}
private static class CacheEntry {
private long ttf;
private int df;
}
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.index;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.*;
import org.apache.lucene.queries.TermsFilter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class FreqTermsEnumTests extends ElasticsearchLuceneTestCase {
private String[] terms;
private IndexWriter iw;
private IndexReader reader;
private Map<String, FreqHolder> referenceAll;
private Map<String, FreqHolder> referenceNotDeleted;
private Map<String, FreqHolder> referenceFilter;
private Filter filter;
static class FreqHolder {
int docFreq;
long totalTermFreq;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
referenceAll = Maps.newHashMap();
referenceNotDeleted = Maps.newHashMap();
referenceFilter = Maps.newHashMap();
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
conf.setMergeScheduler(NoMergeScheduler.INSTANCE); // we don't want to do any merges, so we won't expunge deletes
iw = new IndexWriter(dir, conf);
terms = new String[scaledRandomIntBetween(10, 300)];
for (int i = 0; i < terms.length; i++) {
terms[i] = randomAsciiOfLength(5);
}
int numberOfDocs = scaledRandomIntBetween(30, 300);
Document[] docs = new Document[numberOfDocs];
for (int i = 0; i < numberOfDocs; i++) {
Document doc = new Document();
doc.add(new StringField("id", Integer.toString(i), Field.Store.YES));
docs[i] = doc;
for (String term : terms) {
if (randomBoolean()) {
continue;
}
int freq = randomIntBetween(1, 3);
for (int j = 0; j < freq; j++) {
doc.add(new TextField("field", term, Field.Store.YES));
}
}
}
// add all docs
for (int i = 0; i < docs.length; i++) {
Document doc = docs[i];
iw.addDocument(doc);
if (rarely()) {
iw.commit();
}
}
Set<String> deletedIds = Sets.newHashSet();
for (int i = 0; i < docs.length; i++) {
Document doc = docs[i];
if (randomInt(5) == 2) {
Term idTerm = new Term("id", Integer.toString(i));
deletedIds.add(idTerm.text());
iw.deleteDocuments(idTerm);
}
}
// now go over each doc, build the relevant references and filter
reader = DirectoryReader.open(iw, true);
List<Term> filterTerms = Lists.newArrayList();
for (int docId = 0; docId < reader.maxDoc(); docId++) {
Document doc = reader.document(docId);
addFreqs(doc, referenceAll);
if (!deletedIds.contains(doc.getField("id").stringValue())) {
addFreqs(doc, referenceNotDeleted);
if (randomBoolean()) {
filterTerms.add(new Term("id", doc.getField("id").stringValue()));
addFreqs(doc, referenceFilter);
}
}
}
filter = new TermsFilter(filterTerms);
}
private void addFreqs(Document doc, Map<String, FreqHolder> reference) {
Set<String> addedDocFreq = Sets.newHashSet();
for (IndexableField field : doc.getFields("field")) {
String term = field.stringValue();
FreqHolder freqHolder = reference.get(term);
if (freqHolder == null) {
freqHolder = new FreqHolder();
reference.put(term, freqHolder);
}
if (!addedDocFreq.contains(term)) {
freqHolder.docFreq++;
addedDocFreq.add(term);
}
freqHolder.totalTermFreq++;
}
}
@After
@Override
public void tearDown() throws Exception {
IOUtils.close(reader, iw, iw.getDirectory());
super.tearDown();
}
@Test
public void testAllFreqs() throws Exception {
assertAgainstReference(true, true, null, referenceAll);
assertAgainstReference(true, false, null, referenceAll);
assertAgainstReference(false, true, null, referenceAll);
}
@Test
public void testNonDeletedFreqs() throws Exception {
assertAgainstReference(true, true, Queries.MATCH_ALL_FILTER, referenceNotDeleted);
assertAgainstReference(true, false, Queries.MATCH_ALL_FILTER, referenceNotDeleted);
assertAgainstReference(false, true, Queries.MATCH_ALL_FILTER, referenceNotDeleted);
}
@Test
public void testFilterFreqs() throws Exception {
assertAgainstReference(true, true, filter, referenceFilter);
assertAgainstReference(true, false, filter, referenceFilter);
assertAgainstReference(false, true, filter, referenceFilter);
}
private void assertAgainstReference(boolean docFreq, boolean totalTermFreq, Filter filter, Map<String, FreqHolder> reference) throws Exception {
FreqTermsEnum freqTermsEnum = new FreqTermsEnum(reader, "field", docFreq, totalTermFreq, filter, BigArrays.NON_RECYCLING_INSTANCE);
assertAgainstReference(freqTermsEnum, reference, docFreq, totalTermFreq);
}
private void assertAgainstReference(FreqTermsEnum termsEnum, Map<String, FreqHolder> reference, boolean docFreq, boolean totalTermFreq) throws Exception {
int cycles = randomIntBetween(1, 5);
for (int i = 0; i < cycles; i++) {
List<String> terms = Lists.newArrayList(Arrays.asList(this.terms));
//Collections.shuffle(terms, getRandom());
for (String term : terms) {
if (!termsEnum.seekExact(new BytesRef(term))) {
continue;
}
if (docFreq) {
assertThat("cycle " + i + ", term " + term + ", docFreq", termsEnum.docFreq(), equalTo(reference.get(term).docFreq));
}
if (totalTermFreq) {
assertThat("cycle " + i + ", term " + term + ", totalTermFreq", termsEnum.totalTermFreq(), equalTo(reference.get(term).totalTermFreq));
}
}
}
}
}

View File

@ -27,10 +27,13 @@ import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms.Bucket;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -136,6 +139,35 @@ public class SignificantTermsTests extends ElasticsearchIntegrationTest {
checkExpectedStringTermsFound(topTerms);
}
@Test
public void nestedAggs() throws Exception {
String[][] expectedKeywordsByCategory={
{ "paul", "weller", "jam", "style", "council" },
{ "paul", "smith" },
{ "craig", "kelly", "terje", "haakonsen", "burton" }};
SearchResponse response = client().prepareSearch("test")
.setSearchType(SearchType.QUERY_AND_FETCH)
.addAggregation(new TermsBuilder("myCategories").field("fact_category").minDocCount(2)
.subAggregation(
new SignificantTermsBuilder("mySignificantTerms").field("description")
.minDocCount(2)))
.execute()
.actionGet();
Terms topCategoryTerms = response.getAggregations().get("myCategories");
for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket topCategory : topCategoryTerms.getBuckets()) {
SignificantTerms topTerms = topCategory.getAggregations().get("mySignificantTerms");
HashSet<String> foundTopWords = new HashSet<String>();
for (Bucket topTerm : topTerms) {
foundTopWords.add(topTerm.getKey());
}
String[] expectedKeywords = expectedKeywordsByCategory[Integer.parseInt(topCategory.getKey()) - 1];
for (String expectedKeyword : expectedKeywords) {
assertTrue(expectedKeyword + " missing from category keywords", foundTopWords.contains(expectedKeyword));
}
}
}
@Test
public void partiallyUnmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped","test")