Moves deferring code into its own subclass (#26421)

* Moves deferring code into its own subclass

This change moves the code that deals with deferring collection to a subclass of BucketAggregator called DeferringBucketAggregator. This means that the code in AggregatorBase is simplified and also means that the code for deferring colleciton is in one place and easier to maintain.

* Makes SIngleBucketAggregator an interface

This is so aggregators that extend BucketsAggregator directly and those that extend DeferringBucketAggregator can be a single bucket aggregator

* review comments

* More review comments
This commit is contained in:
Colin Goodheart-Smithe 2017-08-30 11:15:40 +01:00 committed by GitHub
parent 643eb286dc
commit ce1d85d7d0
11 changed files with 116 additions and 73 deletions

View File

@ -18,12 +18,10 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;
@ -31,6 +29,7 @@ import org.elasticsearch.search.query.QueryPhaseExecutionException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -52,7 +51,6 @@ public abstract class AggregatorBase extends Aggregator {
protected BucketCollector collectableSubAggregators;
private Map<String, Aggregator> subAggregatorbyName;
private DeferringBucketCollector recordingWrapper;
private final List<PipelineAggregator> pipelineAggregators;
private final CircuitBreakerService breakerService;
private long requestBytesUsed;
@ -176,56 +174,12 @@ public abstract class AggregatorBase extends Aggregator {
@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = new ArrayList<>();
List<BucketCollector> deferredCollectors = new ArrayList<>();
for (int i = 0; i < subAggregators.length; ++i) {
if (shouldDefer(subAggregators[i])) {
if (recordingWrapper == null) {
recordingWrapper = getDeferringCollector();
}
deferredCollectors.add(subAggregators[i]);
subAggregators[i] = recordingWrapper.wrap(subAggregators[i]);
} else {
collectors.add(subAggregators[i]);
}
}
if (recordingWrapper != null) {
recordingWrapper.setDeferredCollector(deferredCollectors);
collectors.add(recordingWrapper);
}
List<BucketCollector> collectors = Arrays.asList(subAggregators);
collectableSubAggregators = BucketCollector.wrap(collectors);
doPreCollection();
collectableSubAggregators.preCollection();
}
public DeferringBucketCollector getDeferringCollector() {
// Default impl is a collector that selects the best buckets
// but an alternative defer policy may be based on best docs.
return new BestBucketsDeferringCollector(context());
}
/**
* This method should be overridden by subclasses that want to defer calculation
* of a child aggregation until a first pass is complete and a set of buckets has
* been pruned.
* Deferring collection will require the recording of all doc/bucketIds from the first
* pass and then the sub class should call {@link #runDeferredCollections(long...)}
* for the selected set of buckets that survive the pruning.
* @param aggregator the child aggregator
* @return true if the aggregator should be deferred
* until a first pass at collection has completed
*/
protected boolean shouldDefer(Aggregator aggregator) {
return false;
}
protected final void runDeferredCollections(long... bucketOrds) throws IOException{
// Being lenient here - ignore calls where there are no deferred collections to playback
if (recordingWrapper != null) {
recordingWrapper.replay(bucketOrds);
}
}
/**
* @return The name of the aggregation.
*/

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public abstract class DeferableBucketAggregator extends BucketsAggregator {
private DeferringBucketCollector recordingWrapper;
protected DeferableBucketAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
}
@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = new ArrayList<>();
List<BucketCollector> deferredCollectors = new ArrayList<>();
for (int i = 0; i < subAggregators.length; ++i) {
if (shouldDefer(subAggregators[i])) {
if (recordingWrapper == null) {
recordingWrapper = getDeferringCollector();
}
deferredCollectors.add(subAggregators[i]);
subAggregators[i] = recordingWrapper.wrap(subAggregators[i]);
} else {
collectors.add(subAggregators[i]);
}
}
if (recordingWrapper != null) {
recordingWrapper.setDeferredCollector(deferredCollectors);
collectors.add(recordingWrapper);
}
collectableSubAggregators = BucketCollector.wrap(collectors);
}
public DeferringBucketCollector getDeferringCollector() {
// Default impl is a collector that selects the best buckets
// but an alternative defer policy may be based on best docs.
return new BestBucketsDeferringCollector(context());
}
/**
* This method should be overridden by subclasses that want to defer
* calculation of a child aggregation until a first pass is complete and a
* set of buckets has been pruned. Deferring collection will require the
* recording of all doc/bucketIds from the first pass and then the sub class
* should call {@link #runDeferredCollections(long...)} for the selected set
* of buckets that survive the pruning.
*
* @param aggregator
* the child aggregator
* @return true if the aggregator should be deferred until a first pass at
* collection has completed
*/
protected boolean shouldDefer(Aggregator aggregator) {
return false;
}
protected final void runDeferredCollections(long... bucketOrds) throws IOException {
// Being lenient here - ignore calls where there are no deferred
// collections to playback
if (recordingWrapper != null) {
recordingWrapper.replay(bucketOrds);
}
}
}

View File

@ -18,24 +18,10 @@
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* A bucket aggregator that doesn't create new buckets.
*/
public abstract class SingleBucketAggregator extends BucketsAggregator {
protected SingleBucketAggregator(String name, AggregatorFactories factories,
SearchContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
}
public interface SingleBucketAggregator {
int bucketDocCount(long bucketOrd);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
@ -38,7 +39,7 @@ import java.util.Map;
/**
* Aggregate all docs that match a filter.
*/
public class FilterAggregator extends SingleBucketAggregator {
public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator {
private final Weight filter;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
@ -31,7 +32,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class GlobalAggregator extends SingleBucketAggregator {
public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator {
public GlobalAggregator(String name, AggregatorFactories subFactories, SearchContext aggregationContext,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
@ -34,7 +35,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class MissingAggregator extends SingleBucketAggregator {
public class MissingAggregator extends BucketsAggregator implements SingleBucketAggregator {
private final ValuesSource valuesSource;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
@ -44,7 +45,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
class NestedAggregator extends SingleBucketAggregator {
class NestedAggregator extends BucketsAggregator implements SingleBucketAggregator {
static final ParseField PATH_FIELD = new ParseField("path");

View File

@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
@ -41,7 +42,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ReverseNestedAggregator extends SingleBucketAggregator {
public class ReverseNestedAggregator extends BucketsAggregator implements SingleBucketAggregator {
static final ParseField PATH_FIELD = new ParseField("path");

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -47,7 +48,7 @@ import java.util.Map;
* values would be preferable to users having to recreate this logic in a
* 'script' e.g. to turn a datetime in milliseconds into a month key value.
*/
public class SamplerAggregator extends SingleBucketAggregator {
public class SamplerAggregator extends DeferableBucketAggregator implements SingleBucketAggregator {
public static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size");
public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value");

View File

@ -35,6 +35,7 @@ import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.InternalOrder.Aggregation;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
@ -50,7 +51,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
public abstract class TermsAggregator extends BucketsAggregator {
public abstract class TermsAggregator extends DeferableBucketAggregator {
public static class BucketCountThresholds implements Writeable, ToXContentFragment {
private long minDocCount;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
@ -48,7 +49,7 @@ import java.util.Map;
// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this
// aggregation, for this reason that collector can't be used
public class ParentToChildrenAggregator extends SingleBucketAggregator {
public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator {
static final ParseField TYPE_FIELD = new ParseField("type");