diff --git a/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java b/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java index 2dcb8da3ad6..7ec8cef7f69 100644 --- a/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java @@ -83,7 +83,7 @@ public class MultiCollector extends XCollector { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java b/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java index 1ac10509938..cbb10a43873 100644 --- a/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java @@ -44,7 +44,7 @@ public class FilteredCollector extends XCollector { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java b/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java index 942a8a55b72..b022626d842 100644 --- a/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java @@ -20,13 +20,15 @@ package org.elasticsearch.common.lucene.search; import org.apache.lucene.search.Collector; +import java.io.IOException; + /** * An extension to {@link Collector} that allows for a callback when * collection is done. */ public abstract class XCollector extends Collector { - public void postCollection() { + public void postCollection() throws IOException { } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 372b5d6a6d7..060847a75aa 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -25,8 +25,6 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorer; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.search.XCollector; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; @@ -124,10 +122,10 @@ public class AggregationPhase implements SearchPhase { } try { context.searcher().search(query, collector); + collector.postCollection(); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e); } - collector.postCollection(); } List aggregations = new ArrayList<>(aggregators.length); @@ -171,7 +169,7 @@ public class AggregationPhase implements SearchPhase { } @Override - public void postCollection() { + public void postCollection() throws IOException { for (Aggregator collector : collectors) { collector.postCollection(); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index c6cc465b801..3303c42de01 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -18,8 +18,9 @@ */ package org.elasticsearch.search.aggregations; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lucene.ReaderContextAware; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.support.AggregationContext; @@ -27,12 +28,16 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; -public abstract class Aggregator implements Releasable, ReaderContextAware { +public abstract class Aggregator extends BucketCollector implements Releasable { + + private static final Predicate COLLECTABLE_AGGREGATOR = new Predicate() { + @Override + public boolean apply(Aggregator aggregator) { + return aggregator.shouldCollect(); + } + }; /** * Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create. @@ -60,6 +65,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { protected final BucketAggregationMode bucketAggregationMode; protected final AggregatorFactories factories; protected final Aggregator[] subAggregators; + protected final BucketCollector collectableSugAggregators; private Map subAggregatorbyName; @@ -84,6 +90,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.factories = factories; this.subAggregators = factories.createSubAggregators(this, estimatedBucketsCount); + collectableSugAggregators = BucketCollector.wrap(Iterables.filter(Arrays.asList(subAggregators), COLLECTABLE_AGGREGATOR)); context.searchContext().addReleasable(this, Lifetime.PHASE); } @@ -150,26 +157,11 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { */ public abstract boolean shouldCollect(); - /** - * Called during the query phase, to collect & aggregate the given document. - * - * @param doc The document to be collected/aggregated - * @param owningBucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator. - * Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS} - * will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just - * an extra information for the aggregation context. For top level aggregators, the ordinal will always be - * equal to 0. - * @throws IOException - */ - public abstract void collect(int doc, long owningBucketOrdinal) throws IOException; - /** * Called after collection of all document is done. */ - public final void postCollection() { - for (int i = 0; i < subAggregators.length; i++) { - subAggregators[i].postCollection(); - } + public final void postCollection() throws IOException { + collectableSugAggregators.postCollection(); doPostCollection(); } @@ -185,7 +177,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { /** * Can be overriden by aggregator implementation to be called back when the collection phase ends. */ - protected void doPostCollection() { + protected void doPostCollection() throws IOException { } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 13f6eb7891d..87d833a398a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -86,7 +86,7 @@ public class AggregatorFactories { } @Override - protected void doPostCollection() { + protected void doPostCollection() throws IOException { for (long i = 0; i < aggregators.size(); ++i) { final Aggregator aggregator = aggregators.get(i); if (aggregator != null) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java b/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java new file mode 100644 index 00000000000..3620c88417d --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations; + +import com.google.common.collect.Iterables; +import org.apache.lucene.index.AtomicReaderContext; +import org.elasticsearch.common.lucene.ReaderContextAware; +import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode; + +import java.io.IOException; + +/** + * A Collector that can collect data in separate buckets. + */ +public abstract class BucketCollector implements ReaderContextAware { + + public static BucketCollector NO_OP_COLLECTOR = new BucketCollector() { + + @Override + public void collect(int docId, long bucketOrdinal) throws IOException { + // no-op + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + // no-op + } + + @Override + public void postCollection() throws IOException { + // no-op + } + }; + + /** + * Wrap the given collectors into a single instance. + */ + public static BucketCollector wrap(Iterable collectorList) { + final BucketCollector[] collectors = Iterables.toArray(collectorList, BucketCollector.class); + switch (collectors.length) { + case 0: + return NO_OP_COLLECTOR; + case 1: + return collectors[0]; + default: + return new BucketCollector() { + + @Override + public void collect(int docId, long bucketOrdinal) throws IOException { + for (BucketCollector collector : collectors) { + collector.collect(docId, bucketOrdinal); + } + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + for (BucketCollector collector : collectors) { + collector.setNextReader(reader); + } + } + + @Override + public void postCollection() throws IOException { + for (BucketCollector collector : collectors) { + collector.postCollection(); + } + } + + }; + } + } + + /** + * Called during the query phase, to collect & aggregate the given document. + * + * @param doc The document to be collected/aggregated + * @param bucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator. + * Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS} + * will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just + * an extra information for the aggregation context. For top level aggregators, the ordinal will always be + * equal to 0. + * @throws IOException + */ + public abstract void collect(int docId, long bucketOrdinal) throws IOException; + + /** + * Post collection callback. + */ + public abstract void postCollection() throws IOException; + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 8e6e22896d0..ea87a9cc093 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -27,9 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; /** * @@ -38,19 +36,10 @@ public abstract class BucketsAggregator extends Aggregator { private LongArray docCounts; - private final Aggregator[] collectableSugAggregators; - public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories, long estimatedBucketsCount, AggregationContext context, Aggregator parent) { super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent); docCounts = bigArrays.newLongArray(estimatedBucketsCount, true); - List collectables = new ArrayList<>(subAggregators.length); - for (int i = 0; i < subAggregators.length; i++) { - if (subAggregators[i].shouldCollect()) { - collectables.add((subAggregators[i])); - } - } - collectableSugAggregators = collectables.toArray(new Aggregator[collectables.size()]); } /** @@ -73,9 +62,7 @@ public abstract class BucketsAggregator extends Aggregator { */ protected final void collectExistingBucket(int doc, long bucketOrd) throws IOException { docCounts.increment(bucketOrd, 1); - for (int i = 0; i < collectableSugAggregators.length; i++) { - collectableSugAggregators[i].collect(doc, bucketOrd); - } + collectBucketNoCounts(doc, bucketOrd); } public LongArray getDocCounts() { @@ -86,9 +73,7 @@ public abstract class BucketsAggregator extends Aggregator { * Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket */ protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException { - for (int i = 0; i < collectableSugAggregators.length; i++) { - collectableSugAggregators[i].collect(doc, bucketOrd); - } + collectableSugAggregators.collect(doc, bucketOrd); } /** diff --git a/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java index c09528c219b..816b545692a 100644 --- a/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java @@ -121,7 +121,7 @@ public abstract class FacetExecutor { } @Override - public abstract void postCollection(); + public abstract void postCollection() throws IOException; } /** diff --git a/src/main/java/org/elasticsearch/search/facet/FacetPhase.java b/src/main/java/org/elasticsearch/search/facet/FacetPhase.java index 75741b5890d..68ce9713cb9 100644 --- a/src/main/java/org/elasticsearch/search/facet/FacetPhase.java +++ b/src/main/java/org/elasticsearch/search/facet/FacetPhase.java @@ -184,14 +184,14 @@ public class FacetPhase implements SearchPhase { } try { context.searcher().search(query, MultiCollector.wrap(entry.getValue().toArray(new Collector[entry.getValue().size()]))); + for (Collector collector : entry.getValue()) { + if (collector instanceof XCollector) { + ((XCollector) collector).postCollection(); + } + } } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global facets", e); } - for (Collector collector : entry.getValue()) { - if (collector instanceof XCollector) { - ((XCollector) collector).postCollection(); - } - } } } diff --git a/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java index bf61d489586..5c389912444 100644 --- a/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java @@ -171,7 +171,7 @@ public class NestedFacetExecutor extends FacetExecutor { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java b/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java index e9842c1319e..1ed96cacf98 100644 --- a/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java +++ b/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java @@ -88,7 +88,7 @@ public class DocIdSetCollector extends XCollector implements Releasable { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); }