From b2a0f38a0c9f0fd9127e63e22a94f253b7b6d9e5 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 6 Aug 2018 15:33:18 -0500 Subject: [PATCH] Adding xpack.core.ml.datafeed to protocol.xpack.ml.datafeed (#32625) * Adding org.elasticsearch.xpack.core.ml.datafeed to org.elasticsearch.protocol.xpack.ml.datafeed * removing unused ParseField and import * Addressing PR feed back and fixing tests * Simplifying Datafeed(Config|Update) ctor parser --- .../xpack/ml/datafeed/ChunkingConfig.java | 134 +++++++ .../xpack/ml/datafeed/DatafeedConfig.java | 329 ++++++++++++++++++ .../xpack/ml/datafeed/DatafeedUpdate.java | 310 +++++++++++++++++ .../ml/datafeed/ChunkingConfigTests.java | 59 ++++ .../ml/datafeed/DatafeedConfigTests.java | 177 ++++++++++ .../ml/datafeed/DatafeedUpdateTests.java | 101 ++++++ 6 files changed, 1110 insertions(+) create mode 100644 x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfig.java create mode 100644 x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfig.java create mode 100644 x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdate.java create mode 100644 x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfigTests.java create mode 100644 x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfigTests.java create mode 100644 x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdateTests.java diff --git a/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfig.java b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfig.java new file mode 100644 index 00000000000..0b9d9f12046 --- /dev/null +++ b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfig.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +/** + * The description of how searches should be chunked. + */ +public class ChunkingConfig implements ToXContentObject { + + public static final ParseField MODE_FIELD = new ParseField("mode"); + public static final ParseField TIME_SPAN_FIELD = new ParseField("time_span"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "chunking_config", true, a -> new ChunkingConfig((Mode) a[0], (TimeValue) a[1])); + + static { + PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return Mode.fromString(p.text()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, MODE_FIELD, ValueType.STRING); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return TimeValue.parseTimeValue(p.text(), TIME_SPAN_FIELD.getPreferredName()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, TIME_SPAN_FIELD, ValueType.STRING); + + } + + private final Mode mode; + private final TimeValue timeSpan; + + + ChunkingConfig(Mode mode, @Nullable TimeValue timeSpan) { + this.mode = Objects.requireNonNull(mode, MODE_FIELD.getPreferredName()); + this.timeSpan = timeSpan; + } + + @Nullable + public TimeValue getTimeSpan() { + return timeSpan; + } + + Mode getMode() { + return mode; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MODE_FIELD.getPreferredName(), mode); + if (timeSpan != null) { + builder.field(TIME_SPAN_FIELD.getPreferredName(), timeSpan.getStringRep()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(mode, timeSpan); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + ChunkingConfig other = (ChunkingConfig) obj; + return Objects.equals(this.mode, other.mode) && + Objects.equals(this.timeSpan, other.timeSpan); + } + + public static ChunkingConfig newAuto() { + return new ChunkingConfig(Mode.AUTO, null); + } + + public static ChunkingConfig newOff() { + return new ChunkingConfig(Mode.OFF, null); + } + + public static ChunkingConfig newManual(TimeValue timeSpan) { + return new ChunkingConfig(Mode.MANUAL, timeSpan); + } + + public enum Mode { + AUTO, MANUAL, OFF; + + public static Mode fromString(String value) { + return Mode.valueOf(value.toUpperCase(Locale.ROOT)); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfig.java b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfig.java new file mode 100644 index 00000000000..85b7a0acea6 --- /dev/null +++ b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfig.java @@ -0,0 +1,329 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Datafeed configuration options pojo. Describes where to proactively pull input + * data from. + *

+ * If a value has not been set it will be null. Object wrappers are + * used around integral types and booleans so they can take null + * values. + */ +public class DatafeedConfig implements ToXContentObject { + + public static final int DEFAULT_SCROLL_SIZE = 1000; + + public static final ParseField ID = new ParseField("datafeed_id"); + public static final ParseField QUERY_DELAY = new ParseField("query_delay"); + public static final ParseField FREQUENCY = new ParseField("frequency"); + public static final ParseField INDEXES = new ParseField("indexes"); + public static final ParseField INDICES = new ParseField("indices"); + public static final ParseField JOB_ID = new ParseField("job_id"); + public static final ParseField TYPES = new ParseField("types"); + public static final ParseField QUERY = new ParseField("query"); + public static final ParseField SCROLL_SIZE = new ParseField("scroll_size"); + public static final ParseField AGGREGATIONS = new ParseField("aggregations"); + public static final ParseField AGGS = new ParseField("aggs"); + public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); + public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID); + + PARSER.declareStringArray(Builder::setIndices, INDEXES); + PARSER.declareStringArray(Builder::setIndices, INDICES); + PARSER.declareStringArray(Builder::setTypes, TYPES); + PARSER.declareString((builder, val) -> + builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY); + PARSER.declareString((builder, val) -> + builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY); + PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY); + PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS); + PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS); + PARSER.declareObject(Builder::setScriptFields, (p, c) -> { + List parsedScriptFields = new ArrayList<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + parsedScriptFields.add(new SearchSourceBuilder.ScriptField(p)); + } + return parsedScriptFields; + }, SCRIPT_FIELDS); + PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); + PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); + } + + private final String id; + private final String jobId; + + /** + * The delay before starting to query a period of time + */ + private final TimeValue queryDelay; + + /** + * The frequency with which queries are executed + */ + private final TimeValue frequency; + + private final List indices; + private final List types; + private final QueryBuilder query; + private final AggregatorFactories.Builder aggregations; + private final List scriptFields; + private final Integer scrollSize; + private final ChunkingConfig chunkingConfig; + + private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, + QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, + Integer scrollSize, ChunkingConfig chunkingConfig) { + this.id = id; + this.jobId = jobId; + this.queryDelay = queryDelay; + this.frequency = frequency; + this.indices = indices == null ? null : Collections.unmodifiableList(indices); + this.types = types == null ? null : Collections.unmodifiableList(types); + this.query = query; + this.aggregations = aggregations; + this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields); + this.scrollSize = scrollSize; + this.chunkingConfig = chunkingConfig; + } + + public String getId() { + return id; + } + + public String getJobId() { + return jobId; + } + + public TimeValue getQueryDelay() { + return queryDelay; + } + + public TimeValue getFrequency() { + return frequency; + } + + public List getIndices() { + return indices; + } + + public List getTypes() { + return types; + } + + public Integer getScrollSize() { + return scrollSize; + } + + public QueryBuilder getQuery() { + return query; + } + + public AggregatorFactories.Builder getAggregations() { + return aggregations; + } + + public List getScriptFields() { + return scriptFields == null ? Collections.emptyList() : scriptFields; + } + + public ChunkingConfig getChunkingConfig() { + return chunkingConfig; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ID.getPreferredName(), id); + builder.field(JOB_ID.getPreferredName(), jobId); + if (queryDelay != null) { + builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep()); + } + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } + builder.field(INDICES.getPreferredName(), indices); + builder.field(TYPES.getPreferredName(), types); + builder.field(QUERY.getPreferredName(), query); + if (aggregations != null) { + builder.field(AGGREGATIONS.getPreferredName(), aggregations); + } + if (scriptFields != null) { + builder.startObject(SCRIPT_FIELDS.getPreferredName()); + for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { + scriptField.toXContent(builder, params); + } + builder.endObject(); + } + builder.field(SCROLL_SIZE.getPreferredName(), scrollSize); + if (chunkingConfig != null) { + builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig); + } + + builder.endObject(); + return builder; + } + + /** + * The lists of indices and types are compared for equality but they are not + * sorted first so this test could fail simply because the indices and types + * lists are in different orders. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DatafeedConfig that = (DatafeedConfig) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.jobId, that.jobId) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.queryDelay, that.queryDelay) + && Objects.equals(this.indices, that.indices) + && Objects.equals(this.types, that.types) + && Objects.equals(this.query, that.query) + && Objects.equals(this.scrollSize, that.scrollSize) + && Objects.equals(this.aggregations, that.aggregations) + && Objects.equals(this.scriptFields, that.scriptFields) + && Objects.equals(this.chunkingConfig, that.chunkingConfig); + } + + @Override + public int hashCode() { + return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, + chunkingConfig); + } + + public static class Builder { + + private String id; + private String jobId; + private TimeValue queryDelay; + private TimeValue frequency; + private List indices = Collections.emptyList(); + private List types = Collections.emptyList(); + private QueryBuilder query = QueryBuilders.matchAllQuery(); + private AggregatorFactories.Builder aggregations; + private List scriptFields; + private Integer scrollSize = DEFAULT_SCROLL_SIZE; + private ChunkingConfig chunkingConfig; + + public Builder(String id, String jobId) { + this.id = Objects.requireNonNull(id, ID.getPreferredName()); + this.jobId = Objects.requireNonNull(jobId, JOB_ID.getPreferredName()); + } + + public Builder(DatafeedConfig config) { + this.id = config.id; + this.jobId = config.jobId; + this.queryDelay = config.queryDelay; + this.frequency = config.frequency; + this.indices = config.indices; + this.types = config.types; + this.query = config.query; + this.aggregations = config.aggregations; + this.scriptFields = config.scriptFields; + this.scrollSize = config.scrollSize; + this.chunkingConfig = config.chunkingConfig; + } + + public Builder setIndices(List indices) { + this.indices = Objects.requireNonNull(indices, INDICES.getPreferredName()); + return this; + } + + public Builder setTypes(List types) { + this.types = Objects.requireNonNull(types, TYPES.getPreferredName()); + return this; + } + + public Builder setQueryDelay(TimeValue queryDelay) { + this.queryDelay = queryDelay; + return this; + } + + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + + public Builder setQuery(QueryBuilder query) { + this.query = Objects.requireNonNull(query, QUERY.getPreferredName()); + return this; + } + + public Builder setAggregations(AggregatorFactories.Builder aggregations) { + this.aggregations = aggregations; + return this; + } + + public Builder setScriptFields(List scriptFields) { + List sorted = new ArrayList<>(scriptFields); + sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName)); + this.scriptFields = sorted; + return this; + } + + public Builder setScrollSize(int scrollSize) { + this.scrollSize = scrollSize; + return this; + } + + public Builder setChunkingConfig(ChunkingConfig chunkingConfig) { + this.chunkingConfig = chunkingConfig; + return this; + } + + public DatafeedConfig build() { + return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, + chunkingConfig); + } + } +} diff --git a/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdate.java b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdate.java new file mode 100644 index 00000000000..6afcdf1d2d8 --- /dev/null +++ b/x-pack/protocol/src/main/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdate.java @@ -0,0 +1,310 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * A datafeed update contains partial properties to update a {@link DatafeedConfig}. + * The main difference between this class and {@link DatafeedConfig} is that here all + * fields are nullable. + */ +public class DatafeedUpdate implements ToXContentObject { + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datafeed_update", true, a -> new Builder((String)a[0])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID); + + PARSER.declareString(Builder::setJobId, DatafeedConfig.JOB_ID); + PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDEXES); + PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDICES); + PARSER.declareStringArray(Builder::setTypes, DatafeedConfig.TYPES); + PARSER.declareString((builder, val) -> builder.setQueryDelay( + TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY); + PARSER.declareString((builder, val) -> builder.setFrequency( + TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY); + PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), DatafeedConfig.QUERY); + PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), + DatafeedConfig.AGGREGATIONS); + PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), + DatafeedConfig.AGGS); + PARSER.declareObject(Builder::setScriptFields, (p, c) -> { + List parsedScriptFields = new ArrayList<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + parsedScriptFields.add(new SearchSourceBuilder.ScriptField(p)); + } + return parsedScriptFields; + }, DatafeedConfig.SCRIPT_FIELDS); + PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE); + PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG); + } + + private final String id; + private final String jobId; + private final TimeValue queryDelay; + private final TimeValue frequency; + private final List indices; + private final List types; + private final QueryBuilder query; + private final AggregatorFactories.Builder aggregations; + private final List scriptFields; + private final Integer scrollSize; + private final ChunkingConfig chunkingConfig; + + private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, + QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, + Integer scrollSize, ChunkingConfig chunkingConfig) { + this.id = id; + this.jobId = jobId; + this.queryDelay = queryDelay; + this.frequency = frequency; + this.indices = indices; + this.types = types; + this.query = query; + this.aggregations = aggregations; + this.scriptFields = scriptFields; + this.scrollSize = scrollSize; + this.chunkingConfig = chunkingConfig; + } + + /** + * Get the id of the datafeed to update + */ + public String getId() { + return id; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DatafeedConfig.ID.getPreferredName(), id); + addOptionalField(builder, DatafeedConfig.JOB_ID, jobId); + if (queryDelay != null) { + builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep()); + } + if (frequency != null) { + builder.field(DatafeedConfig.FREQUENCY.getPreferredName(), frequency.getStringRep()); + } + addOptionalField(builder, DatafeedConfig.INDICES, indices); + addOptionalField(builder, DatafeedConfig.TYPES, types); + addOptionalField(builder, DatafeedConfig.QUERY, query); + addOptionalField(builder, DatafeedConfig.AGGREGATIONS, aggregations); + if (scriptFields != null) { + builder.startObject(DatafeedConfig.SCRIPT_FIELDS.getPreferredName()); + for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { + scriptField.toXContent(builder, params); + } + builder.endObject(); + } + addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); + addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); + builder.endObject(); + return builder; + } + + private void addOptionalField(XContentBuilder builder, ParseField field, Object value) throws IOException { + if (value != null) { + builder.field(field.getPreferredName(), value); + } + } + + public String getJobId() { + return jobId; + } + + public TimeValue getQueryDelay() { + return queryDelay; + } + + public TimeValue getFrequency() { + return frequency; + } + + public List getIndices() { + return indices; + } + + public List getTypes() { + return types; + } + + public Integer getScrollSize() { + return scrollSize; + } + + public QueryBuilder getQuery() { + return query; + } + + public AggregatorFactories.Builder getAggregations() { + return aggregations; + } + + public List getScriptFields() { + return scriptFields == null ? Collections.emptyList() : scriptFields; + } + + public ChunkingConfig getChunkingConfig() { + return chunkingConfig; + } + + /** + * The lists of indices and types are compared for equality but they are not + * sorted first so this test could fail simply because the indices and types + * lists are in different orders. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DatafeedUpdate that = (DatafeedUpdate) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.jobId, that.jobId) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.queryDelay, that.queryDelay) + && Objects.equals(this.indices, that.indices) + && Objects.equals(this.types, that.types) + && Objects.equals(this.query, that.query) + && Objects.equals(this.scrollSize, that.scrollSize) + && Objects.equals(this.aggregations, that.aggregations) + && Objects.equals(this.scriptFields, that.scriptFields) + && Objects.equals(this.chunkingConfig, that.chunkingConfig); + } + + @Override + public int hashCode() { + return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, + chunkingConfig); + } + + public static class Builder { + + private String id; + private String jobId; + private TimeValue queryDelay; + private TimeValue frequency; + private List indices; + private List types; + private QueryBuilder query; + private AggregatorFactories.Builder aggregations; + private List scriptFields; + private Integer scrollSize; + private ChunkingConfig chunkingConfig; + + public Builder(String id) { + this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); + } + + public Builder(DatafeedUpdate config) { + this.id = config.id; + this.jobId = config.jobId; + this.queryDelay = config.queryDelay; + this.frequency = config.frequency; + this.indices = config.indices; + this.types = config.types; + this.query = config.query; + this.aggregations = config.aggregations; + this.scriptFields = config.scriptFields; + this.scrollSize = config.scrollSize; + this.chunkingConfig = config.chunkingConfig; + } + + public Builder setJobId(String jobId) { + this.jobId = jobId; + return this; + } + + public Builder setIndices(List indices) { + this.indices = indices; + return this; + } + + public Builder setTypes(List types) { + this.types = types; + return this; + } + + public Builder setQueryDelay(TimeValue queryDelay) { + this.queryDelay = queryDelay; + return this; + } + + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + + public Builder setQuery(QueryBuilder query) { + this.query = query; + return this; + } + + public Builder setAggregations(AggregatorFactories.Builder aggregations) { + this.aggregations = aggregations; + return this; + } + + public Builder setScriptFields(List scriptFields) { + List sorted = new ArrayList<>(scriptFields); + sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName)); + this.scriptFields = sorted; + return this; + } + + public Builder setScrollSize(int scrollSize) { + this.scrollSize = scrollSize; + return this; + } + + public Builder setChunkingConfig(ChunkingConfig chunkingConfig) { + this.chunkingConfig = chunkingConfig; + return this; + } + + public DatafeedUpdate build() { + return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, + chunkingConfig); + } + } +} diff --git a/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfigTests.java b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfigTests.java new file mode 100644 index 00000000000..c835788bb1c --- /dev/null +++ b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/ChunkingConfigTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class ChunkingConfigTests extends AbstractXContentTestCase { + + @Override + protected ChunkingConfig createTestInstance() { + return createRandomizedChunk(); + } + + @Override + protected ChunkingConfig doParseInstance(XContentParser parser) { + return ChunkingConfig.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + public static ChunkingConfig createRandomizedChunk() { + ChunkingConfig.Mode mode = randomFrom(ChunkingConfig.Mode.values()); + TimeValue timeSpan = null; + if (mode == ChunkingConfig.Mode.MANUAL) { + // time span is required to be at least 1 millis, so we use a custom method to generate a time value here + timeSpan = randomPositiveSecondsMinutesHours(); + } + return new ChunkingConfig(mode, timeSpan); + } + + private static TimeValue randomPositiveSecondsMinutesHours() { + return new TimeValue(randomIntBetween(1, 1000), randomFrom(Arrays.asList(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS))); + } + +} diff --git a/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfigTests.java b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfigTests.java new file mode 100644 index 00000000000..f45d88d318e --- /dev/null +++ b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedConfigTests.java @@ -0,0 +1,177 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DatafeedConfigTests extends AbstractXContentTestCase { + + @Override + protected DatafeedConfig createTestInstance() { + long bucketSpanMillis = 3600000; + DatafeedConfig.Builder builder = constructBuilder(); + builder.setIndices(randomStringList(1, 10)); + builder.setTypes(randomStringList(0, 10)); + if (randomBoolean()) { + builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + boolean addScriptFields = randomBoolean(); + if (addScriptFields) { + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new ScriptField(randomAlphaOfLength(10), mockScript(randomAlphaOfLength(10)), + randomBoolean())); + } + builder.setScriptFields(scriptFields); + } + Long aggHistogramInterval = null; + if (randomBoolean()) { + // can only test with a single agg as the xcontent order gets randomized by test base class and then + // the actual xcontent isn't the same and test fail. + // Testing with a single agg is ok as we don't have special list xcontent logic + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggHistogramInterval = randomNonNegativeLong(); + aggHistogramInterval = aggHistogramInterval > bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval; + aggHistogramInterval = aggHistogramInterval <= 0 ? 1 : aggHistogramInterval; + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + aggs.addAggregator(AggregationBuilders.dateHistogram("buckets") + .interval(aggHistogramInterval).subAggregation(maxTime).field("time")); + builder.setAggregations(aggs); + } + if (randomBoolean()) { + builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + if (aggHistogramInterval == null) { + builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000))); + } else { + builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval)); + } + } + if (randomBoolean()) { + builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000))); + } + if (randomBoolean()) { + builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk()); + } + return builder.build(); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + return new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + public static List randomStringList(int min, int max) { + int size = scaledRandomIntBetween(min, max); + List list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add(randomAlphaOfLength(10)); + } + return list; + } + + @Override + protected DatafeedConfig doParseInstance(XContentParser parser) { + return DatafeedConfig.PARSER.apply(parser, null).build(); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + private static final String FUTURE_DATAFEED = "{\n" + + " \"datafeed_id\": \"farequote-datafeed\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + " \"tomorrows_technology_today\": \"amazing\",\n" + + " \"scroll_size\": 1234\n" + + "}"; + + public void testFutureMetadataParse() throws IOException { + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED); + // Unlike the config version of this test, the metadata parser should tolerate the unknown future field + assertNotNull(DatafeedConfig.PARSER.apply(parser, null).build()); + } + + public void testCopyConstructor() { + for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + DatafeedConfig datafeedConfig = createTestInstance(); + DatafeedConfig copy = new DatafeedConfig.Builder(datafeedConfig).build(); + assertEquals(datafeedConfig, copy); + } + } + + public void testCheckValid_GivenNullIdInConstruction() { + expectThrows(NullPointerException.class, () -> new DatafeedConfig.Builder(null, null)); + } + + public void testCheckValid_GivenNullJobId() { + expectThrows(NullPointerException.class, () -> new DatafeedConfig.Builder(randomValidDatafeedId(), null)); + } + + public void testCheckValid_GivenNullIndices() { + DatafeedConfig.Builder conf = constructBuilder(); + expectThrows(NullPointerException.class, () -> conf.setIndices(null)); + } + + public void testCheckValid_GivenNullType() { + DatafeedConfig.Builder conf = constructBuilder(); + expectThrows(NullPointerException.class, () -> conf.setTypes(null)); + } + + public void testCheckValid_GivenNullQuery() { + DatafeedConfig.Builder conf = constructBuilder(); + expectThrows(NullPointerException.class, () -> conf.setQuery(null)); + } + + public static String randomValidDatafeedId() { + CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); + return generator.ofCodePointsLength(random(), 10, 10); + } + + private static DatafeedConfig.Builder constructBuilder() { + return new DatafeedConfig.Builder(randomValidDatafeedId(), randomAlphaOfLength(10)); + } + +} diff --git a/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdateTests.java b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdateTests.java new file mode 100644 index 00000000000..edbef8461e0 --- /dev/null +++ b/x-pack/protocol/src/test/java/org/elasticsearch/protocol/xpack/ml/datafeed/DatafeedUpdateTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.protocol.xpack.ml.datafeed; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DatafeedUpdateTests extends AbstractXContentTestCase { + + @Override + protected DatafeedUpdate createTestInstance() { + DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId()); + if (randomBoolean()) { + builder.setJobId(randomAlphaOfLength(10)); + } + if (randomBoolean()) { + builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE))); + } + if (randomBoolean()) { + builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, Integer.MAX_VALUE))); + } + if (randomBoolean()) { + builder.setIndices(DatafeedConfigTests.randomStringList(1, 10)); + } + if (randomBoolean()) { + builder.setTypes(DatafeedConfigTests.randomStringList(1, 10)); + } + if (randomBoolean()) { + builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); + } + if (randomBoolean()) { + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10), mockScript(randomAlphaOfLength(10)), + randomBoolean())); + } + builder.setScriptFields(scriptFields); + } + if (randomBoolean()) { + // can only test with a single agg as the xcontent order gets randomized by test base class and then + // the actual xcontent isn't the same and test fail. + // Testing with a single agg is ok as we don't have special list xcontent logic + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.avg(randomAlphaOfLength(10)).field(randomAlphaOfLength(10))); + builder.setAggregations(aggs); + } + if (randomBoolean()) { + builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk()); + } + return builder.build(); + } + + @Override + protected DatafeedUpdate doParseInstance(XContentParser parser) { + return DatafeedUpdate.PARSER.apply(parser, null).build(); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + return new NamedXContentRegistry(searchModule.getNamedXContents()); + } + +}