diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 18eba2f9cbf..44b504d777f 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -202,8 +202,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelec import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregatorBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; @@ -478,7 +478,8 @@ public class SearchModule extends AbstractModule { registerAggregation(ChildrenAggregatorBuilder::new, ChildrenAggregatorBuilder::parse, ChildrenAggregatorBuilder.AGGREGATION_NAME_FIELD); - registerPipelineParser(new DerivativeParser()); + registerPipelineAggregation(DerivativePipelineAggregatorBuilder::new, DerivativePipelineAggregatorBuilder::parse, + DerivativePipelineAggregatorBuilder.AGGREGATION_NAME_FIELD); registerPipelineParser(new MaxBucketParser()); registerPipelineParser(new MinBucketParser()); registerPipelineParser(new AvgBucketParser()); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java index 24d2913bfc0..98d364bcd07 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java @@ -29,7 +29,6 @@ import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InvalidAggregationPathException; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; -import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.support.AggregationPath; import java.io.IOException; @@ -157,7 +156,7 @@ public class BucketHelpers { try { Object propertyValue = bucket.getProperty(agg.getName(), aggPathAsList); if (propertyValue == null) { - throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() + throw new AggregationExecutionException(PipelineAggregatorBuilder.BUCKETS_PATH_FIELD.getPreferredName() + " must reference either a number value or a single value numeric metric aggregation"); } else { double value; @@ -166,7 +165,7 @@ public class BucketHelpers { } else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) { value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value(); } else { - throw new AggregationExecutionException(DerivativeParser.BUCKETS_PATH.getPreferredName() + throw new AggregationExecutionException(PipelineAggregatorBuilder.BUCKETS_PATH_FIELD.getPreferredName() + " must reference either a number value or a single value numeric metric aggregation, got: " + propertyValue.getClass().getCanonicalName()); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java index e9a102438a3..b010b6d28ea 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregator.java @@ -35,11 +35,9 @@ import java.util.Map; public abstract class PipelineAggregator implements Streamable { /** - * Parses the pipeline aggregation request and creates the appropriate - * pipeline aggregator factory for it. - * - * @see PipelineAggregatorBuilder + * Parse the {@link PipelineAggregatorBuilder} from a {@link QueryParseContext}. */ + @FunctionalInterface public static interface Parser { public static final ParseField BUCKETS_PATH = new ParseField("buckets_path"); @@ -50,7 +48,9 @@ public abstract class PipelineAggregator implements Streamable { /** * @return The aggregation type this parser is associated with. */ - String type(); + default String type() { + throw new UnsupportedOperationException(); // NORELEASE remove before 5.0.0GA + } /** * Returns the pipeline aggregator factory with which this parser is @@ -71,7 +71,9 @@ public abstract class PipelineAggregator implements Streamable { * @return an empty {@link PipelineAggregatorBuilder} instance for this * parser that can be used for deserialization */ - PipelineAggregatorBuilder getFactoryPrototype(); + default PipelineAggregatorBuilder getFactoryPrototype() { + throw new UnsupportedOperationException(); // NORELEASE remove before 5.0.0GA + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilder.java index 73adf489f97..9e943412232 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilder.java @@ -19,10 +19,10 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.action.support.ToXContentToBytes; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregatorFactory; @@ -37,7 +37,12 @@ import java.util.Objects; * specific type. */ public abstract class PipelineAggregatorBuilder> extends ToXContentToBytes - implements NamedWriteable>, ToXContent { + implements NamedWriteable> { + + /** + * Field shared by many parsers. + */ + public static final ParseField BUCKETS_PATH_FIELD = new ParseField("buckets_path"); protected final String name; protected final String type; @@ -52,7 +57,7 @@ public abstract class PipelineAggregatorBuilder readFrom(StreamInput in) throws IOException { + String name = in.readString(); + String[] bucketsPaths = in.readStringArray(); + PipelineAggregatorBuilder factory = doReadFrom(name, bucketsPaths, in); + factory.metaData = in.readMap(); + return factory; + } + + protected PipelineAggregatorBuilder doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { + throw new UnsupportedOperationException(); // NORELEASE remove this before 5.0.0GA, when all the aggregations have been migrated + } + public String name() { return name; } @@ -114,32 +161,11 @@ public abstract class PipelineAggregatorBuilder readFrom(StreamInput in) throws IOException { - String name = in.readString(); - String[] bucketsPaths = in.readStringArray(); - PipelineAggregatorBuilder factory = doReadFrom(name, bucketsPaths, in); - factory.metaData = in.readMap(); - return factory; - } - - protected abstract PipelineAggregatorBuilder doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException; - @Override public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(getName()); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java deleted file mode 100644 index e08efc4a23f..00000000000 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.pipeline.derivative; - -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.ParsingException; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryParseContext; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class DerivativeParser implements PipelineAggregator.Parser { - - public static final ParseField FORMAT = new ParseField("format"); - public static final ParseField GAP_POLICY = new ParseField("gap_policy"); - public static final ParseField UNIT = new ParseField("unit"); - - @Override - public String type() { - return DerivativePipelineAggregator.TYPE.name(); - } - - @Override - public DerivativePipelineAggregatorBuilder parse(String pipelineAggregatorName, QueryParseContext context) throws IOException { - XContentParser parser = context.parser(); - XContentParser.Token token; - String currentFieldName = null; - String[] bucketsPaths = null; - String format = null; - String units = null; - GapPolicy gapPolicy = null; - - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - if (context.parseFieldMatcher().match(currentFieldName, FORMAT)) { - format = parser.text(); - } else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { - bucketsPaths = new String[] { parser.text() }; - } else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) { - gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); - } else if (context.parseFieldMatcher().match(currentFieldName, UNIT)) { - units = parser.text(); - } else { - throw new ParsingException(parser.getTokenLocation(), - "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) { - List paths = new ArrayList<>(); - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - String path = parser.text(); - paths.add(path); - } - bucketsPaths = paths.toArray(new String[paths.size()]); - } else { - throw new ParsingException(parser.getTokenLocation(), - "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); - } - } else { - throw new ParsingException(parser.getTokenLocation(), - "Unexpected token " + token + " in [" + pipelineAggregatorName + "]."); - } - } - - if (bucketsPaths == null) { - throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH.getPreferredName() - + "] for derivative aggregation [" + pipelineAggregatorName + "]"); - } - - DerivativePipelineAggregatorBuilder factory = - new DerivativePipelineAggregatorBuilder(pipelineAggregatorName, bucketsPaths[0]); - if (format != null) { - factory.format(format); - } - if (gapPolicy != null) { - factory.gapPolicy(gapPolicy); - } - if (units != null) { - factory.unit(units); - } - return factory; - } - - @Override - public DerivativePipelineAggregatorBuilder getFactoryPrototype() { - return DerivativePipelineAggregatorBuilder.PROTOTYPE; - } - -} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregatorBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregatorBuilder.java index ffcc91c51dc..847261d5c48 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregatorBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregatorBuilder.java @@ -19,11 +19,15 @@ package org.elasticsearch.search.aggregations.pipeline.derivative; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.rounding.DateTimeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory; @@ -34,13 +38,18 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; public class DerivativePipelineAggregatorBuilder extends PipelineAggregatorBuilder { - - static final DerivativePipelineAggregatorBuilder PROTOTYPE = new DerivativePipelineAggregatorBuilder("", ""); + public static final String NAME = DerivativePipelineAggregator.TYPE.name(); + public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); + + private static final ParseField FORMAT_FIELD = new ParseField("format"); + private static final ParseField GAP_POLICY_FIELD = new ParseField("gap_policy"); + private static final ParseField UNIT_FIELD = new ParseField("unit"); private String format; private GapPolicy gapPolicy = GapPolicy.SKIP; @@ -54,6 +63,34 @@ public class DerivativePipelineAggregatorBuilder extends PipelineAggregatorBuild super(name, DerivativePipelineAggregator.TYPE.name(), bucketsPaths); } + /** + * Read from a stream. + */ + public DerivativePipelineAggregatorBuilder(StreamInput in) throws IOException { + super(in, DerivativePipelineAggregator.TYPE.name()); + format = in.readOptionalString(); + if (in.readBoolean()) { + gapPolicy = GapPolicy.readFrom(in); + } + units = in.readOptionalString(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + boolean hasGapPolicy = gapPolicy != null; + out.writeBoolean(hasGapPolicy); + if (hasGapPolicy) { + gapPolicy.writeTo(out); + } + out.writeOptionalString(units); + } + + @Override + protected boolean usesNewStyleSerialization() { + return true; + } + public DerivativePipelineAggregatorBuilder format(String format) { if (format == null) { throw new IllegalArgumentException("[format] must not be null: [" + name + "]"); @@ -140,42 +177,82 @@ public class DerivativePipelineAggregatorBuilder extends PipelineAggregatorBuild } } - @Override - protected DerivativePipelineAggregatorBuilder doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { - DerivativePipelineAggregatorBuilder factory = new DerivativePipelineAggregatorBuilder(name, bucketsPaths); - factory.format = in.readOptionalString(); - if (in.readBoolean()) { - factory.gapPolicy = GapPolicy.readFrom(in); - } - factory.units = in.readOptionalString(); - return factory; - } - - @Override - protected void doWriteTo(StreamOutput out) throws IOException { - out.writeOptionalString(format); - boolean hasGapPolicy = gapPolicy != null; - out.writeBoolean(hasGapPolicy); - if (hasGapPolicy) { - gapPolicy.writeTo(out); - } - out.writeOptionalString(units); - } - @Override protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { if (format != null) { - builder.field(DerivativeParser.FORMAT.getPreferredName(), format); + builder.field(FORMAT_FIELD.getPreferredName(), format); } if (gapPolicy != null) { - builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + builder.field(GAP_POLICY_FIELD.getPreferredName(), gapPolicy.getName()); } if (units != null) { - builder.field(DerivativeParser.UNIT.getPreferredName(), units); + builder.field(FORMAT_FIELD.getPreferredName(), units); } return builder; } + public static DerivativePipelineAggregatorBuilder parse(String pipelineAggregatorName, QueryParseContext context) throws IOException { + XContentParser parser = context.parser(); + XContentParser.Token token; + String currentFieldName = null; + String[] bucketsPaths = null; + String format = null; + String units = null; + GapPolicy gapPolicy = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (context.parseFieldMatcher().match(currentFieldName, FORMAT_FIELD)) { + format = parser.text(); + } else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH_FIELD)) { + bucketsPaths = new String[] { parser.text() }; + } else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY_FIELD)) { + gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); + } else if (context.parseFieldMatcher().match(currentFieldName, UNIT_FIELD)) { + units = parser.text(); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH_FIELD)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPaths = paths.toArray(new String[paths.size()]); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " in [" + pipelineAggregatorName + "]."); + } + } + + if (bucketsPaths == null) { + throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH_FIELD.getPreferredName() + + "] for derivative aggregation [" + pipelineAggregatorName + "]"); + } + + DerivativePipelineAggregatorBuilder factory = + new DerivativePipelineAggregatorBuilder(pipelineAggregatorName, bucketsPaths[0]); + if (format != null) { + factory.format(format); + } + if (gapPolicy != null) { + factory.gapPolicy(gapPolicy); + } + if (units != null) { + factory.unit(units); + } + return factory; + } + @Override protected boolean doEquals(Object obj) { DerivativePipelineAggregatorBuilder other = (DerivativePipelineAggregatorBuilder) obj;