Improve the way sub-aggregations are collected.
Sub-aggregations are currently collected directly, by just forwarding the doc ID and bucket ordinal to them. This change adds the new BucketCollector abstract class that Aggregator extends, so that we have more flexibility to add implicit filters or buffering between an aggregator and its sub aggregators. Close #5975
This commit is contained in:
parent
2eeaa56d95
commit
b2db7c8222
|
@ -83,7 +83,7 @@ public class MultiCollector extends XCollector {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
if (collector instanceof XCollector) {
|
if (collector instanceof XCollector) {
|
||||||
((XCollector) collector).postCollection();
|
((XCollector) collector).postCollection();
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class FilteredCollector extends XCollector {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
if (collector instanceof XCollector) {
|
if (collector instanceof XCollector) {
|
||||||
((XCollector) collector).postCollection();
|
((XCollector) collector).postCollection();
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,13 +20,15 @@ package org.elasticsearch.common.lucene.search;
|
||||||
|
|
||||||
import org.apache.lucene.search.Collector;
|
import org.apache.lucene.search.Collector;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An extension to {@link Collector} that allows for a callback when
|
* An extension to {@link Collector} that allows for a callback when
|
||||||
* collection is done.
|
* collection is done.
|
||||||
*/
|
*/
|
||||||
public abstract class XCollector extends Collector {
|
public abstract class XCollector extends Collector {
|
||||||
|
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.search.Scorer;
|
import org.apache.lucene.search.Scorer;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
|
||||||
import org.elasticsearch.common.lucene.search.Queries;
|
import org.elasticsearch.common.lucene.search.Queries;
|
||||||
import org.elasticsearch.common.lucene.search.XCollector;
|
import org.elasticsearch.common.lucene.search.XCollector;
|
||||||
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
|
||||||
|
@ -124,10 +122,10 @@ public class AggregationPhase implements SearchPhase {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
context.searcher().search(query, collector);
|
context.searcher().search(query, collector);
|
||||||
|
collector.postCollection();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e);
|
throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e);
|
||||||
}
|
}
|
||||||
collector.postCollection();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
|
List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
|
||||||
|
@ -171,7 +169,7 @@ public class AggregationPhase implements SearchPhase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
for (Aggregator collector : collectors) {
|
for (Aggregator collector : collectors) {
|
||||||
collector.postCollection();
|
collector.postCollection();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,9 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.search.aggregations;
|
package org.elasticsearch.search.aggregations;
|
||||||
|
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.lucene.ReaderContextAware;
|
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||||
|
@ -27,12 +28,16 @@ import org.elasticsearch.search.internal.SearchContext;
|
||||||
import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public abstract class Aggregator implements Releasable, ReaderContextAware {
|
public abstract class Aggregator extends BucketCollector implements Releasable {
|
||||||
|
|
||||||
|
private static final Predicate<Aggregator> COLLECTABLE_AGGREGATOR = new Predicate<Aggregator>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(Aggregator aggregator) {
|
||||||
|
return aggregator.shouldCollect();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create.
|
* Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create.
|
||||||
|
@ -60,6 +65,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware {
|
||||||
protected final BucketAggregationMode bucketAggregationMode;
|
protected final BucketAggregationMode bucketAggregationMode;
|
||||||
protected final AggregatorFactories factories;
|
protected final AggregatorFactories factories;
|
||||||
protected final Aggregator[] subAggregators;
|
protected final Aggregator[] subAggregators;
|
||||||
|
protected final BucketCollector collectableSugAggregators;
|
||||||
|
|
||||||
private Map<String, Aggregator> subAggregatorbyName;
|
private Map<String, Aggregator> subAggregatorbyName;
|
||||||
|
|
||||||
|
@ -84,6 +90,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware {
|
||||||
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
|
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
|
||||||
this.factories = factories;
|
this.factories = factories;
|
||||||
this.subAggregators = factories.createSubAggregators(this, estimatedBucketsCount);
|
this.subAggregators = factories.createSubAggregators(this, estimatedBucketsCount);
|
||||||
|
collectableSugAggregators = BucketCollector.wrap(Iterables.filter(Arrays.asList(subAggregators), COLLECTABLE_AGGREGATOR));
|
||||||
context.searchContext().addReleasable(this, Lifetime.PHASE);
|
context.searchContext().addReleasable(this, Lifetime.PHASE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,26 +157,11 @@ public abstract class Aggregator implements Releasable, ReaderContextAware {
|
||||||
*/
|
*/
|
||||||
public abstract boolean shouldCollect();
|
public abstract boolean shouldCollect();
|
||||||
|
|
||||||
/**
|
|
||||||
* Called during the query phase, to collect & aggregate the given document.
|
|
||||||
*
|
|
||||||
* @param doc The document to be collected/aggregated
|
|
||||||
* @param owningBucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator.
|
|
||||||
* Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS}
|
|
||||||
* will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just
|
|
||||||
* an extra information for the aggregation context. For top level aggregators, the ordinal will always be
|
|
||||||
* equal to 0.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public abstract void collect(int doc, long owningBucketOrdinal) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called after collection of all document is done.
|
* Called after collection of all document is done.
|
||||||
*/
|
*/
|
||||||
public final void postCollection() {
|
public final void postCollection() throws IOException {
|
||||||
for (int i = 0; i < subAggregators.length; i++) {
|
collectableSugAggregators.postCollection();
|
||||||
subAggregators[i].postCollection();
|
|
||||||
}
|
|
||||||
doPostCollection();
|
doPostCollection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +177,7 @@ public abstract class Aggregator implements Releasable, ReaderContextAware {
|
||||||
/**
|
/**
|
||||||
* Can be overriden by aggregator implementation to be called back when the collection phase ends.
|
* Can be overriden by aggregator implementation to be called back when the collection phase ends.
|
||||||
*/
|
*/
|
||||||
protected void doPostCollection() {
|
protected void doPostCollection() throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -86,7 +86,7 @@ public class AggregatorFactories {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doPostCollection() {
|
protected void doPostCollection() throws IOException {
|
||||||
for (long i = 0; i < aggregators.size(); ++i) {
|
for (long i = 0; i < aggregators.size(); ++i) {
|
||||||
final Aggregator aggregator = aggregators.get(i);
|
final Aggregator aggregator = aggregators.get(i);
|
||||||
if (aggregator != null) {
|
if (aggregator != null) {
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations;
|
||||||
|
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import org.apache.lucene.index.AtomicReaderContext;
|
||||||
|
import org.elasticsearch.common.lucene.ReaderContextAware;
|
||||||
|
import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Collector that can collect data in separate buckets.
|
||||||
|
*/
|
||||||
|
public abstract class BucketCollector implements ReaderContextAware {
|
||||||
|
|
||||||
|
public static BucketCollector NO_OP_COLLECTOR = new BucketCollector() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void collect(int docId, long bucketOrdinal) throws IOException {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNextReader(AtomicReaderContext reader) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postCollection() throws IOException {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap the given collectors into a single instance.
|
||||||
|
*/
|
||||||
|
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectorList) {
|
||||||
|
final BucketCollector[] collectors = Iterables.toArray(collectorList, BucketCollector.class);
|
||||||
|
switch (collectors.length) {
|
||||||
|
case 0:
|
||||||
|
return NO_OP_COLLECTOR;
|
||||||
|
case 1:
|
||||||
|
return collectors[0];
|
||||||
|
default:
|
||||||
|
return new BucketCollector() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void collect(int docId, long bucketOrdinal) throws IOException {
|
||||||
|
for (BucketCollector collector : collectors) {
|
||||||
|
collector.collect(docId, bucketOrdinal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNextReader(AtomicReaderContext reader) {
|
||||||
|
for (BucketCollector collector : collectors) {
|
||||||
|
collector.setNextReader(reader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postCollection() throws IOException {
|
||||||
|
for (BucketCollector collector : collectors) {
|
||||||
|
collector.postCollection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called during the query phase, to collect & aggregate the given document.
|
||||||
|
*
|
||||||
|
* @param doc The document to be collected/aggregated
|
||||||
|
* @param bucketOrdinal The ordinal of the bucket this aggregator belongs to, assuming this aggregator is not a top level aggregator.
|
||||||
|
* Typically, aggregators with {@code #bucketAggregationMode} set to {@link BucketAggregationMode#MULTI_BUCKETS}
|
||||||
|
* will heavily depend on this ordinal. Other aggregators may or may not use it and can see this ordinal as just
|
||||||
|
* an extra information for the aggregation context. For top level aggregators, the ordinal will always be
|
||||||
|
* equal to 0.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public abstract void collect(int docId, long bucketOrdinal) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Post collection callback.
|
||||||
|
*/
|
||||||
|
public abstract void postCollection() throws IOException;
|
||||||
|
|
||||||
|
}
|
|
@ -27,9 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -38,19 +36,10 @@ public abstract class BucketsAggregator extends Aggregator {
|
||||||
|
|
||||||
private LongArray docCounts;
|
private LongArray docCounts;
|
||||||
|
|
||||||
private final Aggregator[] collectableSugAggregators;
|
|
||||||
|
|
||||||
public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories,
|
public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories,
|
||||||
long estimatedBucketsCount, AggregationContext context, Aggregator parent) {
|
long estimatedBucketsCount, AggregationContext context, Aggregator parent) {
|
||||||
super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent);
|
super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent);
|
||||||
docCounts = bigArrays.newLongArray(estimatedBucketsCount, true);
|
docCounts = bigArrays.newLongArray(estimatedBucketsCount, true);
|
||||||
List<Aggregator> collectables = new ArrayList<>(subAggregators.length);
|
|
||||||
for (int i = 0; i < subAggregators.length; i++) {
|
|
||||||
if (subAggregators[i].shouldCollect()) {
|
|
||||||
collectables.add((subAggregators[i]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
collectableSugAggregators = collectables.toArray(new Aggregator[collectables.size()]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -73,9 +62,7 @@ public abstract class BucketsAggregator extends Aggregator {
|
||||||
*/
|
*/
|
||||||
protected final void collectExistingBucket(int doc, long bucketOrd) throws IOException {
|
protected final void collectExistingBucket(int doc, long bucketOrd) throws IOException {
|
||||||
docCounts.increment(bucketOrd, 1);
|
docCounts.increment(bucketOrd, 1);
|
||||||
for (int i = 0; i < collectableSugAggregators.length; i++) {
|
collectBucketNoCounts(doc, bucketOrd);
|
||||||
collectableSugAggregators[i].collect(doc, bucketOrd);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public LongArray getDocCounts() {
|
public LongArray getDocCounts() {
|
||||||
|
@ -86,9 +73,7 @@ public abstract class BucketsAggregator extends Aggregator {
|
||||||
* Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket
|
* Utility method to collect the given doc in the given bucket but not to update the doc counts of the bucket
|
||||||
*/
|
*/
|
||||||
protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException {
|
protected final void collectBucketNoCounts(int doc, long bucketOrd) throws IOException {
|
||||||
for (int i = 0; i < collectableSugAggregators.length; i++) {
|
collectableSugAggregators.collect(doc, bucketOrd);
|
||||||
collectableSugAggregators[i].collect(doc, bucketOrd);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -121,7 +121,7 @@ public abstract class FacetExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public abstract void postCollection();
|
public abstract void postCollection() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -184,14 +184,14 @@ public class FacetPhase implements SearchPhase {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
context.searcher().search(query, MultiCollector.wrap(entry.getValue().toArray(new Collector[entry.getValue().size()])));
|
context.searcher().search(query, MultiCollector.wrap(entry.getValue().toArray(new Collector[entry.getValue().size()])));
|
||||||
|
for (Collector collector : entry.getValue()) {
|
||||||
|
if (collector instanceof XCollector) {
|
||||||
|
((XCollector) collector).postCollection();
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new QueryPhaseExecutionException(context, "Failed to execute global facets", e);
|
throw new QueryPhaseExecutionException(context, "Failed to execute global facets", e);
|
||||||
}
|
}
|
||||||
for (Collector collector : entry.getValue()) {
|
|
||||||
if (collector instanceof XCollector) {
|
|
||||||
((XCollector) collector).postCollection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,7 @@ public class NestedFacetExecutor extends FacetExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
if (collector instanceof XCollector) {
|
if (collector instanceof XCollector) {
|
||||||
((XCollector) collector).postCollection();
|
((XCollector) collector).postCollection();
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class DocIdSetCollector extends XCollector implements Releasable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postCollection() {
|
public void postCollection() throws IOException {
|
||||||
if (collector instanceof XCollector) {
|
if (collector instanceof XCollector) {
|
||||||
((XCollector) collector).postCollection();
|
((XCollector) collector).postCollection();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue