Adding xpack.core.ml.datafeed to protocol.xpack.ml.datafeed (#32625)

* Adding org.elasticsearch.xpack.core.ml.datafeed to org.elasticsearch.protocol.xpack.ml.datafeed

* removing unused ParseField and import

* Addressing PR feed back and fixing tests

* Simplifying Datafeed(Config|Update) ctor parser
This commit is contained in:
Benjamin Trent 2018-08-06 15:33:18 -05:00 committed by GitHub
parent e01e4393a8
commit b2a0f38a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1110 additions and 0 deletions

View File

@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
/**
* The description of how searches should be chunked.
*/
public class ChunkingConfig implements ToXContentObject {
public static final ParseField MODE_FIELD = new ParseField("mode");
public static final ParseField TIME_SPAN_FIELD = new ParseField("time_span");
public static final ConstructingObjectParser<ChunkingConfig, Void> PARSER = new ConstructingObjectParser<>(
"chunking_config", true, a -> new ChunkingConfig((Mode) a[0], (TimeValue) a[1]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Mode.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, MODE_FIELD, ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return TimeValue.parseTimeValue(p.text(), TIME_SPAN_FIELD.getPreferredName());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, TIME_SPAN_FIELD, ValueType.STRING);
}
private final Mode mode;
private final TimeValue timeSpan;
ChunkingConfig(Mode mode, @Nullable TimeValue timeSpan) {
this.mode = Objects.requireNonNull(mode, MODE_FIELD.getPreferredName());
this.timeSpan = timeSpan;
}
@Nullable
public TimeValue getTimeSpan() {
return timeSpan;
}
Mode getMode() {
return mode;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MODE_FIELD.getPreferredName(), mode);
if (timeSpan != null) {
builder.field(TIME_SPAN_FIELD.getPreferredName(), timeSpan.getStringRep());
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(mode, timeSpan);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ChunkingConfig other = (ChunkingConfig) obj;
return Objects.equals(this.mode, other.mode) &&
Objects.equals(this.timeSpan, other.timeSpan);
}
public static ChunkingConfig newAuto() {
return new ChunkingConfig(Mode.AUTO, null);
}
public static ChunkingConfig newOff() {
return new ChunkingConfig(Mode.OFF, null);
}
public static ChunkingConfig newManual(TimeValue timeSpan) {
return new ChunkingConfig(Mode.MANUAL, timeSpan);
}
public enum Mode {
AUTO, MANUAL, OFF;
public static Mode fromString(String value) {
return Mode.valueOf(value.toUpperCase(Locale.ROOT));
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -0,0 +1,329 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* Datafeed configuration options pojo. Describes where to proactively pull input
* data from.
* <p>
* If a value has not been set it will be <code>null</code>. Object wrappers are
* used around integral types and booleans so they can take <code>null</code>
* values.
*/
public class DatafeedConfig implements ToXContentObject {
public static final int DEFAULT_SCROLL_SIZE = 1000;
public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField INDEXES = new ParseField("indexes");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField JOB_ID = new ParseField("job_id");
public static final ParseField TYPES = new ParseField("types");
public static final ParseField QUERY = new ParseField("query");
public static final ParseField SCROLL_SIZE = new ParseField("scroll_size");
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);
PARSER.declareStringArray(Builder::setIndices, INDEXES);
PARSER.declareStringArray(Builder::setIndices, INDICES);
PARSER.declareStringArray(Builder::setTypes, TYPES);
PARSER.declareString((builder, val) ->
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
PARSER.declareString((builder, val) ->
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
parsedScriptFields.add(new SearchSourceBuilder.ScriptField(p));
}
return parsedScriptFields;
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
}
private final String id;
private final String jobId;
/**
* The delay before starting to query a period of time
*/
private final TimeValue queryDelay;
/**
* The frequency with which queries are executed
*/
private final TimeValue frequency;
private final List<String> indices;
private final List<String> types;
private final QueryBuilder query;
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
this.frequency = frequency;
this.indices = indices == null ? null : Collections.unmodifiableList(indices);
this.types = types == null ? null : Collections.unmodifiableList(types);
this.query = query;
this.aggregations = aggregations;
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
}
public String getId() {
return id;
}
public String getJobId() {
return jobId;
}
public TimeValue getQueryDelay() {
return queryDelay;
}
public TimeValue getFrequency() {
return frequency;
}
public List<String> getIndices() {
return indices;
}
public List<String> getTypes() {
return types;
}
public Integer getScrollSize() {
return scrollSize;
}
public QueryBuilder getQuery() {
return query;
}
public AggregatorFactories.Builder getAggregations() {
return aggregations;
}
public List<SearchSourceBuilder.ScriptField> getScriptFields() {
return scriptFields == null ? Collections.emptyList() : scriptFields;
}
public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(JOB_ID.getPreferredName(), jobId);
if (queryDelay != null) {
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
builder.field(INDICES.getPreferredName(), indices);
builder.field(TYPES.getPreferredName(), types);
builder.field(QUERY.getPreferredName(), query);
if (aggregations != null) {
builder.field(AGGREGATIONS.getPreferredName(), aggregations);
}
if (scriptFields != null) {
builder.startObject(SCRIPT_FIELDS.getPreferredName());
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(builder, params);
}
builder.endObject();
}
builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
builder.endObject();
return builder;
}
/**
* The lists of indices and types are compared for equality but they are not
* sorted first so this test could fail simply because the indices and types
* lists are in different orders.
*/
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DatafeedConfig that = (DatafeedConfig) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indices, that.indices)
&& Objects.equals(this.types, that.types)
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
chunkingConfig);
}
public static class Builder {
private String id;
private String jobId;
private TimeValue queryDelay;
private TimeValue frequency;
private List<String> indices = Collections.emptyList();
private List<String> types = Collections.emptyList();
private QueryBuilder query = QueryBuilders.matchAllQuery();
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private ChunkingConfig chunkingConfig;
public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
this.jobId = Objects.requireNonNull(jobId, JOB_ID.getPreferredName());
}
public Builder(DatafeedConfig config) {
this.id = config.id;
this.jobId = config.jobId;
this.queryDelay = config.queryDelay;
this.frequency = config.frequency;
this.indices = config.indices;
this.types = config.types;
this.query = config.query;
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
}
public Builder setIndices(List<String> indices) {
this.indices = Objects.requireNonNull(indices, INDICES.getPreferredName());
return this;
}
public Builder setTypes(List<String> types) {
this.types = Objects.requireNonNull(types, TYPES.getPreferredName());
return this;
}
public Builder setQueryDelay(TimeValue queryDelay) {
this.queryDelay = queryDelay;
return this;
}
public Builder setFrequency(TimeValue frequency) {
this.frequency = frequency;
return this;
}
public Builder setQuery(QueryBuilder query) {
this.query = Objects.requireNonNull(query, QUERY.getPreferredName());
return this;
}
public Builder setAggregations(AggregatorFactories.Builder aggregations) {
this.aggregations = aggregations;
return this;
}
public Builder setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
this.scriptFields = sorted;
return this;
}
public Builder setScrollSize(int scrollSize) {
this.scrollSize = scrollSize;
return this;
}
public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
return this;
}
public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
}
}
}

View File

@ -0,0 +1,310 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
/**
* A datafeed update contains partial properties to update a {@link DatafeedConfig}.
* The main difference between this class and {@link DatafeedConfig} is that here all
* fields are nullable.
*/
public class DatafeedUpdate implements ToXContentObject {
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_update", true, a -> new Builder((String)a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
PARSER.declareString(Builder::setJobId, DatafeedConfig.JOB_ID);
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDEXES);
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDICES);
PARSER.declareStringArray(Builder::setTypes, DatafeedConfig.TYPES);
PARSER.declareString((builder, val) -> builder.setQueryDelay(
TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY);
PARSER.declareString((builder, val) -> builder.setFrequency(
TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY);
PARSER.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), DatafeedConfig.QUERY);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p),
DatafeedConfig.AGGREGATIONS);
PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p),
DatafeedConfig.AGGS);
PARSER.declareObject(Builder::setScriptFields, (p, c) -> {
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
parsedScriptFields.add(new SearchSourceBuilder.ScriptField(p));
}
return parsedScriptFields;
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
}
private final String id;
private final String jobId;
private final TimeValue queryDelay;
private final TimeValue frequency;
private final List<String> indices;
private final List<String> types;
private final QueryBuilder query;
private final AggregatorFactories.Builder aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
this.frequency = frequency;
this.indices = indices;
this.types = types;
this.query = query;
this.aggregations = aggregations;
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
}
/**
* Get the id of the datafeed to update
*/
public String getId() {
return id;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), id);
addOptionalField(builder, DatafeedConfig.JOB_ID, jobId);
if (queryDelay != null) {
builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
if (frequency != null) {
builder.field(DatafeedConfig.FREQUENCY.getPreferredName(), frequency.getStringRep());
}
addOptionalField(builder, DatafeedConfig.INDICES, indices);
addOptionalField(builder, DatafeedConfig.TYPES, types);
addOptionalField(builder, DatafeedConfig.QUERY, query);
addOptionalField(builder, DatafeedConfig.AGGREGATIONS, aggregations);
if (scriptFields != null) {
builder.startObject(DatafeedConfig.SCRIPT_FIELDS.getPreferredName());
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(builder, params);
}
builder.endObject();
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
return builder;
}
private void addOptionalField(XContentBuilder builder, ParseField field, Object value) throws IOException {
if (value != null) {
builder.field(field.getPreferredName(), value);
}
}
public String getJobId() {
return jobId;
}
public TimeValue getQueryDelay() {
return queryDelay;
}
public TimeValue getFrequency() {
return frequency;
}
public List<String> getIndices() {
return indices;
}
public List<String> getTypes() {
return types;
}
public Integer getScrollSize() {
return scrollSize;
}
public QueryBuilder getQuery() {
return query;
}
public AggregatorFactories.Builder getAggregations() {
return aggregations;
}
public List<SearchSourceBuilder.ScriptField> getScriptFields() {
return scriptFields == null ? Collections.emptyList() : scriptFields;
}
public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}
/**
* The lists of indices and types are compared for equality but they are not
* sorted first so this test could fail simply because the indices and types
* lists are in different orders.
*/
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DatafeedUpdate that = (DatafeedUpdate) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indices, that.indices)
&& Objects.equals(this.types, that.types)
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.aggregations, that.aggregations)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields,
chunkingConfig);
}
public static class Builder {
private String id;
private String jobId;
private TimeValue queryDelay;
private TimeValue frequency;
private List<String> indices;
private List<String> types;
private QueryBuilder query;
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
}
public Builder(DatafeedUpdate config) {
this.id = config.id;
this.jobId = config.jobId;
this.queryDelay = config.queryDelay;
this.frequency = config.frequency;
this.indices = config.indices;
this.types = config.types;
this.query = config.query;
this.aggregations = config.aggregations;
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
}
public Builder setJobId(String jobId) {
this.jobId = jobId;
return this;
}
public Builder setIndices(List<String> indices) {
this.indices = indices;
return this;
}
public Builder setTypes(List<String> types) {
this.types = types;
return this;
}
public Builder setQueryDelay(TimeValue queryDelay) {
this.queryDelay = queryDelay;
return this;
}
public Builder setFrequency(TimeValue frequency) {
this.frequency = frequency;
return this;
}
public Builder setQuery(QueryBuilder query) {
this.query = query;
return this;
}
public Builder setAggregations(AggregatorFactories.Builder aggregations) {
this.aggregations = aggregations;
return this;
}
public Builder setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
this.scriptFields = sorted;
return this;
}
public Builder setScrollSize(int scrollSize) {
this.scrollSize = scrollSize;
return this;
}
public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
return this;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class ChunkingConfigTests extends AbstractXContentTestCase<ChunkingConfig> {
@Override
protected ChunkingConfig createTestInstance() {
return createRandomizedChunk();
}
@Override
protected ChunkingConfig doParseInstance(XContentParser parser) {
return ChunkingConfig.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
public static ChunkingConfig createRandomizedChunk() {
ChunkingConfig.Mode mode = randomFrom(ChunkingConfig.Mode.values());
TimeValue timeSpan = null;
if (mode == ChunkingConfig.Mode.MANUAL) {
// time span is required to be at least 1 millis, so we use a custom method to generate a time value here
timeSpan = randomPositiveSecondsMinutesHours();
}
return new ChunkingConfig(mode, timeSpan);
}
private static TimeValue randomPositiveSecondsMinutesHours() {
return new TimeValue(randomIntBetween(1, 1000), randomFrom(Arrays.asList(TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS)));
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class DatafeedConfigTests extends AbstractXContentTestCase<DatafeedConfig> {
@Override
protected DatafeedConfig createTestInstance() {
long bucketSpanMillis = 3600000;
DatafeedConfig.Builder builder = constructBuilder();
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
boolean addScriptFields = randomBoolean();
if (addScriptFields) {
int scriptsSize = randomInt(3);
List<ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new ScriptField(randomAlphaOfLength(10), mockScript(randomAlphaOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
}
Long aggHistogramInterval = null;
if (randomBoolean()) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list xcontent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggHistogramInterval = randomNonNegativeLong();
aggHistogramInterval = aggHistogramInterval > bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval;
aggHistogramInterval = aggHistogramInterval <= 0 ? 1 : aggHistogramInterval;
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets")
.interval(aggHistogramInterval).subAggregation(maxTime).field("time"));
builder.setAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
if (aggHistogramInterval == null) {
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
} else {
builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval));
}
}
if (randomBoolean()) {
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
public static List<String> randomStringList(int min, int max) {
int size = scaledRandomIntBetween(min, max);
List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
list.add(randomAlphaOfLength(10));
}
return list;
}
@Override
protected DatafeedConfig doParseInstance(XContentParser parser) {
return DatafeedConfig.PARSER.apply(parser, null).build();
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
private static final String FUTURE_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
" \"tomorrows_technology_today\": \"amazing\",\n" +
" \"scroll_size\": 1234\n" +
"}";
public void testFutureMetadataParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
// Unlike the config version of this test, the metadata parser should tolerate the unknown future field
assertNotNull(DatafeedConfig.PARSER.apply(parser, null).build());
}
public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
DatafeedConfig datafeedConfig = createTestInstance();
DatafeedConfig copy = new DatafeedConfig.Builder(datafeedConfig).build();
assertEquals(datafeedConfig, copy);
}
}
public void testCheckValid_GivenNullIdInConstruction() {
expectThrows(NullPointerException.class, () -> new DatafeedConfig.Builder(null, null));
}
public void testCheckValid_GivenNullJobId() {
expectThrows(NullPointerException.class, () -> new DatafeedConfig.Builder(randomValidDatafeedId(), null));
}
public void testCheckValid_GivenNullIndices() {
DatafeedConfig.Builder conf = constructBuilder();
expectThrows(NullPointerException.class, () -> conf.setIndices(null));
}
public void testCheckValid_GivenNullType() {
DatafeedConfig.Builder conf = constructBuilder();
expectThrows(NullPointerException.class, () -> conf.setTypes(null));
}
public void testCheckValid_GivenNullQuery() {
DatafeedConfig.Builder conf = constructBuilder();
expectThrows(NullPointerException.class, () -> conf.setQuery(null));
}
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
}
private static DatafeedConfig.Builder constructBuilder() {
return new DatafeedConfig.Builder(randomValidDatafeedId(), randomAlphaOfLength(10));
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.protocol.xpack.ml.datafeed;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate> {
@Override
protected DatafeedUpdate createTestInstance() {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId());
if (randomBoolean()) {
builder.setJobId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
}
if (randomBoolean()) {
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, Integer.MAX_VALUE)));
}
if (randomBoolean()) {
builder.setIndices(DatafeedConfigTests.randomStringList(1, 10));
}
if (randomBoolean()) {
builder.setTypes(DatafeedConfigTests.randomStringList(1, 10));
}
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
if (randomBoolean()) {
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10), mockScript(randomAlphaOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
}
if (randomBoolean()) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list xcontent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg(randomAlphaOfLength(10)).field(randomAlphaOfLength(10)));
builder.setAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
}
@Override
protected DatafeedUpdate doParseInstance(XContentParser parser) {
return DatafeedUpdate.PARSER.apply(parser, null).build();
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
}