From 16812cc0321e56466a3d23bc6c1521d0cdd9ce77 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 13 Jul 2016 12:21:51 -0400 Subject: [PATCH] Migrate moving_avg pipeline aggregation to NamedWriteable This is the first pipeline aggregation that doesn't have its own bucket type that needs serializing. It uses InternalHistogram instead. So that required reworking the new-style `registerAggregation` method to not require bucket readers. So I built `PipelineAggregationSpec` to mirror `AggregationSpec`. It allows registering any number of bucket readers or result readers. --- .../elasticsearch/search/SearchModule.java | 142 +++++++++++++----- .../MovAvgPipelineAggregationBuilder.java | 6 +- .../movavg/MovAvgPipelineAggregator.java | 80 ++++------ 3 files changed, 136 insertions(+), 92 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index baf72763137..719b1ab110a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -98,7 +98,6 @@ import org.elasticsearch.plugins.SearchPlugin.SearchPluginSpec; import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.Aggregator.Parser; import org.elasticsearch.search.aggregations.AggregatorParsers; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; @@ -381,8 +380,10 @@ public class SearchModule extends AbstractModule { * Register an aggregation. */ public void registerAggregation(AggregationSpec spec) { - namedWriteableRegistry.register(AggregationBuilder.class, spec.aggregationName.getPreferredName(), spec.builderReader); - aggregationParserRegistry.register(spec.aggregationParser, spec.aggregationName); + if (false == transportClient) { + aggregationParserRegistry.register(spec.parser, spec.name); + } + namedWriteableRegistry.register(AggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader); for (Map.Entry> t : spec.resultReaders.entrySet()) { String writeableName = t.getKey(); Writeable.Reader internalReader = t.getValue(); @@ -393,29 +394,30 @@ public class SearchModule extends AbstractModule { public static class AggregationSpec { private final Map> resultReaders = new TreeMap<>(); private final Writeable.Reader builderReader; - private final Aggregator.Parser aggregationParser; - private final ParseField aggregationName; + private final Aggregator.Parser parser; + private final ParseField name; /** * Register an aggregation. * * @param builderReader reads the {@link AggregationBuilder} from a stream - * @param aggregationParser reads the aggregation builder from XContent - * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the - * reader is registered under. + * @param parser reads the aggregation builder from XContent + * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is + * registered under. */ - public AggregationSpec(Reader builderReader, Parser aggregationParser, ParseField aggregationName) { + public AggregationSpec(Reader builderReader, Aggregator.Parser parser, + ParseField name) { this.builderReader = builderReader; - this.aggregationParser = aggregationParser; - this.aggregationName = aggregationName; + this.parser = parser; + this.name = name; } /** - * Add a reader for the shard level results of the aggregation with {@linkplain #aggregationName}'s - * {@link ParseField#getPreferredName()} as the {@link NamedWriteable#getWriteableName()}. + * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as + * the {@link NamedWriteable#getWriteableName()}. */ public AggregationSpec addResultReader(Writeable.Reader resultReader) { - return addResultReader(aggregationName.getPreferredName(), resultReader); + return addResultReader(name.getPreferredName(), resultReader); } /** @@ -429,23 +431,73 @@ public class SearchModule extends AbstractModule { /** * Register a pipeline aggregation. - * - * @param reader reads the aggregation builder from a stream - * @param internalReader reads the {@link PipelineAggregator} from a stream - * @param bucketReader reads the {@link InternalAggregation} that represents a bucket in this aggregation from a stream - * @param aggregationParser reads the aggregation builder from XContent - * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader - * is registered under. */ - public void registerPipelineAggregation(Writeable.Reader reader, - Writeable.Reader internalReader, Writeable.Reader bucketReader, - PipelineAggregator.Parser aggregationParser, ParseField aggregationName) { + public void registerPipelineAggregation(PipelineAggregationSpec spec) { if (false == transportClient) { - pipelineAggregationParserRegistry.register(aggregationParser, aggregationName); + pipelineAggregationParserRegistry.register(spec.parser, spec.name); + } + namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader); + for (Map.Entry> resultReader : spec.resultReaders.entrySet()) { + namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue()); + } + for (Map.Entry> bucketReaders : spec.bucketReaders.entrySet()) { + namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue()); + } + } + + public static class PipelineAggregationSpec { + private final Map> resultReaders = new TreeMap<>(); + private final Map> bucketReaders = new TreeMap<>(); + private final Writeable.Reader builderReader; + private final PipelineAggregator.Parser parser; + private final ParseField name; + + /** + * Register a pipeline aggregation. + * + * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream + * @param parser reads the aggregation builder from XContent + * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is + * registered under. + */ + public PipelineAggregationSpec(Reader builderReader, + PipelineAggregator.Parser parser, ParseField name) { + this.builderReader = builderReader; + this.parser = parser; + this.name = name; + } + + /** + * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as + * the {@link NamedWriteable#getWriteableName()}. + */ + public PipelineAggregationSpec addResultReader(Writeable.Reader resultReader) { + return addResultReader(name.getPreferredName(), resultReader); + } + + /** + * Add a reader for the shard level results of the aggregation. + */ + public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader resultReader) { + resultReaders.put(writeableName, resultReader); + return this; + } + + /** + * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()} + * as the {@link NamedWriteable#getWriteableName()}. + */ + public PipelineAggregationSpec addBucketReader(Writeable.Reader resultReader) { + return addBucketReader(name.getPreferredName(), resultReader); + } + + /** + * Add a reader for the shard level results of the aggregation. + */ + public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader resultReader) { + bucketReaders.put(writeableName, resultReader); + return this; } - namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader); - namedWriteableRegistry.register(PipelineAggregator.class, aggregationName.getPreferredName(), internalReader); - namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), bucketReader); } public void registerPipelineAggregation(Writeable.Reader reader, @@ -552,8 +604,12 @@ public class SearchModule extends AbstractModule { registerAggregation(new AggregationSpec(ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse, ChildrenAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalChildren::new)); - registerPipelineAggregation(DerivativePipelineAggregationBuilder::new, DerivativePipelineAggregator::new, InternalDerivative::new, - DerivativePipelineAggregationBuilder::parse, DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(new PipelineAggregationSpec( + DerivativePipelineAggregationBuilder::new, + DerivativePipelineAggregationBuilder::parse, + DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(DerivativePipelineAggregator::new) + .addBucketReader(InternalDerivative::new)); registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER, MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER, @@ -562,17 +618,26 @@ public class SearchModule extends AbstractModule { AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER, SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(StatsBucketPipelineAggregationBuilder::new, StatsBucketPipelineAggregator::new, - InternalStatsBucket::new, StatsBucketPipelineAggregationBuilder.PARSER, - StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(ExtendedStatsBucketPipelineAggregationBuilder::new, ExtendedStatsBucketPipelineAggregator::new, - InternalExtendedStatsBucket::new, new ExtendedStatsBucketParser(), - ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); + registerPipelineAggregation(new PipelineAggregationSpec( + StatsBucketPipelineAggregationBuilder::new, + StatsBucketPipelineAggregationBuilder.PARSER, + StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(StatsBucketPipelineAggregator::new) + .addBucketReader(InternalStatsBucket::new)); + registerPipelineAggregation(new PipelineAggregationSpec( + ExtendedStatsBucketPipelineAggregationBuilder::new, + new ExtendedStatsBucketParser(), + ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD) + .addResultReader(ExtendedStatsBucketPipelineAggregator::new) + .addBucketReader(InternalExtendedStatsBucket::new)); registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER, PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); - registerPipelineAggregation(MovAvgPipelineAggregationBuilder::new, + registerPipelineAggregation(new PipelineAggregationSpec( + MovAvgPipelineAggregationBuilder::new, (n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c), - MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME); + MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME) + .addResultReader(MovAvgPipelineAggregator::new) + /* Uses InternalHistogram for buckets */); registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse, CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse, @@ -821,7 +886,6 @@ public class SearchModule extends AbstractModule { AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); PercentilesBucketPipelineAggregator.registerStreams(); - MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); BucketSelectorPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java index 7d71af17b42..f9d21087d6b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java @@ -50,7 +50,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator. import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { - public static final String NAME = MovAvgPipelineAggregator.TYPE.name(); + public static final String NAME = "moving_avg"; public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME); public static final ParseField MODEL = new ParseField("model"); @@ -67,14 +67,14 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio private Boolean minimize; public MovAvgPipelineAggregationBuilder(String name, String bucketsPath) { - super(name, MovAvgPipelineAggregator.TYPE.name(), new String[] { bucketsPath }); + super(name, NAME, new String[] { bucketsPath }); } /** * Read from a stream. */ public MovAvgPipelineAggregationBuilder(StreamInput in) throws IOException { - super(in, MovAvgPipelineAggregator.TYPE.name()); + super(in, NAME); format = in.readOptionalString(); gapPolicy = GapPolicy.readFrom(in); window = in.readVInt(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index 880b4e4e6a0..216890741bb 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -26,13 +26,11 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; -import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.joda.time.DateTime; @@ -47,31 +45,12 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class MovAvgPipelineAggregator extends PipelineAggregator { - - public static final Type TYPE = new Type("moving_avg"); - - public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { - @Override - public MovAvgPipelineAggregator readResult(StreamInput in) throws IOException { - MovAvgPipelineAggregator result = new MovAvgPipelineAggregator(); - result.readFrom(in); - return result; - } - }; - - public static void registerStreams() { - PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); - } - - private DocValueFormat formatter; - private GapPolicy gapPolicy; - private int window; + private final DocValueFormat formatter; + private final GapPolicy gapPolicy; + private final int window; private MovAvgModel model; - private int predict; - private boolean minimize; - - public MovAvgPipelineAggregator() { - } + private final int predict; + private final boolean minimize; public MovAvgPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy, int window, int predict, MovAvgModel model, boolean minimize, Map metadata) { @@ -84,9 +63,32 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { this.minimize = minimize; } + /** + * Read from a stream. + */ + public MovAvgPipelineAggregator(StreamInput in) throws IOException { + super(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + window = in.readVInt(); + predict = in.readVInt(); + model = in.readNamedWriteable(MovAvgModel.class); + minimize = in.readBoolean(); + } + @Override - public Type type() { - return TYPE; + public void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(formatter); + gapPolicy.writeTo(out); + out.writeVInt(window); + out.writeVInt(predict); + out.writeNamedWriteable(model); + out.writeBoolean(minimize); + } + + @Override + public String getWriteableName() { + return MovAvgPipelineAggregationBuilder.NAME; } @Override @@ -246,26 +248,4 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { return SimulatedAnealingMinimizer.minimize(model, values, test); } - - @Override - public void doReadFrom(StreamInput in) throws IOException { - formatter = in.readNamedWriteable(DocValueFormat.class); - gapPolicy = GapPolicy.readFrom(in); - window = in.readVInt(); - predict = in.readVInt(); - model = in.readNamedWriteable(MovAvgModel.class); - minimize = in.readBoolean(); - - } - - @Override - public void doWriteTo(StreamOutput out) throws IOException { - out.writeNamedWriteable(formatter); - gapPolicy.writeTo(out); - out.writeVInt(window); - out.writeVInt(predict); - out.writeNamedWriteable(model); - out.writeBoolean(minimize); - - } }