diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index f34e144fcec..1ec6259ae27 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -81,6 +81,7 @@ import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDatafeedRequest; +import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateJobRequest; @@ -616,6 +617,17 @@ final class MLRequestConverters { return request; } + static Request updateDataFrameAnalytics(UpdateDataFrameAnalyticsRequest updateRequest) throws IOException { + String endpoint = new EndpointBuilder() + .addPathPartAsIs("_ml", "data_frame", "analytics") + .addPathPart(updateRequest.getUpdate().getId()) + .addPathPartAsIs("_update") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(updateRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request getDataFrameAnalytics(GetDataFrameAnalyticsRequest getRequest) { String endpoint = new EndpointBuilder() .addPathPartAsIs("_ml", "data_frame", "analytics") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index d8033e7c36e..6c74dd1e800 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -115,6 +115,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedResponse; +import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateJobRequest; @@ -2042,6 +2043,52 @@ public final class MachineLearningClient { Collections.emptySet()); } + /** + * Updates a Data Frame Analytics config + *

+ * For additional info + * see + * PUT Data Frame Analytics documentation + * + * @param request The {@link UpdateDataFrameAnalyticsRequest} containing the + * {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate} + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return The {@link PutDataFrameAnalyticsResponse} containing the updated + * {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig} + * @throws IOException when there is a serialization issue sending the request or receiving the response + */ + public PutDataFrameAnalyticsResponse updateDataFrameAnalytics(UpdateDataFrameAnalyticsRequest request, + RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, + MLRequestConverters::updateDataFrameAnalytics, + options, + PutDataFrameAnalyticsResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Updates a Data Frame Analytics config asynchronously and notifies listener upon completion + *

+ * For additional info + * see + * Update Data Frame Analytics documentation + * + * @param request The {@link UpdateDataFrameAnalyticsRequest} containing the + * {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate} + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener Listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable updateDataFrameAnalyticsAsync(UpdateDataFrameAnalyticsRequest request, RequestOptions options, + ActionListener listener) { + return restHighLevelClient.performRequestAsyncAndParseEntity(request, + MLRequestConverters::updateDataFrameAnalytics, + options, + PutDataFrameAnalyticsResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Gets a single or multiple Data Frame Analytics configs *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequest.java new file mode 100644 index 00000000000..7c754290a7e --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ml; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +public class UpdateDataFrameAnalyticsRequest implements ToXContentObject, Validatable { + + private final DataFrameAnalyticsConfigUpdate update; + + public UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdate update) { + this.update = update; + } + + public DataFrameAnalyticsConfigUpdate getUpdate() { + return update; + } + + @Override + public Optional validate() { + if (update == null) { + return Optional.of(ValidationException.withError("update requires a non-null data frame analytics config update")); + } + return Optional.empty(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return update.toXContent(builder, params); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + UpdateDataFrameAnalyticsRequest other = (UpdateDataFrameAnalyticsRequest) o; + return Objects.equals(update, other.update); + } + + @Override + public int hashCode() { + return Objects.hash(update); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java index 5e803f714cd..871426bab74 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfig.java @@ -47,16 +47,16 @@ public class DataFrameAnalyticsConfig implements ToXContentObject { return new Builder(); } - private static final ParseField ID = new ParseField("id"); - private static final ParseField DESCRIPTION = new ParseField("description"); - private static final ParseField SOURCE = new ParseField("source"); - private static final ParseField DEST = new ParseField("dest"); - private static final ParseField ANALYSIS = new ParseField("analysis"); - private static final ParseField ANALYZED_FIELDS = new ParseField("analyzed_fields"); - private static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit"); - private static final ParseField CREATE_TIME = new ParseField("create_time"); - private static final ParseField VERSION = new ParseField("version"); - private static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); + static final ParseField ID = new ParseField("id"); + static final ParseField DESCRIPTION = new ParseField("description"); + static final ParseField SOURCE = new ParseField("source"); + static final ParseField DEST = new ParseField("dest"); + static final ParseField ANALYSIS = new ParseField("analysis"); + static final ParseField ANALYZED_FIELDS = new ParseField("analyzed_fields"); + static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit"); + static final ParseField CREATE_TIME = new ParseField("create_time"); + static final ParseField VERSION = new ParseField("version"); + static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); private static final ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java new file mode 100644 index 00000000000..1d5ecb66577 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -0,0 +1,165 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.dataframe; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE; + +public class DataFrameAnalyticsConfigUpdate implements ToXContentObject { + + public static DataFrameAnalyticsConfigUpdate fromXContent(XContentParser parser) { + return PARSER.apply(parser, null).build(); + } + + public static Builder builder() { + return new Builder(); + } + + public static final ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_config_update", true, Builder::new); + + static { + PARSER.declareString(Builder::setId, DataFrameAnalyticsConfig.ID); + PARSER.declareStringOrNull(Builder::setDescription, DataFrameAnalyticsConfig.DESCRIPTION); + PARSER.declareField( + Builder::setModelMemoryLimit, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName()), + DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, + VALUE); + PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); + + } + + private final String id; + private final String description; + private final ByteSizeValue modelMemoryLimit; + private final Boolean allowLazyStart; + + private DataFrameAnalyticsConfigUpdate(String id, + @Nullable String description, + @Nullable ByteSizeValue modelMemoryLimit, + @Nullable Boolean allowLazyStart) { + this.id = id; + this.description = description; + this.modelMemoryLimit = modelMemoryLimit; + this.allowLazyStart = allowLazyStart; + } + + public String getId() { + return id; + } + + public String getDescription() { + return description; + } + + public ByteSizeValue getModelMemoryLimit() { + return modelMemoryLimit; + } + + public Boolean isAllowLazyStart() { + return allowLazyStart; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); + if (description != null) { + builder.field(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName(), description); + } + if (modelMemoryLimit != null) { + builder.field(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName(), modelMemoryLimit.getStringRep()); + } + if (allowLazyStart != null) { + builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof DataFrameAnalyticsConfigUpdate == false) { + return false; + } + + DataFrameAnalyticsConfigUpdate that = (DataFrameAnalyticsConfigUpdate) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.description, that.description) + && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) + && Objects.equals(this.allowLazyStart, that.allowLazyStart); + } + + @Override + public int hashCode() { + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + } + + public static class Builder { + + private String id; + private String description; + private ByteSizeValue modelMemoryLimit; + private Boolean allowLazyStart; + + private Builder() {} + + public String getId() { + return id; + } + + public Builder setId(String id) { + this.id = id; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setModelMemoryLimit(ByteSizeValue modelMemoryLimit) { + this.modelMemoryLimit = modelMemoryLimit; + return this; + } + + public Builder setAllowLazyStart(Boolean allowLazyStart) { + this.allowLazyStart = allowLazyStart; + return this; + } + + public DataFrameAnalyticsConfigUpdate build() { + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + } + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index fd99f06aeb9..61be15fdcfc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedRequestTests; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDatafeedRequest; +import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateModelSnapshotRequest; @@ -90,6 +91,7 @@ import org.elasticsearch.client.ml.calendars.ScheduledEventTests; import org.elasticsearch.client.ml.datafeed.DatafeedConfig; import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate; import org.elasticsearch.client.ml.dataframe.MlDataFrameAnalysisNamedXContentProvider; import org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider; import org.elasticsearch.client.ml.dataframe.stats.AnalysisStatsNamedXContentProvider; @@ -127,6 +129,7 @@ import java.util.List; import java.util.Map; import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigTests.randomDataFrameAnalyticsConfig; +import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdateTests.randomDataFrameAnalyticsConfigUpdate; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -740,6 +743,17 @@ public class MLRequestConvertersTests extends ESTestCase { } } + public void testUpdateDataFrameAnalytics() throws IOException { + UpdateDataFrameAnalyticsRequest updateRequest = new UpdateDataFrameAnalyticsRequest(randomDataFrameAnalyticsConfigUpdate()); + Request request = MLRequestConverters.updateDataFrameAnalytics(updateRequest); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals("/_ml/data_frame/analytics/" + updateRequest.getUpdate().getId() + "/_update", request.getEndpoint()); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) { + DataFrameAnalyticsConfigUpdate parsedUpdate = DataFrameAnalyticsConfigUpdate.fromXContent(parser); + assertThat(parsedUpdate, equalTo(updateRequest.getUpdate())); + } + } + public void testGetDataFrameAnalytics() { String configId1 = randomAlphaOfLength(10); String configId2 = randomAlphaOfLength(10); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index d9790f55b46..41160d29f2f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -115,6 +115,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedResponse; +import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateJobRequest; @@ -129,6 +130,7 @@ import org.elasticsearch.client.ml.datafeed.DatafeedState; import org.elasticsearch.client.ml.datafeed.DatafeedStats; import org.elasticsearch.client.ml.datafeed.DatafeedUpdate; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState; @@ -1425,6 +1427,33 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { assertThat(createdConfig.getDescription(), equalTo("this is a classification")); } + public void testUpdateDataFrameAnalytics() throws Exception { + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + String configId = "test-update-df-analytics-classification"; + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfig.builder() + .setId(configId) + .setSource(DataFrameAnalyticsSource.builder().setIndex("update-test-source-index").build()) + .setDest(DataFrameAnalyticsDest.builder().setIndex("update-test-dest-index").build()) + .setAnalysis(org.elasticsearch.client.ml.dataframe.Classification.builder("my_dependent_variable").build()) + .setDescription("this is a classification") + .build(); + + createIndex("update-test-source-index", defaultMappingForTest()); + + machineLearningClient.putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(config), RequestOptions.DEFAULT); + + UpdateDataFrameAnalyticsRequest request = + new UpdateDataFrameAnalyticsRequest( + DataFrameAnalyticsConfigUpdate.builder().setId(config.getId()).setDescription("Updated description").build()); + PutDataFrameAnalyticsResponse response = + execute(request, machineLearningClient::updateDataFrameAnalytics, machineLearningClient::updateDataFrameAnalyticsAsync); + assertThat(response.getConfig().getDescription(), equalTo("Updated description")); + + GetDataFrameAnalyticsRequest getRequest = new GetDataFrameAnalyticsRequest(config.getId()); + GetDataFrameAnalyticsResponse getResponse = machineLearningClient.getDataFrameAnalytics(getRequest, RequestOptions.DEFAULT); + assertThat(getResponse.getAnalytics().get(0).getDescription(), equalTo("Updated description")); + } + public void testGetDataFrameAnalyticsConfig_SingleConfig() throws Exception { MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); String configId = "get-test-config"; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 1c39864ec07..5d5dd86b19b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -129,6 +129,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedResponse; +import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateJobRequest; @@ -145,6 +146,7 @@ import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig; import org.elasticsearch.client.ml.dataframe.Classification; import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState; @@ -3081,6 +3083,67 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } } + public void testUpdateDataFrameAnalytics() throws Exception { + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); + + RestHighLevelClient client = highLevelClient(); + client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); + { + // tag::update-data-frame-analytics-config-update + DataFrameAnalyticsConfigUpdate update = DataFrameAnalyticsConfigUpdate.builder() + .setId("my-analytics-config") // <1> + .setDescription("new description") // <2> + .setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3> + .build(); + // end::update-data-frame-analytics-config-update + + // tag::update-data-frame-analytics-request + UpdateDataFrameAnalyticsRequest request = new UpdateDataFrameAnalyticsRequest(update); // <1> + // end::update-data-frame-analytics-request + + // tag::update-data-frame-analytics-execute + PutDataFrameAnalyticsResponse response = client.machineLearning().updateDataFrameAnalytics(request, RequestOptions.DEFAULT); + // end::update-data-frame-analytics-execute + + // tag::update-data-frame-analytics-response + DataFrameAnalyticsConfig updatedConfig = response.getConfig(); + // end::update-data-frame-analytics-response + + assertThat(updatedConfig.getDescription(), is(equalTo("new description"))); + assertThat(updatedConfig.getModelMemoryLimit(), is(equalTo(new ByteSizeValue(128, ByteSizeUnit.MB)))); + } + { + DataFrameAnalyticsConfigUpdate update = DataFrameAnalyticsConfigUpdate.builder() + .setId("my-analytics-config") + .build(); + UpdateDataFrameAnalyticsRequest request = new UpdateDataFrameAnalyticsRequest(update); + + // tag::update-data-frame-analytics-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(PutDataFrameAnalyticsResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::update-data-frame-analytics-execute-listener + + // Replace the empty listener by a blocking listener in test + CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::update-data-frame-analytics-execute-async + client.machineLearning().updateDataFrameAnalyticsAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::update-data-frame-analytics-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testDeleteDataFrameAnalytics() throws Exception { createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequestTests.java new file mode 100644 index 00000000000..3f90b782797 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/UpdateDataFrameAnalyticsRequestTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ml; + +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdateTests; +import org.elasticsearch.client.ml.dataframe.MlDataFrameAnalysisNamedXContentProvider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty; +import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; +import static org.hamcrest.Matchers.containsString; + +public class UpdateDataFrameAnalyticsRequestTests extends AbstractXContentTestCase { + + public void testValidate_Ok() { + assertThat(createTestInstance().validate(), isEmpty()); + } + + public void testValidate_Failure() { + Optional exception = new UpdateDataFrameAnalyticsRequest(null).validate(); + assertThat(exception, isPresent()); + assertThat(exception.get().getMessage(), containsString("update requires a non-null data frame analytics config")); + } + + @Override + protected UpdateDataFrameAnalyticsRequest createTestInstance() { + return new UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdateTests.randomDataFrameAnalyticsConfigUpdate()); + } + + @Override + protected UpdateDataFrameAnalyticsRequest doParseInstance(XContentParser parser) throws IOException { + return new UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdate.fromXContent(parser)); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + List namedXContent = new ArrayList<>(); + namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents()); + namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers()); + return new NamedXContentRegistry(namedXContent); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java index 4075fff6363..6b0d1ee7760 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -36,8 +36,8 @@ import java.util.Collections; import java.util.List; import java.util.function.Predicate; -import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSourceTests.randomSourceConfig; import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDestTests.randomDestConfig; +import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSourceTests.randomSourceConfig; import static org.elasticsearch.client.ml.dataframe.OutlierDetectionTests.randomOutlierDetection; public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java new file mode 100644 index 00000000000..7a113e86711 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ml.dataframe; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DataFrameAnalyticsConfigUpdateTests extends AbstractXContentTestCase { + + public static DataFrameAnalyticsConfigUpdate randomDataFrameAnalyticsConfigUpdate() { + DataFrameAnalyticsConfigUpdate.Builder builder = + DataFrameAnalyticsConfigUpdate.builder() + .setId(randomAlphaOfLengthBetween(1, 10)); + if (randomBoolean()) { + builder.setDescription(randomAlphaOfLength(20)); + } + if (randomBoolean()) { + builder.setModelMemoryLimit(new ByteSizeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + builder.setAllowLazyStart(randomBoolean()); + } + return builder.build(); + } + + @Override + protected DataFrameAnalyticsConfigUpdate createTestInstance() { + return randomDataFrameAnalyticsConfigUpdate(); + } + + @Override + protected DataFrameAnalyticsConfigUpdate doParseInstance(XContentParser parser) throws IOException { + return DataFrameAnalyticsConfigUpdate.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + List namedXContent = new ArrayList<>(); + namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents()); + namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers()); + return new NamedXContentRegistry(namedXContent); + } +} diff --git a/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc new file mode 100644 index 00000000000..b6df7d25d04 --- /dev/null +++ b/docs/java-rest/high-level/ml/update-data-frame-analytics.asciidoc @@ -0,0 +1,51 @@ +-- +:api: update-data-frame-analytics +:request: UpdateDataFrameAnalyticsRequest +:response: UpdateDataFrameAnalyticsResponse +-- +[role="xpack"] +[id="{upid}-{api}"] +=== Update {dfanalytics-jobs} API + +Updates an existing {dfanalytics-job}. +The API accepts an +{request}+ object as a request and returns an +{response}+. + +[id="{upid}-{api}-request"] +==== Update {dfanalytics-jobs} request + +An +{request}+ requires the following argument: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-request] +-------------------------------------------------- +<1> The configuration of the {dfanalytics-job} update to perform + +[id="{upid}-{api}-config"] +==== {dfanalytics-cap} configuration update + +The `DataFrameAnalyticsConfigUpdate` object contains all the details about the {dfanalytics-job} +configuration update and contains the following arguments: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-config-update] +-------------------------------------------------- +<1> The {dfanalytics-job} ID +<2> The human-readable description +<3> The memory limit for the model created as part of the analysis process + +[id="{upid}-{api}-query-config"] + + +include::../execution.asciidoc[] + +[id="{upid}-{api}-response"] +==== Response + +The returned +{response}+ contains the updated {dfanalytics-job}. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 4c0c793e954..c0ffa77bd37 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -324,6 +324,7 @@ The Java High Level REST Client supports the following Machine Learning APIs: * <<{upid}-get-data-frame-analytics>> * <<{upid}-get-data-frame-analytics-stats>> * <<{upid}-put-data-frame-analytics>> +* <<{upid}-update-data-frame-analytics>> * <<{upid}-delete-data-frame-analytics>> * <<{upid}-start-data-frame-analytics>> * <<{upid}-stop-data-frame-analytics>> @@ -381,6 +382,7 @@ include::ml/estimate-model-memory.asciidoc[] include::ml/get-data-frame-analytics.asciidoc[] include::ml/get-data-frame-analytics-stats.asciidoc[] include::ml/put-data-frame-analytics.asciidoc[] +include::ml/update-data-frame-analytics.asciidoc[] include::ml/delete-data-frame-analytics.asciidoc[] include::ml/start-data-frame-analytics.asciidoc[] include::ml/stop-data-frame-analytics.asciidoc[] diff --git a/docs/reference/ml/df-analytics/apis/index.asciidoc b/docs/reference/ml/df-analytics/apis/index.asciidoc index 80102bed330..fd3a7f099c5 100644 --- a/docs/reference/ml/df-analytics/apis/index.asciidoc +++ b/docs/reference/ml/df-analytics/apis/index.asciidoc @@ -6,6 +6,7 @@ You can use the following APIs to perform {ml} {dfanalytics} activities. * <> +* <> * <> * <> * <> @@ -28,6 +29,8 @@ See also <>. //CREATE include::put-dfanalytics.asciidoc[] include::put-inference.asciidoc[] +//UPDATE +include::update-dfanalytics.asciidoc[] //DELETE include::delete-dfanalytics.asciidoc[] include::delete-inference-trained-model.asciidoc[] diff --git a/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc new file mode 100644 index 00000000000..0c3a36e95e4 --- /dev/null +++ b/docs/reference/ml/df-analytics/apis/update-dfanalytics.asciidoc @@ -0,0 +1,98 @@ +[role="xpack"] +[testenv="platinum"] +[[update-dfanalytics]] +=== Update {dfanalytics-jobs} API +[subs="attributes"] +++++ +Update {dfanalytics-jobs} +++++ + +Updates an existing {dfanalytics-job}. + +experimental[] + +[[ml-update-dfanalytics-request]] +==== {api-request-title} + +`POST _ml/data_frame/analytics//_update` + + +[[ml-update-dfanalytics-prereq]] +==== {api-prereq-title} + +If the {es} {security-features} are enabled, you must have the following +built-in roles and privileges: + +* `machine_learning_admin` +* `kibana_admin` (UI only) + + +* source indices: `read`, `view_index_metadata` +* destination index: `read`, `create_index`, `manage` and `index` +* cluster: `monitor` (UI only) + +For more information, see <> and <>. + +NOTE: The {dfanalytics-job} remembers which roles the user who created it had at +the time of creation. When you start the job, it performs the analysis using +those same roles. If you provide +<>, +those credentials are used instead. + +[[ml-update-dfanalytics-desc]] +==== {api-description-title} + +This API updates an existing {dfanalytics-job} that performs an analysis on the source +indices and stores the outcome in a destination index. + + +[[ml-update-dfanalytics-path-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics-define] + +[role="child_attributes"] +[[ml-update-dfanalytics-request-body]] +==== {api-request-body-title} + +`allow_lazy_start`:: +(Optional, boolean) +Specifies whether this job can start when there is insufficient {ml} node +capacity for it to be immediately assigned to a node. The default is `false`; if +a {ml} node with capacity to run the job cannot immediately be found, the API +returns an error. However, this is also subject to the cluster-wide +`xpack.ml.max_lazy_ml_nodes` setting. See <>. If this +option is set to `true`, the API does not return an error and the job waits in +the `starting` state until sufficient {ml} node capacity is available. + +`description`:: +(Optional, string) +include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa] + +`model_memory_limit`:: +(Optional, string) +The approximate maximum amount of memory resources that are permitted for +analytical processing. The default value for {dfanalytics-jobs} is `1gb`. If +your `elasticsearch.yml` file contains an `xpack.ml.max_model_memory_limit` +setting, an error occurs when you try to create {dfanalytics-jobs} that have +`model_memory_limit` values greater than that setting. For more information, see +<>. + +[[ml-update-dfanalytics-example]] +==== {api-examples-title} + +[[ml-update-dfanalytics-example-preprocess]] +===== Updating model memory limit example + +The following example shows how to update the model memory limit for the existing {dfanalytics} configuration. + +[source,console] +-------------------------------------------------- +POST _ml/data_frame/analytics/model-flight-delays/_update +{ + "model_memory_limit": "200mb" +} +-------------------------------------------------- +// TEST[skip:setup kibana sample data] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 2ae783222e0..3b180b285d9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -139,6 +139,7 @@ import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; @@ -385,6 +386,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl PutDataFrameAnalyticsAction.INSTANCE, GetDataFrameAnalyticsAction.INSTANCE, GetDataFrameAnalyticsStatsAction.INSTANCE, + UpdateDataFrameAnalyticsAction.INSTANCE, DeleteDataFrameAnalyticsAction.INSTANCE, StartDataFrameAnalyticsAction.INSTANCE, StopDataFrameAnalyticsAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsAction.java new file mode 100644 index 00000000000..8f69dff9077 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsAction.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; + +import java.io.IOException; +import java.util.Objects; + +public class UpdateDataFrameAnalyticsAction extends ActionType { + + public static final UpdateDataFrameAnalyticsAction INSTANCE = new UpdateDataFrameAnalyticsAction(); + public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/update"; + + private UpdateDataFrameAnalyticsAction() { + super(NAME, PutDataFrameAnalyticsAction.Response::new); + } + + public static class Request extends AcknowledgedRequest implements ToXContentObject { + + /** + * Parses request. + */ + public static Request parseRequest(String id, XContentParser parser) { + DataFrameAnalyticsConfigUpdate.Builder updateBuilder = DataFrameAnalyticsConfigUpdate.PARSER.apply(parser, null); + if (updateBuilder.getId() == null) { + updateBuilder.setId(id); + } else if (!Strings.isNullOrEmpty(id) && !id.equals(updateBuilder.getId())) { + // If we have both URI and body ID, they must be identical + throw new IllegalArgumentException( + Messages.getMessage(Messages.INCONSISTENT_ID, DataFrameAnalyticsConfig.ID, updateBuilder.getId(), id)); + } + + return new UpdateDataFrameAnalyticsAction.Request(updateBuilder.build()); + } + + private DataFrameAnalyticsConfigUpdate update; + + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + update = new DataFrameAnalyticsConfigUpdate(in); + } + + public Request(DataFrameAnalyticsConfigUpdate update) { + this.update = update; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + update.writeTo(out); + } + + public DataFrameAnalyticsConfigUpdate getUpdate() { + return update; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + update.toXContent(builder, params); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UpdateDataFrameAnalyticsAction.Request request = (UpdateDataFrameAnalyticsAction.Request) o; + return Objects.equals(update, request.update); + } + + @Override + public int hashCode() { + return Objects.hash(update); + } + } + + public static class RequestBuilder + extends MasterNodeOperationRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, UpdateDataFrameAnalyticsAction action) { + super(client, action, new Request()); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 5b2f14f4e08..d17b96e2c52 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -386,12 +386,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (indicesOptions != null) { builder.setIndicesOptions(indicesOptions); } - if (headers.isEmpty() == false) { - // Adjust the request, adding security headers from the current thread context builder.setHeaders(filterSecurityHeaders(headers)); } - return builder.build(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java new file mode 100644 index 00000000000..c2fea3932e3 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdate.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE; + +public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObject { + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("data_frame_analytics_config_update", args -> new Builder((String) args[0])); + + static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameAnalyticsConfig.ID); + PARSER.declareStringOrNull(Builder::setDescription, DataFrameAnalyticsConfig.DESCRIPTION); + PARSER.declareField( + Builder::setModelMemoryLimit, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName()), + DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT, + VALUE); + PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START); + + } + + private final String id; + private final String description; + private final ByteSizeValue modelMemoryLimit; + private final Boolean allowLazyStart; + + private DataFrameAnalyticsConfigUpdate(String id, + @Nullable String description, + @Nullable ByteSizeValue modelMemoryLimit, + @Nullable Boolean allowLazyStart) { + this.id = id; + this.description = description; + this.modelMemoryLimit = modelMemoryLimit; + this.allowLazyStart = allowLazyStart; + } + + public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException { + this.id = in.readString(); + this.description = in.readOptionalString(); + this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new); + this.allowLazyStart = in.readOptionalBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeOptionalString(description); + out.writeOptionalWriteable(modelMemoryLimit); + out.writeOptionalBoolean(allowLazyStart); + } + + public String getId() { + return id; + } + + public String getDescription() { + return description; + } + + public ByteSizeValue getModelMemoryLimit() { + return modelMemoryLimit; + } + + public Boolean isAllowLazyStart() { + return allowLazyStart; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); + if (description != null) { + builder.field(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName(), description); + } + if (modelMemoryLimit != null) { + builder.field(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName(), modelMemoryLimit.getStringRep()); + } + if (allowLazyStart != null) { + builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart); + } + builder.endObject(); + return builder; + } + + /** + * Updates {@code source} with the new values in this object returning a new {@link DataFrameAnalyticsConfig}. + * + * @param source Source config to be updated + * @return A new config equivalent to {@code source} updated. + */ + public DataFrameAnalyticsConfig.Builder mergeWithConfig(DataFrameAnalyticsConfig source) { + if (id.equals(source.getId()) == false) { + throw new IllegalArgumentException("Cannot apply update to a config with different id"); + } + + DataFrameAnalyticsConfig.Builder builder = new DataFrameAnalyticsConfig.Builder(source); + if (description != null) { + builder.setDescription(description); + } + if (modelMemoryLimit != null) { + builder.setModelMemoryLimit(modelMemoryLimit); + } + if (allowLazyStart != null) { + builder.setAllowLazyStart(allowLazyStart); + } + return builder; + } + + /** + * Whether this update applied to the given source config requires analytics task restart. + */ + public boolean requiresRestart(DataFrameAnalyticsConfig source) { + return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other instanceof DataFrameAnalyticsConfigUpdate == false) { + return false; + } + + DataFrameAnalyticsConfigUpdate that = (DataFrameAnalyticsConfigUpdate) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.description, that.description) + && Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit) + && Objects.equals(this.allowLazyStart, that.allowLazyStart); + } + + @Override + public int hashCode() { + return Objects.hash(id, description, modelMemoryLimit, allowLazyStart); + } + + public static class Builder { + + private String id; + private String description; + private ByteSizeValue modelMemoryLimit; + private Boolean allowLazyStart; + + public Builder(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public Builder setId(String id) { + this.id = id; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setModelMemoryLimit(ByteSizeValue modelMemoryLimit) { + this.modelMemoryLimit = modelMemoryLimit; + return this; + } + + public Builder setAllowLazyStart(Boolean allowLazyStart) { + this.allowLazyStart = allowLazyStart; + return this; + } + + public DataFrameAnalyticsConfigUpdate build() { + return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 5287ce03531..868dce9042c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -74,6 +74,7 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results"; + public static final String DATA_FRAME_ANALYTICS_CANNOT_UPDATE_IN_CURRENT_STATE = "Cannot update analytics [{0}] unless it''s stopped"; public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsActionRequestTests.java new file mode 100644 index 00000000000..3c51c41e966 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDataFrameAnalyticsActionRequestTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction.Request; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdateTests; + +import java.io.IOException; + +import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; + +public class UpdateDataFrameAnalyticsActionRequestTests extends AbstractSerializingTestCase { + + @Override + protected Request createTestInstance() { + return new Request(DataFrameAnalyticsConfigUpdateTests.randomUpdate(randomValidId())); + } + + @Override + protected Request doParseInstance(XContentParser parser) { + return Request.parseRequest(null, parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + public void testParseRequest() throws IOException { + try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{}")) { + Request request = Request.parseRequest("id-from-param", parser); + assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("id-from-param").build()))); + } + try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"id-from-body\"}")) { + Request request = Request.parseRequest(null, parser); + assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("id-from-body").build()))); + } + try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"same-id\"}")) { + Request request = Request.parseRequest("same-id", parser); + assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("same-id").build()))); + } + try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"id-from-body\"}")) { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> Request.parseRequest("id-from-param", parser)); + assertThat(e.getMessage(), startsWith("Inconsistent id")); + } + } + + public void testDefaultTimeout() { + assertThat(createTestInstance().timeout(), is(notNullValue())); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java new file mode 100644 index 00000000000..ae6bc9952bb --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigUpdateTests.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTestCase { + + @Override + protected DataFrameAnalyticsConfigUpdate doParseInstance(XContentParser parser) throws IOException { + return DataFrameAnalyticsConfigUpdate.PARSER.apply(parser, null).build(); + } + + @Override + protected DataFrameAnalyticsConfigUpdate createTestInstance() { + return randomUpdate(randomValidId()); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataFrameAnalyticsConfigUpdate::new; + } + + public static DataFrameAnalyticsConfigUpdate randomUpdate(String id) { + DataFrameAnalyticsConfigUpdate.Builder builder = new DataFrameAnalyticsConfigUpdate.Builder(id); + if (randomBoolean()) { + builder.setDescription(randomAlphaOfLength(20)); + } + if (randomBoolean()) { + builder.setModelMemoryLimit(new ByteSizeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + builder.setAllowLazyStart(randomBoolean()); + } + return builder.build(); + } + + public void testMergeWithConfig_UpdatedDescription() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setDescription("old description").build(); + DataFrameAnalyticsConfigUpdate update = + new DataFrameAnalyticsConfigUpdate.Builder(id).setDescription("new description").build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setDescription("new description").build()))); + } + + public void testMergeWithConfig_UpdatedModelMemoryLimit() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setModelMemoryLimit(new ByteSizeValue(1024)).build(); + DataFrameAnalyticsConfigUpdate update = + new DataFrameAnalyticsConfigUpdate.Builder(id).setModelMemoryLimit(new ByteSizeValue(2048)).build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setModelMemoryLimit(new ByteSizeValue(2048)).build()))); + } + + public void testMergeWithConfig_UpdatedAllowLazyStart() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setAllowLazyStart(false).build(); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setAllowLazyStart(true).build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build()))); + } + + public void testMergeWithConfig_UpdatedAllUpdatableProperties() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id) + .setDescription("old description") + .setModelMemoryLimit(new ByteSizeValue(1024)) + .setAllowLazyStart(false) + .build(); + DataFrameAnalyticsConfigUpdate update = + new DataFrameAnalyticsConfigUpdate.Builder(id) + .setDescription("new description") + .setModelMemoryLimit(new ByteSizeValue(2048)) + .setAllowLazyStart(true) + .build(); + assertThat( + update.mergeWithConfig(config).build(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(config) + .setDescription("new description") + .setModelMemoryLimit(new ByteSizeValue(2048)) + .setAllowLazyStart(true) + .build()))); + } + + public void testMergeWithConfig_NoopUpdate() { + String id = randomValidId(); + + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id); + DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).build(); + assertThat(update.mergeWithConfig(config).build(), is(equalTo(config))); + } + + public void testMergeWithConfig_GivenRandomUpdates_AssertImmutability() { + String id = randomValidId(); + + for (int i = 0; i < 100; ++i) { + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id); + DataFrameAnalyticsConfigUpdate update; + do { + update = randomUpdate(id); + } while (isNoop(config, update)); + + assertThat(update.mergeWithConfig(config).build(), is(not(equalTo(config)))); + } + } + + public void testMergeWithConfig_failBecauseTargetConfigHasDifferentId() { + String id = randomValidId(); + + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> randomUpdate(id + "_2").mergeWithConfig(config)); + assertThat(e.getMessage(), containsString("different id")); + } + + public void testRequiresRestart_DescriptionUpdateDoesNotRequireRestart() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setDescription("old description").build(); + DataFrameAnalyticsConfigUpdate update = + new DataFrameAnalyticsConfigUpdate.Builder(id).setDescription("new description").build(); + + assertThat(update.requiresRestart(config), is(false)); + } + + public void testRequiresRestart_ModelMemoryLimitUpdateRequiresRestart() { + String id = randomValidId(); + DataFrameAnalyticsConfig config = + DataFrameAnalyticsConfigTests.createRandomBuilder(id).setModelMemoryLimit(new ByteSizeValue(1024)).build(); + DataFrameAnalyticsConfigUpdate update = + new DataFrameAnalyticsConfigUpdate.Builder(id).setModelMemoryLimit(new ByteSizeValue(2048)).build(); + + assertThat(update.requiresRestart(config), is(true)); + } + + private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) { + return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription())) + && (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit())) + && (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart())); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index a18620037de..f26a59a9e86 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -300,13 +300,12 @@ public class JobUpdateTests extends AbstractSerializingTestCase { public void testMergeWithJob_GivenRandomUpdates_AssertImmutability() { for (int i = 0; i < 100; ++i) { Job job = JobTests.createRandomizedJob(); - JobUpdate update = createRandom(job.getId(), job); - while (update.isNoop(job)) { + JobUpdate update; + do { update = createRandom(job.getId(), job); - } + } while (update.isNoop(job)); Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L)); - assertThat(job, not(equalTo(updatedJob))); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index f943d1f02b5..61b607825e8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; @@ -656,6 +657,29 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L)); } + public void testUpdateAnalytics() throws Exception { + initialize("update_analytics_description"); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); + putAnalytics(config); + assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(nullValue())); + + updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).setDescription("updated-description-1").build()); + assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-1"))); + + // Noop update + updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).build()); + assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-1"))); + + updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).setDescription("updated-description-2").build()); + assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2"))); + } + + private static T getOnlyElement(List list) { + assertThat(list, hasSize(1)); + return list.get(0); + } + private void initialize(String jobId) { initialize(jobId, false); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index f56c11aa060..4f40abf54c4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -36,7 +36,9 @@ import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; @@ -121,6 +123,11 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest return client().execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet(); } + protected PutDataFrameAnalyticsAction.Response updateAnalytics(DataFrameAnalyticsConfigUpdate update) { + UpdateDataFrameAnalyticsAction.Request request = new UpdateDataFrameAnalyticsAction.Request(update); + return client().execute(UpdateDataFrameAnalyticsAction.INSTANCE, request).actionGet(); + } + protected AcknowledgedResponse deleteAnalytics(String id) { DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id); return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet(); diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java new file mode 100644 index 00000000000..4c75173dcc8 --- /dev/null +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java @@ -0,0 +1,373 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; +import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; +import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase { + + private DataFrameAnalyticsConfigProvider configProvider; + + @Before + public void createComponents() throws Exception { + configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry()); + waitForMlTemplates(); + } + + public void testGet_ConfigDoesNotExist() throws InterruptedException { + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> configProvider.get("missing", actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(nullValue())); + assertThat(exceptionHolder.get(), is(notNullValue())); + assertThat(exceptionHolder.get(), is(instanceOf(ResourceNotFoundException.class))); + } + + public void testPutAndGet() throws InterruptedException { + String configId = "config-id"; + // Create valid config + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(configId); + { // Put the config and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall( + actionListener -> configProvider.put(config, emptyMap(), actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat(configHolder.get(), is(equalTo(config))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Get the config back and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> configProvider.get(configId, actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat(configHolder.get(), is(equalTo(config))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + } + + public void testPutAndGet_WithSecurityHeaders() throws InterruptedException { + String configId = "config-id"; + DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(configId); + Map securityHeaders = Collections.singletonMap("_xpack_security_authentication", "dummy"); + { // Put the config and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> configProvider.put(config, securityHeaders, actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat( + configHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(config) + .setHeaders(securityHeaders) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Get the config back and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(actionListener -> configProvider.get(configId, actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat( + configHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(config) + .setHeaders(securityHeaders) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + } + + public void testPut_ConfigAlreadyExists() throws InterruptedException { + String configId = "config-id"; + { // Put the config and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId); + blockingCall( + actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat(configHolder.get(), is(equalTo(initialConfig))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Try putting the config with the same id and verify the response + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfig configWithSameId = DataFrameAnalyticsConfigTests.createRandom(configId); + blockingCall( + actionListener -> configProvider.put(configWithSameId, emptyMap(), actionListener), + configHolder, + exceptionHolder); + + assertThat(configHolder.get(), is(nullValue())); + assertThat(exceptionHolder.get(), is(notNullValue())); + assertThat(exceptionHolder.get(), is(instanceOf(ResourceAlreadyExistsException.class))); + } + } + + public void testUpdate() throws Exception { + String configId = "config-id"; + DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId); + { + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall( + actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat(configHolder.get(), is(equalTo(initialConfig))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Update that changes description + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = + new DataFrameAnalyticsConfigUpdate.Builder(configId) + .setDescription("description-1") + .build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(notNullValue())); + assertThat( + updatedConfigHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(initialConfig) + .setDescription("description-1") + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Update that changes model memory limit + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = + new DataFrameAnalyticsConfigUpdate.Builder(configId) + .setModelMemoryLimit(new ByteSizeValue(1024)) + .build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(notNullValue())); + assertThat( + updatedConfigHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(initialConfig) + .setDescription("description-1") + .setModelMemoryLimit(new ByteSizeValue(1024)) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Noop update + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder(configId).build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(notNullValue())); + assertThat( + updatedConfigHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(initialConfig) + .setDescription("description-1") + .setModelMemoryLimit(new ByteSizeValue(1024)) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Update that changes both description and model memory limit + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = + new DataFrameAnalyticsConfigUpdate.Builder(configId) + .setDescription("description-2") + .setModelMemoryLimit(new ByteSizeValue(2048)) + .build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(notNullValue())); + assertThat( + updatedConfigHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(initialConfig) + .setDescription("description-2") + .setModelMemoryLimit(new ByteSizeValue(2048)) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Update that applies security headers + Map securityHeaders = Collections.singletonMap("_xpack_security_authentication", "dummy"); + + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder(configId).build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, securityHeaders, ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(notNullValue())); + assertThat( + updatedConfigHolder.get(), + is(equalTo( + new DataFrameAnalyticsConfig.Builder(initialConfig) + .setDescription("description-2") + .setModelMemoryLimit(new ByteSizeValue(2048)) + .setHeaders(securityHeaders) + .build()))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + } + + public void testUpdate_ConfigDoesNotExist() throws InterruptedException { + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder("missing").build(); + + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(nullValue())); + assertThat(exceptionHolder.get(), is(notNullValue())); + assertThat(exceptionHolder.get(), is(instanceOf(ResourceNotFoundException.class))); + } + + public void testUpdate_UpdateCannotBeAppliedWhenTaskIsRunning() throws InterruptedException { + String configId = "config-id"; + DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId); + { + AtomicReference configHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall( + actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder); + + assertThat(configHolder.get(), is(notNullValue())); + assertThat(configHolder.get(), is(equalTo(initialConfig))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + { // Update that tries to change model memory limit while the analytics is running + AtomicReference updatedConfigHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + DataFrameAnalyticsConfigUpdate configUpdate = + new DataFrameAnalyticsConfigUpdate.Builder(configId) + .setModelMemoryLimit(new ByteSizeValue(2048, ByteSizeUnit.MB)) + .build(); + + ClusterState clusterState = clusterStateWithRunningAnalyticsTask(configId, DataFrameAnalyticsState.ANALYZING); + blockingCall( + actionListener -> configProvider.update(configUpdate, emptyMap(), clusterState, actionListener), + updatedConfigHolder, + exceptionHolder); + + assertThat(updatedConfigHolder.get(), is(nullValue())); + assertThat(exceptionHolder.get(), is(notNullValue())); + assertThat(exceptionHolder.get(), is(instanceOf(ElasticsearchStatusException.class))); + ElasticsearchStatusException e = (ElasticsearchStatusException) exceptionHolder.get(); + assertThat(e.status(), is(equalTo(RestStatus.CONFLICT))); + assertThat(e.getMessage(), is(equalTo("Cannot update analytics [config-id] unless it's stopped"))); + } + } + + private static ClusterState clusterStateWithRunningAnalyticsTask(String analyticsId, DataFrameAnalyticsState analyticsState) { + PersistentTasksCustomMetadata.Builder builder = PersistentTasksCustomMetadata.builder(); + builder.addTask( + MlTasks.dataFrameAnalyticsTaskId(analyticsId), + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, + new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, emptyList(), false), + new PersistentTasksCustomMetadata.Assignment("node", "test assignment")); + builder.updateTaskState( + MlTasks.dataFrameAnalyticsTaskId(analyticsId), + new DataFrameAnalyticsTaskState(analyticsState, builder.getLastAllocationId(), null)); + PersistentTasksCustomMetadata tasks = builder.build(); + + return ClusterState.builder(new ClusterName("cluster")) + .metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks).build()) + .build(); + } + + @Override + public NamedXContentRegistry xContentRegistry() { + List namedXContent = new ArrayList<>(); + namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers()); + namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents()); + return new NamedXContentRegistry(namedXContent); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 15a91b72047..13dac083aa2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -125,6 +125,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -197,6 +198,7 @@ import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportUpdateCalendarJobAction; +import org.elasticsearch.xpack.ml.action.TransportUpdateDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.action.TransportUpdateDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportUpdateFilterAction; import org.elasticsearch.xpack.ml.action.TransportUpdateJobAction; @@ -276,6 +278,7 @@ import org.elasticsearch.xpack.ml.rest.dataframe.RestEvaluateDataFrameAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsStatsAction; +import org.elasticsearch.xpack.ml.rest.dataframe.RestPostDataFrameAnalyticsUpdateAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestPutDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestStartDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestStopDataFrameAnalyticsAction; @@ -828,6 +831,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, new RestGetDataFrameAnalyticsAction(), new RestGetDataFrameAnalyticsStatsAction(), new RestPutDataFrameAnalyticsAction(), + new RestPostDataFrameAnalyticsUpdateAction(), new RestDeleteDataFrameAnalyticsAction(), new RestStartDataFrameAnalyticsAction(), new RestStopDataFrameAnalyticsAction(), @@ -905,6 +909,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, new ActionHandler<>(GetDataFrameAnalyticsAction.INSTANCE, TransportGetDataFrameAnalyticsAction.class), new ActionHandler<>(GetDataFrameAnalyticsStatsAction.INSTANCE, TransportGetDataFrameAnalyticsStatsAction.class), new ActionHandler<>(PutDataFrameAnalyticsAction.INSTANCE, TransportPutDataFrameAnalyticsAction.class), + new ActionHandler<>(UpdateDataFrameAnalyticsAction.INSTANCE, TransportUpdateDataFrameAnalyticsAction.class), new ActionHandler<>(DeleteDataFrameAnalyticsAction.INSTANCE, TransportDeleteDataFrameAnalyticsAction.class), new ActionHandler<>(StartDataFrameAnalyticsAction.INSTANCE, TransportStartDataFrameAnalyticsAction.class), new ActionHandler<>(StopDataFrameAnalyticsAction.INSTANCE, TransportStopDataFrameAnalyticsAction.class), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java index 60ceef93e7f..4d42356191f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -169,7 +168,7 @@ public class TransportPutDataFrameAnalyticsAction preparedForPutConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(preparedForPutConfig)), + unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(preparedForPutConfig)), listener::onFailure )); } @@ -183,7 +182,7 @@ public class TransportPutDataFrameAnalyticsAction memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), + unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), listener::onFailure )); } else { @@ -203,7 +202,7 @@ public class TransportPutDataFrameAnalyticsAction private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, Map headers, - ActionListener listener) { + ActionListener listener) { ClusterState clusterState = clusterService.state(); if (clusterState == null) { logger.warn("Cannot update doc mapping because clusterState == null"); @@ -221,7 +220,7 @@ public class TransportPutDataFrameAnalyticsAction auditor.info( config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, config.getAnalysis().getWriteableName())); - listener.onResponse(indexResponse); + listener.onResponse(config); }, listener::onFailure)), listener::onFailure)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDataFrameAnalyticsAction.java new file mode 100644 index 00000000000..af58243fcd3 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDataFrameAnalyticsAction.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; + +public class TransportUpdateDataFrameAnalyticsAction + extends TransportMasterNodeAction { + + private final XPackLicenseState licenseState; + private final DataFrameAnalyticsConfigProvider configProvider; + private final SecurityContext securityContext; + + @Inject + public TransportUpdateDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters, + XPackLicenseState licenseState, ThreadPool threadPool, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + DataFrameAnalyticsConfigProvider configProvider) { + super(UpdateDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters, + UpdateDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver); + this.licenseState = licenseState; + this.configProvider = configProvider; + this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) + ? new SecurityContext(settings, threadPool.getThreadContext()) + : null; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected PutDataFrameAnalyticsAction.Response read(StreamInput in) throws IOException { + return new PutDataFrameAnalyticsAction.Response(in); + } + + @Override + protected ClusterBlockException checkBlock(UpdateDataFrameAnalyticsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(UpdateDataFrameAnalyticsAction.Request request, ClusterState state, + ActionListener listener) { + + useSecondaryAuthIfAvailable(securityContext, () -> { + Map headers = threadPool.getThreadContext().getHeaders(); + configProvider.update( + request.getUpdate(), + headers, + state, + ActionListener.wrap( + updatedConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(updatedConfig)), + listener::onFailure)); + }); + + } + + @Override + protected void doExecute(Task task, UpdateDataFrameAnalyticsAction.Request request, + ActionListener listener) { + if (licenseState.isAllowed(XPackLicenseState.Feature.MACHINE_LEARNING)) { + super.doExecute(task, request, listener); + } else { + listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java index 6d3d90d5440..7e4f3e19f40 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java @@ -10,14 +10,18 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.get.GetAction; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -29,11 +33,16 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.ml.MlConfigIndex; +import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; @@ -68,28 +77,115 @@ public class DataFrameAnalyticsConfigProvider { this.xContentRegistry = xContentRegistry; } - public void put(DataFrameAnalyticsConfig config, Map headers, ActionListener listener) { - String id = config.getId(); - + /** + * Puts the given {@link DataFrameAnalyticsConfig} document into the config index. + */ + public void put(DataFrameAnalyticsConfig config, Map headers, ActionListener listener) { if (headers.isEmpty() == false) { // Filter any values in headers that aren't security fields config = new DataFrameAnalyticsConfig.Builder(config) .setHeaders(filterSecurityHeaders(headers)) .build(); } + index(config, null, listener); + } + + /** + * Updates the {@link DataFrameAnalyticsConfig} document in the config index using given {@link DataFrameAnalyticsConfigUpdate}. + */ + public void update(DataFrameAnalyticsConfigUpdate update, + Map headers, + ClusterState clusterState, + ActionListener listener) { + String id = update.getId(); + GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(id)); + executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap( + getResponse -> { + + // Fail the update request if the config to be updated doesn't exist + if (getResponse.isExists() == false) { + listener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(id)); + return; + } + + // Parse the original config + DataFrameAnalyticsConfig originalConfig; + try { + try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + originalConfig = DataFrameAnalyticsConfig.LENIENT_PARSER.apply(parser, null).build(); + } + } catch (IOException e) { + listener.onFailure( + new ElasticsearchParseException("Failed to parse data frame analytics configuration [" + id + "]", e)); + return; + } + + // Check that the update can be applied given current analytics state + checkUpdateCanBeApplied(originalConfig, update, clusterState); + + // Merge the original config with the given update object + DataFrameAnalyticsConfig.Builder updatedConfigBuilder = update.mergeWithConfig(originalConfig); + if (headers.isEmpty() == false) { + updatedConfigBuilder.setHeaders(filterSecurityHeaders(headers)); + } + DataFrameAnalyticsConfig updatedConfig = updatedConfigBuilder.build(); + + // Index the update config + index(updatedConfig, getResponse, listener); + }, + listener::onFailure + )); + } + + private static void checkUpdateCanBeApplied(DataFrameAnalyticsConfig originalConfig, + DataFrameAnalyticsConfigUpdate update, + ClusterState clusterState) { + String analyticsId = update.getId(); + PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(analyticsId, tasks); + if (DataFrameAnalyticsState.STOPPED.equals(analyticsState)) { + // Analytics is stopped, therefore it is safe to proceed with the udpate + return; + } + if (update.requiresRestart(originalConfig)) { + throw ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_CANNOT_UPDATE_IN_CURRENT_STATE, analyticsId, analyticsState)); + } + } + + /** + * Indexes the new version of {@link DataFrameAnalyticsConfig} document into the config index. + * + * @param config config object to be indexed + * @param getResponse {@link GetResponse} coming from requesting the previous version of the config. + * If null, this config is indexed for the first time + * @param listener listener to be called after indexing + */ + private void index(DataFrameAnalyticsConfig config, + @Nullable GetResponse getResponse, + ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequest indexRequest = new IndexRequest(MlConfigIndex.indexName()) - .id(DataFrameAnalyticsConfig.documentId(config.getId())) - .opType(DocWriteRequest.OpType.CREATE) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(builder); + .id(DataFrameAnalyticsConfig.documentId(config.getId())) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(builder); + if (getResponse == null) { + indexRequest.opType(DocWriteRequest.OpType.CREATE); + } else { + indexRequest + .opType(DocWriteRequest.OpType.INDEX) + .setIfSeqNo(getResponse.getSeqNo()) + .setIfPrimaryTerm(getResponse.getPrimaryTerm()); + } executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( - listener::onResponse, + indexResponse -> listener.onResponse(config), e -> { if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) { - listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(id)); + listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(config.getId())); } else { listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 65c3cd25c67..edec83ef472 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -244,7 +244,6 @@ public class JobConfigProvider { return; } - final long version = getResponse.getVersion(); final long seqNo = getResponse.getSeqNo(); final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPostDataFrameAnalyticsUpdateAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPostDataFrameAnalyticsUpdateAction.java new file mode 100644 index 00000000000..b6d4115be81 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestPostDataFrameAnalyticsUpdateAction.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.dataframe; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestPostDataFrameAnalyticsUpdateAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList( + new Route( + POST, MachineLearning.BASE_PATH + "data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}/_update")); + } + + @Override + public String getName() { + return "xpack_ml_post_data_frame_analytics_update_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName()); + XContentParser parser = restRequest.contentParser(); + UpdateDataFrameAnalyticsAction.Request updateRequest = UpdateDataFrameAnalyticsAction.Request.parseRequest(id, parser); + updateRequest.timeout(restRequest.paramAsTime("timeout", updateRequest.timeout())); + + return channel -> client.execute(UpdateDataFrameAnalyticsAction.INSTANCE, updateRequest, new RestToXContentListener<>(channel)); + } +}