[7.x] Introduce DataFrameAnalyticsConfig update API (#58302) (#58648)

This commit is contained in:
Przemysław Witek 2020-06-29 10:56:11 +02:00 committed by GitHub
parent 8f82ec0b19
commit 3f7c45472e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1919 additions and 34 deletions

View File

@ -81,6 +81,7 @@ import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateFilterRequest;
import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest;
@ -616,6 +617,17 @@ final class MLRequestConverters {
return request; return request;
} }
static Request updateDataFrameAnalytics(UpdateDataFrameAnalyticsRequest updateRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml", "data_frame", "analytics")
.addPathPart(updateRequest.getUpdate().getId())
.addPathPartAsIs("_update")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(updateRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request getDataFrameAnalytics(GetDataFrameAnalyticsRequest getRequest) { static Request getDataFrameAnalytics(GetDataFrameAnalyticsRequest getRequest) {
String endpoint = new EndpointBuilder() String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml", "data_frame", "analytics") .addPathPartAsIs("_ml", "data_frame", "analytics")

View File

@ -115,6 +115,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse; import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateFilterRequest;
import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest;
@ -2042,6 +2043,52 @@ public final class MachineLearningClient {
Collections.emptySet()); Collections.emptySet());
} }
/**
* Updates a Data Frame Analytics config
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/update-dfanalytics.html">
* PUT Data Frame Analytics documentation</a>
*
* @param request The {@link UpdateDataFrameAnalyticsRequest} containing the
* {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate}
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return The {@link PutDataFrameAnalyticsResponse} containing the updated
* {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig}
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public PutDataFrameAnalyticsResponse updateDataFrameAnalytics(UpdateDataFrameAnalyticsRequest request,
RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::updateDataFrameAnalytics,
options,
PutDataFrameAnalyticsResponse::fromXContent,
Collections.emptySet());
}
/**
* Updates a Data Frame Analytics config asynchronously and notifies listener upon completion
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/update-dfanalytics.html">
* Update Data Frame Analytics documentation</a>
*
* @param request The {@link UpdateDataFrameAnalyticsRequest} containing the
* {@link org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate}
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable updateDataFrameAnalyticsAsync(UpdateDataFrameAnalyticsRequest request, RequestOptions options,
ActionListener<PutDataFrameAnalyticsResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::updateDataFrameAnalytics,
options,
PutDataFrameAnalyticsResponse::fromXContent,
listener,
Collections.emptySet());
}
/** /**
* Gets a single or multiple Data Frame Analytics configs * Gets a single or multiple Data Frame Analytics configs
* <p> * <p>

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
public class UpdateDataFrameAnalyticsRequest implements ToXContentObject, Validatable {
private final DataFrameAnalyticsConfigUpdate update;
public UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdate update) {
this.update = update;
}
public DataFrameAnalyticsConfigUpdate getUpdate() {
return update;
}
@Override
public Optional<ValidationException> validate() {
if (update == null) {
return Optional.of(ValidationException.withError("update requires a non-null data frame analytics config update"));
}
return Optional.empty();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return update.toXContent(builder, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateDataFrameAnalyticsRequest other = (UpdateDataFrameAnalyticsRequest) o;
return Objects.equals(update, other.update);
}
@Override
public int hashCode() {
return Objects.hash(update);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -47,16 +47,16 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
return new Builder(); return new Builder();
} }
private static final ParseField ID = new ParseField("id"); static final ParseField ID = new ParseField("id");
private static final ParseField DESCRIPTION = new ParseField("description"); static final ParseField DESCRIPTION = new ParseField("description");
private static final ParseField SOURCE = new ParseField("source"); static final ParseField SOURCE = new ParseField("source");
private static final ParseField DEST = new ParseField("dest"); static final ParseField DEST = new ParseField("dest");
private static final ParseField ANALYSIS = new ParseField("analysis"); static final ParseField ANALYSIS = new ParseField("analysis");
private static final ParseField ANALYZED_FIELDS = new ParseField("analyzed_fields"); static final ParseField ANALYZED_FIELDS = new ParseField("analyzed_fields");
private static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit"); static final ParseField MODEL_MEMORY_LIMIT = new ParseField("model_memory_limit");
private static final ParseField CREATE_TIME = new ParseField("create_time"); static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ParseField VERSION = new ParseField("version"); static final ParseField VERSION = new ParseField("version");
private static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start"); static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new); private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);

View File

@ -0,0 +1,165 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE;
public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
public static DataFrameAnalyticsConfigUpdate fromXContent(XContentParser parser) {
return PARSER.apply(parser, null).build();
}
public static Builder builder() {
return new Builder();
}
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config_update", true, Builder::new);
static {
PARSER.declareString(Builder::setId, DataFrameAnalyticsConfig.ID);
PARSER.declareStringOrNull(Builder::setDescription, DataFrameAnalyticsConfig.DESCRIPTION);
PARSER.declareField(
Builder::setModelMemoryLimit,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName()),
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
VALUE);
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
}
private final String id;
private final String description;
private final ByteSizeValue modelMemoryLimit;
private final Boolean allowLazyStart;
private DataFrameAnalyticsConfigUpdate(String id,
@Nullable String description,
@Nullable ByteSizeValue modelMemoryLimit,
@Nullable Boolean allowLazyStart) {
this.id = id;
this.description = description;
this.modelMemoryLimit = modelMemoryLimit;
this.allowLazyStart = allowLazyStart;
}
public String getId() {
return id;
}
public String getDescription() {
return description;
}
public ByteSizeValue getModelMemoryLimit() {
return modelMemoryLimit;
}
public Boolean isAllowLazyStart() {
return allowLazyStart;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
if (description != null) {
builder.field(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName(), description);
}
if (modelMemoryLimit != null) {
builder.field(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName(), modelMemoryLimit.getStringRep());
}
if (allowLazyStart != null) {
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof DataFrameAnalyticsConfigUpdate == false) {
return false;
}
DataFrameAnalyticsConfigUpdate that = (DataFrameAnalyticsConfigUpdate) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
&& Objects.equals(this.allowLazyStart, that.allowLazyStart);
}
@Override
public int hashCode() {
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart);
}
public static class Builder {
private String id;
private String description;
private ByteSizeValue modelMemoryLimit;
private Boolean allowLazyStart;
private Builder() {}
public String getId() {
return id;
}
public Builder setId(String id) {
this.id = id;
return this;
}
public Builder setDescription(String description) {
this.description = description;
return this;
}
public Builder setModelMemoryLimit(ByteSizeValue modelMemoryLimit) {
this.modelMemoryLimit = modelMemoryLimit;
return this;
}
public Builder setAllowLazyStart(Boolean allowLazyStart) {
this.allowLazyStart = allowLazyStart;
return this;
}
public DataFrameAnalyticsConfigUpdate build() {
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart);
}
}
}

View File

@ -80,6 +80,7 @@ import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedRequestTests; import org.elasticsearch.client.ml.StartDatafeedRequestTests;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateFilterRequest;
import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.UpdateModelSnapshotRequest; import org.elasticsearch.client.ml.UpdateModelSnapshotRequest;
@ -90,6 +91,7 @@ import org.elasticsearch.client.ml.calendars.ScheduledEventTests;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig; import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests; import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.client.ml.dataframe.MlDataFrameAnalysisNamedXContentProvider; import org.elasticsearch.client.ml.dataframe.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider; import org.elasticsearch.client.ml.dataframe.evaluation.MlEvaluationNamedXContentProvider;
import org.elasticsearch.client.ml.dataframe.stats.AnalysisStatsNamedXContentProvider; import org.elasticsearch.client.ml.dataframe.stats.AnalysisStatsNamedXContentProvider;
@ -127,6 +129,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigTests.randomDataFrameAnalyticsConfig; import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigTests.randomDataFrameAnalyticsConfig;
import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdateTests.randomDataFrameAnalyticsConfigUpdate;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
@ -740,6 +743,17 @@ public class MLRequestConvertersTests extends ESTestCase {
} }
} }
public void testUpdateDataFrameAnalytics() throws IOException {
UpdateDataFrameAnalyticsRequest updateRequest = new UpdateDataFrameAnalyticsRequest(randomDataFrameAnalyticsConfigUpdate());
Request request = MLRequestConverters.updateDataFrameAnalytics(updateRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + updateRequest.getUpdate().getId() + "/_update", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
DataFrameAnalyticsConfigUpdate parsedUpdate = DataFrameAnalyticsConfigUpdate.fromXContent(parser);
assertThat(parsedUpdate, equalTo(updateRequest.getUpdate()));
}
}
public void testGetDataFrameAnalytics() { public void testGetDataFrameAnalytics() {
String configId1 = randomAlphaOfLength(10); String configId1 = randomAlphaOfLength(10);
String configId2 = randomAlphaOfLength(10); String configId2 = randomAlphaOfLength(10);

View File

@ -115,6 +115,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse; import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateFilterRequest;
import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest;
@ -129,6 +130,7 @@ import org.elasticsearch.client.ml.datafeed.DatafeedState;
import org.elasticsearch.client.ml.datafeed.DatafeedStats; import org.elasticsearch.client.ml.datafeed.DatafeedStats;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate; import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState;
@ -1425,6 +1427,33 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertThat(createdConfig.getDescription(), equalTo("this is a classification")); assertThat(createdConfig.getDescription(), equalTo("this is a classification"));
} }
public void testUpdateDataFrameAnalytics() throws Exception {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
String configId = "test-update-df-analytics-classification";
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfig.builder()
.setId(configId)
.setSource(DataFrameAnalyticsSource.builder().setIndex("update-test-source-index").build())
.setDest(DataFrameAnalyticsDest.builder().setIndex("update-test-dest-index").build())
.setAnalysis(org.elasticsearch.client.ml.dataframe.Classification.builder("my_dependent_variable").build())
.setDescription("this is a classification")
.build();
createIndex("update-test-source-index", defaultMappingForTest());
machineLearningClient.putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(config), RequestOptions.DEFAULT);
UpdateDataFrameAnalyticsRequest request =
new UpdateDataFrameAnalyticsRequest(
DataFrameAnalyticsConfigUpdate.builder().setId(config.getId()).setDescription("Updated description").build());
PutDataFrameAnalyticsResponse response =
execute(request, machineLearningClient::updateDataFrameAnalytics, machineLearningClient::updateDataFrameAnalyticsAsync);
assertThat(response.getConfig().getDescription(), equalTo("Updated description"));
GetDataFrameAnalyticsRequest getRequest = new GetDataFrameAnalyticsRequest(config.getId());
GetDataFrameAnalyticsResponse getResponse = machineLearningClient.getDataFrameAnalytics(getRequest, RequestOptions.DEFAULT);
assertThat(getResponse.getAnalytics().get(0).getDescription(), equalTo("Updated description"));
}
public void testGetDataFrameAnalyticsConfig_SingleConfig() throws Exception { public void testGetDataFrameAnalyticsConfig_SingleConfig() throws Exception {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
String configId = "get-test-config"; String configId = "get-test-config";

View File

@ -129,6 +129,7 @@ import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse; import org.elasticsearch.client.ml.StopDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.StopDatafeedRequest; import org.elasticsearch.client.ml.StopDatafeedRequest;
import org.elasticsearch.client.ml.StopDatafeedResponse; import org.elasticsearch.client.ml.StopDatafeedResponse;
import org.elasticsearch.client.ml.UpdateDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.UpdateDatafeedRequest; import org.elasticsearch.client.ml.UpdateDatafeedRequest;
import org.elasticsearch.client.ml.UpdateFilterRequest; import org.elasticsearch.client.ml.UpdateFilterRequest;
import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest;
@ -145,6 +146,7 @@ import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.client.ml.dataframe.Classification; import org.elasticsearch.client.ml.dataframe.Classification;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis; import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsState;
@ -3081,6 +3083,67 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
} }
} }
public void testUpdateDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);
{
// tag::update-data-frame-analytics-config-update
DataFrameAnalyticsConfigUpdate update = DataFrameAnalyticsConfigUpdate.builder()
.setId("my-analytics-config") // <1>
.setDescription("new description") // <2>
.setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3>
.build();
// end::update-data-frame-analytics-config-update
// tag::update-data-frame-analytics-request
UpdateDataFrameAnalyticsRequest request = new UpdateDataFrameAnalyticsRequest(update); // <1>
// end::update-data-frame-analytics-request
// tag::update-data-frame-analytics-execute
PutDataFrameAnalyticsResponse response = client.machineLearning().updateDataFrameAnalytics(request, RequestOptions.DEFAULT);
// end::update-data-frame-analytics-execute
// tag::update-data-frame-analytics-response
DataFrameAnalyticsConfig updatedConfig = response.getConfig();
// end::update-data-frame-analytics-response
assertThat(updatedConfig.getDescription(), is(equalTo("new description")));
assertThat(updatedConfig.getModelMemoryLimit(), is(equalTo(new ByteSizeValue(128, ByteSizeUnit.MB))));
}
{
DataFrameAnalyticsConfigUpdate update = DataFrameAnalyticsConfigUpdate.builder()
.setId("my-analytics-config")
.build();
UpdateDataFrameAnalyticsRequest request = new UpdateDataFrameAnalyticsRequest(update);
// tag::update-data-frame-analytics-execute-listener
ActionListener<PutDataFrameAnalyticsResponse> listener = new ActionListener<PutDataFrameAnalyticsResponse>() {
@Override
public void onResponse(PutDataFrameAnalyticsResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::update-data-frame-analytics-execute-listener
// Replace the empty listener by a blocking listener in test
CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::update-data-frame-analytics-execute-async
client.machineLearning().updateDataFrameAnalyticsAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::update-data-frame-analytics-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testDeleteDataFrameAnalytics() throws Exception { public void testDeleteDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsConfigUpdateTests;
import org.elasticsearch.client.ml.dataframe.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.hamcrest.Matchers.containsString;
public class UpdateDataFrameAnalyticsRequestTests extends AbstractXContentTestCase<UpdateDataFrameAnalyticsRequest> {
public void testValidate_Ok() {
assertThat(createTestInstance().validate(), isEmpty());
}
public void testValidate_Failure() {
Optional<ValidationException> exception = new UpdateDataFrameAnalyticsRequest(null).validate();
assertThat(exception, isPresent());
assertThat(exception.get().getMessage(), containsString("update requires a non-null data frame analytics config"));
}
@Override
protected UpdateDataFrameAnalyticsRequest createTestInstance() {
return new UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdateTests.randomDataFrameAnalyticsConfigUpdate());
}
@Override
protected UpdateDataFrameAnalyticsRequest doParseInstance(XContentParser parser) throws IOException {
return new UpdateDataFrameAnalyticsRequest(DataFrameAnalyticsConfigUpdate.fromXContent(parser));
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers());
return new NamedXContentRegistry(namedXContent);
}
}

View File

@ -36,8 +36,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.Predicate; import java.util.function.Predicate;
import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSourceTests.randomSourceConfig;
import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDestTests.randomDestConfig; import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsDestTests.randomDestConfig;
import static org.elasticsearch.client.ml.dataframe.DataFrameAnalyticsSourceTests.randomSourceConfig;
import static org.elasticsearch.client.ml.dataframe.OutlierDetectionTests.randomOutlierDetection; import static org.elasticsearch.client.ml.dataframe.OutlierDetectionTests.randomOutlierDetection;
public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase<DataFrameAnalyticsConfig> { public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase<DataFrameAnalyticsConfig> {

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class DataFrameAnalyticsConfigUpdateTests extends AbstractXContentTestCase<DataFrameAnalyticsConfigUpdate> {
public static DataFrameAnalyticsConfigUpdate randomDataFrameAnalyticsConfigUpdate() {
DataFrameAnalyticsConfigUpdate.Builder builder =
DataFrameAnalyticsConfigUpdate.builder()
.setId(randomAlphaOfLengthBetween(1, 10));
if (randomBoolean()) {
builder.setDescription(randomAlphaOfLength(20));
}
if (randomBoolean()) {
builder.setModelMemoryLimit(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
return builder.build();
}
@Override
protected DataFrameAnalyticsConfigUpdate createTestInstance() {
return randomDataFrameAnalyticsConfigUpdate();
}
@Override
protected DataFrameAnalyticsConfigUpdate doParseInstance(XContentParser parser) throws IOException {
return DataFrameAnalyticsConfigUpdate.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected NamedXContentRegistry xContentRegistry() {
List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers());
return new NamedXContentRegistry(namedXContent);
}
}

View File

@ -0,0 +1,51 @@
--
:api: update-data-frame-analytics
:request: UpdateDataFrameAnalyticsRequest
:response: UpdateDataFrameAnalyticsResponse
--
[role="xpack"]
[id="{upid}-{api}"]
=== Update {dfanalytics-jobs} API
Updates an existing {dfanalytics-job}.
The API accepts an +{request}+ object as a request and returns an +{response}+.
[id="{upid}-{api}-request"]
==== Update {dfanalytics-jobs} request
An +{request}+ requires the following argument:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------
<1> The configuration of the {dfanalytics-job} update to perform
[id="{upid}-{api}-config"]
==== {dfanalytics-cap} configuration update
The `DataFrameAnalyticsConfigUpdate` object contains all the details about the {dfanalytics-job}
configuration update and contains the following arguments:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-config-update]
--------------------------------------------------
<1> The {dfanalytics-job} ID
<2> The human-readable description
<3> The memory limit for the model created as part of the analysis process
[id="{upid}-{api}-query-config"]
include::../execution.asciidoc[]
[id="{upid}-{api}-response"]
==== Response
The returned +{response}+ contains the updated {dfanalytics-job}.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------

View File

@ -324,6 +324,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
* <<{upid}-get-data-frame-analytics>> * <<{upid}-get-data-frame-analytics>>
* <<{upid}-get-data-frame-analytics-stats>> * <<{upid}-get-data-frame-analytics-stats>>
* <<{upid}-put-data-frame-analytics>> * <<{upid}-put-data-frame-analytics>>
* <<{upid}-update-data-frame-analytics>>
* <<{upid}-delete-data-frame-analytics>> * <<{upid}-delete-data-frame-analytics>>
* <<{upid}-start-data-frame-analytics>> * <<{upid}-start-data-frame-analytics>>
* <<{upid}-stop-data-frame-analytics>> * <<{upid}-stop-data-frame-analytics>>
@ -381,6 +382,7 @@ include::ml/estimate-model-memory.asciidoc[]
include::ml/get-data-frame-analytics.asciidoc[] include::ml/get-data-frame-analytics.asciidoc[]
include::ml/get-data-frame-analytics-stats.asciidoc[] include::ml/get-data-frame-analytics-stats.asciidoc[]
include::ml/put-data-frame-analytics.asciidoc[] include::ml/put-data-frame-analytics.asciidoc[]
include::ml/update-data-frame-analytics.asciidoc[]
include::ml/delete-data-frame-analytics.asciidoc[] include::ml/delete-data-frame-analytics.asciidoc[]
include::ml/start-data-frame-analytics.asciidoc[] include::ml/start-data-frame-analytics.asciidoc[]
include::ml/stop-data-frame-analytics.asciidoc[] include::ml/stop-data-frame-analytics.asciidoc[]

View File

@ -6,6 +6,7 @@
You can use the following APIs to perform {ml} {dfanalytics} activities. You can use the following APIs to perform {ml} {dfanalytics} activities.
* <<put-dfanalytics,Create {dfanalytics-jobs}>> * <<put-dfanalytics,Create {dfanalytics-jobs}>>
* <<update-dfanalytics,Update {dfanalytics-jobs}>>
* <<delete-dfanalytics,Delete {dfanalytics-jobs}>> * <<delete-dfanalytics,Delete {dfanalytics-jobs}>>
* <<get-dfanalytics,Get {dfanalytics-jobs} info>> * <<get-dfanalytics,Get {dfanalytics-jobs} info>>
* <<get-dfanalytics-stats,Get {dfanalytics-jobs} statistics>> * <<get-dfanalytics-stats,Get {dfanalytics-jobs} statistics>>
@ -28,6 +29,8 @@ See also <<ml-apis>>.
//CREATE //CREATE
include::put-dfanalytics.asciidoc[] include::put-dfanalytics.asciidoc[]
include::put-inference.asciidoc[] include::put-inference.asciidoc[]
//UPDATE
include::update-dfanalytics.asciidoc[]
//DELETE //DELETE
include::delete-dfanalytics.asciidoc[] include::delete-dfanalytics.asciidoc[]
include::delete-inference-trained-model.asciidoc[] include::delete-inference-trained-model.asciidoc[]

View File

@ -0,0 +1,98 @@
[role="xpack"]
[testenv="platinum"]
[[update-dfanalytics]]
=== Update {dfanalytics-jobs} API
[subs="attributes"]
++++
<titleabbrev>Update {dfanalytics-jobs}</titleabbrev>
++++
Updates an existing {dfanalytics-job}.
experimental[]
[[ml-update-dfanalytics-request]]
==== {api-request-title}
`POST _ml/data_frame/analytics/<data_frame_analytics_id>/_update`
[[ml-update-dfanalytics-prereq]]
==== {api-prereq-title}
If the {es} {security-features} are enabled, you must have the following
built-in roles and privileges:
* `machine_learning_admin`
* `kibana_admin` (UI only)
* source indices: `read`, `view_index_metadata`
* destination index: `read`, `create_index`, `manage` and `index`
* cluster: `monitor` (UI only)
For more information, see <<security-privileges>> and <<built-in-roles>>.
NOTE: The {dfanalytics-job} remembers which roles the user who created it had at
the time of creation. When you start the job, it performs the analysis using
those same roles. If you provide
<<http-clients-secondary-authorization,secondary authorization headers>>,
those credentials are used instead.
[[ml-update-dfanalytics-desc]]
==== {api-description-title}
This API updates an existing {dfanalytics-job} that performs an analysis on the source
indices and stores the outcome in a destination index.
[[ml-update-dfanalytics-path-params]]
==== {api-path-parms-title}
`<data_frame_analytics_id>`::
(Required, string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics-define]
[role="child_attributes"]
[[ml-update-dfanalytics-request-body]]
==== {api-request-body-title}
`allow_lazy_start`::
(Optional, boolean)
Specifies whether this job can start when there is insufficient {ml} node
capacity for it to be immediately assigned to a node. The default is `false`; if
a {ml} node with capacity to run the job cannot immediately be found, the API
returns an error. However, this is also subject to the cluster-wide
`xpack.ml.max_lazy_ml_nodes` setting. See <<advanced-ml-settings>>. If this
option is set to `true`, the API does not return an error and the job waits in
the `starting` state until sufficient {ml} node capacity is available.
`description`::
(Optional, string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
`model_memory_limit`::
(Optional, string)
The approximate maximum amount of memory resources that are permitted for
analytical processing. The default value for {dfanalytics-jobs} is `1gb`. If
your `elasticsearch.yml` file contains an `xpack.ml.max_model_memory_limit`
setting, an error occurs when you try to create {dfanalytics-jobs} that have
`model_memory_limit` values greater than that setting. For more information, see
<<ml-settings>>.
[[ml-update-dfanalytics-example]]
==== {api-examples-title}
[[ml-update-dfanalytics-example-preprocess]]
===== Updating model memory limit example
The following example shows how to update the model memory limit for the existing {dfanalytics} configuration.
[source,console]
--------------------------------------------------
POST _ml/data_frame/analytics/model-flight-delays/_update
{
"model_memory_limit": "200mb"
}
--------------------------------------------------
// TEST[skip:setup kibana sample data]

View File

@ -139,6 +139,7 @@ import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
@ -385,6 +386,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
PutDataFrameAnalyticsAction.INSTANCE, PutDataFrameAnalyticsAction.INSTANCE,
GetDataFrameAnalyticsAction.INSTANCE, GetDataFrameAnalyticsAction.INSTANCE,
GetDataFrameAnalyticsStatsAction.INSTANCE, GetDataFrameAnalyticsStatsAction.INSTANCE,
UpdateDataFrameAnalyticsAction.INSTANCE,
DeleteDataFrameAnalyticsAction.INSTANCE, DeleteDataFrameAnalyticsAction.INSTANCE,
StartDataFrameAnalyticsAction.INSTANCE, StartDataFrameAnalyticsAction.INSTANCE,
StopDataFrameAnalyticsAction.INSTANCE, StopDataFrameAnalyticsAction.INSTANCE,

View File

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import java.io.IOException;
import java.util.Objects;
public class UpdateDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalyticsAction.Response> {
public static final UpdateDataFrameAnalyticsAction INSTANCE = new UpdateDataFrameAnalyticsAction();
public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/update";
private UpdateDataFrameAnalyticsAction() {
super(NAME, PutDataFrameAnalyticsAction.Response::new);
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
/**
* Parses request.
*/
public static Request parseRequest(String id, XContentParser parser) {
DataFrameAnalyticsConfigUpdate.Builder updateBuilder = DataFrameAnalyticsConfigUpdate.PARSER.apply(parser, null);
if (updateBuilder.getId() == null) {
updateBuilder.setId(id);
} else if (!Strings.isNullOrEmpty(id) && !id.equals(updateBuilder.getId())) {
// If we have both URI and body ID, they must be identical
throw new IllegalArgumentException(
Messages.getMessage(Messages.INCONSISTENT_ID, DataFrameAnalyticsConfig.ID, updateBuilder.getId(), id));
}
return new UpdateDataFrameAnalyticsAction.Request(updateBuilder.build());
}
private DataFrameAnalyticsConfigUpdate update;
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
update = new DataFrameAnalyticsConfigUpdate(in);
}
public Request(DataFrameAnalyticsConfigUpdate update) {
this.update = update;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
update.writeTo(out);
}
public DataFrameAnalyticsConfigUpdate getUpdate() {
return update;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
update.toXContent(builder, params);
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateDataFrameAnalyticsAction.Request request = (UpdateDataFrameAnalyticsAction.Request) o;
return Objects.equals(update, request.update);
}
@Override
public int hashCode() {
return Objects.hash(update);
}
}
public static class RequestBuilder
extends MasterNodeOperationRequestBuilder<Request, PutDataFrameAnalyticsAction.Response, RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, UpdateDataFrameAnalyticsAction action) {
super(client, action, new Request());
}
}
}

View File

@ -386,12 +386,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
if (indicesOptions != null) { if (indicesOptions != null) {
builder.setIndicesOptions(indicesOptions); builder.setIndicesOptions(indicesOptions);
} }
if (headers.isEmpty() == false) { if (headers.isEmpty() == false) {
// Adjust the request, adding security headers from the current thread context
builder.setHeaders(filterSecurityHeaders(headers)); builder.setHeaders(filterSecurityHeaders(headers));
} }
return builder.build(); return builder.build();
} }

View File

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ObjectParser.ValueType.VALUE;
public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObject {
public static final ConstructingObjectParser<Builder, Void> PARSER =
new ConstructingObjectParser<>("data_frame_analytics_config_update", args -> new Builder((String) args[0]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameAnalyticsConfig.ID);
PARSER.declareStringOrNull(Builder::setDescription, DataFrameAnalyticsConfig.DESCRIPTION);
PARSER.declareField(
Builder::setModelMemoryLimit,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName()),
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
VALUE);
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
}
private final String id;
private final String description;
private final ByteSizeValue modelMemoryLimit;
private final Boolean allowLazyStart;
private DataFrameAnalyticsConfigUpdate(String id,
@Nullable String description,
@Nullable ByteSizeValue modelMemoryLimit,
@Nullable Boolean allowLazyStart) {
this.id = id;
this.description = description;
this.modelMemoryLimit = modelMemoryLimit;
this.allowLazyStart = allowLazyStart;
}
public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException {
this.id = in.readString();
this.description = in.readOptionalString();
this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new);
this.allowLazyStart = in.readOptionalBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalString(description);
out.writeOptionalWriteable(modelMemoryLimit);
out.writeOptionalBoolean(allowLazyStart);
}
public String getId() {
return id;
}
public String getDescription() {
return description;
}
public ByteSizeValue getModelMemoryLimit() {
return modelMemoryLimit;
}
public Boolean isAllowLazyStart() {
return allowLazyStart;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
if (description != null) {
builder.field(DataFrameAnalyticsConfig.DESCRIPTION.getPreferredName(), description);
}
if (modelMemoryLimit != null) {
builder.field(DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT.getPreferredName(), modelMemoryLimit.getStringRep());
}
if (allowLazyStart != null) {
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
}
builder.endObject();
return builder;
}
/**
* Updates {@code source} with the new values in this object returning a new {@link DataFrameAnalyticsConfig}.
*
* @param source Source config to be updated
* @return A new config equivalent to {@code source} updated.
*/
public DataFrameAnalyticsConfig.Builder mergeWithConfig(DataFrameAnalyticsConfig source) {
if (id.equals(source.getId()) == false) {
throw new IllegalArgumentException("Cannot apply update to a config with different id");
}
DataFrameAnalyticsConfig.Builder builder = new DataFrameAnalyticsConfig.Builder(source);
if (description != null) {
builder.setDescription(description);
}
if (modelMemoryLimit != null) {
builder.setModelMemoryLimit(modelMemoryLimit);
}
if (allowLazyStart != null) {
builder.setAllowLazyStart(allowLazyStart);
}
return builder;
}
/**
* Whether this update applied to the given source config requires analytics task restart.
*/
public boolean requiresRestart(DataFrameAnalyticsConfig source) {
return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof DataFrameAnalyticsConfigUpdate == false) {
return false;
}
DataFrameAnalyticsConfigUpdate that = (DataFrameAnalyticsConfigUpdate) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
&& Objects.equals(this.allowLazyStart, that.allowLazyStart);
}
@Override
public int hashCode() {
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart);
}
public static class Builder {
private String id;
private String description;
private ByteSizeValue modelMemoryLimit;
private Boolean allowLazyStart;
public Builder(String id) {
this.id = id;
}
public String getId() {
return id;
}
public Builder setId(String id) {
this.id = id;
return this;
}
public Builder setDescription(String description) {
this.description = description;
return this;
}
public Builder setModelMemoryLimit(ByteSizeValue modelMemoryLimit) {
this.modelMemoryLimit = modelMemoryLimit;
return this;
}
public Builder setAllowLazyStart(Boolean allowLazyStart) {
this.allowLazyStart = allowLazyStart;
return this;
}
public DataFrameAnalyticsConfigUpdate build() {
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart);
}
}
}

View File

@ -74,6 +74,7 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results";
public static final String DATA_FRAME_ANALYTICS_CANNOT_UPDATE_IN_CURRENT_STATE = "Cannot update analytics [{0}] unless it''s stopped";
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}"; public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction.Request;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdateTests;
import java.io.IOException;
import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
public class UpdateDataFrameAnalyticsActionRequestTests extends AbstractSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(DataFrameAnalyticsConfigUpdateTests.randomUpdate(randomValidId()));
}
@Override
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(null, parser);
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
public void testParseRequest() throws IOException {
try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{}")) {
Request request = Request.parseRequest("id-from-param", parser);
assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("id-from-param").build())));
}
try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"id-from-body\"}")) {
Request request = Request.parseRequest(null, parser);
assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("id-from-body").build())));
}
try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"same-id\"}")) {
Request request = Request.parseRequest("same-id", parser);
assertThat(request.getUpdate(), is(equalTo(new DataFrameAnalyticsConfigUpdate.Builder("same-id").build())));
}
try (XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"id\": \"id-from-body\"}")) {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> Request.parseRequest("id-from-param", parser));
assertThat(e.getMessage(), startsWith("Inconsistent id"));
}
}
public void testDefaultTimeout() {
assertThat(createTestInstance().timeout(), is(notNullValue()));
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests.randomValidId;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfigUpdate> {
@Override
protected DataFrameAnalyticsConfigUpdate doParseInstance(XContentParser parser) throws IOException {
return DataFrameAnalyticsConfigUpdate.PARSER.apply(parser, null).build();
}
@Override
protected DataFrameAnalyticsConfigUpdate createTestInstance() {
return randomUpdate(randomValidId());
}
@Override
protected Writeable.Reader<DataFrameAnalyticsConfigUpdate> instanceReader() {
return DataFrameAnalyticsConfigUpdate::new;
}
public static DataFrameAnalyticsConfigUpdate randomUpdate(String id) {
DataFrameAnalyticsConfigUpdate.Builder builder = new DataFrameAnalyticsConfigUpdate.Builder(id);
if (randomBoolean()) {
builder.setDescription(randomAlphaOfLength(20));
}
if (randomBoolean()) {
builder.setModelMemoryLimit(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
builder.setAllowLazyStart(randomBoolean());
}
return builder.build();
}
public void testMergeWithConfig_UpdatedDescription() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setDescription("old description").build();
DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id).setDescription("new description").build();
assertThat(
update.mergeWithConfig(config).build(),
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setDescription("new description").build())));
}
public void testMergeWithConfig_UpdatedModelMemoryLimit() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setModelMemoryLimit(new ByteSizeValue(1024)).build();
DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id).setModelMemoryLimit(new ByteSizeValue(2048)).build();
assertThat(
update.mergeWithConfig(config).build(),
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setModelMemoryLimit(new ByteSizeValue(2048)).build())));
}
public void testMergeWithConfig_UpdatedAllowLazyStart() {
String id = randomValidId();
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setAllowLazyStart(false).build();
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setAllowLazyStart(true).build();
assertThat(
update.mergeWithConfig(config).build(),
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build())));
}
public void testMergeWithConfig_UpdatedAllUpdatableProperties() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id)
.setDescription("old description")
.setModelMemoryLimit(new ByteSizeValue(1024))
.setAllowLazyStart(false)
.build();
DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id)
.setDescription("new description")
.setModelMemoryLimit(new ByteSizeValue(2048))
.setAllowLazyStart(true)
.build();
assertThat(
update.mergeWithConfig(config).build(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(config)
.setDescription("new description")
.setModelMemoryLimit(new ByteSizeValue(2048))
.setAllowLazyStart(true)
.build())));
}
public void testMergeWithConfig_NoopUpdate() {
String id = randomValidId();
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id);
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).build();
assertThat(update.mergeWithConfig(config).build(), is(equalTo(config)));
}
public void testMergeWithConfig_GivenRandomUpdates_AssertImmutability() {
String id = randomValidId();
for (int i = 0; i < 100; ++i) {
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id);
DataFrameAnalyticsConfigUpdate update;
do {
update = randomUpdate(id);
} while (isNoop(config, update));
assertThat(update.mergeWithConfig(config).build(), is(not(equalTo(config))));
}
}
public void testMergeWithConfig_failBecauseTargetConfigHasDifferentId() {
String id = randomValidId();
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(id);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> randomUpdate(id + "_2").mergeWithConfig(config));
assertThat(e.getMessage(), containsString("different id"));
}
public void testRequiresRestart_DescriptionUpdateDoesNotRequireRestart() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setDescription("old description").build();
DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id).setDescription("new description").build();
assertThat(update.requiresRestart(config), is(false));
}
public void testRequiresRestart_ModelMemoryLimitUpdateRequiresRestart() {
String id = randomValidId();
DataFrameAnalyticsConfig config =
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setModelMemoryLimit(new ByteSizeValue(1024)).build();
DataFrameAnalyticsConfigUpdate update =
new DataFrameAnalyticsConfigUpdate.Builder(id).setModelMemoryLimit(new ByteSizeValue(2048)).build();
assertThat(update.requiresRestart(config), is(true));
}
private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) {
return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription()))
&& (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit()))
&& (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart()));
}
}

View File

@ -300,13 +300,12 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
public void testMergeWithJob_GivenRandomUpdates_AssertImmutability() { public void testMergeWithJob_GivenRandomUpdates_AssertImmutability() {
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
Job job = JobTests.createRandomizedJob(); Job job = JobTests.createRandomizedJob();
JobUpdate update = createRandom(job.getId(), job); JobUpdate update;
while (update.isNoop(job)) { do {
update = createRandom(job.getId(), job); update = createRandom(job.getId(), job);
} } while (update.isNoop(job));
Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L)); Job updatedJob = update.mergeWithJob(job, new ByteSizeValue(0L));
assertThat(job, not(equalTo(updatedJob))); assertThat(job, not(equalTo(updatedJob)));
} }
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
@ -656,6 +657,29 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L)); assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
} }
public void testUpdateAnalytics() throws Exception {
initialize("update_analytics_description");
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
putAnalytics(config);
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(nullValue()));
updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).setDescription("updated-description-1").build());
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-1")));
// Noop update
updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).build());
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-1")));
updateAnalytics(new DataFrameAnalyticsConfigUpdate.Builder(jobId).setDescription("updated-description-2").build());
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
}
private static <T> T getOnlyElement(List<T> list) {
assertThat(list, hasSize(1));
return list.get(0);
}
private void initialize(String jobId) { private void initialize(String jobId) {
initialize(jobId, false); initialize(jobId, false);
} }

View File

@ -36,7 +36,9 @@ import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
@ -121,6 +123,11 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
return client().execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet(); return client().execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();
} }
protected PutDataFrameAnalyticsAction.Response updateAnalytics(DataFrameAnalyticsConfigUpdate update) {
UpdateDataFrameAnalyticsAction.Request request = new UpdateDataFrameAnalyticsAction.Request(update);
return client().execute(UpdateDataFrameAnalyticsAction.INSTANCE, request).actionGet();
}
protected AcknowledgedResponse deleteAnalytics(String id) { protected AcknowledgedResponse deleteAnalytics(String id) {
DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id); DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet(); return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet();

View File

@ -0,0 +1,373 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
private DataFrameAnalyticsConfigProvider configProvider;
@Before
public void createComponents() throws Exception {
configProvider = new DataFrameAnalyticsConfigProvider(client(), xContentRegistry());
waitForMlTemplates();
}
public void testGet_ConfigDoesNotExist() throws InterruptedException {
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> configProvider.get("missing", actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(nullValue()));
assertThat(exceptionHolder.get(), is(notNullValue()));
assertThat(exceptionHolder.get(), is(instanceOf(ResourceNotFoundException.class)));
}
public void testPutAndGet() throws InterruptedException {
String configId = "config-id";
// Create valid config
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(configId);
{ // Put the config and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
actionListener -> configProvider.put(config, emptyMap(), actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(configHolder.get(), is(equalTo(config)));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Get the config back and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> configProvider.get(configId, actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(configHolder.get(), is(equalTo(config)));
assertThat(exceptionHolder.get(), is(nullValue()));
}
}
public void testPutAndGet_WithSecurityHeaders() throws InterruptedException {
String configId = "config-id";
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandom(configId);
Map<String, String> securityHeaders = Collections.singletonMap("_xpack_security_authentication", "dummy");
{ // Put the config and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> configProvider.put(config, securityHeaders, actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(
configHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(config)
.setHeaders(securityHeaders)
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Get the config back and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener -> configProvider.get(configId, actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(
configHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(config)
.setHeaders(securityHeaders)
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
}
public void testPut_ConfigAlreadyExists() throws InterruptedException {
String configId = "config-id";
{ // Put the config and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId);
blockingCall(
actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(configHolder.get(), is(equalTo(initialConfig)));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Try putting the config with the same id and verify the response
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfig configWithSameId = DataFrameAnalyticsConfigTests.createRandom(configId);
blockingCall(
actionListener -> configProvider.put(configWithSameId, emptyMap(), actionListener),
configHolder,
exceptionHolder);
assertThat(configHolder.get(), is(nullValue()));
assertThat(exceptionHolder.get(), is(notNullValue()));
assertThat(exceptionHolder.get(), is(instanceOf(ResourceAlreadyExistsException.class)));
}
}
public void testUpdate() throws Exception {
String configId = "config-id";
DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId);
{
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(configHolder.get(), is(equalTo(initialConfig)));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Update that changes description
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate =
new DataFrameAnalyticsConfigUpdate.Builder(configId)
.setDescription("description-1")
.build();
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(notNullValue()));
assertThat(
updatedConfigHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(initialConfig)
.setDescription("description-1")
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Update that changes model memory limit
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate =
new DataFrameAnalyticsConfigUpdate.Builder(configId)
.setModelMemoryLimit(new ByteSizeValue(1024))
.build();
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(notNullValue()));
assertThat(
updatedConfigHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(initialConfig)
.setDescription("description-1")
.setModelMemoryLimit(new ByteSizeValue(1024))
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Noop update
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder(configId).build();
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(notNullValue()));
assertThat(
updatedConfigHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(initialConfig)
.setDescription("description-1")
.setModelMemoryLimit(new ByteSizeValue(1024))
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Update that changes both description and model memory limit
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate =
new DataFrameAnalyticsConfigUpdate.Builder(configId)
.setDescription("description-2")
.setModelMemoryLimit(new ByteSizeValue(2048))
.build();
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(notNullValue()));
assertThat(
updatedConfigHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(initialConfig)
.setDescription("description-2")
.setModelMemoryLimit(new ByteSizeValue(2048))
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Update that applies security headers
Map<String, String> securityHeaders = Collections.singletonMap("_xpack_security_authentication", "dummy");
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder(configId).build();
blockingCall(
actionListener -> configProvider.update(configUpdate, securityHeaders, ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(notNullValue()));
assertThat(
updatedConfigHolder.get(),
is(equalTo(
new DataFrameAnalyticsConfig.Builder(initialConfig)
.setDescription("description-2")
.setModelMemoryLimit(new ByteSizeValue(2048))
.setHeaders(securityHeaders)
.build())));
assertThat(exceptionHolder.get(), is(nullValue()));
}
}
public void testUpdate_ConfigDoesNotExist() throws InterruptedException {
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate = new DataFrameAnalyticsConfigUpdate.Builder("missing").build();
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), ClusterState.EMPTY_STATE, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(nullValue()));
assertThat(exceptionHolder.get(), is(notNullValue()));
assertThat(exceptionHolder.get(), is(instanceOf(ResourceNotFoundException.class)));
}
public void testUpdate_UpdateCannotBeAppliedWhenTaskIsRunning() throws InterruptedException {
String configId = "config-id";
DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId);
{
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(
actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
assertThat(configHolder.get(), is(notNullValue()));
assertThat(configHolder.get(), is(equalTo(initialConfig)));
assertThat(exceptionHolder.get(), is(nullValue()));
}
{ // Update that tries to change model memory limit while the analytics is running
AtomicReference<DataFrameAnalyticsConfig> updatedConfigHolder = new AtomicReference<>();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
DataFrameAnalyticsConfigUpdate configUpdate =
new DataFrameAnalyticsConfigUpdate.Builder(configId)
.setModelMemoryLimit(new ByteSizeValue(2048, ByteSizeUnit.MB))
.build();
ClusterState clusterState = clusterStateWithRunningAnalyticsTask(configId, DataFrameAnalyticsState.ANALYZING);
blockingCall(
actionListener -> configProvider.update(configUpdate, emptyMap(), clusterState, actionListener),
updatedConfigHolder,
exceptionHolder);
assertThat(updatedConfigHolder.get(), is(nullValue()));
assertThat(exceptionHolder.get(), is(notNullValue()));
assertThat(exceptionHolder.get(), is(instanceOf(ElasticsearchStatusException.class)));
ElasticsearchStatusException e = (ElasticsearchStatusException) exceptionHolder.get();
assertThat(e.status(), is(equalTo(RestStatus.CONFLICT)));
assertThat(e.getMessage(), is(equalTo("Cannot update analytics [config-id] unless it's stopped")));
}
}
private static ClusterState clusterStateWithRunningAnalyticsTask(String analyticsId, DataFrameAnalyticsState analyticsState) {
PersistentTasksCustomMetadata.Builder builder = PersistentTasksCustomMetadata.builder();
builder.addTask(
MlTasks.dataFrameAnalyticsTaskId(analyticsId),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT, emptyList(), false),
new PersistentTasksCustomMetadata.Assignment("node", "test assignment"));
builder.updateTaskState(
MlTasks.dataFrameAnalyticsTaskId(analyticsId),
new DataFrameAnalyticsTaskState(analyticsState, builder.getLastAllocationId(), null));
PersistentTasksCustomMetadata tasks = builder.build();
return ClusterState.builder(new ClusterName("cluster"))
.metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks).build())
.build();
}
@Override
public NamedXContentRegistry xContentRegistry() {
List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
namedXContent.addAll(new MlDataFrameAnalysisNamedXContentProvider().getNamedXContentParsers());
namedXContent.addAll(new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
return new NamedXContentRegistry(namedXContent);
}
}

View File

@ -125,6 +125,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction; import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
@ -197,6 +198,7 @@ import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStopDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.action.TransportStopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateCalendarJobAction; import org.elasticsearch.xpack.ml.action.TransportUpdateCalendarJobAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportUpdateDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateFilterAction; import org.elasticsearch.xpack.ml.action.TransportUpdateFilterAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateJobAction; import org.elasticsearch.xpack.ml.action.TransportUpdateJobAction;
@ -276,6 +278,7 @@ import org.elasticsearch.xpack.ml.rest.dataframe.RestEvaluateDataFrameAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestGetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestPostDataFrameAnalyticsUpdateAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestPutDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestPutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestStartDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestStartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.ml.rest.dataframe.RestStopDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.rest.dataframe.RestStopDataFrameAnalyticsAction;
@ -828,6 +831,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
new RestGetDataFrameAnalyticsAction(), new RestGetDataFrameAnalyticsAction(),
new RestGetDataFrameAnalyticsStatsAction(), new RestGetDataFrameAnalyticsStatsAction(),
new RestPutDataFrameAnalyticsAction(), new RestPutDataFrameAnalyticsAction(),
new RestPostDataFrameAnalyticsUpdateAction(),
new RestDeleteDataFrameAnalyticsAction(), new RestDeleteDataFrameAnalyticsAction(),
new RestStartDataFrameAnalyticsAction(), new RestStartDataFrameAnalyticsAction(),
new RestStopDataFrameAnalyticsAction(), new RestStopDataFrameAnalyticsAction(),
@ -905,6 +909,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
new ActionHandler<>(GetDataFrameAnalyticsAction.INSTANCE, TransportGetDataFrameAnalyticsAction.class), new ActionHandler<>(GetDataFrameAnalyticsAction.INSTANCE, TransportGetDataFrameAnalyticsAction.class),
new ActionHandler<>(GetDataFrameAnalyticsStatsAction.INSTANCE, TransportGetDataFrameAnalyticsStatsAction.class), new ActionHandler<>(GetDataFrameAnalyticsStatsAction.INSTANCE, TransportGetDataFrameAnalyticsStatsAction.class),
new ActionHandler<>(PutDataFrameAnalyticsAction.INSTANCE, TransportPutDataFrameAnalyticsAction.class), new ActionHandler<>(PutDataFrameAnalyticsAction.INSTANCE, TransportPutDataFrameAnalyticsAction.class),
new ActionHandler<>(UpdateDataFrameAnalyticsAction.INSTANCE, TransportUpdateDataFrameAnalyticsAction.class),
new ActionHandler<>(DeleteDataFrameAnalyticsAction.INSTANCE, TransportDeleteDataFrameAnalyticsAction.class), new ActionHandler<>(DeleteDataFrameAnalyticsAction.INSTANCE, TransportDeleteDataFrameAnalyticsAction.class),
new ActionHandler<>(StartDataFrameAnalyticsAction.INSTANCE, TransportStartDataFrameAnalyticsAction.class), new ActionHandler<>(StartDataFrameAnalyticsAction.INSTANCE, TransportStartDataFrameAnalyticsAction.class),
new ActionHandler<>(StopDataFrameAnalyticsAction.INSTANCE, TransportStopDataFrameAnalyticsAction.class), new ActionHandler<>(StopDataFrameAnalyticsAction.INSTANCE, TransportStopDataFrameAnalyticsAction.class),

View File

@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -169,7 +168,7 @@ public class TransportPutDataFrameAnalyticsAction
preparedForPutConfig, preparedForPutConfig,
threadPool.getThreadContext().getHeaders(), threadPool.getThreadContext().getHeaders(),
ActionListener.wrap( ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(preparedForPutConfig)), unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(preparedForPutConfig)),
listener::onFailure listener::onFailure
)); ));
} }
@ -183,7 +182,7 @@ public class TransportPutDataFrameAnalyticsAction
memoryCappedConfig, memoryCappedConfig,
threadPool.getThreadContext().getHeaders(), threadPool.getThreadContext().getHeaders(),
ActionListener.wrap( ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure listener::onFailure
)); ));
} else { } else {
@ -203,7 +202,7 @@ public class TransportPutDataFrameAnalyticsAction
private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config,
Map<String, String> headers, Map<String, String> headers,
ActionListener<IndexResponse> listener) { ActionListener<DataFrameAnalyticsConfig> listener) {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
if (clusterState == null) { if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null"); logger.warn("Cannot update doc mapping because clusterState == null");
@ -221,7 +220,7 @@ public class TransportPutDataFrameAnalyticsAction
auditor.info( auditor.info(
config.getId(), config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, config.getAnalysis().getWriteableName())); Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, config.getAnalysis().getWriteableName()));
listener.onResponse(indexResponse); listener.onResponse(config);
}, },
listener::onFailure)), listener::onFailure)),
listener::onFailure)); listener::onFailure));

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
public class TransportUpdateDataFrameAnalyticsAction
extends TransportMasterNodeAction<UpdateDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
private final XPackLicenseState licenseState;
private final DataFrameAnalyticsConfigProvider configProvider;
private final SecurityContext securityContext;
@Inject
public TransportUpdateDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
XPackLicenseState licenseState, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameAnalyticsConfigProvider configProvider) {
super(UpdateDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
UpdateDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
this.configProvider = configProvider;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected PutDataFrameAnalyticsAction.Response read(StreamInput in) throws IOException {
return new PutDataFrameAnalyticsAction.Response(in);
}
@Override
protected ClusterBlockException checkBlock(UpdateDataFrameAnalyticsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected void masterOperation(UpdateDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
useSecondaryAuthIfAvailable(securityContext, () -> {
Map<String, String> headers = threadPool.getThreadContext().getHeaders();
configProvider.update(
request.getUpdate(),
headers,
state,
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(updatedConfig)),
listener::onFailure));
});
}
@Override
protected void doExecute(Task task, UpdateDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
if (licenseState.isAllowed(XPackLicenseState.Feature.MACHINE_LEARNING)) {
super.doExecute(task, request, listener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
}
}
}

View File

@ -10,14 +10,18 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -29,11 +33,16 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
@ -68,28 +77,115 @@ public class DataFrameAnalyticsConfigProvider {
this.xContentRegistry = xContentRegistry; this.xContentRegistry = xContentRegistry;
} }
public void put(DataFrameAnalyticsConfig config, Map<String, String> headers, ActionListener<IndexResponse> listener) { /**
String id = config.getId(); * Puts the given {@link DataFrameAnalyticsConfig} document into the config index.
*/
public void put(DataFrameAnalyticsConfig config, Map<String, String> headers, ActionListener<DataFrameAnalyticsConfig> listener) {
if (headers.isEmpty() == false) { if (headers.isEmpty() == false) {
// Filter any values in headers that aren't security fields // Filter any values in headers that aren't security fields
config = new DataFrameAnalyticsConfig.Builder(config) config = new DataFrameAnalyticsConfig.Builder(config)
.setHeaders(filterSecurityHeaders(headers)) .setHeaders(filterSecurityHeaders(headers))
.build(); .build();
} }
index(config, null, listener);
}
/**
* Updates the {@link DataFrameAnalyticsConfig} document in the config index using given {@link DataFrameAnalyticsConfigUpdate}.
*/
public void update(DataFrameAnalyticsConfigUpdate update,
Map<String, String> headers,
ClusterState clusterState,
ActionListener<DataFrameAnalyticsConfig> listener) {
String id = update.getId();
GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(id));
executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(
getResponse -> {
// Fail the update request if the config to be updated doesn't exist
if (getResponse.isExists() == false) {
listener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(id));
return;
}
// Parse the original config
DataFrameAnalyticsConfig originalConfig;
try {
try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
originalConfig = DataFrameAnalyticsConfig.LENIENT_PARSER.apply(parser, null).build();
}
} catch (IOException e) {
listener.onFailure(
new ElasticsearchParseException("Failed to parse data frame analytics configuration [" + id + "]", e));
return;
}
// Check that the update can be applied given current analytics state
checkUpdateCanBeApplied(originalConfig, update, clusterState);
// Merge the original config with the given update object
DataFrameAnalyticsConfig.Builder updatedConfigBuilder = update.mergeWithConfig(originalConfig);
if (headers.isEmpty() == false) {
updatedConfigBuilder.setHeaders(filterSecurityHeaders(headers));
}
DataFrameAnalyticsConfig updatedConfig = updatedConfigBuilder.build();
// Index the update config
index(updatedConfig, getResponse, listener);
},
listener::onFailure
));
}
private static void checkUpdateCanBeApplied(DataFrameAnalyticsConfig originalConfig,
DataFrameAnalyticsConfigUpdate update,
ClusterState clusterState) {
String analyticsId = update.getId();
PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(analyticsId, tasks);
if (DataFrameAnalyticsState.STOPPED.equals(analyticsState)) {
// Analytics is stopped, therefore it is safe to proceed with the udpate
return;
}
if (update.requiresRestart(originalConfig)) {
throw ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_CANNOT_UPDATE_IN_CURRENT_STATE, analyticsId, analyticsState));
}
}
/**
* Indexes the new version of {@link DataFrameAnalyticsConfig} document into the config index.
*
* @param config config object to be indexed
* @param getResponse {@link GetResponse} coming from requesting the previous version of the config.
* If null, this config is indexed for the first time
* @param listener listener to be called after indexing
*/
private void index(DataFrameAnalyticsConfig config,
@Nullable GetResponse getResponse,
ActionListener<DataFrameAnalyticsConfig> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(MlConfigIndex.indexName()) IndexRequest indexRequest = new IndexRequest(MlConfigIndex.indexName())
.id(DataFrameAnalyticsConfig.documentId(config.getId())) .id(DataFrameAnalyticsConfig.documentId(config.getId()))
.opType(DocWriteRequest.OpType.CREATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(builder);
.source(builder); if (getResponse == null) {
indexRequest.opType(DocWriteRequest.OpType.CREATE);
} else {
indexRequest
.opType(DocWriteRequest.OpType.INDEX)
.setIfSeqNo(getResponse.getSeqNo())
.setIfPrimaryTerm(getResponse.getPrimaryTerm());
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
listener::onResponse, indexResponse -> listener.onResponse(config),
e -> { e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) { if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(id)); listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(config.getId()));
} else { } else {
listener.onFailure(e); listener.onFailure(e);
} }

View File

@ -244,7 +244,6 @@ public class JobConfigProvider {
return; return;
} }
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo(); final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm(); final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef(); BytesReference source = getResponse.getSourceAsBytesRef();

View File

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.dataframe;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestPostDataFrameAnalyticsUpdateAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return singletonList(
new Route(
POST, MachineLearning.BASE_PATH + "data_frame/analytics/{" + DataFrameAnalyticsConfig.ID.getPreferredName() + "}/_update"));
}
@Override
public String getName() {
return "xpack_ml_post_data_frame_analytics_update_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
XContentParser parser = restRequest.contentParser();
UpdateDataFrameAnalyticsAction.Request updateRequest = UpdateDataFrameAnalyticsAction.Request.parseRequest(id, parser);
updateRequest.timeout(restRequest.paramAsTime("timeout", updateRequest.timeout()));
return channel -> client.execute(UpdateDataFrameAnalyticsAction.INSTANCE, updateRequest, new RestToXContentListener<>(channel));
}
}