Make all aggregators reader-context-aware.

This removes the overhead of polling a Bytes/Double/Long-Values instance in
every call to collect.

Additionally, the AggregationsCollector has been changed to wrap a simple array
instead of an ArrayList.

Close #4841
This commit is contained in:
Adrien Grand 2014-01-20 11:26:40 +01:00
parent 9282ae4ffd
commit 080ce71d54
22 changed files with 155 additions and 89 deletions

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.query.QueryPhaseExecutionException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -145,10 +146,10 @@ public class AggregationPhase implements SearchPhase {
public static class AggregationsCollector extends XCollector { public static class AggregationsCollector extends XCollector {
private final AggregationContext aggregationContext; private final AggregationContext aggregationContext;
private final List<Aggregator> collectors; private final Aggregator[] collectors;
public AggregationsCollector(List<Aggregator> collectors, AggregationContext aggregationContext) { public AggregationsCollector(Collection<Aggregator> collectors, AggregationContext aggregationContext) {
this.collectors = collectors; this.collectors = collectors.toArray(new Aggregator[collectors.size()]);
this.aggregationContext = aggregationContext; this.aggregationContext = aggregationContext;
} }
@ -159,8 +160,8 @@ public class AggregationPhase implements SearchPhase {
@Override @Override
public void collect(int doc) throws IOException { public void collect(int doc) throws IOException {
for (int i = 0; i < collectors.size(); i++) { for (Aggregator collector : collectors) {
collectors.get(i).collect(doc, 0); collector.collect(doc, 0);
} }
} }
@ -176,8 +177,8 @@ public class AggregationPhase implements SearchPhase {
@Override @Override
public void postCollection() { public void postCollection() {
for (int i = 0; i < collectors.size(); i++) { for (Aggregator collector : collectors) {
collectors.get(i).postCollection(); collector.postCollection();
} }
} }
} }

View File

@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
@ -28,7 +29,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public abstract class Aggregator implements Releasable { public abstract class Aggregator implements Releasable, ReaderContextAware {
/** /**
* Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create. * Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create.

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArray;
@ -45,6 +46,14 @@ public class AggregatorFactories {
this.factories = factories; this.factories = factories;
} }
private static Aggregator createAndRegisterContextAware(AggregationContext context, AggregatorFactory factory, Aggregator parent, long estimatedBucketsCount) {
final Aggregator aggregator = factory.create(context, parent, estimatedBucketsCount);
if (aggregator.shouldCollect()) {
context.registerReaderContextAware(aggregator);
}
return aggregator;
}
/** /**
* Create all aggregators so that they can be consumed with multiple buckets. * Create all aggregators so that they can be consumed with multiple buckets.
*/ */
@ -52,7 +61,7 @@ public class AggregatorFactories {
Aggregator[] aggregators = new Aggregator[count()]; Aggregator[] aggregators = new Aggregator[count()];
for (int i = 0; i < factories.length; ++i) { for (int i = 0; i < factories.length; ++i) {
final AggregatorFactory factory = factories[i]; final AggregatorFactory factory = factories[i];
final Aggregator first = factory.create(parent.context(), parent, estimatedBucketsCount); final Aggregator first = createAndRegisterContextAware(parent.context(), factory, parent, estimatedBucketsCount);
if (first.bucketAggregationMode() == BucketAggregationMode.MULTI_BUCKETS) { if (first.bucketAggregationMode() == BucketAggregationMode.MULTI_BUCKETS) {
// This aggregator already supports multiple bucket ordinals, can be used directly // This aggregator already supports multiple bucket ordinals, can be used directly
aggregators[i] = first; aggregators[i] = first;
@ -67,7 +76,7 @@ public class AggregatorFactories {
aggregators = BigArrays.newObjectArray(estimatedBucketsCount, context.pageCacheRecycler()); aggregators = BigArrays.newObjectArray(estimatedBucketsCount, context.pageCacheRecycler());
aggregators.set(0, first); aggregators.set(0, first);
for (long i = 1; i < estimatedBucketsCount; ++i) { for (long i = 1; i < estimatedBucketsCount; ++i) {
aggregators.set(i, factory.create(parent.context(), parent, estimatedBucketsCount)); aggregators.set(i, createAndRegisterContextAware(parent.context(), factory, parent, estimatedBucketsCount));
} }
} }
@ -91,12 +100,16 @@ public class AggregatorFactories {
aggregators = BigArrays.grow(aggregators, owningBucketOrdinal + 1); aggregators = BigArrays.grow(aggregators, owningBucketOrdinal + 1);
Aggregator aggregator = aggregators.get(owningBucketOrdinal); Aggregator aggregator = aggregators.get(owningBucketOrdinal);
if (aggregator == null) { if (aggregator == null) {
aggregator = factory.create(parent.context(), parent, estimatedBucketsCount); aggregator = createAndRegisterContextAware(parent.context(), factory, parent, estimatedBucketsCount);
aggregators.set(owningBucketOrdinal, aggregator); aggregators.set(owningBucketOrdinal, aggregator);
} }
aggregator.collect(doc, 0); aggregator.collect(doc, 0);
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
}
@Override @Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) { public InternalAggregation buildAggregation(long owningBucketOrdinal) {
return aggregators.get(owningBucketOrdinal).buildAggregation(0); return aggregators.get(owningBucketOrdinal).buildAggregation(0);
@ -120,7 +133,7 @@ public class AggregatorFactories {
// These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones // These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
Aggregator[] aggregators = new Aggregator[factories.length]; Aggregator[] aggregators = new Aggregator[factories.length];
for (int i = 0; i < factories.length; i++) { for (int i = 0; i < factories.length; i++) {
aggregators[i] = factories[i].create(ctx, null, 0); aggregators[i] = createAndRegisterContextAware(ctx, factories[i], null, 0);
} }
return aggregators; return aggregators;
} }

View File

@ -21,22 +21,17 @@ package org.elasticsearch.search.aggregations.bucket.filter;
import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.search.Filter; import org.apache.lucene.search.Filter;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.docset.DocIdSets; import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import java.io.IOException; import java.io.IOException;
/** /**
* Aggregate all docs that match a filter. * Aggregate all docs that match a filter.
*/ */
public class FilterAggregator extends SingleBucketAggregator implements ReaderContextAware { public class FilterAggregator extends SingleBucketAggregator {
private final Filter filter; private final Filter filter;
@ -88,9 +83,7 @@ public class FilterAggregator extends SingleBucketAggregator implements ReaderCo
@Override @Override
public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) { public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) {
FilterAggregator aggregator = new FilterAggregator(name, filter, factories, context, parent); return new FilterAggregator(name, filter, factories, context, parent);
context.registerReaderContextAware(aggregator);
return aggregator;
} }
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.geogrid; package org.elasticsearch.search.aggregations.bucket.geogrid;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.fielddata.LongValues; import org.elasticsearch.index.fielddata.LongValues;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
@ -45,6 +46,7 @@ public class GeoHashGridAggregator extends BucketsAggregator {
private final int shardSize; private final int shardSize;
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private final LongHash bucketOrds; private final LongHash bucketOrds;
private LongValues values;
public GeoHashGridAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, public GeoHashGridAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource,
int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) { int requiredSize, int shardSize, AggregationContext aggregationContext, Aggregator parent) {
@ -60,10 +62,14 @@ public class GeoHashGridAggregator extends BucketsAggregator {
return true; return true;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.longValues();
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0; assert owningBucketOrdinal == 0;
final LongValues values = valuesSource.longValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
for (int i = 0; i < valuesCount; ++i) { for (int i = 0; i < valuesCount; ++i) {
@ -145,6 +151,10 @@ public class GeoHashGridAggregator extends BucketsAggregator {
return false; return false;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
} }

View File

@ -18,13 +18,10 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.global; package org.elasticsearch.search.aggregations.bucket.global;
import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import java.io.IOException; import java.io.IOException;
@ -37,6 +34,10 @@ public class GlobalAggregator extends SingleBucketAggregator {
super(name, subFactories, aggregationContext, null); super(name, subFactories, aggregationContext, null);
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0 : "global aggregator can only be a top level aggregator"; assert owningBucketOrdinal == 0 : "global aggregator can only be a top level aggregator";

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.histogram; package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
@ -49,6 +50,7 @@ public class HistogramAggregator extends BucketsAggregator {
private final AbstractHistogramBase.Factory histogramFactory; private final AbstractHistogramBase.Factory histogramFactory;
private final LongHash bucketOrds; private final LongHash bucketOrds;
private LongValues values;
public HistogramAggregator(String name, public HistogramAggregator(String name,
AggregatorFactories factories, AggregatorFactories factories,
@ -78,10 +80,14 @@ public class HistogramAggregator extends BucketsAggregator {
return valuesSource != null; return valuesSource != null;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.longValues();
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0; assert owningBucketOrdinal == 0;
final LongValues values = valuesSource.longValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
long previousKey = Long.MIN_VALUE; long previousKey = Long.MIN_VALUE;

View File

@ -18,14 +18,16 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.missing; package org.elasticsearch.search.aggregations.bucket.missing;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException; import java.io.IOException;
@ -34,7 +36,8 @@ import java.io.IOException;
*/ */
public class MissingAggregator extends SingleBucketAggregator { public class MissingAggregator extends SingleBucketAggregator {
private ValuesSource valuesSource; private final ValuesSource valuesSource;
private BytesValues values;
public MissingAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, public MissingAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource,
AggregationContext aggregationContext, Aggregator parent) { AggregationContext aggregationContext, Aggregator parent) {
@ -42,9 +45,16 @@ public class MissingAggregator extends SingleBucketAggregator {
this.valuesSource = valuesSource; this.valuesSource = valuesSource;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
if (valuesSource != null) {
values = valuesSource.bytesValues();
}
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
if (valuesSource == null || valuesSource.bytesValues().setDocument(doc) == 0) { if (valuesSource == null || values.setDocument(doc) == 0) {
collectBucket(doc, owningBucketOrdinal); collectBucket(doc, owningBucketOrdinal);
} }
} }

View File

@ -119,9 +119,7 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo
@Override @Override
public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) { public Aggregator create(AggregationContext context, Aggregator parent, long expectedBucketsCount) {
NestedAggregator aggregator = new NestedAggregator(name, factories, path, context, parent); return new NestedAggregator(name, factories, path, context, parent);
context.registerReaderContextAware(aggregator);
return aggregator;
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.range; package org.elasticsearch.search.aggregations.bucket.range;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.util.InPlaceMergeSorter; import org.apache.lucene.util.InPlaceMergeSorter;
import org.elasticsearch.index.fielddata.DoubleValues; import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
@ -81,6 +82,7 @@ public class RangeAggregator extends BucketsAggregator {
private final Range[] ranges; private final Range[] ranges;
private final boolean keyed; private final boolean keyed;
private final AbstractRangeBase.Factory rangeFactory; private final AbstractRangeBase.Factory rangeFactory;
private DoubleValues values;
final double[] maxTo; final double[] maxTo;
@ -117,13 +119,17 @@ public class RangeAggregator extends BucketsAggregator {
return true; return true;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.doubleValues();
}
private final long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) { private final long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) {
return owningBucketOrdinal * ranges.length + rangeOrd; return owningBucketOrdinal * ranges.length + rangeOrd;
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
final DoubleValues values = valuesSource.doubleValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
for (int i = 0, lo = 0; i < valuesCount; ++i) { for (int i = 0, lo = 0; i < valuesCount; ++i) {
final double value = values.nextValue(); final double value = values.nextValue();
@ -263,6 +269,10 @@ public class RangeAggregator extends BucketsAggregator {
return false; return false;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
} }

View File

@ -44,6 +44,7 @@ public class DoubleTermsAggregator extends BucketsAggregator {
private final long minDocCount; private final long minDocCount;
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private final LongHash bucketOrds; private final LongHash bucketOrds;
private DoubleValues values;
public DoubleTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount, public DoubleTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) {
@ -61,10 +62,14 @@ public class DoubleTermsAggregator extends BucketsAggregator {
return true; return true;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.doubleValues();
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0; assert owningBucketOrdinal == 0;
final DoubleValues values = valuesSource.doubleValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
for (int i = 0; i < valuesCount; ++i) { for (int i = 0; i < valuesCount; ++i) {

View File

@ -44,6 +44,7 @@ public class LongTermsAggregator extends BucketsAggregator {
private final long minDocCount; private final long minDocCount;
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private final LongHash bucketOrds; private final LongHash bucketOrds;
private LongValues values;
public LongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount, public LongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) { InternalOrder order, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent) {
@ -61,10 +62,14 @@ public class LongTermsAggregator extends BucketsAggregator {
return true; return true;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.longValues();
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0; assert owningBucketOrdinal == 0;
final LongValues values = valuesSource.longValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
for (int i = 0; i < valuesCount; ++i) { for (int i = 0; i < valuesCount; ++i) {

View File

@ -26,7 +26,6 @@ import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.collect.Iterators2; import org.elasticsearch.common.collect.Iterators2;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.BytesValues; import org.elasticsearch.index.fielddata.BytesValues;
@ -56,6 +55,7 @@ public class StringTermsAggregator extends BucketsAggregator {
private final long minDocCount; private final long minDocCount;
protected final BytesRefHash bucketOrds; protected final BytesRefHash bucketOrds;
private final IncludeExclude includeExclude; private final IncludeExclude includeExclude;
private BytesValues values;
public StringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount, public StringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource, long estimatedBucketCount,
InternalOrder order, int requiredSize, int shardSize, long minDocCount, InternalOrder order, int requiredSize, int shardSize, long minDocCount,
@ -76,10 +76,14 @@ public class StringTermsAggregator extends BucketsAggregator {
return true; return true;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
values = valuesSource.bytesValues();
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0; assert owningBucketOrdinal == 0;
final BytesValues values = valuesSource.bytesValues();
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);
for (int i = 0; i < valuesCount; ++i) { for (int i = 0; i < valuesCount; ++i) {
@ -250,7 +254,7 @@ public class StringTermsAggregator extends BucketsAggregator {
/** /**
* Extension of StringTermsAggregator that caches bucket ords using terms ordinals. * Extension of StringTermsAggregator that caches bucket ords using terms ordinals.
*/ */
public static class WithOrdinals extends StringTermsAggregator implements ReaderContextAware { public static class WithOrdinals extends StringTermsAggregator {
private final BytesValuesSource.WithOrdinals valuesSource; private final BytesValuesSource.WithOrdinals valuesSource;
private BytesValues.WithOrdinals bytesValues; private BytesValues.WithOrdinals bytesValues;

View File

@ -108,10 +108,7 @@ public class TermsAggregatorFactory extends ValueSourceAggregatorFactory {
if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) { if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) {
assert includeExclude == null; assert includeExclude == null;
final StringTermsAggregator.WithOrdinals aggregator = new StringTermsAggregator.WithOrdinals(name, return new StringTermsAggregator.WithOrdinals(name, factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, aggregationContext, parent);
aggregationContext.registerReaderContextAware(aggregator);
return aggregator;
} else { } else {
return new StringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent); return new StringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, order, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent);
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.terms; package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
@ -46,6 +47,10 @@ public class UnmappedTermsAggregator extends Aggregator {
return false; return false;
} }
@Override
public void setNextReader(AtomicReaderContext reader) {
}
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long owningBucketOrdinal) throws IOException {
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.avg; package org.elasticsearch.search.aggregations.metrics.avg;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.DoubleArray;
@ -39,6 +40,7 @@ import java.io.IOException;
public class AvgAggregator extends MetricsAggregator.SingleValue { public class AvgAggregator extends MetricsAggregator.SingleValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private LongArray counts; private LongArray counts;
private DoubleArray sums; private DoubleArray sums;
@ -59,14 +61,12 @@ public class AvgAggregator extends MetricsAggregator.SingleValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "if value source is null, collect should never be called"; values = valuesSource.doubleValues();
DoubleValues values = valuesSource.doubleValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
counts = BigArrays.grow(counts, owningBucketOrdinal + 1); counts = BigArrays.grow(counts, owningBucketOrdinal + 1);
sums = BigArrays.grow(sums, owningBucketOrdinal + 1); sums = BigArrays.grow(sums, owningBucketOrdinal + 1);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.max; package org.elasticsearch.search.aggregations.metrics.max;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.DoubleArray;
@ -38,6 +39,7 @@ import java.io.IOException;
public class MaxAggregator extends MetricsAggregator.SingleValue { public class MaxAggregator extends MetricsAggregator.SingleValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private DoubleArray maxes; private DoubleArray maxes;
@ -57,14 +59,12 @@ public class MaxAggregator extends MetricsAggregator.SingleValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "collect should only be called when value source is not null"; values = valuesSource.doubleValues();
DoubleValues values = valuesSource.doubleValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
if (owningBucketOrdinal >= maxes.size()) { if (owningBucketOrdinal >= maxes.size()) {
long from = maxes.size(); long from = maxes.size();
maxes = BigArrays.grow(maxes, owningBucketOrdinal + 1); maxes = BigArrays.grow(maxes, owningBucketOrdinal + 1);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.min; package org.elasticsearch.search.aggregations.metrics.min;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.DoubleArray;
@ -38,6 +39,7 @@ import java.io.IOException;
public class MinAggregator extends MetricsAggregator.SingleValue { public class MinAggregator extends MetricsAggregator.SingleValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private DoubleArray mins; private DoubleArray mins;
@ -57,11 +59,13 @@ public class MinAggregator extends MetricsAggregator.SingleValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "collect must only be called if #shouldCollect returns true"; values = valuesSource.doubleValues();
}
DoubleValues values = valuesSource.doubleValues(); @Override
if (values == null || values.setDocument(doc) == 0) { public void collect(int doc, long owningBucketOrdinal) throws IOException {
if (values.setDocument(doc) == 0) {
return; return;
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.stats; package org.elasticsearch.search.aggregations.metrics.stats;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -40,6 +41,7 @@ import java.io.IOException;
public class StatsAggegator extends MetricsAggregator.MultiValue { public class StatsAggegator extends MetricsAggregator.MultiValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private LongArray counts; private LongArray counts;
private DoubleArray sums; private DoubleArray sums;
@ -66,14 +68,12 @@ public class StatsAggegator extends MetricsAggregator.MultiValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "collect must only be called if #shouldCollect returns true"; values = valuesSource.doubleValues();
DoubleValues values = valuesSource.doubleValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
if (owningBucketOrdinal >= counts.size()) { if (owningBucketOrdinal >= counts.size()) {
final long from = counts.size(); final long from = counts.size();
final long overSize = BigArrays.overSize(owningBucketOrdinal + 1); final long overSize = BigArrays.overSize(owningBucketOrdinal + 1);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.stats.extended; package org.elasticsearch.search.aggregations.metrics.stats.extended;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
@ -40,6 +41,7 @@ import java.io.IOException;
public class ExtendedStatsAggregator extends MetricsAggregator.MultiValue { public class ExtendedStatsAggregator extends MetricsAggregator.MultiValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private LongArray counts; private LongArray counts;
private DoubleArray sums; private DoubleArray sums;
@ -68,14 +70,12 @@ public class ExtendedStatsAggregator extends MetricsAggregator.MultiValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "collect must only be called if #shouldCollect returns true"; values = valuesSource.doubleValues();
DoubleValues values = valuesSource.doubleValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
if (owningBucketOrdinal >= counts.size()) { if (owningBucketOrdinal >= counts.size()) {
final long from = counts.size(); final long from = counts.size();
final long overSize = BigArrays.overSize(owningBucketOrdinal + 1); final long overSize = BigArrays.overSize(owningBucketOrdinal + 1);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.sum; package org.elasticsearch.search.aggregations.metrics.sum;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.common.util.DoubleArray;
@ -38,6 +39,7 @@ import java.io.IOException;
public class SumAggregator extends MetricsAggregator.SingleValue { public class SumAggregator extends MetricsAggregator.SingleValue {
private final NumericValuesSource valuesSource; private final NumericValuesSource valuesSource;
private DoubleValues values;
private DoubleArray sums; private DoubleArray sums;
@ -56,14 +58,12 @@ public class SumAggregator extends MetricsAggregator.SingleValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
assert valuesSource != null : "collect must only be called after #shouldCollect returns true"; values = valuesSource.doubleValues();
DoubleValues values = valuesSource.doubleValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
sums = BigArrays.grow(sums, owningBucketOrdinal + 1); sums = BigArrays.grow(sums, owningBucketOrdinal + 1);
final int valuesCount = values.setDocument(doc); final int valuesCount = values.setDocument(doc);

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.valuecount; package org.elasticsearch.search.aggregations.metrics.valuecount;
import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongArray;
@ -41,6 +42,7 @@ import java.io.IOException;
public class ValueCountAggregator extends MetricsAggregator.SingleValue { public class ValueCountAggregator extends MetricsAggregator.SingleValue {
private final BytesValuesSource valuesSource; private final BytesValuesSource valuesSource;
private BytesValues values;
// a count per bucket // a count per bucket
LongArray counts; LongArray counts;
@ -61,11 +63,12 @@ public class ValueCountAggregator extends MetricsAggregator.SingleValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void setNextReader(AtomicReaderContext reader) {
BytesValues values = valuesSource.bytesValues(); values = valuesSource.bytesValues();
if (values == null) {
return;
} }
@Override
public void collect(int doc, long owningBucketOrdinal) throws IOException {
counts = BigArrays.grow(counts, owningBucketOrdinal + 1); counts = BigArrays.grow(counts, owningBucketOrdinal + 1);
counts.increment(owningBucketOrdinal, values.setDocument(doc)); counts.increment(owningBucketOrdinal, values.setDocument(doc));
} }