From b2db7c8222cbc2a33e06c31de6f59862d2f6c6f3 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 29 Apr 2014 11:26:30 +0200 Subject: [PATCH] Improve the way sub-aggregations are collected. Sub-aggregations are currently collected directly, by just forwarding the doc ID and bucket ordinal to them. This change adds the new BucketCollector abstract class that Aggregator extends, so that we have more flexibility to add implicit filters or buffering between an aggregator and its sub aggregators. Close #5975 --- .../common/lucene/MultiCollector.java | 2 +- .../lucene/search/FilteredCollector.java | 2 +- .../common/lucene/search/XCollector.java | 4 +- .../search/aggregations/AggregationPhase.java | 6 +- .../search/aggregations/Aggregator.java | 40 +++---- .../aggregations/AggregatorFactories.java | 2 +- .../search/aggregations/BucketCollector.java | 108 ++++++++++++++++++ .../bucket/BucketsAggregator.java | 19 +-- .../search/facet/FacetExecutor.java | 2 +- .../search/facet/FacetPhase.java | 10 +- .../facet/nested/NestedFacetExecutor.java | 2 +- .../search/internal/DocIdSetCollector.java | 2 +- 12 files changed, 142 insertions(+), 57 deletions(-) create mode 100644 src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java diff --git a/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java b/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java index 2dcb8da3ad6..7ec8cef7f69 100644 --- a/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/MultiCollector.java @@ -83,7 +83,7 @@ public class MultiCollector extends XCollector { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java b/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java index 1ac10509938..cbb10a43873 100644 --- a/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java @@ -44,7 +44,7 @@ public class FilteredCollector extends XCollector { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java b/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java index 942a8a55b72..b022626d842 100644 --- a/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java +++ b/src/main/java/org/elasticsearch/common/lucene/search/XCollector.java @@ -20,13 +20,15 @@ package org.elasticsearch.common.lucene.search; import org.apache.lucene.search.Collector; +import java.io.IOException; + /** * An extension to {@link Collector} that allows for a callback when * collection is done. */ public abstract class XCollector extends Collector { - public void postCollection() { + public void postCollection() throws IOException { } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 372b5d6a6d7..060847a75aa 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -25,8 +25,6 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorer; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.lucene.search.XCollector; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; @@ -124,10 +122,10 @@ public class AggregationPhase implements SearchPhase { } try { context.searcher().search(query, collector); + collector.postCollection(); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e); } - collector.postCollection(); } List aggregations = new ArrayList<>(aggregators.length); @@ -171,7 +169,7 @@ public class AggregationPhase implements SearchPhase { } @Override - public void postCollection() { + public void postCollection() throws IOException { for (Aggregator collector : collectors) { collector.postCollection(); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index c6cc465b801..3303c42de01 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -18,8 +18,9 @@ */ package org.elasticsearch.search.aggregations; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lucene.ReaderContextAware; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.support.AggregationContext; @@ -27,12 +28,16 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext.Lifetime; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; -public abstract class Aggregator implements Releasable, ReaderContextAware { +public abstract class Aggregator extends BucketCollector implements Releasable { + + private static final Predicate COLLECTABLE_AGGREGATOR = new Predicate() { + @Override + public boolean apply(Aggregator aggregator) { + return aggregator.shouldCollect(); + } + }; /** * Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create. @@ -60,6 +65,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { protected final BucketAggregationMode bucketAggregationMode; protected final AggregatorFactories factories; protected final Aggregator[] subAggregators; + protected final BucketCollector collectableSugAggregators; private Map subAggregatorbyName; @@ -84,6 +90,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.factories = factories; this.subAggregators = factories.createSubAggregators(this, estimatedBucketsCount); + collectableSugAggregators = BucketCollector.wrap(Iterables.filter(Arrays.asList(subAggregators), COLLECTABLE_AGGREGATOR)); context.searchContext().addReleasable(this, Lifetime.PHASE); } @@ -150,26 +157,11 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { */ public abstract boolean shouldCollect(); - /** - * Called during the query phase, to collect & aggregate the given document. - * - * @param doc The document to be collected/aggregated - * @param owningBucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator. - * Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS} - * will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just - * an extra information for the aggregation context. For top level aggregators, the ordinal will always be - * equal to 0. - * @throws IOException - */ - public abstract void collect(int doc, long owningBucketOrdinal) throws IOException; - /** * Called after collection of all document is done. */ - public final void postCollection() { - for (int i = 0; i < subAggregators.length; i++) { - subAggregators[i].postCollection(); - } + public final void postCollection() throws IOException { + collectableSugAggregators.postCollection(); doPostCollection(); } @@ -185,7 +177,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware { /** * Can be overriden by aggregator implementation to be called back when the collection phase ends. */ - protected void doPostCollection() { + protected void doPostCollection() throws IOException { } /** diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 13f6eb7891d..87d833a398a 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -86,7 +86,7 @@ public class AggregatorFactories { } @Override - protected void doPostCollection() { + protected void doPostCollection() throws IOException { for (long i = 0; i < aggregators.size(); ++i) { final Aggregator aggregator = aggregators.get(i); if (aggregator != null) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java b/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java new file mode 100644 index 00000000000..3620c88417d --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations; + +import com.google.common.collect.Iterables; +import org.apache.lucene.index.AtomicReaderContext; +import org.elasticsearch.common.lucene.ReaderContextAware; +import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode; + +import java.io.IOException; + +/** + * A Collector that can collect data in separate buckets. + */ +public abstract class BucketCollector implements ReaderContextAware { + + public static BucketCollector NO_OP_COLLECTOR = new BucketCollector() { + + @Override + public void collect(int docId, long bucketOrdinal) throws IOException { + // no-op + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + // no-op + } + + @Override + public void postCollection() throws IOException { + // no-op + } + }; + + /** + * Wrap the given collectors into a single instance. + */ + public static BucketCollector wrap(Iterable collectorList) { + final BucketCollector[] collectors = Iterables.toArray(collectorList, BucketCollector.class); + switch (collectors.length) { + case 0: + return NO_OP_COLLECTOR; + case 1: + return collectors[0]; + default: + return new BucketCollector() { + + @Override + public void collect(int docId, long bucketOrdinal) throws IOException { + for (BucketCollector collector : collectors) { + collector.collect(docId, bucketOrdinal); + } + } + + @Override + public void setNextReader(AtomicReaderContext reader) { + for (BucketCollector collector : collectors) { + collector.setNextReader(reader); + } + } + + @Override + public void postCollection() throws IOException { + for (BucketCollector collector : collectors) { + collector.postCollection(); + } + } + + }; + } + } + + /** + * Called during the query phase, to collect & aggregate the given document. + * + * @param doc The document to be collected/aggregated + * @param bucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator. + * Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS} + * will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just + * an extra information for the aggregation context. For top level aggregators, the ordinal will always be + * equal to 0. + * @throws IOException + */ + public abstract void collect(int docId, long bucketOrdinal) throws IOException; + + /** + * Post collection callback. + */ + public abstract void postCollection() throws IOException; + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 8e6e22896d0..ea87a9cc093 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -27,9 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; /** * @@ -38,19 +36,10 @@ public abstract class BucketsAggregator extends Aggregator { private LongArray docCounts; - private final Aggregator[] collectableSugAggregators; - public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories, long estimatedBucketsCount, AggregationContext context, Aggregator parent) { super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent); docCounts = bigArrays.newLongArray(estimatedBucketsCount, true); - List collectables = new ArrayList<>(subAggregators.length); - for (int i = 0; i < subAggregators.length; i++) { - if (subAggregators[i].shouldCollect()) { - collectables.add((subAggregators[i])); - } - } - collectableSugAggregators = collectables.toArray(new Aggregator[collectables.size()]); } /** @@ -73,9 +62,7 @@ public abstract class BucketsAggregator extends Aggregator { */ protected final void collectExistingBucket(int doc, long bucketOrd) throws IOException { docCounts.increment(bucketOrd, 1); - for (int i = 0; i < collectableSugAggregators.length; i++) { - collectableSugAggregators[i].collect(doc, bucketOrd); - } + collectBucketNoCounts(doc, bucketOrd); } public LongArray getDocCounts() { @@ -86,9 +73,7 @@ public abstract class BucketsAggregator extends Aggregator { * Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket */ protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException { - for (int i = 0; i < collectableSugAggregators.length; i++) { - collectableSugAggregators[i].collect(doc, bucketOrd); - } + collectableSugAggregators.collect(doc, bucketOrd); } /** diff --git a/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java index c09528c219b..816b545692a 100644 --- a/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/FacetExecutor.java @@ -121,7 +121,7 @@ public abstract class FacetExecutor { } @Override - public abstract void postCollection(); + public abstract void postCollection() throws IOException; } /** diff --git a/src/main/java/org/elasticsearch/search/facet/FacetPhase.java b/src/main/java/org/elasticsearch/search/facet/FacetPhase.java index 75741b5890d..68ce9713cb9 100644 --- a/src/main/java/org/elasticsearch/search/facet/FacetPhase.java +++ b/src/main/java/org/elasticsearch/search/facet/FacetPhase.java @@ -184,14 +184,14 @@ public class FacetPhase implements SearchPhase { } try { context.searcher().search(query, MultiCollector.wrap(entry.getValue().toArray(new Collector[entry.getValue().size()]))); + for (Collector collector : entry.getValue()) { + if (collector instanceof XCollector) { + ((XCollector) collector).postCollection(); + } + } } catch (Exception e) { throw new QueryPhaseExecutionException(context, "Failed to execute global facets", e); } - for (Collector collector : entry.getValue()) { - if (collector instanceof XCollector) { - ((XCollector) collector).postCollection(); - } - } } } diff --git a/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java index bf61d489586..5c389912444 100644 --- a/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/nested/NestedFacetExecutor.java @@ -171,7 +171,7 @@ public class NestedFacetExecutor extends FacetExecutor { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); } diff --git a/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java b/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java index e9842c1319e..1ed96cacf98 100644 --- a/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java +++ b/src/main/java/org/elasticsearch/search/internal/DocIdSetCollector.java @@ -88,7 +88,7 @@ public class DocIdSetCollector extends XCollector implements Releasable { } @Override - public void postCollection() { + public void postCollection() throws IOException { if (collector instanceof XCollector) { ((XCollector) collector).postCollection(); }