Cut serial_diff and cumulative_sum to registerPipelineAggregation

and remove their PROTOTYPEs.

Relates to #17085
This commit is contained in:
Nik Everett 2016-04-15 14:47:15 -04:00
parent e9fa53b87f
commit a9e85182f1
5 changed files with 200 additions and 264 deletions

View File

@ -212,8 +212,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptP
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorParser; import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorParser;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregatorBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregatorBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregatorBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
@ -221,8 +221,8 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelParserMapper; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelParserMapper;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffParser;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregatorBuilder;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.DfsPhase; import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchPhase;
@ -512,10 +512,12 @@ public class SearchModule extends AbstractModule {
ExtendedStatsBucketPipelineAggregatorBuilder.AGGREGATION_NAME_FIELD); ExtendedStatsBucketPipelineAggregatorBuilder.AGGREGATION_NAME_FIELD);
registerPipelineParser(new PercentilesBucketParser()); registerPipelineParser(new PercentilesBucketParser());
registerPipelineParser(new MovAvgParser(movAvgModelParserMapper)); registerPipelineParser(new MovAvgParser(movAvgModelParserMapper));
registerPipelineParser(new CumulativeSumParser()); registerPipelineAggregation(CumulativeSumPipelineAggregatorBuilder::new, CumulativeSumPipelineAggregatorBuilder::parse,
CumulativeSumPipelineAggregatorBuilder.AGGREGATION_NAME_FIELD);
registerPipelineParser(new BucketScriptParser()); registerPipelineParser(new BucketScriptParser());
registerPipelineParser(new BucketSelectorParser()); registerPipelineParser(new BucketSelectorParser());
registerPipelineParser(new SerialDiffParser()); registerPipelineAggregation(SerialDiffPipelineAggregatorBuilder::new, SerialDiffPipelineAggregatorBuilder::parse,
SerialDiffPipelineAggregatorBuilder.AGGREGATION_NAME_FIELD);
AggregationParseElement aggParseElement = new AggregationParseElement(aggregatorParsers, queryParserRegistry); AggregationParseElement aggParseElement = new AggregationParseElement(aggregatorParsers, queryParserRegistry);
AggregationBinaryParseElement aggBinaryParseElement = new AggregationBinaryParseElement(aggregatorParsers, queryParserRegistry); AggregationBinaryParseElement aggBinaryParseElement = new AggregationBinaryParseElement(aggregatorParsers, queryParserRegistry);

View File

@ -1,99 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class CumulativeSumParser implements PipelineAggregator.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField UNIT = new ParseField("unit");
@Override
public String type() {
return CumulativeSumPipelineAggregator.TYPE.name();
}
@Override
public CumulativeSumPipelineAggregatorBuilder parse(String pipelineAggregatorName, QueryParseContext context) throws IOException {
XContentParser parser = context.parser();
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.getParseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " in [" + pipelineAggregatorName + "].");
}
}
if (bucketsPaths == null) {
throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for derivative aggregation [" + pipelineAggregatorName + "]");
}
CumulativeSumPipelineAggregatorBuilder factory =
new CumulativeSumPipelineAggregatorBuilder(pipelineAggregatorName, bucketsPaths[0]);
if (format != null) {
factory.format(format);
}
return factory;
}
@Override
public CumulativeSumPipelineAggregatorBuilder getFactoryPrototype() {
return CumulativeSumPipelineAggregatorBuilder.PROTOTYPE;
}
}

View File

@ -19,9 +19,13 @@
package org.elasticsearch.search.aggregations.pipeline.cumulativesum; package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory; import org.elasticsearch.search.aggregations.bucket.histogram.AbstractHistogramAggregatorFactory;
@ -30,22 +34,40 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class CumulativeSumPipelineAggregatorBuilder extends PipelineAggregatorBuilder<CumulativeSumPipelineAggregatorBuilder> { import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
static final CumulativeSumPipelineAggregatorBuilder PROTOTYPE = new CumulativeSumPipelineAggregatorBuilder("", ""); public class CumulativeSumPipelineAggregatorBuilder extends PipelineAggregatorBuilder<CumulativeSumPipelineAggregatorBuilder> {
public static final String NAME = CumulativeSumPipelineAggregator.TYPE.name();
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private String format; private String format;
public CumulativeSumPipelineAggregatorBuilder(String name, String bucketsPath) { public CumulativeSumPipelineAggregatorBuilder(String name, String bucketsPath) {
this(name, new String[] { bucketsPath }); super(name, CumulativeSumPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
} }
private CumulativeSumPipelineAggregatorBuilder(String name, String[] bucketsPaths) { /**
super(name, CumulativeSumPipelineAggregator.TYPE.name(), bucketsPaths); * Read from a stream.
*/
public CumulativeSumPipelineAggregatorBuilder(StreamInput in) throws IOException {
super(in, CumulativeSumPipelineAggregator.TYPE.name());
format = in.readOptionalString();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
}
@Override
protected boolean usesNewStyleSerialization() {
return true;
} }
/** /**
@ -83,7 +105,7 @@ public class CumulativeSumPipelineAggregatorBuilder extends PipelineAggregatorBu
public void doValidate(AggregatorFactory<?> parent, AggregatorFactory<?>[] aggFactories, public void doValidate(AggregatorFactory<?> parent, AggregatorFactory<?>[] aggFactories,
List<PipelineAggregatorBuilder<?>> pipelineAggregatorFactories) { List<PipelineAggregatorBuilder<?>> pipelineAggregatorFactories) {
if (bucketsPaths.length != 1) { if (bucketsPaths.length != 1) {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() throw new IllegalStateException(BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]"); + " must contain a single entry for aggregation [" + name + "]");
} }
if (!(parent instanceof AbstractHistogramAggregatorFactory<?>)) { if (!(parent instanceof AbstractHistogramAggregatorFactory<?>)) {
@ -106,17 +128,55 @@ public class CumulativeSumPipelineAggregatorBuilder extends PipelineAggregatorBu
return builder; return builder;
} }
@Override public static CumulativeSumPipelineAggregatorBuilder parse(String pipelineAggregatorName, QueryParseContext context)
protected final CumulativeSumPipelineAggregatorBuilder doReadFrom(String name, String[] bucketsPaths, StreamInput in)
throws IOException { throws IOException {
CumulativeSumPipelineAggregatorBuilder factory = new CumulativeSumPipelineAggregatorBuilder(name, bucketsPaths); XContentParser parser = context.parser();
factory.format = in.readOptionalString(); XContentParser.Token token;
return factory; String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.getParseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " in [" + pipelineAggregatorName + "].");
}
} }
@Override if (bucketsPaths == null) {
protected final void doWriteTo(StreamOutput out) throws IOException { throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH.getPreferredName()
out.writeOptionalString(format); + "] for derivative aggregation [" + pipelineAggregatorName + "]");
}
CumulativeSumPipelineAggregatorBuilder factory =
new CumulativeSumPipelineAggregatorBuilder(pipelineAggregatorName, bucketsPaths[0]);
if (format != null) {
factory.format(format);
}
return factory;
} }
@Override @Override
@ -129,4 +189,9 @@ public class CumulativeSumPipelineAggregatorBuilder extends PipelineAggregatorBu
CumulativeSumPipelineAggregatorBuilder other = (CumulativeSumPipelineAggregatorBuilder) obj; CumulativeSumPipelineAggregatorBuilder other = (CumulativeSumPipelineAggregatorBuilder) obj;
return Objects.equals(format, other.format); return Objects.equals(format, other.format);
} }
@Override
public String getWriteableName() {
return NAME;
}
} }

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SerialDiffParser implements PipelineAggregator.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField LAG = new ParseField("lag");
@Override
public String type() {
return SerialDiffPipelineAggregator.TYPE.name();
}
@Override
public SerialDiffPipelineAggregatorBuilder parse(String reducerName, QueryParseContext context) throws IOException {
XContentParser parser = context.parser();
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = null;
Integer lag = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.getParseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else if (context.getParseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (context.getParseFieldMatcher().match(currentFieldName, LAG)) {
lag = parser.intValue(true);
if (lag <= 0) {
throw new ParsingException(parser.getTokenLocation(),
"Lag must be a positive, non-zero integer. Value supplied was" +
lag + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}
if (bucketsPaths == null) {
throw new ParsingException(parser.getTokenLocation(),
"Missing required field [" + BUCKETS_PATH.getPreferredName() + "] for derivative aggregation [" + reducerName + "]");
}
SerialDiffPipelineAggregatorBuilder factory =
new SerialDiffPipelineAggregatorBuilder(reducerName, bucketsPaths[0]);
if (lag != null) {
factory.lag(lag);
}
if (format != null) {
factory.format(format);
}
if (gapPolicy != null) {
factory.gapPolicy(gapPolicy);
}
return factory;
}
@Override
public SerialDiffPipelineAggregatorBuilder getFactoryPrototype() {
return SerialDiffPipelineAggregatorBuilder.PROTOTYPE;
}
}

View File

@ -19,32 +19,62 @@
package org.elasticsearch.search.aggregations.pipeline.serialdiff; package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder; import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class SerialDiffPipelineAggregatorBuilder extends PipelineAggregatorBuilder<SerialDiffPipelineAggregatorBuilder> { import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
static final SerialDiffPipelineAggregatorBuilder PROTOTYPE = new SerialDiffPipelineAggregatorBuilder("", ""); public class SerialDiffPipelineAggregatorBuilder extends PipelineAggregatorBuilder<SerialDiffPipelineAggregatorBuilder> {
public static final String NAME = SerialDiffPipelineAggregator.TYPE.name();
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
private static final ParseField GAP_POLICY = new ParseField("gap_policy");
private static final ParseField LAG = new ParseField("lag");
private String format; private String format;
private GapPolicy gapPolicy = GapPolicy.SKIP; private GapPolicy gapPolicy = GapPolicy.SKIP;
private int lag = 1; private int lag = 1;
public SerialDiffPipelineAggregatorBuilder(String name, String bucketsPath) { public SerialDiffPipelineAggregatorBuilder(String name, String bucketsPath) {
this(name, new String[] { bucketsPath }); super(name, SerialDiffPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
} }
private SerialDiffPipelineAggregatorBuilder(String name, String[] bucketsPaths) { /**
super(name, SerialDiffPipelineAggregator.TYPE.name(), bucketsPaths); * Read from a stream.
*/
public SerialDiffPipelineAggregatorBuilder(StreamInput in) throws IOException {
super(in, SerialDiffPipelineAggregator.TYPE.name());
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
gapPolicy.writeTo(out);
out.writeVInt(lag);
}
@Override
protected boolean usesNewStyleSerialization() {
return true;
} }
/** /**
@ -117,27 +147,84 @@ public class SerialDiffPipelineAggregatorBuilder extends PipelineAggregatorBuild
@Override @Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) { if (format != null) {
builder.field(SerialDiffParser.FORMAT.getPreferredName(), format); builder.field(FORMAT.getPreferredName(), format);
} }
builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName());
builder.field(SerialDiffParser.LAG.getPreferredName(), lag); builder.field(LAG.getPreferredName(), lag);
return builder; return builder;
} }
@Override public static SerialDiffPipelineAggregatorBuilder parse(String reducerName, QueryParseContext context) throws IOException {
protected SerialDiffPipelineAggregatorBuilder doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { XContentParser parser = context.parser();
SerialDiffPipelineAggregatorBuilder factory = new SerialDiffPipelineAggregatorBuilder(name, bucketsPaths); XContentParser.Token token;
factory.format = in.readOptionalString(); String currentFieldName = null;
factory.gapPolicy = GapPolicy.readFrom(in); String[] bucketsPaths = null;
factory.lag = in.readVInt(); String format = null;
return factory; GapPolicy gapPolicy = null;
Integer lag = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.getParseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else if (context.getParseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (context.getParseFieldMatcher().match(currentFieldName, LAG)) {
lag = parser.intValue(true);
if (lag <= 0) {
throw new ParsingException(parser.getTokenLocation(),
"Lag must be a positive, non-zero integer. Value supplied was" +
lag + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.getParseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
} }
@Override if (bucketsPaths == null) {
protected void doWriteTo(StreamOutput out) throws IOException { throw new ParsingException(parser.getTokenLocation(),
out.writeOptionalString(format); "Missing required field [" + BUCKETS_PATH.getPreferredName() + "] for derivative aggregation [" + reducerName + "]");
gapPolicy.writeTo(out); }
out.writeVInt(lag);
SerialDiffPipelineAggregatorBuilder factory =
new SerialDiffPipelineAggregatorBuilder(reducerName, bucketsPaths[0]);
if (lag != null) {
factory.lag(lag);
}
if (format != null) {
factory.format(format);
}
if (gapPolicy != null) {
factory.gapPolicy(gapPolicy);
}
return factory;
} }
@Override @Override
@ -152,4 +239,8 @@ public class SerialDiffPipelineAggregatorBuilder extends PipelineAggregatorBuild
&& Objects.equals(lag, other.lag); && Objects.equals(lag, other.lag);
} }
@Override
public String getWriteableName() {
return NAME;
}
} }