Convert BucketScript to static parser (#44385)

BucketScript was using the old-style parser and could easily be
converted over to the newer static parser.

Also adds a test for GapPolicy enum serialization
This commit is contained in:
Zachary Tong 2019-07-17 10:21:56 -04:00
parent 34725e20fb
commit 103ba976fd
3 changed files with 95 additions and 82 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
@ -52,7 +53,7 @@ public class BucketHelpers {
* "insert_zeros": empty buckets will be filled with zeros for all metrics
* "skip": empty buckets will simply be ignored
*/
public enum GapPolicy {
public enum GapPolicy implements Writeable {
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");
/**
@ -95,6 +96,7 @@ public class BucketHelpers {
/**
* Serialize the GapPolicy to the output stream
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
}

View File

@ -19,9 +19,10 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
@ -29,13 +30,13 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
@ -49,6 +50,31 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr
private String format = null;
private GapPolicy gapPolicy = GapPolicy.SKIP;
private static final Function<String, ConstructingObjectParser<BucketScriptPipelineAggregationBuilder, Void>> PARSER
= name -> {
@SuppressWarnings("unchecked")
ConstructingObjectParser<BucketScriptPipelineAggregationBuilder, Void> parser = new ConstructingObjectParser<>(
BucketScriptPipelineAggregationBuilder.NAME,
false,
o -> new BucketScriptPipelineAggregationBuilder(name, (Map<String, String>) o[0], (Script) o[1]));
parser.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), BUCKETS_PATH_FIELD);
parser.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> Script.parse(p), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
parser.declareString(BucketScriptPipelineAggregationBuilder::format, FORMAT);
parser.declareField(BucketScriptPipelineAggregationBuilder::gapPolicy, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, GAP_POLICY, ObjectParser.ValueType.STRING);
return parser;
};
public BucketScriptPipelineAggregationBuilder(String name, Map<String, String> bucketsPathsMap, Script script) {
super(name, NAME, new TreeMap<>(bucketsPathsMap).values().toArray(new String[bucketsPathsMap.size()]));
this.bucketsPathsMap = bucketsPathsMap;
@ -154,86 +180,10 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr
return builder;
}
public static BucketScriptPipelineAggregationBuilder parse(String reducerName, XContentParser parser) throws IOException {
XContentParser.Token token;
Script script = null;
String currentFieldName = null;
Map<String, String> bucketsPathsMap = null;
String format = null;
GapPolicy gapPolicy = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName, parser.getDeprecationHandler())) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("_value", parser.text());
} else if (GAP_POLICY.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
} else if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
script = Script.parse(parser);
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPathsMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
bucketsPathsMap.put("_value" + i, paths.get(i));
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
script = Script.parse(parser);
} else if (BUCKETS_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
Map<String, Object> map = parser.map();
bucketsPathsMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
bucketsPathsMap.put(entry.getKey(), String.valueOf(entry.getValue()));
}
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + reducerName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Unexpected token " + token + " in [" + reducerName + "].");
}
}
if (bucketsPathsMap == null) {
throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for series_arithmetic aggregation [" + reducerName + "]");
}
if (script == null) {
throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + Script.SCRIPT_PARSE_FIELD.getPreferredName()
+ "] for series_arithmetic aggregation [" + reducerName + "]");
}
BucketScriptPipelineAggregationBuilder factory =
new BucketScriptPipelineAggregationBuilder(reducerName, bucketsPathsMap, script);
if (format != null) {
factory.format(format);
}
if (gapPolicy != null) {
factory.gapPolicy(gapPolicy);
}
return factory;
public static BucketScriptPipelineAggregationBuilder parse(String aggName, XContentParser parser) {
return PARSER.apply(aggName).apply(parser, null);
}
@Override
protected boolean overrideBucketsPath() {
return true;

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.AbstractWriteableEnumTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class GapPolicyTests extends AbstractWriteableEnumTestCase {
public GapPolicyTests() {
super(BucketHelpers.GapPolicy::readFrom);
}
@Override
public void testValidOrdinals() {
assertThat(BucketHelpers.GapPolicy.INSERT_ZEROS.ordinal(), equalTo(0));
assertThat(BucketHelpers.GapPolicy.SKIP.ordinal(), equalTo(1));
}
@Override
public void testFromString() {
assertThat(BucketHelpers.GapPolicy.parse("insert_zeros", null), equalTo(BucketHelpers.GapPolicy.INSERT_ZEROS));
assertThat(BucketHelpers.GapPolicy.parse("skip", null), equalTo(BucketHelpers.GapPolicy.SKIP));
ParsingException e = expectThrows(ParsingException.class, () -> BucketHelpers.GapPolicy.parse("does_not_exist", null));
assertThat(e.getMessage(),
equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip]"));
}
@Override
public void testReadFrom() throws IOException {
assertReadFromStream(0, BucketHelpers.GapPolicy.INSERT_ZEROS);
assertReadFromStream(1, BucketHelpers.GapPolicy.SKIP);
}
@Override
public void testWriteTo() throws IOException {
assertWriteToStream(BucketHelpers.GapPolicy.INSERT_ZEROS, 0);
assertWriteToStream(BucketHelpers.GapPolicy.SKIP, 1);
}
}