Decouples the AggregatorBuilder from the AggregatorFactory

This commit is contained in:
Colin Goodheart-Smithe 2016-02-08 16:18:20 +00:00
parent 02ecfd6279
commit 323088d022
11 changed files with 48 additions and 70 deletions

View File

@ -65,8 +65,10 @@ public class AggregationParseElement implements SearchParseElement {
QueryParseContext parseContext = new QueryParseContext(queriesRegistry); QueryParseContext parseContext = new QueryParseContext(queriesRegistry);
parseContext.reset(parser); parseContext.reset(parser);
parseContext.parseFieldMatcher(context.parseFieldMatcher()); parseContext.parseFieldMatcher(context.parseFieldMatcher());
AggregatorFactories.Builder factories = aggregatorParsers.parseAggregators(parser, parseContext); AggregatorFactories.Builder builders = aggregatorParsers.parseAggregators(parser, parseContext);
AggregationContext aggContext = new AggregationContext(context); AggregationContext aggContext = new AggregationContext(context);
context.aggregations(new SearchContextAggregations(factories.build(aggContext))); AggregatorFactories factories = builders.build(aggContext);
factories.validate();
context.aggregations(new SearchContextAggregations(factories));
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -29,15 +30,19 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
* A factory that knows how to create an {@link Aggregator} of a specific type. * A factory that knows how to create an {@link Aggregator} of a specific type.
*/ */
public abstract class AggregatorBuilder<AB extends AggregatorBuilder<AB>> extends AggregatorFactory<AB> public abstract class AggregatorBuilder<AB extends AggregatorBuilder<AB>> extends ToXContentToBytes
implements NamedWriteable<AB>, ToXContent { implements NamedWriteable<AB>, ToXContent {
protected String name;
protected Type type;
protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder(); protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder();
protected Map<String, Object> metaData;
/** /**
* Constructs a new aggregator factory. * Constructs a new aggregator factory.
@ -46,7 +51,8 @@ public abstract class AggregatorBuilder<AB extends AggregatorBuilder<AB>> extend
* @param type The aggregation type * @param type The aggregation type
*/ */
public AggregatorBuilder(String name, Type type) { public AggregatorBuilder(String name, Type type) {
super(name, type); this.name = name;
this.type = type;
} }
/** /**
@ -82,6 +88,15 @@ public abstract class AggregatorBuilder<AB extends AggregatorBuilder<AB>> extend
return (AB) this; return (AB) this;
} }
public AB setMetaData(Map<String, Object> metaData) {
this.metaData = metaData;
return (AB) this;
}
public String getType() {
return type.name();
}
public final AggregatorFactory<?> build(AggregationContext context) throws IOException { public final AggregatorFactory<?> build(AggregationContext context) throws IOException {
AggregatorFactory<?> factory = doBuild(context); AggregatorFactory<?> factory = doBuild(context);
if (factoriesBuilder != null && factoriesBuilder.count() > 0) { if (factoriesBuilder != null && factoriesBuilder.count() > 0) {
@ -91,10 +106,7 @@ public abstract class AggregatorBuilder<AB extends AggregatorBuilder<AB>> extend
return factory; return factory;
} }
// NORELEASE make this method abstract when agg refactoring is complete protected abstract AggregatorFactory<?> doBuild(AggregationContext context) throws IOException;
protected AggregatorFactory<?> doBuild(AggregationContext context) throws IOException {
return this;
}
@Override @Override
public final AB readFrom(StreamInput in) throws IOException { public final AB readFrom(StreamInput in) throws IOException {

View File

@ -44,7 +44,7 @@ import java.util.Set;
*/ */
public class AggregatorFactories { public class AggregatorFactories {
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorBuilder<?>[0], public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0],
new ArrayList<PipelineAggregatorFactory>()); new ArrayList<PipelineAggregatorFactory>());
private AggregatorFactory<?> parent; private AggregatorFactory<?> parent;

View File

@ -112,6 +112,9 @@ public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> extend
factories.validate(); factories.validate();
} }
public void doValidate() {
}
/** /**
* @return The parent factory if one exists (will always return {@code null} * @return The parent factory if one exists (will always return {@code null}
* for top level aggregator factories). * for top level aggregator factories).
@ -120,11 +123,8 @@ public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> extend
return parent; return parent;
} }
// NORELEASE make this abstract when agg refactoring is complete protected abstract Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
protected Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException;
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
throw new UnsupportedOperationException("THIS SHOULD NEVER BE CALLED");
}
/** /**
* Creates the aggregator * Creates the aggregator
@ -144,9 +144,6 @@ public abstract class AggregatorFactory<AF extends AggregatorFactory<AF>> extend
return createInternal(context, parent, collectsFromSingleBucket, this.factories.createPipelineAggregators(), this.metaData); return createInternal(context, parent, collectsFromSingleBucket, this.factories.createPipelineAggregators(), this.metaData);
} }
public void doValidate() {
}
public AF setMetaData(Map<String, Object> metaData) { public AF setMetaData(Map<String, Object> metaData) {
this.metaData = metaData; this.metaData = metaData;
return (AF) this; return (AF) this;

View File

@ -218,17 +218,13 @@ public class AggregatorParsers {
parser.getTokenLocation()); parser.getTokenLocation());
} else if (aggFactory != null) { } else if (aggFactory != null) {
assert pipelineAggregatorFactory == null; assert pipelineAggregatorFactory == null;
if (metaData != null) { if (metaData != null) {
aggFactory.setMetaData(metaData); aggFactory.setMetaData(metaData);
} }
if (subFactories != null) { if (subFactories != null) {
aggFactory.subAggregations(subFactories); aggFactory.subAggregations(subFactories);
} }
if (level == 0) {
aggFactory.validate();
}
factories.addAggregator(aggFactory); factories.addAggregator(aggFactory);
} else { } else {
@ -238,10 +234,6 @@ public class AggregatorParsers {
"Aggregation [" + aggregationName + "] cannot define sub-aggregations", "Aggregation [" + aggregationName + "] cannot define sub-aggregations",
parser.getTokenLocation()); parser.getTokenLocation());
} }
if (level == 0) {
pipelineAggregatorFactory
.validate(null, factories.getAggregatorFactories(), factories.getPipelineAggregatorFactories());
}
factories.addPipelineAggregator(pipelineAggregatorFactory); factories.addPipelineAggregator(pipelineAggregatorFactory);
} }
} }

View File

@ -35,7 +35,7 @@ import java.util.Map;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
abstract class AbstractHistogramAggregatorFactory<AF extends AbstractHistogramAggregatorFactory<AF>> public abstract class AbstractHistogramAggregatorFactory<AF extends AbstractHistogramAggregatorFactory<AF>>
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, AF> { extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, AF> {
protected final long interval; protected final long interval;
@ -59,6 +59,10 @@ abstract class AbstractHistogramAggregatorFactory<AF extends AbstractHistogramAg
this.histogramFactory = histogramFactory; this.histogramFactory = histogramFactory;
} }
public long minDocCount() {
return minDocCount;
}
@Override @Override
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException { List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

View File

@ -43,9 +43,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorBuilder; import org.elasticsearch.search.aggregations.AggregatorBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
@ -557,11 +555,6 @@ public class TopHitsAggregator extends MetricsAggregator {
return trackScores; return trackScores;
} }
@Override
public TopHitsAggregatorBuilder subFactories(AggregatorFactories subFactories) {
throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + type + "] cannot accept sub-aggregations");
}
@Override @Override
protected AggregatorFactory<?> doBuild(AggregationContext context) { protected AggregatorFactory<?> doBuild(AggregationContext context) {
return new TopHitsAggregatorFactory(name, type, from, size, explain, version, trackScores, sorts, highlightBuilder, fieldNames, return new TopHitsAggregatorFactory(name, type, from, size, explain, version, trackScores, sorts, highlightBuilder, fieldNames,

View File

@ -22,13 +22,12 @@ package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
@ -158,11 +157,11 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]"); + " must contain a single entry for aggregation [" + name + "]");
} }
if (!(parent instanceof HistogramAggregator.AbstractBuilder)) { if (!(parent instanceof AbstractHistogramAggregatorFactory<?>)) {
throw new IllegalStateException("cumulative sum aggregation [" + name throw new IllegalStateException("cumulative sum aggregation [" + name
+ "] must have a histogram or date_histogram as parent"); + "] must have a histogram or date_histogram as parent");
} else { } else {
HistogramAggregator.AbstractBuilder histoParent = (HistogramAggregator.AbstractBuilder) parent; AbstractHistogramAggregatorFactory<?> histoParent = (AbstractHistogramAggregatorFactory<?>) parent;
if (histoParent.minDocCount() != 0) { if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name
+ "] must have min_doc_count of 0"); + "] must have min_doc_count of 0");

View File

@ -30,9 +30,9 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -234,11 +234,11 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]"); + " must contain a single entry for aggregation [" + name + "]");
} }
if (!(parent instanceof HistogramAggregator.AbstractBuilder)) { if (!(parent instanceof AbstractHistogramAggregatorFactory<?>)) {
throw new IllegalStateException("derivative aggregation [" + name throw new IllegalStateException("derivative aggregation [" + name
+ "] must have a histogram or date_histogram as parent"); + "] must have a histogram or date_histogram as parent");
} else { } else {
HistogramAggregator.AbstractBuilder histoParent = (HistogramAggregator.AbstractBuilder) parent; AbstractHistogramAggregatorFactory<?> histoParent = (AbstractHistogramAggregatorFactory<?>) parent;
if (histoParent.minDocCount() != 0) { if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of derivative aggregation [" + name throw new IllegalStateException("parent histogram of derivative aggregation [" + name
+ "] must have min_doc_count of 0"); + "] must have min_doc_count of 0");

View File

@ -29,7 +29,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
@ -455,11 +455,11 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]"); + " must contain a single entry for aggregation [" + name + "]");
} }
if (!(parent instanceof HistogramAggregator.AbstractBuilder)) { if (!(parent instanceof AbstractHistogramAggregatorFactory<?>)) {
throw new IllegalStateException("moving average aggregation [" + name throw new IllegalStateException("moving average aggregation [" + name
+ "] must have a histogram or date_histogram as parent"); + "] must have a histogram or date_histogram as parent");
} else { } else {
HistogramAggregator.AbstractBuilder histoParent = (HistogramAggregator.AbstractBuilder) parent; AbstractHistogramAggregatorFactory<?> histoParent = (AbstractHistogramAggregatorFactory<?>) parent;
if (histoParent.minDocCount() != 0) { if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of moving average aggregation [" + name throw new IllegalStateException("parent histogram of moving average aggregation [" + name
+ "] must have min_doc_count of 0"); + "] must have min_doc_count of 0");

View File

@ -33,11 +33,9 @@ import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.SearchScript; import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationInitializationException; import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.AggregatorBuilder; import org.elasticsearch.search.aggregations.AggregatorBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -65,7 +63,7 @@ public abstract class ValuesSourceAggregatorBuilder<VS extends ValuesSource, AB
} }
@Override @Override
public AB subFactories(AggregatorFactories subFactories) { public AB subAggregations(Builder subFactories) {
throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + type + "] cannot accept sub-aggregations"); throw new AggregationInitializationException("Aggregator [" + name + "] of type [" + type + "] cannot accept sub-aggregations");
} }
} }
@ -192,25 +190,6 @@ public abstract class ValuesSourceAggregatorBuilder<VS extends ValuesSource, AB
protected abstract ValuesSourceAggregatorFactory<VS, ?> innerBuild(AggregationContext context, ValuesSourceConfig<VS> config); protected abstract ValuesSourceAggregatorFactory<VS, ?> innerBuild(AggregationContext context, ValuesSourceConfig<VS> config);
@Override
public void doInit(AggregationContext context) {
this.config = config(context);
}
@Override
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
VS vs = context.valuesSource(config, context.searchContext());
if (vs == null) {
return createUnmapped(context, parent, pipelineAggregators, metaData);
}
return doCreateInternal(vs, context, parent, collectsFromSingleBucket, pipelineAggregators, metaData);
}
@Override
public void doValidate() {
}
public ValuesSourceConfig<VS> config(AggregationContext context) { public ValuesSourceConfig<VS> config(AggregationContext context) {
ValueType valueType = this.valueType != null ? this.valueType : targetValueType; ValueType valueType = this.valueType != null ? this.valueType : targetValueType;