Save memory when significant_text is not on top (#58145) (#58364)

This merges the aggregator for `significant_text` into
`significant_terms`, applying the optimization built in #55873 to save
memory when the aggregation is not on top. The `significant_text`
aggregation is pretty memory intensive all on its own and this doesn't
particularly help with that, but it'll help with the memory usage of any
sub-aggregations.
This commit is contained in:
Nik Everett 2020-06-23 09:19:05 -04:00 committed by GitHub
parent 9d03204308
commit 519f41950a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 354 additions and 423 deletions

View File

@ -25,20 +25,18 @@ import org.apache.lucene.util.RamUsageEstimator;
* A Trie structure for analysing byte streams for duplicate sequences. Bytes
* from a stream are added one at a time using the addByte method and the number
* of times it has been seen as part of a sequence is returned.
*
* <p>
* The minimum required length for a duplicate sequence detected is 6 bytes.
*
* <p>
* The design goals are to maximize speed of lookup while minimizing the space
* required to do so. This has led to a hybrid solution for representing the
* bytes that make up a sequence in the trie.
*
* <p>
* If we have 6 bytes in sequence e.g. abcdef then they are represented as
* object nodes in the tree as follows:
* <p>
* (a)-(b)-(c)-(def as an int)
* <p>
*
*
* {@link RootTreeNode} objects are used for the first two levels of the tree
* (representing bytes a and b in the example sequence). The combinations of
* objects at these 2 levels are few so internally these objects allocate an
@ -61,11 +59,9 @@ import org.apache.lucene.util.RamUsageEstimator;
* reached
* <li>halting any growth of the tree
* </ol>
*
* Tests on real-world-text show that the size of the tree is a multiple of the
* input text where that multiplier varies between 10 and 5 times as the content
* size increased from 10 to 100 megabytes of content.
*
*/
public class DuplicateByteSequenceSpotter {
public static final int TREE_DEPTH = 6;

View File

@ -47,22 +47,23 @@ import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.LongConsumer;
/**
* An aggregator of string values that hashes the strings on the fly rather
* than up front like the {@link GlobalOrdinalsStringTermsAggregator}.
*/
public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
private final CollectorSource collectorSource;
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource valuesSource;
private final BytesKeyedBucketOrds bucketOrds;
private final IncludeExclude.StringFilter includeExclude;
public MapStringTermsAggregator(
String name,
AggregatorFactories factories,
CollectorSource collectorSource,
Function<MapStringTermsAggregator, ResultStrategy<?, ?>> resultStrategy,
ValuesSource valuesSource,
BucketOrder order,
DocValueFormat format,
BucketCountThresholds bucketCountThresholds,
@ -75,56 +76,39 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
this.collectorSource = collectorSource;
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
this.includeExclude = includeExclude;
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}
@Override
public ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
if (collectorSource.needsScores()) {
return ScoreMode.COMPLETE;
}
return super.scoreMode();
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
final BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return resultStrategy.wrapCollector(
collectorSource.getLeafCollector(
includeExclude,
ctx,
sub,
this::addRequestCircuitBreakerBytes,
(s, doc, owningBucketOrd, bytes) -> {
long bucketOrdinal = bucketOrds.add(owningBucketOrd, bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
collectExistingBucket(s, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
collectBucket(s, doc, bucketOrdinal);
}
previous.copyBytes(bytes);
}
}
});
)
);
}
@Override
@ -146,7 +130,82 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
@Override
public void doClose() {
Releasables.close(bucketOrds, resultStrategy);
Releasables.close(collectorSource, resultStrategy, bucketOrds);
}
/**
* Abstaction on top of building collectors to fetch values.
*/
public interface CollectorSource extends Releasable {
boolean needsScores();
LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException;
}
@FunctionalInterface
public interface CollectConsumer {
void accept(LeafBucketCollector sub, int doc, long owningBucketOrd, BytesRef bytes) throws IOException;
}
/**
* Fetch values from a {@link ValuesSource}.
*/
public static class ValuesSourceCollectorSource implements CollectorSource {
private final ValuesSource valuesSource;
public ValuesSourceCollectorSource(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}
@Override
public boolean needsScores() {
return valuesSource.needsScores();
}
@Override
public LeafBucketCollector getLeafCollector(
IncludeExclude.StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
final BytesRefBuilder previous = new BytesRefBuilder();
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (false == values.advanceExact(doc)) {
return;
}
int valuesCount = values.docValueCount();
// SortedBinaryDocValues don't guarantee uniqueness so we
// need to take care of dups
previous.clear();
for (int i = 0; i < valuesCount; ++i) {
BytesRef bytes = values.nextValue();
if (includeExclude != null && false == includeExclude.accept(bytes)) {
continue;
}
if (i > 0 && previous.get().equals(bytes)) {
continue;
}
previous.copyBytes(bytes);
consumer.accept(sub, doc, owningBucketOrd, bytes);
}
}
};
}
@Override
public void close() {}
}
/**
@ -270,6 +329,12 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
* Builds results for the standard {@code terms} aggregation.
*/
class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket> {
private final ValuesSource valuesSource;
StandardTermsResults(ValuesSource valuesSource) {
this.valuesSource = valuesSource;
}
@Override
String describe() {
return "terms";

View File

@ -36,10 +36,11 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
@ -62,14 +63,17 @@ class SignificanceLookup {
}
private final QueryShardContext context;
private final ValuesSourceConfig config;
private final MappedFieldType fieldType;
private final DocValueFormat format;
private final Query backgroundFilter;
private final int supersetNumDocs;
private TermsEnum termsEnum;
SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException {
SignificanceLookup(QueryShardContext context, MappedFieldType fieldType, DocValueFormat format, QueryBuilder backgroundFilter)
throws IOException {
this.context = context;
this.config = config;
this.fieldType = fieldType;
this.format = format;
this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context);
/*
* We need to use a superset size that includes deleted docs or we
@ -129,7 +133,7 @@ class SignificanceLookup {
* Get the background frequency of a {@link BytesRef} term.
*/
private long getBackgroundFrequency(BytesRef term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}
/**
@ -174,7 +178,7 @@ class SignificanceLookup {
* Get the background frequency of a {@code long} term.
*/
private long getBackgroundFrequency(long term) throws IOException {
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
return getBackgroundFrequency(fieldType.termQuery(format.format(term).toString(), context));
}
private long getBackgroundFrequency(Query query) throws IOException {
@ -201,7 +205,7 @@ class SignificanceLookup {
return termsEnum;
}
IndexReader reader = context.getIndexReader();
termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter);
termsEnum = new FilterableTermsEnum(reader, fieldType.name(), PostingsEnum.NONE, backgroundFilter);
return termsEnum;
}

View File

@ -227,7 +227,13 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter);
SignificanceLookup lookup = new SignificanceLookup(
queryShardContext,
config.fieldContext().fieldType(),
config.format(),
backgroundFilter
);
return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(),
bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
significanceHeuristic, lookup, collectsFromSingleBucket, metadata);
@ -255,8 +261,8 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
return new MapStringTermsAggregator(
name,
factories,
new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
valuesSource,
null,
format,
bucketCountThresholds,

View File

@ -1,251 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
public class SignificantTextAggregator extends BucketsAggregator {
private final StringFilter includeExclude;
protected final BucketCountThresholds bucketCountThresholds;
protected long numCollectedDocs;
private final BytesRefHash bucketOrds;
private final SignificanceHeuristic significanceHeuristic;
private SignificantTextAggregatorFactory termsAggFactory;
private final DocValueFormat format = DocValueFormat.RAW;
private final String fieldName;
private final String[] sourceFieldNames;
private DuplicateByteSequenceSpotter dupSequenceSpotter = null ;
private long lastTrieSize;
private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
public SignificantTextAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent,
BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude,
SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory,
String fieldName, String [] sourceFieldNames, boolean filterDuplicateText,
Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, metadata);
this.bucketCountThresholds = bucketCountThresholds;
this.includeExclude = includeExclude;
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
this.fieldName = fieldName;
this.sourceFieldNames = sourceFieldNames;
bucketOrds = new BytesRefHash(1, context.bigArrays());
if(filterDuplicateText){
dupSequenceSpotter = new DuplicateByteSequenceSpotter();
lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
}
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final BytesRefBuilder previous = new BytesRefBuilder();
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
collectFromSource(doc, bucket, fieldName, sourceFieldNames);
numCollectedDocs++;
if (dupSequenceSpotter != null) {
dupSequenceSpotter.startNewSequence();
}
}
private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText)
throws IOException{
if (dupSequenceSpotter != null) {
ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
}
CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
ts.reset();
try {
while (ts.incrementToken()) {
if (dupSequenceSpotter != null) {
long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
long growth = newTrieSize - lastTrieSize;
// Only update the circuitbreaker after
if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
addRequestCircuitBreakerBytes(growth);
lastTrieSize = newTrieSize;
}
}
previous.clear();
previous.copyChars(termAtt);
BytesRef bytes = previous.get();
if (inDocTerms.add(bytes) >= 0) {
if (includeExclude == null || includeExclude.accept(bytes)) {
long bucketOrdinal = bucketOrds.add(bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
}
}
} finally{
ts.close();
}
}
private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
if(fieldType == null){
throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
+"] since it is not present");
}
SourceLookup sourceLookup = context.lookup().source();
sourceLookup.setSegmentAndDocument(ctx, doc);
BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays());
try {
for (String sourceField : sourceFieldNames) {
List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
textsToHighlight = textsToHighlight.stream().map(obj -> {
if (obj instanceof BytesRef) {
return fieldType.valueForDisplay(obj).toString();
} else {
return obj;
}
}).collect(Collectors.toList());
Analyzer analyzer = fieldType.indexAnalyzer();
for (Object fieldValue : textsToHighlight) {
String fieldText = fieldValue.toString();
TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
processTokenStream(doc, bucket, ts, inDocTerms, fieldText);
}
}
} finally{
Releasables.close(inDocTerms);
}
}
};
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantStringTerms.Bucket spare = null;
for (int i = 0; i < bucketOrds.size(); i++) {
final int docCount = bucketDocCount(i);
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
}
bucketOrds.get(i, spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = supersetSize;
// During shard-local down-selection we use subset/superset stats
// that are for this shard only
// Back at the central reducer these properties will be updated with
// global stats
spare.updateScore(significanceHeuristic);
spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes);
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
return new InternalAggregation[] { new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list))
};
}
@Override
public SignificantStringTerms buildEmptyAggregation() {
// We need to account for the significance of a miss in our global stats - provide corpus size as context
ContextIndexSearcher searcher = context.searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, 0, supersetSize, significanceHeuristic, emptyList());
}
@Override
public void doClose() {
Releasables.close(bucketOrds, termsAggFactory);
}
}

View File

@ -19,52 +19,56 @@
package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.StringFilter;
import org.elasticsearch.search.aggregations.bucket.terms.MapStringTermsAggregator.CollectConsumer;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.function.LongConsumer;
public class SignificantTextAggregatorFactory extends AggregatorFactory
implements Releasable {
public class SignificantTextAggregatorFactory extends AggregatorFactory {
private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
private final IncludeExclude includeExclude;
private String indexedFieldName;
private MappedFieldType fieldType;
private final String indexedFieldName;
private final MappedFieldType fieldType;
private final String[] sourceFieldNames;
private FilterableTermsEnum termsEnum;
private int numberOfAggregatorsCreated;
private final Query filter;
private final int supersetNumDocs;
private final QueryBuilder backgroundFilter;
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
private final SignificanceHeuristic significanceHeuristic;
private final DocValueFormat format = DocValueFormat.RAW;
private final boolean filterDuplicateText;
public SignificantTextAggregatorFactory(String name,
IncludeExclude includeExclude,
QueryBuilder filterBuilder,
QueryBuilder backgroundFilter,
TermsAggregator.BucketCountThresholds bucketCountThresholds,
SignificanceHeuristic significanceHeuristic,
QueryShardContext queryShardContext,
@ -84,97 +88,18 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory
"requires an analyzed field");
}
this.indexedFieldName = fieldType != null ? fieldType.name() : fieldName;
this.sourceFieldNames = sourceFieldNames == null
? new String[] { indexedFieldName }
: sourceFieldNames;
this.sourceFieldNames = sourceFieldNames == null ? new String[] { indexedFieldName } : sourceFieldNames;
this.includeExclude = includeExclude;
this.filter = filterBuilder == null
? null
: filterBuilder.toQuery(queryShardContext);
this.backgroundFilter = backgroundFilter;
this.filterDuplicateText = filterDuplicateText;
IndexSearcher searcher = queryShardContext.searcher();
// Important - need to use the doc count that includes deleted docs
// or we have this issue: https://github.com/elastic/elasticsearch/issues/7951
this.supersetNumDocs = filter == null
? searcher.getIndexReader().maxDoc()
: searcher.count(filter);
this.bucketCountThresholds = bucketCountThresholds;
this.significanceHeuristic = significanceHeuristic;
}
/**
* Get the number of docs in the superset.
*/
public long getSupersetNumDocs() {
return supersetNumDocs;
}
private FilterableTermsEnum getTermsEnum(String field) throws IOException {
if (termsEnum != null) {
return termsEnum;
}
IndexReader reader = queryShardContext.getIndexReader();
if (numberOfAggregatorsCreated > 1) {
termsEnum = new FreqTermsEnum(reader, field, true, false, filter, queryShardContext.bigArrays());
} else {
termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter);
}
return termsEnum;
}
private long getBackgroundFrequency(String value) throws IOException {
// fieldType can be null if the field is unmapped, but theoretically this method should only be called
// when constructing buckets. Assert to ensure this is the case
// TODO this is a bad setup and it should be refactored
assert fieldType != null;
Query query = fieldType.termQuery(value, queryShardContext);
if (query instanceof TermQuery) {
// for types that use the inverted index, we prefer using a caching terms
// enum that will do a better job at reusing index inputs
Term term = ((TermQuery) query).getTerm();
FilterableTermsEnum termsEnum = getTermsEnum(term.field());
if (termsEnum.seekExact(term.bytes())) {
return termsEnum.docFreq();
} else {
return 0;
}
}
// otherwise do it the naive way
if (filter != null) {
query = new BooleanQuery.Builder()
.add(query, Occur.FILTER)
.add(filter, Occur.FILTER)
.build();
}
return queryShardContext.searcher().count(query);
}
public long getBackgroundFrequency(BytesRef termBytes) throws IOException {
String value = format.format(termBytes).toString();
return getBackgroundFrequency(value);
}
@Override
public void close() {
try {
if (termsEnum instanceof Releasable) {
((Releasable) termsEnum).close();
}
} finally {
termsEnum = null;
}
}
@Override
protected Aggregator createInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket,
Map<String, Object> metadata) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
numberOfAggregatorsCreated++;
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
// The user has not made a shardSize selection.
@ -194,8 +119,166 @@ public class SignificantTextAggregatorFactory extends AggregatorFactory
IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null:
includeExclude.convertToStringFilter(DocValueFormat.RAW);
return new SignificantTextAggregator(name, factories, searchContext, parent, bucketCountThresholds,
incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metadata);
MapStringTermsAggregator.CollectorSource collectorSource = new SignificantTextCollectorSource(
queryShardContext.lookup().source(),
queryShardContext.bigArrays(),
fieldType,
sourceFieldNames,
filterDuplicateText
);
SignificanceLookup lookup = new SignificanceLookup(queryShardContext, fieldType, DocValueFormat.RAW, backgroundFilter);
return new MapStringTermsAggregator(
name,
factories,
collectorSource,
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
null,
DocValueFormat.RAW,
bucketCountThresholds,
incExcFilter,
searchContext,
parent,
SubAggCollectionMode.BREADTH_FIRST,
false,
collectsFromSingleBucket,
metadata
);
}
private static class SignificantTextCollectorSource implements MapStringTermsAggregator.CollectorSource {
private final SourceLookup sourceLookup;
private final BigArrays bigArrays;
private final MappedFieldType fieldType;
private final String[] sourceFieldNames;
private ObjectArray<DuplicateByteSequenceSpotter> dupSequenceSpotters;
SignificantTextCollectorSource(
SourceLookup sourceLookup,
BigArrays bigArrays,
MappedFieldType fieldType,
String[] sourceFieldNames,
boolean filterDuplicateText
) {
this.sourceLookup = sourceLookup;
this.bigArrays = bigArrays;
this.fieldType = fieldType;
this.sourceFieldNames = sourceFieldNames;
dupSequenceSpotters = filterDuplicateText ? bigArrays.newObjectArray(1) : null;
}
@Override
public boolean needsScores() {
return false;
}
@Override
public LeafBucketCollector getLeafCollector(
StringFilter includeExclude,
LeafReaderContext ctx,
LeafBucketCollector sub,
LongConsumer addRequestCircuitBreakerBytes,
CollectConsumer consumer
) throws IOException {
return new LeafBucketCollectorBase(sub, null) {
private final BytesRefBuilder scratch = new BytesRefBuilder();
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
if (dupSequenceSpotters == null) {
collectFromSource(doc, owningBucketOrd, null);
return;
}
dupSequenceSpotters = bigArrays.grow(dupSequenceSpotters, owningBucketOrd + 1);
DuplicateByteSequenceSpotter spotter = dupSequenceSpotters.get(owningBucketOrd);
if (spotter == null) {
spotter = new DuplicateByteSequenceSpotter();
dupSequenceSpotters.set(owningBucketOrd, spotter);
}
collectFromSource(doc, owningBucketOrd, spotter);
spotter.startNewSequence();
}
private void collectFromSource(int doc, long owningBucketOrd, DuplicateByteSequenceSpotter spotter) throws IOException {
sourceLookup.setSegmentAndDocument(ctx, doc);
BytesRefHash inDocTerms = new BytesRefHash(256, bigArrays);
try {
for (String sourceField : sourceFieldNames) {
Iterator<String> itr = sourceLookup.extractRawValues(sourceField).stream()
.map(obj -> {
if (obj == null) {
return null;
}
if (obj instanceof BytesRef) {
return fieldType.valueForDisplay(obj).toString();
}
return obj.toString();
})
.iterator();
Analyzer analyzer = fieldType.indexAnalyzer();
while (itr.hasNext()) {
TokenStream ts = analyzer.tokenStream(fieldType.name(), itr.next());
processTokenStream(doc, owningBucketOrd, ts, inDocTerms, spotter);
}
}
} finally {
Releasables.close(inDocTerms);
}
}
private void processTokenStream(
int doc,
long owningBucketOrd,
TokenStream ts,
BytesRefHash inDocTerms,
DuplicateByteSequenceSpotter spotter
) throws IOException {
long lastTrieSize = 0;
if (spotter != null) {
lastTrieSize = spotter.getEstimatedSizeInBytes();
ts = new DeDuplicatingTokenFilter(ts, spotter);
}
CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
ts.reset();
try {
while (ts.incrementToken()) {
if (spotter != null) {
long newTrieSize = spotter.getEstimatedSizeInBytes();
long growth = newTrieSize - lastTrieSize;
// Only update the circuitbreaker after
if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
addRequestCircuitBreakerBytes.accept(growth);
lastTrieSize = newTrieSize;
}
}
scratch.clear();
scratch.copyChars(termAtt);
BytesRef bytes = scratch.get();
if (includeExclude != null && includeExclude.accept(bytes)) {
continue;
}
if (inDocTerms.add(bytes) < 0) {
continue;
}
consumer.accept(sub, doc, owningBucketOrd, bytes);
}
} finally {
ts.close();
}
if (spotter != null) {
long growth = spotter.getEstimatedSizeInBytes() - lastTrieSize;
if (growth > 0) {
addRequestCircuitBreakerBytes.accept(growth);
}
}
}
};
}
@Override
public void close() {
Releasables.close(dupSequenceSpotters);
}
}
}

View File

@ -311,8 +311,8 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
return new MapStringTermsAggregator(
name,
factories,
a -> a.new StandardTermsResults(),
valuesSource,
new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSource),
a -> a.new StandardTermsResults(valuesSource),
order,
format,
bucketCountThresholds,

View File

@ -40,7 +40,6 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.BinaryFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
@ -56,7 +55,6 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
@ -70,15 +68,6 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.signific
import static org.hamcrest.Matchers.equalTo;
public class SignificantTermsAggregatorTests extends AggregatorTestCase {
private MappedFieldType fieldType;
@Before
public void setUpTest() throws Exception {
super.setUp();
fieldType = new KeywordFieldMapper.KeywordFieldType("field");
}
@Override
protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
return new SignificantTermsAggregationBuilder("foo").field(fieldName);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
@ -29,6 +30,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
@ -102,7 +104,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
indexWriterConfig.setMaxBufferedDocs(100);
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
indexDocuments(w, textFieldType);
indexDocuments(w);
SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true);
if(randomBoolean()){
@ -150,7 +152,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
indexWriterConfig.setMaxBufferedDocs(100);
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
indexDocuments(w, textFieldType);
indexDocuments(w);
SignificantTextAggregationBuilder agg = significantText("sig_text", "text")
.filterDuplicateText(true);
@ -193,14 +195,50 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
}
}
private void indexDocuments(IndexWriter writer, TextFieldType textFieldType) throws IOException {
public void testInsideTermsAgg() throws IOException {
TextFieldType textFieldType = new TextFieldType("text");
textFieldType.setIndexAnalyzer(new NamedAnalyzer("my_analyzer", AnalyzerScope.GLOBAL, new StandardAnalyzer()));
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
indexWriterConfig.setMaxBufferedDocs(100);
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
indexDocuments(w);
SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true);
TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field("kwd").subAggregation(sigAgg);
try (IndexReader reader = DirectoryReader.open(w)) {
assertEquals("test expects a single segment", 1, reader.leaves().size());
IndexSearcher searcher = new IndexSearcher(reader);
StringTerms terms = searchAndReduce(searcher, new MatchAllDocsQuery(), aggBuilder, textFieldType, keywordField("kwd"));
SignificantTerms sigOdd = terms.getBucketByKey("odd").getAggregations().get("sig_text");
assertNull(sigOdd.getBucketByKey("even"));
assertNull(sigOdd.getBucketByKey("duplicate"));
assertNull(sigOdd.getBucketByKey("common"));
assertNotNull(sigOdd.getBucketByKey("odd"));
SignificantStringTerms sigEven = terms.getBucketByKey("even").getAggregations().get("sig_text");
assertNull(sigEven.getBucketByKey("odd"));
assertNull(sigEven.getBucketByKey("duplicate"));
assertNull(sigEven.getBucketByKey("common"));
assertNull(sigEven.getBucketByKey("separator2"));
assertNull(sigEven.getBucketByKey("separator4"));
assertNull(sigEven.getBucketByKey("separator6"));
assertNotNull(sigEven.getBucketByKey("even"));
}
}
}
private void indexDocuments(IndexWriter writer) throws IOException {
for (int i = 0; i < 10; i++) {
Document doc = new Document();
StringBuilder text = new StringBuilder("common ");
if (i % 2 == 0) {
text.append("odd ");
} else {
text.append("even separator" + i + " duplicate duplicate duplicate duplicate duplicate duplicate ");
} else {
text.append("odd ");
}
doc.add(new Field("text", text.toString(), TextFieldMapper.Defaults.FIELD_TYPE));
@ -208,6 +246,7 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
" \"json_only_field\" : \"" + text.toString() + "\"" +
" }";
doc.add(new StoredField("_source", new BytesRef(json)));
doc.add(new SortedSetDocValuesField("kwd", i % 2 == 0 ? new BytesRef("even") : new BytesRef("odd")));
writer.addDocument(doc);
}
}