Significant_terms agg only creates term frequency cache when necessary and uses new TermsEnum wrapper to cache frequencies. Long and String-based aggs no longer need to pass an IndexReader as parameter when looking up frequencies of terms.

Closes #5459
This commit is contained in:
markharwood 2014-03-17 11:44:48 +00:00
parent d24600830b
commit 12d1bf8485
3 changed files with 162 additions and 30 deletions

View File

@ -79,7 +79,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
spare.term = bucketOrds.key(i);
spare.subsetDf = bucketDocCount(ord);
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(topReader, spare.term);
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.term);
spare.supersetSize = supersetSize;
assert spare.subsetDf <= spare.supersetDf;
// During shard-local down-selection we use subset/superset stats that are for this shard only

View File

@ -84,7 +84,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
bucketOrds.get(i, spare.termBytes);
spare.subsetDf = bucketDocCount(i);
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(topReader, spare.termBytes);
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = supersetSize;
assert spare.subsetDf <= spare.supersetDf;
// During shard-local down-selection we use subset/superset stats

View File

@ -18,8 +18,9 @@
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.*;
import org.apache.lucene.index.FilterAtomicReader.FilterTermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
@ -28,6 +29,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
@ -39,6 +41,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
@ -62,6 +65,8 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
private IntArray termDocFreqs;
private BytesRefHash cachedTermOrds;
private BigArrays bigArrays;
private TermsEnum termsEnum;
private int numberOfAggregatorsCreated = 0;
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, int requiredSize,
int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) {
@ -77,8 +82,6 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
mapper = SearchContext.current().smartNameFieldMapper(indexedFieldName);
}
bigArrays = SearchContext.current().bigArrays();
termDocFreqs = bigArrays.newIntArray(INITIAL_NUM_TERM_FREQS_CACHED, true);
cachedTermOrds = new BytesRefHash(INITIAL_NUM_TERM_FREQS_CACHED, bigArrays);
}
@Override
@ -97,6 +100,33 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
@Override
protected Aggregator create(ValuesSource valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
numberOfAggregatorsCreated++;
switch (numberOfAggregatorsCreated) {
case 1:
// Setup a termsEnum for use by first aggregator
try {
SearchContext searchContext = aggregationContext.searchContext();
ContextIndexSearcher searcher = searchContext.searcher();
Terms terms = MultiFields.getTerms(searcher.getIndexReader(), indexedFieldName);
//terms can be null if the choice of field is not found in this index
if (terms != null) {
termsEnum = terms.iterator(null);
}
} catch (IOException e) {
throw new ElasticsearchException("IOException loading background document frequency info", e);
}
break;
case 2:
// When we have > 1 agg we have possibility of duplicate term frequency lookups and so introduce a cache
// in the form of a wrapper around the plain termsEnum created for use with the first agg
if (termsEnum != null) {
SearchContext searchContext = aggregationContext.searchContext();
termsEnum = new FrequencyCachingTermsEnumWrapper(termsEnum, searchContext.bigArrays(), true, false);
}
break;
}
long estimatedBucketCount = valuesSource.metaData().maxAtomicUniqueValuesCount();
if (estimatedBucketCount < 0) {
// there isn't an estimation available.. 50 should be a good start
@ -155,44 +185,146 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
"]. It can only be applied to numeric or string fields.");
}
// Many child aggs may ask for the same docFreq information so here we cache docFreq values for these terms.
// TODO this should be re-factored into a more generic system for efficiently checking frequencies of things
// In future we may need to a) check the frequency in a set other than the index e.g. a subset and b) check
// the frequency of an entity other than an a single indexed term e.g. a numeric range.
// This is likely to require some careful design.
public long getBackgroundFrequency(IndexReader topReader, BytesRef termBytes) {
int result = 0;
long termOrd = cachedTermOrds.add(termBytes);
if (termOrd < 0) { // already seen, return the cached docFreq
termOrd = -1 - termOrd;
result = termDocFreqs.get(termOrd);
} else { // cache miss - read the terms' frequency in this shard and cache it
try {
result = topReader.docFreq(new Term(indexedFieldName, termBytes));
} catch (IOException e) {
throw new ElasticsearchException("IOException reading document frequency", e);
public long getBackgroundFrequency(BytesRef termBytes) {
assert termsEnum !=null; // having failed to find a field in the index we don't expect any calls for frequencies
long result = 0;
try {
if (termsEnum.seekExact(termBytes)) {
result = termsEnum.docFreq();
}
termDocFreqs = bigArrays.grow(termDocFreqs, termOrd + 1);
termDocFreqs.set(termOrd, result);
} catch (IOException e) {
throw new ElasticsearchException("IOException loading background document frequency info", e);
}
return result;
}
// Many child aggs may ask for the same docFreq information so cache docFreq values for these terms
public long getBackgroundFrequency(IndexReader topReader, long term) {
public long getBackgroundFrequency(long term) {
BytesRef indexedVal = mapper.indexedValueForSearch(term);
return getBackgroundFrequency(topReader, indexedVal);
return getBackgroundFrequency(indexedVal);
}
@Override
public boolean release() throws ElasticsearchException {
try {
Releasables.release(cachedTermOrds, termDocFreqs);
if (termsEnum instanceof Releasable) {
((Releasable) termsEnum).release();
}
} finally {
cachedTermOrds = null;
termDocFreqs = null;
termsEnum = null;
}
return true;
}
// A specialist TermsEnum wrapper for use in the repeated look-ups of frequency stats.
// TODO factor out as a utility class to replace similar org.elasticsearch.search.suggest.phrase.WordScorer.FrequencyCachingTermsEnumWrapper
// This implementation is likely to produce less garbage than WordScorer's impl but will need benchmarking/testing for that use case.
static class FrequencyCachingTermsEnumWrapper extends FilterTermsEnum implements Releasable {
int currentTermDocFreq = 0;
long currentTermTotalFreq = 0;
private IntArray termDocFreqs;
private LongArray termTotalFreqs;
private BytesRefHash cachedTermOrds;
protected BigArrays bigArrays;
private boolean cacheDocFreqs;
private boolean cacheTotalFreqs;
private long currentTermOrd;
public FrequencyCachingTermsEnumWrapper(TermsEnum delegate, BigArrays bigArrays, boolean cacheDocFreqs, boolean cacheTotalFreqs) {
super(delegate);
this.bigArrays = bigArrays;
this.cacheDocFreqs = cacheDocFreqs;
this.cacheTotalFreqs = cacheTotalFreqs;
if (cacheDocFreqs) {
termDocFreqs = bigArrays.newIntArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
}
if (cacheTotalFreqs) {
termTotalFreqs = bigArrays.newLongArray(INITIAL_NUM_TERM_FREQS_CACHED, false);
}
cachedTermOrds = new BytesRefHash(INITIAL_NUM_TERM_FREQS_CACHED, bigArrays);
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
currentTermDocFreq = 0;
currentTermTotalFreq = 0;
currentTermOrd = cachedTermOrds.add(text);
if (currentTermOrd < 0) { // already seen, initialize instance data with the cached frequencies
currentTermOrd = -1 - currentTermOrd;
if (cacheDocFreqs) {
currentTermDocFreq = termDocFreqs.get(currentTermOrd);
}
if (cacheTotalFreqs) {
currentTermTotalFreq = termTotalFreqs.get(currentTermOrd);
}
return true;
} else { // cache miss - pre-emptively read and cache the required frequency values
if (in.seekExact(text)) {
if (cacheDocFreqs) {
currentTermDocFreq = in.docFreq();
termDocFreqs = bigArrays.grow(termDocFreqs, currentTermOrd + 1);
termDocFreqs.set(currentTermOrd, currentTermDocFreq);
}
if (cacheTotalFreqs) {
currentTermTotalFreq = in.totalTermFreq();
termTotalFreqs = bigArrays.grow(termTotalFreqs, currentTermOrd + 1);
termTotalFreqs.set(currentTermOrd, currentTermTotalFreq);
}
return true;
}
}
return false;
}
@Override
public long totalTermFreq() throws IOException {
assert cacheTotalFreqs;
return currentTermTotalFreq;
}
@Override
public int docFreq() throws IOException {
assert cacheDocFreqs;
return currentTermDocFreq;
}
@Override
public void seekExact(long ord) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
throw new UnsupportedOperationException();
}
public SeekStatus seekCeil(BytesRef text) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public BytesRef next() {
throw new UnsupportedOperationException();
}
@Override
public boolean release() throws ElasticsearchException {
try {
Releasables.release(cachedTermOrds, termDocFreqs, termTotalFreqs);
} finally {
cachedTermOrds = null;
termDocFreqs = null;
termTotalFreqs = null;
}
return true;
}
}
}