diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index 1db26087ae8..e3570a2a837 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -148,7 +148,12 @@ final class MLRequestConverters { Request request = new Request(HttpDelete.METHOD_NAME, endpoint); RequestConverters.Params params = new RequestConverters.Params(request); - params.putParam("force", Boolean.toString(deleteJobRequest.isForce())); + if (deleteJobRequest.getForce() != null) { + params.putParam("force", Boolean.toString(deleteJobRequest.getForce())); + } + if (deleteJobRequest.getWaitForCompletion() != null) { + params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion())); + } return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index 29250e5d440..8c442d8ffa6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest; import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -211,14 +212,15 @@ public final class MachineLearningClient { * * @param request The request to delete the job * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized - * @return action acknowledgement + * @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for + * completion * @throws IOException when there is a serialization issue sending the request or receiving the response */ - public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException { + public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity(request, MLRequestConverters::deleteJob, options, - AcknowledgedResponse::fromXContent, + DeleteJobResponse::fromXContent, Collections.emptySet()); } @@ -232,11 +234,11 @@ public final class MachineLearningClient { * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener Listener to be notified upon request completion */ - public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener listener) { + public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener listener) { restHighLevelClient.performRequestAsyncAndParseEntity(request, MLRequestConverters::deleteJob, options, - AcknowledgedResponse::fromXContent, + DeleteJobResponse::fromXContent, listener, Collections.emptySet()); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java index a355f7ec659..44e3668059c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java @@ -29,7 +29,8 @@ import java.util.Objects; public class DeleteJobRequest extends ActionRequest { private String jobId; - private boolean force; + private Boolean force; + private Boolean waitForCompletion; public DeleteJobRequest(String jobId) { this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null"); @@ -47,7 +48,7 @@ public class DeleteJobRequest extends ActionRequest { this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null"); } - public boolean isForce() { + public Boolean getForce() { return force; } @@ -57,10 +58,24 @@ public class DeleteJobRequest extends ActionRequest { * * @param force When {@code true} forcefully delete an opened job. Defaults to {@code false} */ - public void setForce(boolean force) { + public void setForce(Boolean force) { this.force = force; } + public Boolean getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Set whether this request should wait until the operation has completed before returning + * @param waitForCompletion When {@code true} the call will wait for the job deletion to complete. + * Otherwise, the deletion will be executed asynchronously and the response + * will contain the task id. + */ + public void setWaitForCompletion(Boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java new file mode 100644 index 00000000000..f1487c8c276 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response object that contains the acknowledgement or the task id + * depending on whether the delete job action was requested to wait for completion. + */ +public class DeleteJobResponse extends ActionResponse implements ToXContentObject { + + private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged"); + private static final ParseField TASK = new ParseField("task"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("delete_job_response", + true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Boolean acknowledged; + private final TaskId task; + + DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) { + assert acknowledged != null || task != null; + this.acknowledged = acknowledged; + this.task = task; + } + + /** + * Get the action acknowledgement + * @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or + * otherwise a {@code boolean} that indicates whether the job was deleted successfully. + */ + public Boolean getAcknowledged() { + return acknowledged; + } + + /** + * Get the task id + * @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or + * otherwise the id of the job deletion task. + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged, task); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DeleteJobResponse that = (DeleteJobResponse) other; + return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (acknowledged != null) { + builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged); + } + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java index f30a003e02a..13b4dcb955a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java @@ -65,6 +65,7 @@ public class Job implements ToXContentObject { public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days"); public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); + public static final ParseField DELETING = new ParseField("deleting"); public static final ObjectParser PARSER = new ObjectParser<>("job_details", true, Builder::new); @@ -94,6 +95,7 @@ public class Job implements ToXContentObject { PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT); PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); + PARSER.declareBoolean(Builder::setDeleting, DELETING); } private final String jobId; @@ -115,13 +117,14 @@ public class Job implements ToXContentObject { private final Map customSettings; private final String modelSnapshotId; private final String resultsIndexName; + private final Boolean deleting; private Job(String jobId, String jobType, List groups, String description, Date createTime, Date finishedTime, Long establishedModelMemory, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, String resultsIndexName) { + String modelSnapshotId, String resultsIndexName, Boolean deleting) { this.jobId = jobId; this.jobType = jobType; @@ -141,6 +144,7 @@ public class Job implements ToXContentObject { this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings); this.modelSnapshotId = modelSnapshotId; this.resultsIndexName = resultsIndexName; + this.deleting = deleting; } /** @@ -275,6 +279,10 @@ public class Job implements ToXContentObject { return modelSnapshotId; } + public Boolean getDeleting() { + return deleting; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -330,6 +338,9 @@ public class Job implements ToXContentObject { if (resultsIndexName != null) { builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); } + if (deleting != null) { + builder.field(DELETING.getPreferredName(), deleting); + } builder.endObject(); return builder; } @@ -362,7 +373,8 @@ public class Job implements ToXContentObject { && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) - && Objects.equals(this.resultsIndexName, that.resultsIndexName); + && Objects.equals(this.resultsIndexName, that.resultsIndexName) + && Objects.equals(this.deleting, that.deleting); } @Override @@ -370,7 +382,7 @@ public class Job implements ToXContentObject { return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName); + modelSnapshotId, resultsIndexName, deleting); } @Override @@ -402,6 +414,7 @@ public class Job implements ToXContentObject { private Map customSettings; private String modelSnapshotId; private String resultsIndexName; + private Boolean deleting; private Builder() { } @@ -429,6 +442,7 @@ public class Job implements ToXContentObject { this.customSettings = job.getCustomSettings(); this.modelSnapshotId = job.getModelSnapshotId(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); + this.deleting = job.getDeleting(); } public Builder setId(String id) { @@ -525,6 +539,11 @@ public class Job implements ToXContentObject { return this; } + Builder setDeleting(Boolean deleting) { + this.deleting = deleting; + return this; + } + /** * Builds a job. * @@ -537,7 +556,7 @@ public class Job implements ToXContentObject { id, jobType, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName); + modelSnapshotId, resultsIndexName, deleting); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index b07f78cab1b..8c5f49c943f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -164,11 +164,18 @@ public class MLRequestConvertersTests extends ESTestCase { Request request = MLRequestConverters.deleteJob(deleteJobRequest); assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint()); - assertEquals(Boolean.toString(false), request.getParameters().get("force")); + assertNull(request.getParameters().get("force")); + assertNull(request.getParameters().get("wait_for_completion")); + deleteJobRequest = new DeleteJobRequest(jobId); deleteJobRequest.setForce(true); request = MLRequestConverters.deleteJob(deleteJobRequest); assertEquals(Boolean.toString(true), request.getParameters().get("force")); + + deleteJobRequest = new DeleteJobRequest(jobId); + deleteJobRequest.setWaitForCompletion(false); + request = MLRequestConverters.deleteJob(deleteJobRequest); + assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion")); } public void testFlushJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 5d3fc82a4bb..cac9f533501 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest; import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -151,17 +152,33 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2)); } - public void testDeleteJob() throws Exception { + public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception { String jobId = randomValidJobId(); Job job = buildJob(jobId); MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); - AcknowledgedResponse response = execute(new DeleteJobRequest(jobId), + DeleteJobResponse response = execute(new DeleteJobRequest(jobId), machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); - assertTrue(response.isAcknowledged()); + assertTrue(response.getAcknowledged()); + assertNull(response.getTask()); + } + + public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception { + String jobId = randomValidJobId(); + Job job = buildJob(jobId); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + + DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId); + deleteJobRequest.setWaitForCompletion(false); + + DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); + + assertNull(response.getAcknowledged()); + assertNotNull(response.getTask()); } public void testOpenJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index eb1d65a3805..0c0efb241f9 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest; import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -108,6 +109,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskId; import org.junit.After; import java.io.IOException; @@ -281,20 +283,34 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { { //tag::x-pack-delete-ml-job-request - DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job"); - deleteJobRequest.setForce(false); // <1> - AcknowledgedResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT); + DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job"); // <1> //end::x-pack-delete-ml-job-request + //tag::x-pack-delete-ml-job-request-force + deleteJobRequest.setForce(false); // <1> + //end::x-pack-delete-ml-job-request-force + + //tag::x-pack-delete-ml-job-request-wait-for-completion + deleteJobRequest.setWaitForCompletion(true); // <1> + //end::x-pack-delete-ml-job-request-wait-for-completion + + //tag::x-pack-delete-ml-job-execute + DeleteJobResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT); + //end::x-pack-delete-ml-job-execute + //tag::x-pack-delete-ml-job-response - boolean isAcknowledged = deleteJobResponse.isAcknowledged(); // <1> + Boolean isAcknowledged = deleteJobResponse.getAcknowledged(); // <1> + TaskId task = deleteJobResponse.getTask(); // <2> //end::x-pack-delete-ml-job-response + + assertTrue(isAcknowledged); + assertNull(task); } { //tag::x-pack-delete-ml-job-request-listener - ActionListener listener = new ActionListener() { + ActionListener listener = new ActionListener() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { + public void onResponse(DeleteJobResponse deleteJobResponse) { // <1> } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java index d3ccb98eeb6..d9f96fd0f28 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java @@ -34,12 +34,4 @@ public class DeleteJobRequestTests extends ESTestCase { ex = expectThrows(NullPointerException.class, () -> createTestInstance().setJobId(null)); assertEquals("[job_id] must not be null", ex.getMessage()); } - - public void test_WithForce() { - DeleteJobRequest deleteJobRequest = createTestInstance(); - assertFalse(deleteJobRequest.isForce()); - - deleteJobRequest.setForce(true); - assertTrue(deleteJobRequest.isForce()); - } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java new file mode 100644 index 00000000000..97a8c5b892c --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class DeleteJobResponseTests extends AbstractXContentTestCase { + + @Override + protected DeleteJobResponse createTestInstance() { + if (randomBoolean()) { + return new DeleteJobResponse(randomBoolean(), null); + } + return new DeleteJobResponse(null, new TaskId(randomAlphaOfLength(20) + ":" + randomIntBetween(1, 100))); + } + + @Override + protected DeleteJobResponse doParseInstance(XContentParser parser) throws IOException { + return DeleteJobResponse.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index b678dce6cff..667932d5912 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -34,9 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class JobTests extends AbstractXContentTestCase { @@ -77,93 +75,6 @@ public class JobTests extends AbstractXContentTestCase { assertNotNull(Job.PARSER.apply(parser, null).build()); } - public void testEquals_GivenDifferentClass() { - Job job = buildJobBuilder("foo").build(); - assertFalse(job.equals("a string")); - } - - public void testEquals_GivenDifferentIds() { - Date createTime = new Date(); - Job.Builder builder = buildJobBuilder("foo"); - builder.setCreateTime(createTime); - Job job1 = builder.build(); - builder.setId("bar"); - Job job2 = builder.build(); - assertFalse(job1.equals(job2)); - } - - public void testEquals_GivenDifferentRenormalizationWindowDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setRenormalizationWindowDays(3L); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setRenormalizationWindowDays(4L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentBackgroundPersistInterval() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setBackgroundPersistInterval(TimeValue.timeValueSeconds(10000L)); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L)); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentModelSnapshotRetentionDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setModelSnapshotRetentionDays(10L); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setModelSnapshotRetentionDays(8L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentResultsRetentionDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setCreateTime(date); - jobDetails1.setResultsRetentionDays(30L); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setResultsRetentionDays(4L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentCustomSettings() { - Job.Builder jobDetails1 = buildJobBuilder("foo"); - Map customSettings1 = new HashMap<>(); - customSettings1.put("key1", "value1"); - jobDetails1.setCustomSettings(customSettings1); - Job.Builder jobDetails2 = buildJobBuilder("foo"); - Map customSettings2 = new HashMap<>(); - customSettings2.put("key2", "value2"); - jobDetails2.setCustomSettings(customSettings2); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - public void testCopyConstructor() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { Job job = createTestInstance(); @@ -184,20 +95,6 @@ public class JobTests extends AbstractXContentTestCase { assertEquals("[job_type] must not be null", ex.getMessage()); } - public static Job.Builder buildJobBuilder(String id, Date date) { - Job.Builder builder = new Job.Builder(id); - builder.setCreateTime(date); - AnalysisConfig.Builder ac = createAnalysisConfig(); - DataDescription.Builder dc = new DataDescription.Builder(); - builder.setAnalysisConfig(ac); - builder.setDataDescription(dc); - return builder; - } - - public static Job.Builder buildJobBuilder(String id) { - return buildJobBuilder(id, new Date()); - } - public static String randomValidJobId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); @@ -262,6 +159,9 @@ public class JobTests extends AbstractXContentTestCase { if (randomBoolean()) { builder.setResultsIndexName(randomValidJobId()); } + if (randomBoolean()) { + builder.setDeleting(randomBoolean()); + } return builder; } diff --git a/docs/java-rest/high-level/ml/delete-job.asciidoc b/docs/java-rest/high-level/ml/delete-job.asciidoc index 43f1e2fb02b..7cdc4149b23 100644 --- a/docs/java-rest/high-level/ml/delete-job.asciidoc +++ b/docs/java-rest/high-level/ml/delete-job.asciidoc @@ -4,26 +4,57 @@ [[java-rest-high-x-pack-machine-learning-delete-job-request]] ==== Delete Job Request -A `DeleteJobRequest` object requires a non-null `jobId` and can optionally set `force`. -Can be executed as follows: +A `DeleteJobRequest` object requires a non-null `jobId`. ["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request] --------------------------------------------------- +<1> Constructing a new request referencing an existing `jobId` + +==== Optional Arguments + +The following arguments are optional: + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-force] +--------------------------------------------------- <1> Use to forcefully delete an opened job; this method is quicker than closing and deleting the job. -Defaults to `false` +Defaults to `false`. + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-wait-for-completion] +--------------------------------------------------- +<1> Use to set whether the request should wait until the operation has completed before returning. +Defaults to `true`. + +[[java-rest-high-x-pack-machine-learning-delete-job-execution]] +==== Execution + +The request can be executed through the `MachineLearningClient` contained +in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-execute] +-------------------------------------------------- [[java-rest-high-x-pack-machine-learning-delete-job-response]] ==== Delete Job Response -The returned `AcknowledgedResponse` object indicates the acknowledgement of the request: +The returned `DeleteJobResponse` object contains the acknowledgement of the +job deletion or the deletion task depending on whether the request was set +to wait for completion: + ["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-response] --------------------------------------------------- -<1> `isAcknowledged` was the deletion request acknowledged or not +<1> whether was job deletion was acknowledged or not; will be `null` when set not to wait for completion +<2> the id of the job deletion task; will be `null` when set to wait for completion [[java-rest-high-x-pack-machine-learning-delete-job-async]] ==== Delete Job Asynchronously diff --git a/docs/reference/ml/apis/delete-job.asciidoc b/docs/reference/ml/apis/delete-job.asciidoc index d5ef120ad04..b9dbe9e3cd6 100644 --- a/docs/reference/ml/apis/delete-job.asciidoc +++ b/docs/reference/ml/apis/delete-job.asciidoc @@ -41,6 +41,9 @@ separated list. (boolean) Use to forcefully delete an opened job; this method is quicker than closing and deleting the job. +`wait_for_completion`:: + (boolean) Specifies whether the request should return immediately or wait + until the job deletion completes. Defaults to `true`. ==== Authorization @@ -66,4 +69,23 @@ When the job is deleted, you receive the following results: "acknowledged": true } ---- -// TESTRESPONSE \ No newline at end of file +// TESTRESPONSE + +In the next example we delete the `total-requests` job asynchronously: + +[source,js] +-------------------------------------------------- +DELETE _xpack/ml/anomaly_detectors/total-requests?wait_for_completion=false +-------------------------------------------------- +// CONSOLE +// TEST[skip:setup:server_metrics_job] + +When `wait_for_completion` is set to `false`, the response contains the id +of the job deletion task: +[source,js] +---- +{ + "task": "oTUltX4IQMOUUVeiohTt8A:39" +} +---- +// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/] \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskId.java b/server/src/main/java/org/elasticsearch/tasks/TaskId.java index 1aeceef247f..f92997b047c 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskId.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskId.java @@ -19,10 +19,13 @@ package org.elasticsearch.tasks; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ContextParser; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -96,6 +99,15 @@ public final class TaskId implements Writeable { out.writeLong(id); } + public static ContextParser parser() { + return (p, c) -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return new TaskId(p.text()); + } + throw new ElasticsearchParseException("Expected a string but found [{}] instead", p.currentToken()); + }; + } + public String getNodeId() { return nodeId; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java index a866ad9bb2d..46b68ce1602 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java @@ -76,7 +76,7 @@ public final class TaskResult implements Writeable, ToXContentObject { * Construct a {@linkplain TaskResult} for a task that completed successfully. */ public TaskResult(TaskInfo task, ToXContent response) throws IOException { - this(true, task, null, toXContent(response)); + this(true, task, null, XContentHelper.toXContent(response, Requests.INDEX_CONTENT_TYPE, true)); } private TaskResult(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) { @@ -222,16 +222,6 @@ public final class TaskResult implements Writeable, ToXContentObject { return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap()); } - private static BytesReference toXContent(ToXContent result) throws IOException { - try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { - // Elasticsearch's Response object never emit starting or ending objects. Most other implementers of ToXContent do.... - builder.startObject(); - result.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - return BytesReference.bytes(builder); - } - } - private static BytesReference toXContent(Exception error) throws IOException { try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 193695ac693..8d3c6a3565f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -91,9 +91,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { return groupOrJobLookup.expandJobIds(expression, allowNoJobs); } - public boolean isJobDeleted(String jobId) { + public boolean isJobDeleting(String jobId) { Job job = jobs.get(jobId); - return job == null || job.isDeleted(); + return job == null || job.isDeleting(); } public SortedMap getDatafeeds() { @@ -287,7 +287,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { if (job == null) { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } - if (job.isDeleted() == false) { + if (job.isDeleting() == false) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it hasn't marked as deleted"); } return this; @@ -318,7 +318,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { private void checkJobIsAvailableForDatafeed(String jobId) { Job job = jobs.get(jobId); - if (job == null || job.isDeleted()) { + if (job == null || job.isDeleting()) { throw ExceptionsHelper.missingJobException(jobId); } Optional existingDatafeed = getDatafeedByJobId(jobId); @@ -387,14 +387,14 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { return new MlMetadata(jobs, datafeeds); } - public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { + public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { Job job = jobs.get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } - if (job.isDeleted()) { + if (job.isDeleting()) { // Job still exists but is already being deleted - throw new JobAlreadyMarkedAsDeletedException(); + return; } checkJobHasNoDatafeed(jobId); @@ -408,7 +408,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { } } Job.Builder jobBuilder = new Job.Builder(job); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); putJob(jobBuilder.build(), true); } @@ -430,7 +430,4 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { } return mlMetadata; } - - public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java index 9fbde4721cd..6b279e08521 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java @@ -42,6 +42,11 @@ public class DeleteJobAction extends Action { private String jobId; private boolean force; + /** + * Should this task store its result? + */ + private boolean shouldStoreResult; + public Request(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); } @@ -64,6 +69,18 @@ public class DeleteJobAction extends Action { this.force = force; } + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a5293cdcbc7..5a352ab2665 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -75,7 +75,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); - public static final ParseField DELETED = new ParseField("deleted"); + public static final ParseField DELETING = new ParseField("deleting"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("jobs"); @@ -119,7 +119,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO parser.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION); parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); - parser.declareBoolean(Builder::setDeleted, DELETED); + parser.declareBoolean(Builder::setDeleting, DELETING); return parser; } @@ -152,14 +152,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private final String modelSnapshotId; private final Version modelSnapshotMinVersion; private final String resultsIndexName; - private final boolean deleted; + private final boolean deleting; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, Date createTime, Date finishedTime, Long establishedModelMemory, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleted) { + String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting) { this.jobId = jobId; this.jobType = jobType; @@ -181,7 +181,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.resultsIndexName = resultsIndexName; - this.deleted = deleted; + this.deleting = deleting; } public Job(StreamInput in) throws IOException { @@ -224,7 +224,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO modelSnapshotMinVersion = null; } resultsIndexName = in.readString(); - deleted = in.readBoolean(); + deleting = in.readBoolean(); } /** @@ -375,8 +375,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return modelSnapshotMinVersion; } - public boolean isDeleted() { - return deleted; + public boolean isDeleting() { + return deleting; } /** @@ -489,7 +489,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } } out.writeString(resultsIndexName); - out.writeBoolean(deleted); + out.writeBoolean(deleting); } @Override @@ -554,8 +554,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); } builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); - if (params.paramAsBoolean("all", false)) { - builder.field(DELETED.getPreferredName(), deleted); + if (deleting) { + builder.field(DELETING.getPreferredName(), deleting); } return builder; } @@ -591,7 +591,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleted, that.deleted); + && Objects.equals(this.deleting, that.deleting); } @Override @@ -599,7 +599,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -647,7 +647,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO private String modelSnapshotId; private Version modelSnapshotMinVersion; private String resultsIndexName; - private boolean deleted; + private boolean deleting; public Builder() { } @@ -677,7 +677,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO this.modelSnapshotId = job.getModelSnapshotId(); this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); - this.deleted = job.isDeleted(); + this.deleting = job.isDeleting(); } public Builder(StreamInput in) throws IOException { @@ -717,7 +717,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO modelSnapshotMinVersion = null; } resultsIndexName = in.readOptionalString(); - deleted = in.readBoolean(); + deleting = in.readBoolean(); } public Builder setId(String id) { @@ -834,8 +834,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return this; } - public Builder setDeleted(boolean deleted) { - this.deleted = deleted; + public Builder setDeleting(boolean deleting) { + this.deleting = deleting; return this; } @@ -911,7 +911,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } } out.writeOptionalString(resultsIndexName); - out.writeBoolean(deleted); + out.writeBoolean(deleting); } @Override @@ -972,8 +972,8 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO if (resultsIndexName != null) { builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); } - if (params.paramAsBoolean("all", false)) { - builder.field(DELETED.getPreferredName(), deleted); + if (deleting) { + builder.field(DELETING.getPreferredName(), deleting); } builder.endObject(); @@ -1006,7 +1006,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleted, that.deleted); + && Objects.equals(this.deleting, that.deleting); } @Override @@ -1014,7 +1014,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, - modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotMinVersion, resultsIndexName, deleting); } /** @@ -1127,7 +1127,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO id, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } private void checkValidBackgroundPersistInterval() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 3c571c9d605..b669e8f1edc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -67,6 +67,8 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; + public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''"; + public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}"; public static final String JOB_AUDIT_DELETED = "Job deleted"; public static final String JOB_AUDIT_KILLING = "Killing job"; public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java index 2f218cfb2dc..f3cd2abf461 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java @@ -12,7 +12,17 @@ import java.util.Map; public class JobDeletionTask extends Task { + private volatile boolean started; + public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); } + + public void start() { + started = true; + } + + public boolean isStarted() { + return started; + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 4d7c5cd058b..e9f9166a2a3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -16,9 +16,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; import org.junit.After; @@ -26,6 +26,8 @@ import java.io.IOException; import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; @@ -386,6 +388,41 @@ public class MlJobIT extends ESRestTestCase { String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); assertThat(indicesAfterDelete, containsString(indexName)); + waitUntilIndexIsEmpty(indexName); + + // check that the job itself is gone + expectThrows(ResponseException.class, () -> + client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + } + + public void testDeleteJobAsync() throws Exception { + String jobId = "delete-job-async-job"; + String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; + createFarequoteJob(jobId); + + String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); + assertThat(indicesBeforeDelete, containsString(indexName)); + + Response response = client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + + "?wait_for_completion=false")); + + // Wait for task to complete + String taskId = extractTaskId(response); + Response taskResponse = client().performRequest(new Request("GET", "_tasks/" + taskId + "?wait_for_completion=true")); + assertThat(EntityUtils.toString(taskResponse.getEntity()), containsString("\"acknowledged\":true")); + + // check that the index still exists (it's shared by default) + String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); + assertThat(indicesAfterDelete, containsString(indexName)); + + waitUntilIndexIsEmpty(indexName); + + // check that the job itself is gone + expectThrows(ResponseException.class, () -> + client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + } + + private void waitUntilIndexIsEmpty(String indexName) throws Exception { assertBusy(() -> { try { String count = EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity()); @@ -394,10 +431,14 @@ public class MlJobIT extends ESRestTestCase { fail(e.getMessage()); } }); + } - // check that the job itself is gone - expectThrows(ResponseException.class, () -> - client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + private static String extractTaskId(Response response) throws IOException { + String responseAsString = EntityUtils.toString(response.getEntity()); + Pattern matchTaskId = Pattern.compile(".*\"task\":.*\"(.*)\".*"); + Matcher taskIdMatcher = matchTaskId.matcher(responseAsString); + assertTrue(taskIdMatcher.matches()); + return taskIdMatcher.group(1); } public void testDeleteJobAfterMissingIndex() throws Exception { @@ -521,7 +562,7 @@ public class MlJobIT extends ESRestTestCase { } public void testDelete_multipleRequest() throws Exception { - String jobId = "delete-job-mulitple-times"; + String jobId = "delete-job-multiple-times"; createFarequoteJob(jobId); ConcurrentMapLong responses = ConcurrentCollections.newConcurrentMapLong(); @@ -532,8 +573,8 @@ public class MlJobIT extends ESRestTestCase { AtomicReference recreationException = new AtomicReference<>(); Runnable deleteJob = () -> { + boolean forceDelete = randomBoolean(); try { - boolean forceDelete = randomBoolean(); String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId; if (forceDelete) { url += "?force=true"; @@ -554,6 +595,7 @@ public class MlJobIT extends ESRestTestCase { } catch (ResponseException re) { recreationException.set(re); } catch (IOException e) { + logger.error("Error trying to recreate the job", e); ioe.set(e); } } @@ -563,14 +605,14 @@ public class MlJobIT extends ESRestTestCase { // the other to complete. This is difficult to schedule but // hopefully it will happen in CI int numThreads = 5; - Thread [] threads = new Thread[numThreads]; - for (int i=0; i jobIdProcessor = id -> { validateJobAndTaskState(id, mlMetadata, tasksMetaData); Job job = mlMetadata.getJobs().get(id); - if (job.isDeleted()) { + if (job.isDeleting()) { return; } addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 1d285b91f2f..89f42d62241 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -23,9 +23,9 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -34,9 +34,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -45,14 +45,13 @@ import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; @@ -72,10 +71,11 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -90,6 +90,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction>> listenersByJobId; + @Inject public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, @@ -101,6 +109,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @Override @@ -114,42 +123,8 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { - - ActionListener markAsDeletingListener = ActionListener.wrap( - response -> { - if (request.isForce()) { - forceDeleteJob(request, listener); - } else { - normalDeleteJob(request, listener); - } - }, - e -> { - if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) { - // Don't kick off a parallel deletion task, but just wait for - // the in-progress request to finish. This is much safer in the - // case where the job with the same name might be immediately - // recreated after the delete returns. However, if a force - // delete times out then eventually kick off a parallel delete - // in case the original completely failed for some reason. - waitForDeletingJob(request.getJobId(), MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT, - ActionListener.wrap( - listener::onResponse, - e2 -> { - if (request.isForce() && e2 instanceof TimeoutException) { - forceDeleteJob(request, listener); - } else { - listener.onFailure(e2); - } - } - )); - } else { - listener.onFailure(e); - } - }); - - markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); + protected ClusterBlockException checkBlock(DeleteJobAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } @Override @@ -158,13 +133,71 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + logger.debug("Deleting job '{}'", request.getJobId()); + + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId); + + // Check if there is a deletion task for this job already and if yes wait for it to complete + synchronized (listenersByJobId) { + if (listenersByJobId.containsKey(request.getJobId())) { + logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete", + request.getJobId(), task.getId()); + listenersByJobId.get(request.getJobId()).add(listener); + return; + } else { + List> listeners = new ArrayList<>(); + listeners.add(listener); + listenersByJobId.put(request.getJobId(), listeners); + } + } + + auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId)); + + // The listener that will be executed at the end of the chain will notify all listeners + ActionListener finalListener = ActionListener.wrap( + ack -> notifyListeners(request.getJobId(), ack, null), + e -> notifyListeners(request.getJobId(), null, e) + ); + + ActionListener markAsDeletingListener = ActionListener.wrap( + response -> { + if (request.isForce()) { + forceDeleteJob(parentTaskClient, request, finalListener); + } else { + normalDeleteJob(parentTaskClient, request, finalListener); + } + }, + e -> { + auditor.error(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING_FAILED, e.getMessage())); + finalListener.onFailure(e); + }); + + markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); } - private void normalDeleteJob(DeleteJobAction.Request request, ActionListener listener) { + private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) { + synchronized (listenersByJobId) { + List> listeners = listenersByJobId.remove(jobId); + if (listeners == null) { + logger.error("[{}] No deletion job listeners could be found", jobId); + return; + } + for (ActionListener listener : listeners) { + if (error != null) { + listener.onFailure(error); + } else { + listener.onResponse(ack); + } + } + } + } + + private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, + ActionListener listener) { String jobId = request.getJobId(); - logger.debug("Deleting job '" + jobId + "'"); // Step 4. When the job has been removed from the cluster state, return a response // ------- @@ -212,10 +245,11 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler, Consumer failureHandler) { + private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId, + CheckedConsumer finishedHandler, Consumer failureHandler) { final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId); final String indexPattern = indexName + "-*"; @@ -241,7 +275,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeActionwrap( response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias failureHandler), - client.admin().indices()::delete); + parentTaskClient.admin().indices()::delete); } }, failure -> { @@ -312,7 +346,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction deleteQuantilesHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), + response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler), failureHandler); // Step 2. Delete state done, delete the quantiles ActionListener deleteStateHandler = ActionListener.wrap( - bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler), + bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler), failureHandler); // Step 1. Delete the model state - deleteModelState(jobId, client, deleteStateHandler); + deleteModelState(parentTaskClient, jobId, deleteStateHandler); } - private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { + private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener finishedHandler) { // The quantiles type and doc ID changed in v5.5 so delete both the old and new format DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace @@ -344,7 +378,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler.onResponse(true), e -> { // It's not a problem for us if the index wasn't found - it's equivalent to document not found @@ -356,19 +390,20 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener listener) { GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( response -> { List deleteCandidates = response.getPage().results(); - JobDataDeleter deleter = new JobDataDeleter(client, jobId); + JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId); deleter.deleteModelSnapshots(deleteCandidates, listener); }, listener::onFailure)); } - private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { + private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum, + ActionListener finishedHandler) { // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace @@ -380,13 +415,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction { // If we successfully deleted a document try the next one; if not we're done if (response.getDeleted() > 0) { // There's an assumption here that there won't be very many categorizer // state documents, so the recursion won't go more than, say, 5 levels deep - deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); + deleteCategorizerState(parentTaskClient, jobId, docNum + 1, finishedHandler); return; } finishedHandler.onResponse(true); @@ -401,14 +436,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction finishedHandler) { + private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId, + ActionListener finishedHandler) { final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); // first find the concrete indices associated with the aliases GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName) .indicesOptions(IndicesOptions.lenientExpandOpen()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, + executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, ActionListener.wrap( getAliasesResponse -> { // remove the aliases from the concrete indices found in the first step @@ -419,13 +455,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeActionwrap( finishedHandler::onResponse, finishedHandler::onFailure), - client.admin().indices()::aliases); + parentTaskClient.admin().indices()::aliases); }, - finishedHandler::onFailure), client.admin().indices()::getAliases); + finishedHandler::onFailure), parentTaskClient.admin().indices()::getAliases); } private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) { @@ -445,7 +481,10 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, + ActionListener listener) { + + logger.debug("Force deleting job [{}]", request.getJobId()); final ClusterState state = clusterService.state(); final String jobId = request.getJobId(); @@ -454,13 +493,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction removeTaskListener = new ActionListener() { @Override public void onResponse(Boolean response) { - normalDeleteJob(request, listener); + normalDeleteJob(parentTaskClient, request, listener); } @Override public void onFailure(Exception e) { if (e instanceof ResourceNotFoundException) { - normalDeleteJob(request, listener); + normalDeleteJob(parentTaskClient, request, listener); } else { listener.onFailure(e); } @@ -483,12 +522,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { + private void killProcess(ParentTaskAssigningClient parentTaskClient, String jobId, + ActionListener listener) { KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId); - executeAsyncWithOrigin(client, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener); + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener); } private void removePersistentTask(String jobId, ClusterState currentState, @@ -520,7 +560,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); - - ClusterState clusterState = stateObserver.setAndGetObservedState(); - if (jobIsDeletedFromState(jobId, clusterState)) { - listener.onResponse(new AcknowledgedResponse(true)); - } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new TimeoutException("timed out after " + timeout)); - } - }, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout); - } - } - static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) { return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index ab1ef73780e..7217fcc6ec9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -183,6 +183,6 @@ public class TransportGetJobsStatsAction extends TransportTasksAction stats) { Set excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet()); return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) && - !mlMetadata.isJobDeleted(jobId)).collect(Collectors.toList()); + !mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 512d8188abf..42b67b29173 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -127,8 +127,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel)); + + if (restRequest.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel)); + } else { + deleteJobRequest.setShouldStoreResult(true); + + Task task = client.executeLocally(DeleteJobAction.INSTANCE, deleteJobRequest, nullTaskListener()); + // Send task description id instead of waiting for the message + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", client.getLocalNodeId() + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; + } + } + + // We do not want to log anything due to a delete action + // The response or error will be returned to the client when called synchronously + // or it will be stored in the task result when called asynchronously + private static TaskListener nullTaskListener() { + return new TaskListener() { + @Override + public void onResponse(Task task, Object o) {} + + @Override + public void onFailure(Task task, Throwable e) {} + }; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index e16ac2f9970..82478fbf5d3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -124,7 +124,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testRemoveJob() { Job.Builder jobBuilder = buildJobBuilder("1"); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); Job job1 = jobBuilder.build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); @@ -206,7 +206,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { } public void testPutDatafeed_failBecauseJobIsBeingDeleted() { - Job job1 = createDatafeedJob().setDeleted(true).build(new Date()); + Job job1 = createDatafeedJob().setDeleting(true).build(new Date()); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 2e00ad71251..6d4b008570c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -28,7 +28,7 @@ public class TransportGetJobsStatsActionTests extends ESTestCase { public void testDetermineJobIds() { MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.isJobDeleted(eq("id4"))).thenReturn(true); + when(mlMetadata.isJobDeleting(eq("id4"))).thenReturn(true); List result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Collections.singletonList("id1"), Collections.emptyList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 58b60273b0e..4dd41363b73 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -79,14 +79,14 @@ public class TransportOpenJobActionTests extends ESTestCase { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", mlBuilder.build())); } - public void testValidate_jobMarkedAsDeleted() { + public void testValidate_jobMarkedAsDeleting() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); Job.Builder jobBuilder = buildJobBuilder("job_id"); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); mlBuilder.putJob(jobBuilder.build(), false); Exception e = expectThrows(ElasticsearchStatusException.class, () -> TransportOpenJobAction.validate("job_id", mlBuilder.build())); - assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); + assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage()); } public void testValidate_jobWithoutVersion() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java deleted file mode 100644 index ed23a5328ae..00000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.integration; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -public class DeleteJobIT extends BaseMlIntegTestCase { - - public void testWaitForDelete() throws ExecutionException, InterruptedException { - final String jobId = "wait-for-delete-job"; - Job.Builder job = createJob(jobId); - PutJobAction.Request putJobRequest = new PutJobAction.Request(job); - client().execute(PutJobAction.INSTANCE, putJobRequest).get(); - - AtomicReference exceptionHolder = new AtomicReference<>(); - CountDownLatch markAsDeletedLatch = new CountDownLatch(1); - clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return markJobAsDeleted(jobId, currentState); - } - - @Override - public void onFailure(String source, Exception e) { - markAsDeletedLatch.countDown(); - exceptionHolder.set(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - markAsDeletedLatch.countDown(); - } - }); - - assertTrue("Timed out waiting for state update", markAsDeletedLatch.await(5, TimeUnit.SECONDS)); - assertNull("mark-job-as-deleted task failed: " + exceptionHolder.get(), exceptionHolder.get()); - - // Job is marked as deleting so now a delete request should wait for it. - AtomicBoolean isDeleted = new AtomicBoolean(false); - AtomicReference deleteFailure = new AtomicReference<>(); - ActionListener deleteListener = new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - isDeleted.compareAndSet(false, response.isAcknowledged()); - } - - @Override - public void onFailure(Exception e) { - deleteFailure.set(e); - } - }; - - client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId), deleteListener); - awaitBusy(isDeleted::get, 1, TimeUnit.SECONDS); - // still waiting - assertFalse(isDeleted.get()); - - CountDownLatch removeJobLatch = new CountDownLatch(1); - clusterService().submitStateUpdateTask("remove-job-from-state", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - assertFalse(isDeleted.get()); - return removeJobFromClusterState(jobId, currentState); - } - - @Override - public void onFailure(String source, Exception e) { - removeJobLatch.countDown(); - exceptionHolder.set(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - removeJobLatch.countDown(); - } - }); - - assertTrue("Timed out waiting for remove job from state response", removeJobLatch.await(5, TimeUnit.SECONDS)); - assertNull("remove-job-from-state task failed: " + exceptionHolder.get(), exceptionHolder.get()); - - assertNull("Job deletion failed: " + deleteFailure.get(), deleteFailure.get()); - assertTrue("Job was not deleted", isDeleted.get()); - } - - private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - assertNotNull(mlMetadata); - - MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata); - PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - builder.markJobAsDeleted(jobId, tasks, true); - - ClusterState.Builder newState = ClusterState.builder(currentState); - return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()) - .build(); - } - - private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - - ClusterState.Builder newState = ClusterState.builder(currentState); - return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()) - .build(); - } -} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json index 77eb89c00f9..f93fff6eaab 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json @@ -15,8 +15,13 @@ "params": { "force": { "type": "boolean", - "required": false, - "description": "True if the job should be forcefully deleted" + "description": "True if the job should be forcefully deleted", + "default": false + }, + "wait_for_completion": { + "type": "boolean", + "description": "Should this request wait until the operation has completed before returning", + "default": true } } },