Create weights lazily in filter and filters aggregation (#26983)
Previous to this change the weights for the filter and filters aggregation were created in the `Filter(s)AggregatorFactory` which meant that they were created regardless of whether the aggregator actually collects any documents. This meant that for filters that are expensive to initialise, requests would not be quick when the query of the request was (or effectively was) a `match_none` query. This change maintains a single Weight instance for each filter across parent buckets but passes a weight supplier to the aggregator instances which will create the weight on first call and then return that instance for subsequent calls.
This commit is contained in:
parent
ab94150a23
commit
e1679bfe5e
|
@ -35,16 +35,17 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Aggregate all docs that match a filter.
|
||||
*/
|
||||
public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator {
|
||||
|
||||
private final Weight filter;
|
||||
private final Supplier<Weight> filter;
|
||||
|
||||
public FilterAggregator(String name,
|
||||
Weight filter,
|
||||
Supplier<Weight> filter,
|
||||
AggregatorFactories factories,
|
||||
SearchContext context,
|
||||
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
|
||||
|
@ -57,7 +58,7 @@ public class FilterAggregator extends BucketsAggregator implements SingleBucketA
|
|||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
||||
final LeafBucketCollector sub) throws IOException {
|
||||
// no need to provide deleted docs to the filter
|
||||
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.scorerSupplier(ctx));
|
||||
final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.get().scorerSupplier(ctx));
|
||||
return new LeafBucketCollectorBase(sub, null) {
|
||||
@Override
|
||||
public void collect(int doc, long bucket) throws IOException {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.search.IndexSearcher;
|
|||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationInitializationException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
|
@ -35,20 +36,40 @@ import java.util.Map;
|
|||
|
||||
public class FilterAggregatorFactory extends AggregatorFactory<FilterAggregatorFactory> {
|
||||
|
||||
final Weight weight;
|
||||
private Weight weight;
|
||||
private Query filter;
|
||||
|
||||
public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, SearchContext context,
|
||||
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
|
||||
super(name, context, parent, subFactoriesBuilder, metaData);
|
||||
filter = filterBuilder.toFilter(context.getQueryShardContext());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link Weight} for this filter aggregation, creating it if
|
||||
* necessary. This is done lazily so that the {@link Weight} is only created
|
||||
* if the aggregation collects documents reducing the overhead of the
|
||||
* aggregation in teh case where no documents are collected.
|
||||
*
|
||||
* Note that as aggregations are initialsed and executed in a serial manner,
|
||||
* no concurrency considerations are necessary here.
|
||||
*/
|
||||
public Weight getWeight() {
|
||||
if (weight == null) {
|
||||
IndexSearcher contextSearcher = context.searcher();
|
||||
Query filter = filterBuilder.toFilter(context.getQueryShardContext());
|
||||
try {
|
||||
weight = contextSearcher.createNormalizedWeight(filter, false);
|
||||
} catch (IOException e) {
|
||||
throw new AggregationInitializationException("Failed to initialse filter", e);
|
||||
}
|
||||
}
|
||||
return weight;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
return new FilterAggregator(name, weight, factories, context, parent, pipelineAggregators, metaData);
|
||||
return new FilterAggregator(name, () -> this.getWeight(), factories, context, parent, pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class FiltersAggregator extends BucketsAggregator {
|
||||
|
||||
|
@ -115,13 +116,13 @@ public class FiltersAggregator extends BucketsAggregator {
|
|||
}
|
||||
|
||||
private final String[] keys;
|
||||
private Weight[] filters;
|
||||
private Supplier<Weight[]> filters;
|
||||
private final boolean keyed;
|
||||
private final boolean showOtherBucket;
|
||||
private final String otherBucketKey;
|
||||
private final int totalNumKeys;
|
||||
|
||||
public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Weight[] filters, boolean keyed,
|
||||
public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Supplier<Weight[]> filters, boolean keyed,
|
||||
String otherBucketKey, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
super(name, factories, context, parent, pipelineAggregators, metaData);
|
||||
|
@ -141,6 +142,7 @@ public class FiltersAggregator extends BucketsAggregator {
|
|||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
||||
final LeafBucketCollector sub) throws IOException {
|
||||
// no need to provide deleted docs to the filter
|
||||
Weight[] filters = this.filters.get();
|
||||
final Bits[] bits = new Bits[filters.length];
|
||||
for (int i = 0; i < filters.length; ++i) {
|
||||
bits[i] = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filters[i].scorerSupplier(ctx));
|
||||
|
@ -164,7 +166,7 @@ public class FiltersAggregator extends BucketsAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
|
||||
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
|
||||
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
long bucketOrd = bucketOrd(owningBucketOrdinal, i);
|
||||
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], bucketDocCount(bucketOrd),
|
||||
|
@ -184,7 +186,7 @@ public class FiltersAggregator extends BucketsAggregator {
|
|||
@Override
|
||||
public InternalAggregation buildEmptyAggregation() {
|
||||
InternalAggregations subAggs = buildEmptySubAggregations();
|
||||
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(filters.length);
|
||||
List<InternalFilters.InternalBucket> buckets = new ArrayList<>(keys.length);
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], 0, subAggs, keyed);
|
||||
buckets.add(bucket);
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.bucket.filter;
|
|||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.elasticsearch.search.aggregations.AggregationInitializationException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
|
@ -36,7 +37,8 @@ import java.util.Map;
|
|||
public class FiltersAggregatorFactory extends AggregatorFactory<FiltersAggregatorFactory> {
|
||||
|
||||
private final String[] keys;
|
||||
final Weight[] weights;
|
||||
private final Query[] filters;
|
||||
private Weight[] weights;
|
||||
private final boolean keyed;
|
||||
private final boolean otherBucket;
|
||||
private final String otherBucketKey;
|
||||
|
@ -48,21 +50,43 @@ public class FiltersAggregatorFactory extends AggregatorFactory<FiltersAggregato
|
|||
this.keyed = keyed;
|
||||
this.otherBucket = otherBucket;
|
||||
this.otherBucketKey = otherBucketKey;
|
||||
IndexSearcher contextSearcher = context.searcher();
|
||||
weights = new Weight[filters.size()];
|
||||
keys = new String[filters.size()];
|
||||
this.filters = new Query[filters.size()];
|
||||
for (int i = 0; i < filters.size(); ++i) {
|
||||
KeyedFilter keyedFilter = filters.get(i);
|
||||
this.keys[i] = keyedFilter.key();
|
||||
Query filter = keyedFilter.filter().toFilter(context.getQueryShardContext());
|
||||
this.weights[i] = contextSearcher.createNormalizedWeight(filter, false);
|
||||
this.filters[i] = keyedFilter.filter().toFilter(context.getQueryShardContext());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link Weight}s for this filter aggregation, creating it if
|
||||
* necessary. This is done lazily so that the {@link Weight}s are only
|
||||
* created if the aggregation collects documents reducing the overhead of
|
||||
* the aggregation in the case where no documents are collected.
|
||||
*
|
||||
* Note that as aggregations are initialsed and executed in a serial manner,
|
||||
* no concurrency considerations are necessary here.
|
||||
*/
|
||||
public Weight[] getWeights() {
|
||||
if (weights == null) {
|
||||
try {
|
||||
IndexSearcher contextSearcher = context.searcher();
|
||||
weights = new Weight[filters.length];
|
||||
for (int i = 0; i < filters.length; ++i) {
|
||||
this.weights[i] = contextSearcher.createNormalizedWeight(filters[i], false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AggregationInitializationException("Failed to initialse filters for aggregation [" + name() + "]", e);
|
||||
}
|
||||
}
|
||||
return weights;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
return new FiltersAggregator(name, factories, keys, weights, keyed, otherBucket ? otherBucketKey : null, context, parent,
|
||||
return new FiltersAggregator(name, factories, keys, () -> getWeights(), keyed, otherBucket ? otherBucketKey : null, context, parent,
|
||||
pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.nested;
|
||||
|
||||
import com.carrotsearch.hppc.LongArrayList;
|
||||
|
||||
import org.apache.lucene.index.IndexReaderContext;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.ReaderUtil;
|
||||
|
|
|
@ -36,9 +36,6 @@ import org.elasticsearch.index.query.QueryBuilder;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -121,7 +118,7 @@ public class FilterAggregatorTests extends AggregatorTestCase {
|
|||
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
|
||||
assertThat(factory, Matchers.instanceOf(FilterAggregatorFactory.class));
|
||||
FilterAggregatorFactory filterFactory = (FilterAggregatorFactory) factory;
|
||||
Query parsedQuery = filterFactory.weight.getQuery();
|
||||
Query parsedQuery = filterFactory.getWeight().getQuery();
|
||||
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
|
||||
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
|
||||
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
|
||||
|
|
|
@ -214,7 +214,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
|
|||
AggregatorFactory<?> factory = createAggregatorFactory(builder, indexSearcher, fieldType);
|
||||
assertThat(factory, Matchers.instanceOf(FiltersAggregatorFactory.class));
|
||||
FiltersAggregatorFactory filtersFactory = (FiltersAggregatorFactory) factory;
|
||||
Query parsedQuery = filtersFactory.weights[0].getQuery();
|
||||
Query parsedQuery = filtersFactory.getWeights()[0].getQuery();
|
||||
assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class));
|
||||
assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size());
|
||||
// means the bool query has been parsed as a filter, if it was a query minShouldMatch would
|
||||
|
|
Loading…
Reference in New Issue