diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java index 6d12108664c..af3186eef9b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/MlPlugin.java @@ -62,6 +62,7 @@ import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.ml.action.UpdateProcessAction; @@ -92,6 +93,7 @@ import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestPutDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStartDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction; +import org.elasticsearch.xpack.ml.rest.datafeeds.RestUpdateDatafeedAction; import org.elasticsearch.xpack.ml.rest.filter.RestDeleteFilterAction; import org.elasticsearch.xpack.ml.rest.filter.RestGetFiltersAction; import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction; @@ -293,6 +295,7 @@ public class MlPlugin extends Plugin implements ActionPlugin { new RestGetDatafeedsAction(settings, restController), new RestGetDatafeedStatsAction(settings, restController), new RestPutDatafeedAction(settings, restController), + new RestUpdateDatafeedAction(settings, restController), new RestDeleteDatafeedAction(settings, restController), new RestStartDatafeedAction(settings, restController), new RestStopDatafeedAction(settings, restController), @@ -330,6 +333,7 @@ public class MlPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(GetDatafeedsAction.INSTANCE, GetDatafeedsAction.TransportAction.class), new ActionHandler<>(GetDatafeedsStatsAction.INSTANCE, GetDatafeedsStatsAction.TransportAction.class), new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class), + new ActionHandler<>(UpdateDatafeedAction.INSTANCE, UpdateDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class), new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java new file mode 100644 index 00000000000..b6a898bdaa7 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java @@ -0,0 +1,179 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; + +import java.io.IOException; +import java.util.Objects; + +public class UpdateDatafeedAction extends Action { + + public static final UpdateDatafeedAction INSTANCE = new UpdateDatafeedAction(); + public static final String NAME = "cluster:admin/ml/datafeeds/update"; + + private UpdateDatafeedAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public PutDatafeedAction.Response newResponse() { + return new PutDatafeedAction.Response(); + } + + public static class Request extends AcknowledgedRequest implements ToXContent { + + public static Request parseRequest(String datafeedId, XContentParser parser) { + DatafeedUpdate.Builder update = DatafeedUpdate.PARSER.apply(parser, null); + update.setId(datafeedId); + return new Request(update.build()); + } + + private DatafeedUpdate update; + + public Request(DatafeedUpdate update) { + this.update = update; + } + + Request() { + } + + public DatafeedUpdate getUpdate() { + return update; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + update = new DatafeedUpdate(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + update.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + update.toXContent(builder, params); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(update, request.update); + } + + @Override + public int hashCode() { + return Objects.hash(update); + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, UpdateDatafeedAction action) { + super(client, action, new Request()); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected PutDatafeedAction.Response newResponse() { + return new PutDatafeedAction.Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) + throws Exception { + clusterService.submitStateUpdateTask("update-datafeed-" + request.getUpdate().getId(), + new AckedClusterStateUpdateTask(request, listener) { + private DatafeedConfig updatedDatafeed; + + @Override + protected PutDatafeedAction.Response newResponse(boolean acknowledged) { + if (acknowledged) { + logger.info("Updated datafeed [{}]", request.getUpdate().getId()); + } + return new PutDatafeedAction.Response(acknowledged, updatedDatafeed); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + DatafeedUpdate update = request.getUpdate(); + MlMetadata currentMetadata = state.getMetaData().custom(MlMetadata.TYPE); + PersistentTasksInProgress persistentTasksInProgress = state.custom(PersistentTasksInProgress.TYPE); + MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) + .updateDatafeed(update, persistentTasksInProgress).build(); + updatedDatafeed = newMetadata.getDatafeed(update.getId()); + return ClusterState.builder(currentState).metaData( + MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()).build(); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index addb1a89989..055c42fdeab 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -177,22 +177,10 @@ public class DatafeedConfig extends AbstractDiffable implements return frequency; } - /** - * For the ELASTICSEARCH data source only, one or more indexes to search for - * input data. - * - * @return The indexes to search, or null if not set. - */ public List getIndexes() { return indexes; } - /** - * For the ELASTICSEARCH data source only, one or more types to search for - * input data. - * - * @return The types to search, or null if not set. - */ public List getTypes() { return types; } @@ -213,6 +201,13 @@ public class DatafeedConfig extends AbstractDiffable implements return aggregations; } + /** + * @return {@code true} when there are non-empty aggregations, {@code false} otherwise + */ + public boolean hasAggregations() { + return aggregations != null && aggregations.count() > 0; + } + public List getScriptFields() { return scriptFields == null ? Collections.emptyList() : scriptFields; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 06e903a39b6..33161f49817 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -164,7 +164,7 @@ public class DatafeedJobRunner extends AbstractComponent { } DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { - boolean isScrollSearch = datafeedConfig.getAggregations() == null; + boolean isScrollSearch = datafeedConfig.hasAggregations() == false; DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) : new AggregationDataExtractorFactory(client, datafeedConfig, job); ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java index 93fc633ffa9..70be293e650 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidator.java @@ -23,7 +23,7 @@ public final class DatafeedJobValidator { if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) { throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); } - if (datafeedConfig.getAggregations() != null && !DatafeedConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { + if (datafeedConfig.hasAggregations() && !DatafeedConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { throw new IllegalArgumentException( Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT)); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java new file mode 100644 index 00000000000..a88134eab99 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdate.java @@ -0,0 +1,371 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * A datafeed update contains partial properties to update a {@link DatafeedConfig}. + * The main difference between this class and {@link DatafeedConfig} is that here all + * fields are nullable. + */ +public class DatafeedUpdate implements Writeable, ToXContent { + + public static final ObjectParser PARSER = new ObjectParser<>("datafeed_update", Builder::new); + + static { + PARSER.declareString(Builder::setId, DatafeedConfig.ID); + PARSER.declareString(Builder::setJobId, Job.ID); + PARSER.declareStringArray(Builder::setIndexes, DatafeedConfig.INDEXES); + PARSER.declareStringArray(Builder::setTypes, DatafeedConfig.TYPES); + PARSER.declareLong(Builder::setQueryDelay, DatafeedConfig.QUERY_DELAY); + PARSER.declareLong(Builder::setFrequency, DatafeedConfig.FREQUENCY); + PARSER.declareObject(Builder::setQuery, + (p, c) -> new QueryParseContext(p).parseInnerQueryBuilder(), DatafeedConfig.QUERY); + PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)), + DatafeedConfig.AGGREGATIONS); + PARSER.declareObject(Builder::setAggregations,(p, c) -> AggregatorFactories.parseAggregators(new QueryParseContext(p)), + DatafeedConfig.AGGS); + PARSER.declareObject(Builder::setScriptFields, (p, c) -> { + List parsedScriptFields = new ArrayList<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + parsedScriptFields.add(new SearchSourceBuilder.ScriptField(new QueryParseContext(p))); + } + parsedScriptFields.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName)); + return parsedScriptFields; + }, DatafeedConfig.SCRIPT_FIELDS); + PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE); + PARSER.declareBoolean(Builder::setSource, DatafeedConfig.SOURCE); + PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG); + } + + private final String id; + private final String jobId; + private final Long queryDelay; + private final Long frequency; + private final List indexes; + private final List types; + private final QueryBuilder query; + private final AggregatorFactories.Builder aggregations; + private final List scriptFields; + private final Integer scrollSize; + private final Boolean source; + private final ChunkingConfig chunkingConfig; + + private DatafeedUpdate(String id, String jobId, Long queryDelay, Long frequency, List indexes, List types, + QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, + Integer scrollSize, Boolean source, ChunkingConfig chunkingConfig) { + this.id = id; + this.jobId = jobId; + this.queryDelay = queryDelay; + this.frequency = frequency; + this.indexes = indexes; + this.types = types; + this.query = query; + this.aggregations = aggregations; + this.scriptFields = scriptFields; + this.scrollSize = scrollSize; + this.source = source; + this.chunkingConfig = chunkingConfig; + } + + public DatafeedUpdate(StreamInput in) throws IOException { + this.id = in.readString(); + this.jobId = in.readOptionalString(); + this.queryDelay = in.readOptionalLong(); + this.frequency = in.readOptionalLong(); + if (in.readBoolean()) { + this.indexes = in.readList(StreamInput::readString); + } else { + this.indexes = null; + } + if (in.readBoolean()) { + this.types = in.readList(StreamInput::readString); + } else { + this.types = null; + } + this.query = in.readOptionalNamedWriteable(QueryBuilder.class); + this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); + if (in.readBoolean()) { + this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new); + } else { + this.scriptFields = null; + } + this.scrollSize = in.readOptionalVInt(); + this.source = in.readOptionalBoolean(); + this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new); + } + + /** + * Get the id of the datafeed to update + */ + public String getId() { + return id; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeOptionalString(jobId); + out.writeOptionalLong(queryDelay); + out.writeOptionalLong(frequency); + if (indexes != null) { + out.writeBoolean(true); + out.writeStringList(indexes); + } else { + out.writeBoolean(false); + } + if (types != null) { + out.writeBoolean(true); + out.writeStringList(types); + } else { + out.writeBoolean(false); + } + out.writeOptionalNamedWriteable(query); + out.writeOptionalWriteable(aggregations); + if (scriptFields != null) { + out.writeBoolean(true); + out.writeList(scriptFields); + } else { + out.writeBoolean(false); + } + out.writeOptionalVInt(scrollSize); + out.writeOptionalBoolean(source); + out.writeOptionalWriteable(chunkingConfig); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DatafeedConfig.ID.getPreferredName(), id); + addOptionalField(builder, Job.ID, jobId); + addOptionalField(builder, DatafeedConfig.QUERY_DELAY, queryDelay); + addOptionalField(builder, DatafeedConfig.FREQUENCY, frequency); + addOptionalField(builder, DatafeedConfig.INDEXES, indexes); + addOptionalField(builder, DatafeedConfig.TYPES, types); + addOptionalField(builder, DatafeedConfig.QUERY, query); + addOptionalField(builder, DatafeedConfig.AGGREGATIONS, aggregations); + if (scriptFields != null) { + builder.startObject(DatafeedConfig.SCRIPT_FIELDS.getPreferredName()); + for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { + scriptField.toXContent(builder, params); + } + builder.endObject(); + } + addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); + addOptionalField(builder, DatafeedConfig.SOURCE, source); + addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); + builder.endObject(); + return builder; + } + + private void addOptionalField(XContentBuilder builder, ParseField field, Object value) throws IOException { + if (value != null) { + builder.field(field.getPreferredName(), value); + } + } + + /** + * Applies the update to the given {@link DatafeedConfig} + * @return a new {@link DatafeedConfig} that contains the update + */ + public DatafeedConfig apply(DatafeedConfig datafeedConfig) { + if (id.equals(datafeedConfig.getId()) == false) { + throw new IllegalArgumentException("Cannot apply update to datafeedConfig with different id"); + } + + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig); + if (jobId != null) { + builder.setJobId(jobId); + } + if (queryDelay != null) { + builder.setQueryDelay(queryDelay); + } + if (frequency != null) { + builder.setFrequency(frequency); + } + if (indexes != null) { + builder.setIndexes(indexes); + } + if (types != null) { + builder.setTypes(types); + } + if (query != null) { + builder.setQuery(query); + } + if (aggregations != null) { + builder.setAggregations(aggregations); + } + if (scriptFields != null) { + builder.setScriptFields(scriptFields); + } + if (scrollSize != null) { + builder.setScrollSize(scrollSize); + } + if (source != null) { + builder.setSource(source); + } + if (chunkingConfig != null) { + builder.setChunkingConfig(chunkingConfig); + } + return builder.build(); + } + + /** + * The lists of indexes and types are compared for equality but they are not + * sorted first so this test could fail simply because the indexes and types + * lists are in different orders. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof DatafeedUpdate == false) { + return false; + } + + DatafeedUpdate that = (DatafeedUpdate) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.jobId, that.jobId) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.queryDelay, that.queryDelay) + && Objects.equals(this.indexes, that.indexes) + && Objects.equals(this.types, that.types) + && Objects.equals(this.query, that.query) + && Objects.equals(this.scrollSize, that.scrollSize) + && Objects.equals(this.aggregations, that.aggregations) + && Objects.equals(this.scriptFields, that.scriptFields) + && Objects.equals(this.source, that.source) + && Objects.equals(this.chunkingConfig, that.chunkingConfig); + } + + @Override + public int hashCode() { + return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, aggregations, scriptFields, source, + chunkingConfig); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static class Builder { + + private String id; + private String jobId; + private Long queryDelay; + private Long frequency; + private List indexes; + private List types; + private QueryBuilder query; + private AggregatorFactories.Builder aggregations; + private List scriptFields; + private Integer scrollSize; + private Boolean source; + private ChunkingConfig chunkingConfig; + + public Builder() { + } + + public Builder(String id) { + this.id = ExceptionsHelper.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); + } + + public Builder(DatafeedUpdate config) { + this.id = config.id; + this.jobId = config.jobId; + this.queryDelay = config.queryDelay; + this.frequency = config.frequency; + this.indexes = config.indexes; + this.types = config.types; + this.query = config.query; + this.aggregations = config.aggregations; + this.scriptFields = config.scriptFields; + this.scrollSize = config.scrollSize; + this.source = config.source; + this.chunkingConfig = config.chunkingConfig; + } + + public void setId(String datafeedId) { + id = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public void setIndexes(List indexes) { + this.indexes = indexes; + } + + public void setTypes(List types) { + this.types = types; + } + + public void setQueryDelay(long queryDelay) { + this.queryDelay = queryDelay; + } + + public void setFrequency(long frequency) { + this.frequency = frequency; + } + + public void setQuery(QueryBuilder query) { + this.query = query; + } + + public void setAggregations(AggregatorFactories.Builder aggregations) { + this.aggregations = aggregations; + } + + public void setScriptFields(List scriptFields) { + List sorted = new ArrayList<>(scriptFields); + sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName)); + this.scriptFields = sorted; + } + + public void setScrollSize(int scrollSize) { + this.scrollSize = scrollSize; + } + + public void setSource(boolean enabled) { + this.source = enabled; + } + + public void setChunkingConfig(ChunkingConfig chunkingConfig) { + this.chunkingConfig = chunkingConfig; + } + + public DatafeedUpdate build() { + return new DatafeedUpdate(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, + source, chunkingConfig); + } + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java index 3592285f7b8..22517d8289c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadata.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; @@ -43,6 +44,7 @@ import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; +import java.util.function.Supplier; public class MlMetadata implements MetaData.Custom { @@ -254,6 +256,14 @@ public class MlMetadata implements MetaData.Custom { throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists"); } String jobId = datafeedConfig.getJobId(); + checkJobIsAvailableForDatafeed(jobId); + Job job = jobs.get(jobId); + DatafeedJobValidator.validate(datafeedConfig, job); + datafeeds.put(datafeedConfig.getId(), datafeedConfig); + return this; + } + + private void checkJobIsAvailableForDatafeed(String jobId) { Job job = jobs.get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); @@ -263,9 +273,23 @@ public class MlMetadata implements MetaData.Custom { throw ExceptionsHelper.conflictStatusException("A datafeed [" + existingDatafeed.get().getId() + "] already exists for job [" + jobId + "]"); } - DatafeedJobValidator.validate(datafeedConfig, job); + } - datafeeds.put(datafeedConfig.getId(), datafeedConfig); + public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksInProgress persistentTasksInProgress) { + String datafeedId = update.getId(); + DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); + if (oldDatafeedConfig == null) { + throw ExceptionsHelper.missingDatafeedException(datafeedId); + } + checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, + DatafeedState.STARTED), datafeedId, persistentTasksInProgress); + DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig); + if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { + checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); + } + Job job = jobs.get(newDatafeedConfig.getJobId()); + DatafeedJobValidator.validate(newDatafeedConfig, job); + datafeeds.put(datafeedId, newDatafeedConfig); return this; } @@ -274,17 +298,8 @@ public class MlMetadata implements MetaData.Custom { if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); } - if (persistentTasksInProgress != null) { - Predicate> predicate = t -> { - StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); - return storedRequest.getDatafeedId().equals(datafeedId); - }; - if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) { - String msg = Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, - DatafeedState.STARTED); - throw ExceptionsHelper.conflictStatusException(msg); - } - } + checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, + DatafeedState.STARTED), datafeedId, persistentTasksInProgress); datafeeds.remove(datafeedId); return this; } @@ -293,6 +308,18 @@ public class MlMetadata implements MetaData.Custom { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } + private void checkDatafeedIsStopped(Supplier msg, String datafeedId, PersistentTasksInProgress persistentTasksInProgress) { + if (persistentTasksInProgress != null) { + Predicate> predicate = t -> { + StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); + return storedRequest.getDatafeedId().equals(datafeedId); + }; + if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) { + throw ExceptionsHelper.conflictStatusException(msg.get()); + } + } + } + private Builder putJobs(Collection jobs) { for (Job job : jobs) { putJob(job, true); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java new file mode 100644 index 00000000000..9bc1222104c --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestUpdateDatafeedAction.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.datafeeds; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.MlPlugin; +import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; + +import java.io.IOException; + +public class RestUpdateDatafeedAction extends BaseRestHandler { + + public RestUpdateDatafeedAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, MlPlugin.BASE_PATH + "datafeeds/{" + + DatafeedConfig.ID.getPreferredName() + "}/_update", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + XContentParser parser = restRequest.contentParser(); + UpdateDatafeedAction.Request updateDatafeedRequest = UpdateDatafeedAction.Request.parseRequest(datafeedId, parser); + return channel -> client.execute(UpdateDatafeedAction.INSTANCE, updateDatafeedRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedActionRequestTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedActionRequestTests.java new file mode 100644 index 00000000000..e0037d2f906 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedActionRequestTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction.Request; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; +import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdateTests; +import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; +import org.junit.Before; + +public class UpdateDatafeedActionRequestTests extends AbstractStreamableXContentTestCase { + + private String datafeedId; + + @Before + public void setUpDatafeedId() { + datafeedId = DatafeedConfigTests.randomValidDatafeedId(); + } + + @Override + protected Request createTestInstance() { + return new Request(DatafeedUpdateTests.createRandomized(datafeedId)); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } + + @Override + protected Request parseInstance(XContentParser parser) { + return Request.parseRequest(datafeedId, parser); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index cc138887655..b8d8c8900e7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -25,6 +25,7 @@ import java.util.List; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class DatafeedConfigTests extends AbstractSerializingTestCase { @@ -40,13 +41,17 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase scriptFields = new ArrayList<>(scriptsSize); - for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { - scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), - randomBoolean())); + boolean addScriptFields = randomBoolean(); + if (addScriptFields) { + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), + randomBoolean())); + } + builder.setScriptFields(scriptFields); } - if (randomBoolean() && scriptsSize == 0) { + if (randomBoolean() && addScriptFields == false) { // can only test with a single agg as the xcontent order gets randomized by test base class and then // the actual xcontent isn't the same and test fail. // Testing with a single agg is ok as we don't have special list writeable / xconent logic @@ -54,7 +59,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase randomStringList(int min, int max) { + public static List randomStringList(int min, int max) { int size = scaledRandomIntBetween(min, max); List list = new ArrayList<>(); for (int i = 0; i < size; i++) { @@ -257,6 +261,35 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase currentTime); DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java new file mode 100644 index 00000000000..3fcc941d7d7 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedUpdateTests.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class DatafeedUpdateTests extends AbstractSerializingTestCase { + + @Override + protected DatafeedUpdate createTestInstance() { + return createRandomized(DatafeedConfigTests.randomValidDatafeedId()); + } + + public static DatafeedUpdate createRandomized(String datafeedId) { + DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId); + if (randomBoolean()) { + builder.setJobId(randomAsciiOfLength(10)); + } + if (randomBoolean()) { + builder.setQueryDelay(randomNonNegativeLong()); + } + if (randomBoolean()) { + builder.setFrequency(randomNonNegativeLong()); + } + if (randomBoolean()) { + builder.setIndexes(DatafeedConfigTests.randomStringList(1, 10)); + } + if (randomBoolean()) { + builder.setTypes(DatafeedConfigTests.randomStringList(1, 10)); + } + if (randomBoolean()) { + builder.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10))); + } + if (randomBoolean()) { + int scriptsSize = randomInt(3); + List scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), + randomBoolean())); + } + builder.setScriptFields(scriptFields); + } + if (randomBoolean()) { + // can only test with a single agg as the xcontent order gets randomized by test base class and then + // the actual xcontent isn't the same and test fail. + // Testing with a single agg is ok as we don't have special list writeable / xconent logic + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10))); + builder.setAggregations(aggs); + } + if (randomBoolean()) { + builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + builder.setSource(randomBoolean()); + } + if (randomBoolean()) { + builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk()); + } + return builder.build(); + } + + @Override + protected Writeable.Reader instanceReader() { + return DatafeedUpdate::new; + } + + @Override + protected DatafeedUpdate parseInstance(XContentParser parser) { + return DatafeedUpdate.PARSER.apply(parser, null).build(); + } + + public void testApply_failBecauseTargetDatafeedHasDifferentId() { + DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); + expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed)); + } + + public void testApply_givenEmptyUpdate() { + DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); + DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed); + assertThat(datafeed, equalTo(updatedDatafeed)); + } + + public void testApply_givenPartialUpdate() { + DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId()); + update.setScrollSize(datafeed.getScrollSize() + 1); + + DatafeedUpdate.Builder updated = new DatafeedUpdate.Builder(datafeed.getId()); + updated.setScrollSize(datafeed.getScrollSize() + 1); + DatafeedConfig updatedDatafeed = update.build().apply(datafeed); + + DatafeedConfig.Builder expectedDatafeed = new DatafeedConfig.Builder(datafeed); + expectedDatafeed.setScrollSize(datafeed.getScrollSize() + 1); + assertThat(updatedDatafeed, equalTo(expectedDatafeed.build())); + } + + public void testApply_givenFullUpdateNoAggregations() { + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("foo", "foo-feed"); + datafeedBuilder.setIndexes(Arrays.asList("i_1")); + datafeedBuilder.setTypes(Arrays.asList("t_1")); + DatafeedConfig datafeed = datafeedBuilder.build(); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId()); + update.setJobId("bar"); + update.setIndexes(Arrays.asList("i_2")); + update.setTypes(Arrays.asList("t_2")); + update.setQueryDelay(42L); + update.setFrequency(142L); + update.setQuery(QueryBuilders.termQuery("a", "b")); + update.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false))); + update.setScrollSize(8000); + update.setSource(true); + update.setChunkingConfig(ChunkingConfig.newManual(3600L)); + + DatafeedConfig updatedDatafeed = update.build().apply(datafeed); + + assertThat(updatedDatafeed.getJobId(), equalTo("bar")); + assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_2"))); + assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_2"))); + assertThat(updatedDatafeed.getQueryDelay(), equalTo(42L)); + assertThat(updatedDatafeed.getFrequency(), equalTo(142L)); + assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b"))); + assertThat(updatedDatafeed.hasAggregations(), is(false)); + assertThat(updatedDatafeed.getScriptFields(), + equalTo(Arrays.asList(new SearchSourceBuilder.ScriptField("a", new Script("b"), false)))); + assertThat(updatedDatafeed.getScrollSize(), equalTo(8000)); + assertThat(updatedDatafeed.isSource(), is(true)); + assertThat(updatedDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newManual(3600L))); + } + + public void testApply_givenAggregations() { + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("foo", "foo-feed"); + datafeedBuilder.setIndexes(Arrays.asList("i_1")); + datafeedBuilder.setTypes(Arrays.asList("t_1")); + DatafeedConfig datafeed = datafeedBuilder.build(); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId()); + update.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a"))); + + DatafeedConfig updatedDatafeed = update.build().apply(datafeed); + + assertThat(updatedDatafeed.getIndexes(), equalTo(Arrays.asList("i_1"))); + assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_1"))); + assertThat(updatedDatafeed.getAggregations(), + equalTo(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("a")))); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 8e56fbed42d..51dcbfe3018 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigTests; +import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; @@ -47,7 +48,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { Job job = JobTests.createRandomizedJob(); if (randomBoolean()) { DatafeedConfig datafeedConfig = DatafeedConfigTests.createRandomizedDatafeedConfig(job.getId()); - if (datafeedConfig.getAggregations() != null) { + if (datafeedConfig.hasAggregations()) { AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig().getDetectors()); analysisConfig.setSummaryCountFieldName("doc_count"); Job.Builder jobBuilder = new Job.Builder(job); @@ -236,6 +237,90 @@ public class MlMetadataTests extends AbstractSerializingTestCase { expectThrows(IllegalArgumentException.class, () -> builder.putDatafeed(datafeedConfig1)); } + public void testUpdateDatafeed() { + Job job1 = createDatafeedJob().build(); + DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(job1, false); + builder.putDatafeed(datafeedConfig1); + MlMetadata beforeMetadata = builder.build(); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); + update.setScrollSize(5000); + MlMetadata updatedMetadata = new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null).build(); + + DatafeedConfig updatedDatafeed = updatedMetadata.getDatafeed(datafeedConfig1.getId()); + assertThat(updatedDatafeed.getJobId(), equalTo(datafeedConfig1.getJobId())); + assertThat(updatedDatafeed.getIndexes(), equalTo(datafeedConfig1.getIndexes())); + assertThat(updatedDatafeed.getTypes(), equalTo(datafeedConfig1.getTypes())); + assertThat(updatedDatafeed.getScrollSize(), equalTo(5000)); + } + + public void testUpdateDatafeed_failBecauseDatafeedDoesNotExist() { + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder("foo"); + update.setScrollSize(5000); + expectThrows(ResourceNotFoundException.class, () -> new MlMetadata.Builder().updateDatafeed(update.build(), null).build()); + } + + public void testUpdateDatafeed_failBecauseDatafeedIsNotStopped() { + Job job1 = createDatafeedJob().build(); + DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(job1, false); + builder.putDatafeed(datafeedConfig1); + MlMetadata beforeMetadata = builder.build(); + + StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); + PersistentTaskInProgress taskInProgress = + new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null); + PersistentTasksInProgress tasksInProgress = + new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); + update.setScrollSize(5000); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), tasksInProgress)); + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + } + + public void testUpdateDatafeed_failBecauseNewJobIdDoesNotExist() { + Job job1 = createDatafeedJob().build(); + DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(job1, false); + builder.putDatafeed(datafeedConfig1); + MlMetadata beforeMetadata = builder.build(); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); + update.setJobId(job1.getId() + "_2"); + + expectThrows(ResourceNotFoundException.class, + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null)); + } + + public void testUpdateDatafeed_failBecauseNewJobHasAnotherDatafeedAttached() { + Job job1 = createDatafeedJob().build(); + Job.Builder job2 = new Job.Builder(job1); + job2.setId(job1.getId() + "_2"); + DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); + DatafeedConfig datafeedConfig2 = createDatafeedConfig("datafeed2", job2.getId()).build(); + MlMetadata.Builder builder = new MlMetadata.Builder(); + builder.putJob(job1, false); + builder.putJob(job2.build(), false); + builder.putDatafeed(datafeedConfig1); + builder.putDatafeed(datafeedConfig2); + MlMetadata beforeMetadata = builder.build(); + + DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); + update.setJobId(job2.getId()); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> new MlMetadata.Builder(beforeMetadata).updateDatafeed(update.build(), null)); + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("A datafeed [datafeed2] already exists for job [foo_2]")); + } + public void testRemoveDatafeed_failBecauseDatafeedStarted() { Job job1 = createDatafeedJob().build(); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); diff --git a/elasticsearch/src/test/resources/org/elasticsearch/transport/actions b/elasticsearch/src/test/resources/org/elasticsearch/transport/actions index ba852f2b4db..0f76d78213f 100644 --- a/elasticsearch/src/test/resources/org/elasticsearch/transport/actions +++ b/elasticsearch/src/test/resources/org/elasticsearch/transport/actions @@ -113,6 +113,7 @@ cluster:admin/ml/anomaly_detectors/model_snapshots/get cluster:admin/ml/anomaly_detectors/results/records/get cluster:admin/ml/anomaly_detectors/results/influencers/get cluster:admin/ml/datafeeds/put +cluster:admin/ml/datafeeds/update cluster:admin/ml/anomaly_detectors/model_snapshots/delete cluster:admin/ml/anomaly_detectors/validate/detector cluster:admin/ml/anomaly_detectors/validate diff --git a/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.update_datafeed.json b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.update_datafeed.json new file mode 100644 index 00000000000..3bf0b4692f3 --- /dev/null +++ b/elasticsearch/src/test/resources/rest-api-spec/api/xpack.ml.update_datafeed.json @@ -0,0 +1,20 @@ +{ + "xpack.ml.update_datafeed": { + "methods": [ "POST" ], + "url": { + "path": "/_xpack/ml/datafeeds/{datafeed_id}/_update", + "paths": [ "/_xpack/ml/datafeeds/{datafeed_id}/_update" ], + "parts": { + "datafeed_id": { + "type": "string", + "required": true, + "description": "The ID of the datafeed to update" + } + } + }, + "body": { + "description" : "The datafeed update settings", + "required" : true + } + } +} diff --git a/elasticsearch/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml b/elasticsearch/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml index ad99e9c14e3..9b11933eb2a 100644 --- a/elasticsearch/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml +++ b/elasticsearch/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yaml @@ -133,6 +133,121 @@ setup: "query":{"match_all_mispelled":{}} } +--- +"Test update datafeed": + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id":"job-1", + "indexes":["index-foo"], + "types":["type-bar"], + "scroll_size": 2000 + } + + - do: + xpack.ml.update_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "indexes":["index-*"], + "scroll_size": 10000 + } + - match: { datafeed_id: "test-datafeed-1" } + - match: { job_id: "job-1" } + - match: { indexes: ["index-*"] } + - match: { types: ["type-bar"] } + - match: { scroll_size: 10000 } + +--- +"Test update datafeed to point to different job": + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id":"job-1", + "indexes":["index-foo"], + "types":["type-bar"], + "scroll_size": 2000 + } + + - do: + xpack.ml.update_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id": "job-2" + } + - match: { datafeed_id: "test-datafeed-1" } + - match: { job_id: "job-2" } + - match: { indexes: ["index-foo"] } + - match: { types: ["type-bar"] } + +--- +"Test update datafeed with missing id": + + - do: + catch: /resource_not_found_exception/ + xpack.ml.update_datafeed: + datafeed_id: a-missing-datafeed + body: > + {} + +--- +"Test update datafeed to point to missing job": + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id":"job-1", + "indexes":["index-foo"], + "types":["type-bar"], + "scroll_size": 2000 + } + + - do: + catch: /resource_not_found_exception/ + xpack.ml.update_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id": "job-3" + } + +--- +"Test update datafeed to point to job already attached to another datafeed": + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id":"job-1", + "indexes":["index-foo"], + "types":["type-bar"] + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-2 + body: > + { + "job_id":"job-2", + "indexes":["index-foo"], + "types":["type-bar"] + } + + - do: + catch: /A datafeed \[test-datafeed-2\] already exists for job \[job-2\]/ + xpack.ml.update_datafeed: + datafeed_id: test-datafeed-1 + body: > + { + "job_id": "job-2" + } + --- "Test delete datafeed with missing id": - do: