From ce1d85d7d0104f243b5fc6a20d739d423a152ffb Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Wed, 30 Aug 2017 11:15:40 +0100 Subject: [PATCH] Moves deferring code into its own subclass (#26421) * Moves deferring code into its own subclass This change moves the code that deals with deferring collection to a subclass of BucketAggregator called DeferringBucketAggregator. This means that the code in AggregatorBase is simplified and also means that the code for deferring colleciton is in one place and easier to maintain. * Makes SIngleBucketAggregator an interface This is so aggregators that extend BucketsAggregator directly and those that extend DeferringBucketAggregator can be a single bucket aggregator * review comments * More review comments --- .../search/aggregations/AggregatorBase.java | 52 +--------- .../bucket/DeferableBucketAggregator.java | 95 +++++++++++++++++++ .../bucket/SingleBucketAggregator.java | 18 +--- .../bucket/filter/FilterAggregator.java | 3 +- .../bucket/global/GlobalAggregator.java | 3 +- .../bucket/missing/MissingAggregator.java | 3 +- .../bucket/nested/NestedAggregator.java | 3 +- .../nested/ReverseNestedAggregator.java | 3 +- .../bucket/sampler/SamplerAggregator.java | 3 +- .../bucket/terms/TermsAggregator.java | 3 +- .../ParentToChildrenAggregator.java | 3 +- 11 files changed, 116 insertions(+), 73 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 18cc2ffc2f6..65696e1239c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -18,12 +18,10 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector; -import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; @@ -31,6 +29,7 @@ import org.elasticsearch.search.query.QueryPhaseExecutionException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,7 +51,6 @@ public abstract class AggregatorBase extends Aggregator { protected BucketCollector collectableSubAggregators; private Map subAggregatorbyName; - private DeferringBucketCollector recordingWrapper; private final List pipelineAggregators; private final CircuitBreakerService breakerService; private long requestBytesUsed; @@ -176,56 +174,12 @@ public abstract class AggregatorBase extends Aggregator { @Override public final void preCollection() throws IOException { - List collectors = new ArrayList<>(); - List deferredCollectors = new ArrayList<>(); - for (int i = 0; i < subAggregators.length; ++i) { - if (shouldDefer(subAggregators[i])) { - if (recordingWrapper == null) { - recordingWrapper = getDeferringCollector(); - } - deferredCollectors.add(subAggregators[i]); - subAggregators[i] = recordingWrapper.wrap(subAggregators[i]); - } else { - collectors.add(subAggregators[i]); - } - } - if (recordingWrapper != null) { - recordingWrapper.setDeferredCollector(deferredCollectors); - collectors.add(recordingWrapper); - } + List collectors = Arrays.asList(subAggregators); collectableSubAggregators = BucketCollector.wrap(collectors); doPreCollection(); collectableSubAggregators.preCollection(); } - public DeferringBucketCollector getDeferringCollector() { - // Default impl is a collector that selects the best buckets - // but an alternative defer policy may be based on best docs. - return new BestBucketsDeferringCollector(context()); - } - - /** - * This method should be overridden by subclasses that want to defer calculation - * of a child aggregation until a first pass is complete and a set of buckets has - * been pruned. - * Deferring collection will require the recording of all doc/bucketIds from the first - * pass and then the sub class should call {@link #runDeferredCollections(long...)} - * for the selected set of buckets that survive the pruning. - * @param aggregator the child aggregator - * @return true if the aggregator should be deferred - * until a first pass at collection has completed - */ - protected boolean shouldDefer(Aggregator aggregator) { - return false; - } - - protected final void runDeferredCollections(long... bucketOrds) throws IOException{ - // Being lenient here - ignore calls where there are no deferred collections to playback - if (recordingWrapper != null) { - recordingWrapper.replay(bucketOrds); - } - } - /** * @return The name of the aggregation. */ diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java new file mode 100644 index 00000000000..bbfcef0af40 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class DeferableBucketAggregator extends BucketsAggregator { + + private DeferringBucketCollector recordingWrapper; + + protected DeferableBucketAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + } + + @Override + protected void doPreCollection() throws IOException { + List collectors = new ArrayList<>(); + List deferredCollectors = new ArrayList<>(); + for (int i = 0; i < subAggregators.length; ++i) { + if (shouldDefer(subAggregators[i])) { + if (recordingWrapper == null) { + recordingWrapper = getDeferringCollector(); + } + deferredCollectors.add(subAggregators[i]); + subAggregators[i] = recordingWrapper.wrap(subAggregators[i]); + } else { + collectors.add(subAggregators[i]); + } + } + if (recordingWrapper != null) { + recordingWrapper.setDeferredCollector(deferredCollectors); + collectors.add(recordingWrapper); + } + collectableSubAggregators = BucketCollector.wrap(collectors); + } + + public DeferringBucketCollector getDeferringCollector() { + // Default impl is a collector that selects the best buckets + // but an alternative defer policy may be based on best docs. + return new BestBucketsDeferringCollector(context()); + } + + /** + * This method should be overridden by subclasses that want to defer + * calculation of a child aggregation until a first pass is complete and a + * set of buckets has been pruned. Deferring collection will require the + * recording of all doc/bucketIds from the first pass and then the sub class + * should call {@link #runDeferredCollections(long...)} for the selected set + * of buckets that survive the pruning. + * + * @param aggregator + * the child aggregator + * @return true if the aggregator should be deferred until a first pass at + * collection has completed + */ + protected boolean shouldDefer(Aggregator aggregator) { + return false; + } + + protected final void runDeferredCollections(long... bucketOrds) throws IOException { + // Being lenient here - ignore calls where there are no deferred + // collections to playback + if (recordingWrapper != null) { + recordingWrapper.replay(bucketOrds); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java index f74df5d63b1..25d3e1e7188 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java @@ -18,24 +18,10 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - /** * A bucket aggregator that doesn't create new buckets. */ -public abstract class SingleBucketAggregator extends BucketsAggregator { - - protected SingleBucketAggregator(String name, AggregatorFactories factories, - SearchContext aggregationContext, Aggregator parent, - List pipelineAggregators, Map metaData) throws IOException { - super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); - } +public interface SingleBucketAggregator { + int bucketDocCount(long bucketOrd); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java index 46a9049711f..c5385c68839 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -38,7 +39,7 @@ import java.util.Map; /** * Aggregate all docs that match a filter. */ -public class FilterAggregator extends SingleBucketAggregator { +public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator { private final Weight filter; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java index e46b7623e34..68e07c3657f 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -31,7 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class GlobalAggregator extends SingleBucketAggregator { +public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator { public GlobalAggregator(String name, AggregatorFactories subFactories, SearchContext aggregationContext, List pipelineAggregators, Map metaData) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java index 0f2217dfe26..5864f5c5ca7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -34,7 +35,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class MissingAggregator extends SingleBucketAggregator { +public class MissingAggregator extends BucketsAggregator implements SingleBucketAggregator { private final ValuesSource valuesSource; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 004c88d43f0..99932bdc2fa 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -44,7 +45,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -class NestedAggregator extends SingleBucketAggregator { +class NestedAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField PATH_FIELD = new ParseField("path"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java index d52412ec1ec..cf45a09ef61 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -41,7 +42,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class ReverseNestedAggregator extends SingleBucketAggregator { +public class ReverseNestedAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField PATH_FIELD = new ParseField("path"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java index 7be81c08120..ba6cece7295 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -47,7 +48,7 @@ import java.util.Map; * values would be preferable to users having to recreate this logic in a * 'script' e.g. to turn a datetime in milliseconds into a month key value. */ -public class SamplerAggregator extends SingleBucketAggregator { +public class SamplerAggregator extends DeferableBucketAggregator implements SingleBucketAggregator { public static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index ee8ed9ef6f4..8294986bcce 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.Aggregation; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; @@ -50,7 +51,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -public abstract class TermsAggregator extends BucketsAggregator { +public abstract class TermsAggregator extends DeferableBucketAggregator { public static class BucketCountThresholds implements Writeable, ToXContentFragment { private long minDocCount; diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java index fa025e994f6..b555afce67a 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -48,7 +49,7 @@ import java.util.Map; // The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this // aggregation, for this reason that collector can't be used -public class ParentToChildrenAggregator extends SingleBucketAggregator { +public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField TYPE_FIELD = new ParseField("type");