diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index 18cc2ffc2f6..65696e1239c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -18,12 +18,10 @@ */ package org.elasticsearch.search.aggregations; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector; -import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; @@ -31,6 +29,7 @@ import org.elasticsearch.search.query.QueryPhaseExecutionException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,7 +51,6 @@ public abstract class AggregatorBase extends Aggregator { protected BucketCollector collectableSubAggregators; private Map subAggregatorbyName; - private DeferringBucketCollector recordingWrapper; private final List pipelineAggregators; private final CircuitBreakerService breakerService; private long requestBytesUsed; @@ -176,56 +174,12 @@ public abstract class AggregatorBase extends Aggregator { @Override public final void preCollection() throws IOException { - List collectors = new ArrayList<>(); - List deferredCollectors = new ArrayList<>(); - for (int i = 0; i < subAggregators.length; ++i) { - if (shouldDefer(subAggregators[i])) { - if (recordingWrapper == null) { - recordingWrapper = getDeferringCollector(); - } - deferredCollectors.add(subAggregators[i]); - subAggregators[i] = recordingWrapper.wrap(subAggregators[i]); - } else { - collectors.add(subAggregators[i]); - } - } - if (recordingWrapper != null) { - recordingWrapper.setDeferredCollector(deferredCollectors); - collectors.add(recordingWrapper); - } + List collectors = Arrays.asList(subAggregators); collectableSubAggregators = BucketCollector.wrap(collectors); doPreCollection(); collectableSubAggregators.preCollection(); } - public DeferringBucketCollector getDeferringCollector() { - // Default impl is a collector that selects the best buckets - // but an alternative defer policy may be based on best docs. - return new BestBucketsDeferringCollector(context()); - } - - /** - * This method should be overridden by subclasses that want to defer calculation - * of a child aggregation until a first pass is complete and a set of buckets has - * been pruned. - * Deferring collection will require the recording of all doc/bucketIds from the first - * pass and then the sub class should call {@link #runDeferredCollections(long...)} - * for the selected set of buckets that survive the pruning. - * @param aggregator the child aggregator - * @return true if the aggregator should be deferred - * until a first pass at collection has completed - */ - protected boolean shouldDefer(Aggregator aggregator) { - return false; - } - - protected final void runDeferredCollections(long... bucketOrds) throws IOException{ - // Being lenient here - ignore calls where there are no deferred collections to playback - if (recordingWrapper != null) { - recordingWrapper.replay(bucketOrds); - } - } - /** * @return The name of the aggregation. */ diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java new file mode 100644 index 00000000000..bbfcef0af40 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class DeferableBucketAggregator extends BucketsAggregator { + + private DeferringBucketCollector recordingWrapper; + + protected DeferableBucketAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + } + + @Override + protected void doPreCollection() throws IOException { + List collectors = new ArrayList<>(); + List deferredCollectors = new ArrayList<>(); + for (int i = 0; i < subAggregators.length; ++i) { + if (shouldDefer(subAggregators[i])) { + if (recordingWrapper == null) { + recordingWrapper = getDeferringCollector(); + } + deferredCollectors.add(subAggregators[i]); + subAggregators[i] = recordingWrapper.wrap(subAggregators[i]); + } else { + collectors.add(subAggregators[i]); + } + } + if (recordingWrapper != null) { + recordingWrapper.setDeferredCollector(deferredCollectors); + collectors.add(recordingWrapper); + } + collectableSubAggregators = BucketCollector.wrap(collectors); + } + + public DeferringBucketCollector getDeferringCollector() { + // Default impl is a collector that selects the best buckets + // but an alternative defer policy may be based on best docs. + return new BestBucketsDeferringCollector(context()); + } + + /** + * This method should be overridden by subclasses that want to defer + * calculation of a child aggregation until a first pass is complete and a + * set of buckets has been pruned. Deferring collection will require the + * recording of all doc/bucketIds from the first pass and then the sub class + * should call {@link #runDeferredCollections(long...)} for the selected set + * of buckets that survive the pruning. + * + * @param aggregator + * the child aggregator + * @return true if the aggregator should be deferred until a first pass at + * collection has completed + */ + protected boolean shouldDefer(Aggregator aggregator) { + return false; + } + + protected final void runDeferredCollections(long... bucketOrds) throws IOException { + // Being lenient here - ignore calls where there are no deferred + // collections to playback + if (recordingWrapper != null) { + recordingWrapper.replay(bucketOrds); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java index f74df5d63b1..25d3e1e7188 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java @@ -18,24 +18,10 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - /** * A bucket aggregator that doesn't create new buckets. */ -public abstract class SingleBucketAggregator extends BucketsAggregator { - - protected SingleBucketAggregator(String name, AggregatorFactories factories, - SearchContext aggregationContext, Aggregator parent, - List pipelineAggregators, Map metaData) throws IOException { - super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); - } +public interface SingleBucketAggregator { + int bucketDocCount(long bucketOrd); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java index 46a9049711f..c5385c68839 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -38,7 +39,7 @@ import java.util.Map; /** * Aggregate all docs that match a filter. */ -public class FilterAggregator extends SingleBucketAggregator { +public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator { private final Weight filter; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java index e46b7623e34..68e07c3657f 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -31,7 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class GlobalAggregator extends SingleBucketAggregator { +public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator { public GlobalAggregator(String name, AggregatorFactories subFactories, SearchContext aggregationContext, List pipelineAggregators, Map metaData) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java index 0f2217dfe26..5864f5c5ca7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -34,7 +35,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class MissingAggregator extends SingleBucketAggregator { +public class MissingAggregator extends BucketsAggregator implements SingleBucketAggregator { private final ValuesSource valuesSource; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 004c88d43f0..99932bdc2fa 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -44,7 +45,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -class NestedAggregator extends SingleBucketAggregator { +class NestedAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField PATH_FIELD = new ParseField("path"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java index d52412ec1ec..cf45a09ef61 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; @@ -41,7 +42,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class ReverseNestedAggregator extends SingleBucketAggregator { +public class ReverseNestedAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField PATH_FIELD = new ParseField("path"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java index 7be81c08120..ba6cece7295 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -47,7 +48,7 @@ import java.util.Map; * values would be preferable to users having to recreate this logic in a * 'script' e.g. to turn a datetime in milliseconds into a month key value. */ -public class SamplerAggregator extends SingleBucketAggregator { +public class SamplerAggregator extends DeferableBucketAggregator implements SingleBucketAggregator { public static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size"); public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index ee8ed9ef6f4..8294986bcce 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.Aggregation; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; @@ -50,7 +51,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -public abstract class TermsAggregator extends BucketsAggregator { +public abstract class TermsAggregator extends DeferableBucketAggregator { public static class BucketCountThresholds implements Writeable, ToXContentFragment { private long minDocCount; diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java index fa025e994f6..b555afce67a 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java @@ -36,6 +36,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -48,7 +49,7 @@ import java.util.Map; // The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this // aggregation, for this reason that collector can't be used -public class ParentToChildrenAggregator extends SingleBucketAggregator { +public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator { static final ParseField TYPE_FIELD = new ParseField("type");