This moves the code to look up significance heuristics information like background frequency and superset size out of `SignificantTermsAggregatorFactory` and into its own home so that it is easier to pass around. This will: 1. Make us feel better about ourselves for not passing around the factory, which is really *supposed* to be a throw away thing. 2. Abstract the significance lookup logic so we can reuse it for the `significant_text` aggregation. 3. Make if very simple to cache the background frequencies which should speed up when the agg is a sub-agg. We had done this for numerics but not string-shaped significant terms.
This commit is contained in:
parent
922423c52c
commit
d6c8d9415d
|
@ -43,6 +43,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
@ -753,14 +754,19 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
|||
SignificantStringTerms.Bucket,
|
||||
SignificantStringTerms.Bucket> {
|
||||
|
||||
// TODO a reference to the factory is weird - probably should be reference to what we need from it.
|
||||
private final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
private final BackgroundFrequencyForBytes backgroundFrequencies;
|
||||
private final long supersetSize;
|
||||
private final SignificanceHeuristic significanceHeuristic;
|
||||
|
||||
private LongArray subsetSizes = context.bigArrays().newLongArray(1, true);
|
||||
|
||||
SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) {
|
||||
this.termsAggFactory = termsAggFactory;
|
||||
SignificantTermsResults(
|
||||
SignificanceLookup significanceLookup,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
boolean collectsFromSingleBucket
|
||||
) {
|
||||
backgroundFrequencies = significanceLookup.bytesLookup(context.bigArrays(), collectsFromSingleBucket);
|
||||
supersetSize = significanceLookup.supersetSize();
|
||||
this.significanceHeuristic = significanceHeuristic;
|
||||
}
|
||||
|
||||
|
@ -804,8 +810,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
|||
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
|
||||
spare.subsetDf = docCount;
|
||||
spare.subsetSize = subsetSize;
|
||||
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
|
||||
spare.supersetSize = termsAggFactory.getSupersetNumDocs();
|
||||
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
|
||||
spare.supersetSize = supersetSize;
|
||||
/*
|
||||
* During shard-local down-selection we use subset/superset stats
|
||||
* that are for this shard only. Back at the central reducer these
|
||||
|
@ -839,7 +845,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
|||
metadata(),
|
||||
format,
|
||||
subsetSizes.get(owningBucketOrd),
|
||||
termsAggFactory.getSupersetNumDocs(),
|
||||
supersetSize,
|
||||
significanceHeuristic,
|
||||
Arrays.asList(topBuckets)
|
||||
);
|
||||
|
@ -857,7 +863,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(termsAggFactory, subsetSizes);
|
||||
Releasables.close(backgroundFrequencies, subsetSizes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
|||
import org.elasticsearch.search.aggregations.InternalOrder;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
@ -367,14 +368,19 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
|
|||
* Builds results for the {@code significant_terms} aggregation.
|
||||
*/
|
||||
class SignificantTermsResults extends ResultStrategy<SignificantStringTerms, SignificantStringTerms.Bucket> {
|
||||
// TODO a reference to the factory is weird - probably should be reference to what we need from it.
|
||||
private final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
private final BackgroundFrequencyForBytes backgroundFrequencies;
|
||||
private final long supersetSize;
|
||||
private final SignificanceHeuristic significanceHeuristic;
|
||||
|
||||
private LongArray subsetSizes = context.bigArrays().newLongArray(1, true);
|
||||
|
||||
SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) {
|
||||
this.termsAggFactory = termsAggFactory;
|
||||
SignificantTermsResults(
|
||||
SignificanceLookup significanceLookup,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
boolean collectsFromSingleBucket
|
||||
) {
|
||||
backgroundFrequencies = significanceLookup.bytesLookup(context.bigArrays(), collectsFromSingleBucket);
|
||||
supersetSize = significanceLookup.supersetSize();
|
||||
this.significanceHeuristic = significanceHeuristic;
|
||||
}
|
||||
|
||||
|
@ -416,8 +422,8 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
|
|||
ordsEnum.readValue(spare.termBytes);
|
||||
spare.bucketOrd = ordsEnum.ord();
|
||||
spare.subsetDf = docCount;
|
||||
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
|
||||
spare.supersetSize = termsAggFactory.getSupersetNumDocs();
|
||||
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
|
||||
spare.supersetSize = supersetSize;
|
||||
/*
|
||||
* During shard-local down-selection we use subset/superset stats
|
||||
* that are for this shard only. Back at the central reducer these
|
||||
|
@ -460,7 +466,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
|
|||
metadata(),
|
||||
format,
|
||||
subsetSizes.get(owningBucketOrd),
|
||||
termsAggFactory.getSupersetNumDocs(),
|
||||
supersetSize,
|
||||
significanceHeuristic,
|
||||
Arrays.asList(topBuckets)
|
||||
);
|
||||
|
@ -473,7 +479,7 @@ public class MapStringTermsAggregator extends AbstractStringTermsAggregator {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(termsAggFactory, subsetSizes);
|
||||
Releasables.close(backgroundFrequencies, subsetSizes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,9 +27,7 @@ import org.apache.lucene.util.PriorityQueue;
|
|||
import org.elasticsearch.common.collect.List;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
import org.elasticsearch.index.fielddata.FieldData;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
|
@ -42,6 +40,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
|||
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForLong;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.internal.ContextIndexSearcher;
|
||||
|
@ -469,19 +468,18 @@ public class NumericTermsAggregator extends TermsAggregator {
|
|||
}
|
||||
|
||||
class SignificantLongTermsResults extends ResultStrategy<SignificantLongTerms, SignificantLongTerms.Bucket> {
|
||||
private final BackgroundFrequencies backgroundFrequencies;
|
||||
private final BackgroundFrequencyForLong backgroundFrequencies;
|
||||
private final long supersetSize;
|
||||
private final SignificanceHeuristic significanceHeuristic;
|
||||
private LongArray subsetSizes;
|
||||
|
||||
SignificantLongTermsResults(
|
||||
SignificantTermsAggregatorFactory termsAggFactory,
|
||||
SignificanceLookup significanceLookup,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
boolean collectsFromSingleBucket
|
||||
) {
|
||||
LookupBackgroundFrequencies lookup = new LookupBackgroundFrequencies(termsAggFactory);
|
||||
backgroundFrequencies = collectsFromSingleBucket ? lookup : new CacheBackgroundFrequencies(lookup, context.bigArrays());
|
||||
supersetSize = termsAggFactory.getSupersetNumDocs();
|
||||
backgroundFrequencies = significanceLookup.longLookup(context.bigArrays(), collectsFromSingleBucket);
|
||||
supersetSize = significanceLookup.supersetSize();
|
||||
this.significanceHeuristic = significanceHeuristic;
|
||||
subsetSizes = context.bigArrays().newLongArray(1, true);
|
||||
}
|
||||
|
@ -588,66 +586,5 @@ public class NumericTermsAggregator extends TermsAggregator {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup frequencies for terms.
|
||||
*/
|
||||
private interface BackgroundFrequencies extends Releasable {
|
||||
long freq(long term) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup frequencies for terms.
|
||||
*/
|
||||
private static class LookupBackgroundFrequencies implements BackgroundFrequencies {
|
||||
// TODO a reference to the factory is weird - probably should be reference to what we need from it.
|
||||
private final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
|
||||
LookupBackgroundFrequencies(SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
this.termsAggFactory = termsAggFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long freq(long term) throws IOException {
|
||||
return termsAggFactory.getBackgroundFrequency(term);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
termsAggFactory.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup and cache background frequencies for terms.
|
||||
*/
|
||||
private static class CacheBackgroundFrequencies implements BackgroundFrequencies {
|
||||
private final LookupBackgroundFrequencies lookup;
|
||||
private final BigArrays bigArrays;
|
||||
private final LongHash termToPosition;
|
||||
private LongArray positionToFreq;
|
||||
|
||||
CacheBackgroundFrequencies(LookupBackgroundFrequencies lookup, BigArrays bigArrays) {
|
||||
this.lookup = lookup;
|
||||
this.bigArrays = bigArrays;
|
||||
termToPosition = new LongHash(1, bigArrays);
|
||||
positionToFreq = bigArrays.newLongArray(1, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long freq(long term) throws IOException {
|
||||
long position = termToPosition.add(term);
|
||||
if (position < 0) {
|
||||
return positionToFreq.get(-1 - position);
|
||||
}
|
||||
long freq = lookup.freq(term);
|
||||
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
|
||||
positionToFreq.set(position, freq);
|
||||
return freq;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(lookup, termToPosition, positionToFreq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket.terms;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.PostingsEnum;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BytesRefHash;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Looks up values used for {@link SignificanceHeuristic}s.
|
||||
*/
|
||||
class SignificanceLookup {
|
||||
/**
|
||||
* Lookup frequencies for {@link BytesRef} terms.
|
||||
*/
|
||||
interface BackgroundFrequencyForBytes extends Releasable {
|
||||
long freq(BytesRef term) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup frequencies for {@code long} terms.
|
||||
*/
|
||||
interface BackgroundFrequencyForLong extends Releasable {
|
||||
long freq(long term) throws IOException;
|
||||
}
|
||||
|
||||
private final QueryShardContext context;
|
||||
private final ValuesSourceConfig config;
|
||||
private final Query backgroundFilter;
|
||||
private final int supersetNumDocs;
|
||||
private TermsEnum termsEnum;
|
||||
|
||||
SignificanceLookup(QueryShardContext context, ValuesSourceConfig config, QueryBuilder backgroundFilter) throws IOException {
|
||||
this.context = context;
|
||||
this.config = config;
|
||||
this.backgroundFilter = backgroundFilter == null ? null : backgroundFilter.toQuery(context);
|
||||
/*
|
||||
* We need to use a superset size that includes deleted docs or we
|
||||
* could end up blowing up with bad statistics that cause us to blow
|
||||
* up later on.
|
||||
*/
|
||||
IndexSearcher searcher = context.searcher();
|
||||
supersetNumDocs = backgroundFilter == null ? searcher.getIndexReader().maxDoc() : searcher.count(this.backgroundFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of docs in the superset.
|
||||
*/
|
||||
long supersetSize() {
|
||||
return supersetNumDocs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the background frequency of a {@link BytesRef} term.
|
||||
*/
|
||||
BackgroundFrequencyForBytes bytesLookup(BigArrays bigArrays, boolean collectsFromSingleBucket) {
|
||||
if (collectsFromSingleBucket) {
|
||||
return new BackgroundFrequencyForBytes() {
|
||||
@Override
|
||||
public long freq(BytesRef term) throws IOException {
|
||||
return getBackgroundFrequency(term);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
};
|
||||
}
|
||||
return new BackgroundFrequencyForBytes() {
|
||||
private final BytesRefHash termToPosition = new BytesRefHash(1, bigArrays);
|
||||
private LongArray positionToFreq = bigArrays.newLongArray(1, false);
|
||||
|
||||
@Override
|
||||
public long freq(BytesRef term) throws IOException {
|
||||
long position = termToPosition.add(term);
|
||||
if (position < 0) {
|
||||
return positionToFreq.get(-1 - position);
|
||||
}
|
||||
long freq = getBackgroundFrequency(term);
|
||||
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
|
||||
positionToFreq.set(position, freq);
|
||||
return freq;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(termToPosition, positionToFreq);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the background frequency of a {@link BytesRef} term.
|
||||
*/
|
||||
private long getBackgroundFrequency(BytesRef term) throws IOException {
|
||||
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the background frequency of a {@code long} term.
|
||||
*/
|
||||
BackgroundFrequencyForLong longLookup(BigArrays bigArrays, boolean collectsFromSingleBucket) {
|
||||
if (collectsFromSingleBucket) {
|
||||
return new BackgroundFrequencyForLong() {
|
||||
@Override
|
||||
public long freq(long term) throws IOException {
|
||||
return getBackgroundFrequency(term);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
};
|
||||
}
|
||||
return new BackgroundFrequencyForLong() {
|
||||
private final LongHash termToPosition = new LongHash(1, bigArrays);
|
||||
private LongArray positionToFreq = bigArrays.newLongArray(1, false);
|
||||
|
||||
@Override
|
||||
public long freq(long term) throws IOException {
|
||||
long position = termToPosition.add(term);
|
||||
if (position < 0) {
|
||||
return positionToFreq.get(-1 - position);
|
||||
}
|
||||
long freq = getBackgroundFrequency(term);
|
||||
positionToFreq = bigArrays.grow(positionToFreq, position + 1);
|
||||
positionToFreq.set(position, freq);
|
||||
return freq;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(termToPosition, positionToFreq);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the background frequency of a {@code long} term.
|
||||
*/
|
||||
private long getBackgroundFrequency(long term) throws IOException {
|
||||
return getBackgroundFrequency(config.fieldContext().fieldType().termQuery(config.format().format(term).toString(), context));
|
||||
}
|
||||
|
||||
private long getBackgroundFrequency(Query query) throws IOException {
|
||||
if (query instanceof TermQuery) {
|
||||
// for types that use the inverted index, we prefer using a terms
|
||||
// enum that will do a better job at reusing index inputs
|
||||
Term term = ((TermQuery) query).getTerm();
|
||||
TermsEnum termsEnum = getTermsEnum(term.field());
|
||||
if (termsEnum.seekExact(term.bytes())) {
|
||||
return termsEnum.docFreq();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
// otherwise do it the naive way
|
||||
if (backgroundFilter != null) {
|
||||
query = new BooleanQuery.Builder().add(query, Occur.FILTER).add(backgroundFilter, Occur.FILTER).build();
|
||||
}
|
||||
return context.searcher().count(query);
|
||||
}
|
||||
|
||||
private TermsEnum getTermsEnum(String field) throws IOException {
|
||||
// TODO this method helps because of asMultiBucketAggregator. Once we remove it we can move this logic into the aggregators.
|
||||
if (termsEnum != null) {
|
||||
return termsEnum;
|
||||
}
|
||||
IndexReader reader = context.getIndexReader();
|
||||
termsEnum = new FilterableTermsEnum(reader, config.fieldContext().field(), PostingsEnum.NONE, backgroundFilter);
|
||||
return termsEnum;
|
||||
}
|
||||
|
||||
}
|
|
@ -20,21 +20,8 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.terms;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.PostingsEnum;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
|
||||
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.search.DocValueFormat;
|
||||
|
@ -60,21 +47,10 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory implements Releasable {
|
||||
public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFactory {
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(
|
||||
LogManager.getLogger(SignificantTermsAggregatorFactory.class));
|
||||
|
||||
private final IncludeExclude includeExclude;
|
||||
private final String executionHint;
|
||||
private String indexedFieldName;
|
||||
private MappedFieldType fieldType;
|
||||
private FilterableTermsEnum termsEnum;
|
||||
private int numberOfAggregatorsCreated;
|
||||
final Query filter;
|
||||
private final int supersetNumDocs;
|
||||
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
|
||||
private final SignificanceHeuristic significanceHeuristic;
|
||||
|
||||
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
|
||||
builder.register(SignificantTermsAggregationBuilder.NAME,
|
||||
Arrays.asList(CoreValuesSourceType.BYTES, CoreValuesSourceType.IP),
|
||||
|
@ -102,7 +78,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
SearchContext context,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory sigTermsFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
|
||||
|
@ -124,7 +100,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
}
|
||||
|
||||
return execution.create(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent,
|
||||
significanceHeuristic, sigTermsFactory, collectsFromSingleBucket, metadata);
|
||||
significanceHeuristic, lookup, collectsFromSingleBucket, metadata);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -146,7 +122,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
SearchContext context,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory sigTermsFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
|
||||
|
@ -167,18 +143,24 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
}
|
||||
|
||||
return new NumericTermsAggregator(name, factories,
|
||||
agg -> agg.new SignificantLongTermsResults(sigTermsFactory, significanceHeuristic, collectsFromSingleBucket),
|
||||
agg -> agg.new SignificantLongTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
|
||||
numericValuesSource, format, null, bucketCountThresholds, context, parent, SubAggCollectionMode.BREADTH_FIRST,
|
||||
longFilter, collectsFromSingleBucket, metadata);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private final IncludeExclude includeExclude;
|
||||
private final String executionHint;
|
||||
private final QueryBuilder backgroundFilter;
|
||||
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
|
||||
private final SignificanceHeuristic significanceHeuristic;
|
||||
|
||||
SignificantTermsAggregatorFactory(String name,
|
||||
ValuesSourceConfig config,
|
||||
IncludeExclude includeExclude,
|
||||
String executionHint,
|
||||
QueryBuilder filterBuilder,
|
||||
QueryBuilder backgroundFilter,
|
||||
TermsAggregator.BucketCountThresholds bucketCountThresholds,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
QueryShardContext queryShardContext,
|
||||
|
@ -194,90 +176,13 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
}
|
||||
}
|
||||
|
||||
if (config.hasValues()) {
|
||||
if (config.fieldContext().fieldType().isSearchable() == false) {
|
||||
throw new IllegalArgumentException("SignificantText aggregation requires fields to be searchable, but ["
|
||||
+ config.fieldContext().fieldType().name() + "] is not");
|
||||
}
|
||||
|
||||
this.fieldType = config.fieldContext().fieldType();
|
||||
this.indexedFieldName = fieldType.name();
|
||||
}
|
||||
|
||||
this.includeExclude = includeExclude;
|
||||
this.executionHint = executionHint;
|
||||
this.filter = filterBuilder == null
|
||||
? null
|
||||
: filterBuilder.toQuery(queryShardContext);
|
||||
IndexSearcher searcher = queryShardContext.searcher();
|
||||
/*
|
||||
* We need to use a superset size that includes deleted docs or we
|
||||
* could end up blowing up with bad statistics that cause us to blow
|
||||
* up later on.
|
||||
*/
|
||||
this.supersetNumDocs = filter == null ? searcher.getIndexReader().maxDoc() : searcher.count(filter);
|
||||
this.backgroundFilter = backgroundFilter;
|
||||
this.bucketCountThresholds = bucketCountThresholds;
|
||||
this.significanceHeuristic = significanceHeuristic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of docs in the superset.
|
||||
*/
|
||||
long getSupersetNumDocs() {
|
||||
return supersetNumDocs;
|
||||
}
|
||||
|
||||
private FilterableTermsEnum getTermsEnum(String field) throws IOException {
|
||||
// TODO this method helps because of asMultiBucketAggregator. Once we remove it we can move this logic into the aggregators.
|
||||
if (termsEnum != null) {
|
||||
return termsEnum;
|
||||
}
|
||||
IndexReader reader = queryShardContext.getIndexReader();
|
||||
if (numberOfAggregatorsCreated > 1) {
|
||||
termsEnum = new FreqTermsEnum(reader, field, true, false, filter, queryShardContext.bigArrays());
|
||||
} else {
|
||||
termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter);
|
||||
}
|
||||
return termsEnum;
|
||||
}
|
||||
|
||||
private long getBackgroundFrequency(String value) throws IOException {
|
||||
// fieldType can be null if the field is unmapped, but theoretically this method should only be called
|
||||
// when constructing buckets. Assert to ensure this is the case
|
||||
// TODO this is a bad setup and it should be refactored
|
||||
assert fieldType != null;
|
||||
Query query = fieldType.termQuery(value, queryShardContext);
|
||||
if (query instanceof TermQuery) {
|
||||
// for types that use the inverted index, we prefer using a caching terms
|
||||
// enum that will do a better job at reusing index inputs
|
||||
Term term = ((TermQuery) query).getTerm();
|
||||
FilterableTermsEnum termsEnum = getTermsEnum(term.field());
|
||||
if (termsEnum.seekExact(term.bytes())) {
|
||||
return termsEnum.docFreq();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// otherwise do it the naive way
|
||||
if (filter != null) {
|
||||
query = new BooleanQuery.Builder()
|
||||
.add(query, Occur.FILTER)
|
||||
.add(filter, Occur.FILTER)
|
||||
.build();
|
||||
}
|
||||
return queryShardContext.searcher().count(query);
|
||||
}
|
||||
|
||||
long getBackgroundFrequency(BytesRef termBytes) throws IOException {
|
||||
String value = config.format().format(termBytes).toString();
|
||||
return getBackgroundFrequency(value);
|
||||
}
|
||||
|
||||
long getBackgroundFrequency(long termNum) throws IOException {
|
||||
String value = config.format().format(termNum).toString();
|
||||
return getBackgroundFrequency(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator createUnmapped(SearchContext searchContext,
|
||||
Aggregator parent,
|
||||
|
@ -305,7 +210,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
}
|
||||
SignificantTermsAggregatorSupplier sigTermsAggregatorSupplier = (SignificantTermsAggregatorSupplier) aggregatorSupplier;
|
||||
|
||||
numberOfAggregatorsCreated++;
|
||||
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
|
||||
if (bucketCountThresholds.getShardSize() == SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
|
||||
// The user has not made a shardSize selection .
|
||||
|
@ -323,10 +227,10 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
|
||||
}
|
||||
|
||||
// TODO we should refactor so that we don't need to use this Factory as a singleton (e.g. stop passing `this` to the aggregators)
|
||||
SignificanceLookup lookup = new SignificanceLookup(queryShardContext, config, backgroundFilter);
|
||||
return sigTermsAggregatorSupplier.build(name, factories, config.getValuesSource(), config.format(),
|
||||
bucketCountThresholds, includeExclude, executionHint, searchContext, parent,
|
||||
significanceHeuristic, this, collectsFromSingleBucket, metadata);
|
||||
significanceHeuristic, lookup, collectsFromSingleBucket, metadata);
|
||||
}
|
||||
|
||||
public enum ExecutionMode {
|
||||
|
@ -343,7 +247,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory termsAggregatorFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
|
||||
|
@ -351,7 +255,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
return new MapStringTermsAggregator(
|
||||
name,
|
||||
factories,
|
||||
a -> a.new SignificantTermsResults(termsAggregatorFactory, significanceHeuristic),
|
||||
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
|
||||
valuesSource,
|
||||
null,
|
||||
format,
|
||||
|
@ -380,7 +284,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory termsAggregatorFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException {
|
||||
|
||||
|
@ -401,7 +305,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
return new GlobalOrdinalsStringTermsAggregator(
|
||||
name,
|
||||
factories,
|
||||
a -> a.new SignificantTermsResults(termsAggregatorFactory, significanceHeuristic),
|
||||
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, collectsFromSingleBucket),
|
||||
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource,
|
||||
null,
|
||||
format,
|
||||
|
@ -446,7 +350,7 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
SearchContext aggregationContext,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory termsAggregatorFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException;
|
||||
|
||||
|
@ -455,15 +359,4 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
return parseField.getPreferredName();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
if (termsEnum instanceof Releasable) {
|
||||
((Releasable) termsEnum).close();
|
||||
}
|
||||
} finally {
|
||||
termsEnum = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ interface SignificantTermsAggregatorSupplier extends AggregatorSupplier {
|
|||
SearchContext context,
|
||||
Aggregator parent,
|
||||
SignificanceHeuristic significanceHeuristic,
|
||||
SignificantTermsAggregatorFactory sigTermsFactory,
|
||||
SignificanceLookup lookup,
|
||||
boolean collectsFromSingleBucket,
|
||||
Map<String, Object> metadata) throws IOException;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue