[ML] Allow asynchronous job deletion (#34058)

This changes the delete job API by adding
the choice to delete a job asynchronously.
The commit adds a `wait_for_completion` parameter
to the delete job request. When set to `false`,
the action returns immediately and the response
contains the task id.

This also changes the handling of subsequent
delete requests for a job that is already being
deleted. It now uses the task framework to check
if the job is being deleted instead of the cluster
state. This is a beneficial for it is going to also
be working once the job configs are moved out of the
cluster state and into an index. Also, force delete
requests that are waiting for the job to be deleted
will not proceed with the deletion if the first task
fails. This will prevent overloading the cluster. Instead,
the failure is communicated better via notifications
so that the user may retry.

Finally, this makes the `deleting` property of the job
visible (also it was renamed from `deleted`). This allows
a client to render a deleting job differently.

Closes #32836
This commit is contained in:
Dimitris Athanasiou 2018-10-05 02:41:28 +03:00 committed by GitHub
parent 026488bcbf
commit 4dacfa95d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 618 additions and 435 deletions

View File

@ -148,7 +148,12 @@ final class MLRequestConverters {
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
params.putParam("force", Boolean.toString(deleteJobRequest.isForce()));
if (deleteJobRequest.getForce() != null) {
params.putParam("force", Boolean.toString(deleteJobRequest.getForce()));
}
if (deleteJobRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion()));
}
return request;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest;
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
@ -211,14 +212,15 @@ public final class MachineLearningClient {
*
* @param request The request to delete the job
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return action acknowledgement
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
* completion
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
Collections.emptySet());
}
@ -232,11 +234,11 @@ public final class MachineLearningClient {
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<DeleteJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
listener,
Collections.emptySet());
}

View File

@ -29,7 +29,8 @@ import java.util.Objects;
public class DeleteJobRequest extends ActionRequest {
private String jobId;
private boolean force;
private Boolean force;
private Boolean waitForCompletion;
public DeleteJobRequest(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
@ -47,7 +48,7 @@ public class DeleteJobRequest extends ActionRequest {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
}
public boolean isForce() {
public Boolean getForce() {
return force;
}
@ -57,10 +58,24 @@ public class DeleteJobRequest extends ActionRequest {
*
* @param force When {@code true} forcefully delete an opened job. Defaults to {@code false}
*/
public void setForce(boolean force) {
public void setForce(Boolean force) {
this.force = force;
}
public Boolean getWaitForCompletion() {
return waitForCompletion;
}
/**
* Set whether this request should wait until the operation has completed before returning
* @param waitForCompletion When {@code true} the call will wait for the job deletion to complete.
* Otherwise, the deletion will be executed asynchronously and the response
* will contain the task id.
*/
public void setWaitForCompletion(Boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Objects;
/**
* Response object that contains the acknowledgement or the task id
* depending on whether the delete job action was requested to wait for completion.
*/
public class DeleteJobResponse extends ActionResponse implements ToXContentObject {
private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
private static final ParseField TASK = new ParseField("task");
public static final ConstructingObjectParser<DeleteJobResponse, Void> PARSER = new ConstructingObjectParser<>("delete_job_response",
true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1]));
static {
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
}
public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
private final Boolean acknowledged;
private final TaskId task;
DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
assert acknowledged != null || task != null;
this.acknowledged = acknowledged;
this.task = task;
}
/**
* Get the action acknowledgement
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or
* otherwise a {@code boolean} that indicates whether the job was deleted successfully.
*/
public Boolean getAcknowledged() {
return acknowledged;
}
/**
* Get the task id
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or
* otherwise the id of the job deletion task.
*/
public TaskId getTask() {
return task;
}
@Override
public int hashCode() {
return Objects.hash(acknowledged, task);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DeleteJobResponse that = (DeleteJobResponse) other;
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (acknowledged != null) {
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
}
if (task != null) {
builder.field(TASK.getPreferredName(), task.toString());
}
builder.endObject();
return builder;
}
}

View File

@ -65,6 +65,7 @@ public class Job implements ToXContentObject {
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);
@ -94,6 +95,7 @@ public class Job implements ToXContentObject {
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
PARSER.declareBoolean(Builder::setDeleting, DELETING);
}
private final String jobId;
@ -115,13 +117,14 @@ public class Job implements ToXContentObject {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final String resultsIndexName;
private final Boolean deleting;
private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime, Long establishedModelMemory,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName) {
String modelSnapshotId, String resultsIndexName, Boolean deleting) {
this.jobId = jobId;
this.jobType = jobType;
@ -141,6 +144,7 @@ public class Job implements ToXContentObject {
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
this.modelSnapshotId = modelSnapshotId;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
}
/**
@ -275,6 +279,10 @@ public class Job implements ToXContentObject {
return modelSnapshotId;
}
public Boolean getDeleting() {
return deleting;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -330,6 +338,9 @@ public class Job implements ToXContentObject {
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
if (deleting != null) {
builder.field(DELETING.getPreferredName(), deleting);
}
builder.endObject();
return builder;
}
@ -362,7 +373,8 @@ public class Job implements ToXContentObject {
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName);
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
}
@Override
@ -370,7 +382,7 @@ public class Job implements ToXContentObject {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}
@Override
@ -402,6 +414,7 @@ public class Job implements ToXContentObject {
private Map<String, Object> customSettings;
private String modelSnapshotId;
private String resultsIndexName;
private Boolean deleting;
private Builder() {
}
@ -429,6 +442,7 @@ public class Job implements ToXContentObject {
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.getDeleting();
}
public Builder setId(String id) {
@ -525,6 +539,11 @@ public class Job implements ToXContentObject {
return this;
}
Builder setDeleting(Boolean deleting) {
this.deleting = deleting;
return this;
}
/**
* Builds a job.
*
@ -537,7 +556,7 @@ public class Job implements ToXContentObject {
id, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}
}
}

View File

@ -164,11 +164,18 @@ public class MLRequestConvertersTests extends ESTestCase {
Request request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint());
assertEquals(Boolean.toString(false), request.getParameters().get("force"));
assertNull(request.getParameters().get("force"));
assertNull(request.getParameters().get("wait_for_completion"));
deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setForce(true);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion"));
}
public void testFlushJob() throws Exception {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest;
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
@ -151,17 +152,33 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
}
public void testDeleteJob() throws Exception {
public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
AcknowledgedResponse response = execute(new DeleteJobRequest(jobId),
DeleteJobResponse response = execute(new DeleteJobRequest(jobId),
machineLearningClient::deleteJob,
machineLearningClient::deleteJobAsync);
assertTrue(response.isAcknowledged());
assertTrue(response.getAcknowledged());
assertNull(response.getTask());
}
public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);
DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);
assertNull(response.getAcknowledged());
assertNotNull(response.getTask());
}
public void testOpenJob() throws Exception {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.client.ml.DeleteCalendarRequest;
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
@ -108,6 +109,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId;
import org.junit.After;
import java.io.IOException;
@ -281,20 +283,34 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
{
//tag::x-pack-delete-ml-job-request
DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job");
deleteJobRequest.setForce(false); // <1>
AcknowledgedResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT);
DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job"); // <1>
//end::x-pack-delete-ml-job-request
//tag::x-pack-delete-ml-job-request-force
deleteJobRequest.setForce(false); // <1>
//end::x-pack-delete-ml-job-request-force
//tag::x-pack-delete-ml-job-request-wait-for-completion
deleteJobRequest.setWaitForCompletion(true); // <1>
//end::x-pack-delete-ml-job-request-wait-for-completion
//tag::x-pack-delete-ml-job-execute
DeleteJobResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT);
//end::x-pack-delete-ml-job-execute
//tag::x-pack-delete-ml-job-response
boolean isAcknowledged = deleteJobResponse.isAcknowledged(); // <1>
Boolean isAcknowledged = deleteJobResponse.getAcknowledged(); // <1>
TaskId task = deleteJobResponse.getTask(); // <2>
//end::x-pack-delete-ml-job-response
assertTrue(isAcknowledged);
assertNull(task);
}
{
//tag::x-pack-delete-ml-job-request-listener
ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
ActionListener<DeleteJobResponse> listener = new ActionListener<DeleteJobResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onResponse(DeleteJobResponse deleteJobResponse) {
// <1>
}

View File

@ -34,12 +34,4 @@ public class DeleteJobRequestTests extends ESTestCase {
ex = expectThrows(NullPointerException.class, () -> createTestInstance().setJobId(null));
assertEquals("[job_id] must not be null", ex.getMessage());
}
public void test_WithForce() {
DeleteJobRequest deleteJobRequest = createTestInstance();
assertFalse(deleteJobRequest.isForce());
deleteJobRequest.setForce(true);
assertTrue(deleteJobRequest.isForce());
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
public class DeleteJobResponseTests extends AbstractXContentTestCase<DeleteJobResponse> {
@Override
protected DeleteJobResponse createTestInstance() {
if (randomBoolean()) {
return new DeleteJobResponse(randomBoolean(), null);
}
return new DeleteJobResponse(null, new TaskId(randomAlphaOfLength(20) + ":" + randomIntBetween(1, 100)));
}
@Override
protected DeleteJobResponse doParseInstance(XContentParser parser) throws IOException {
return DeleteJobResponse.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -34,9 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JobTests extends AbstractXContentTestCase<Job> {
@ -77,93 +75,6 @@ public class JobTests extends AbstractXContentTestCase<Job> {
assertNotNull(Job.PARSER.apply(parser, null).build());
}
public void testEquals_GivenDifferentClass() {
Job job = buildJobBuilder("foo").build();
assertFalse(job.equals("a string"));
}
public void testEquals_GivenDifferentIds() {
Date createTime = new Date();
Job.Builder builder = buildJobBuilder("foo");
builder.setCreateTime(createTime);
Job job1 = builder.build();
builder.setId("bar");
Job job2 = builder.build();
assertFalse(job1.equals(job2));
}
public void testEquals_GivenDifferentRenormalizationWindowDays() {
Date date = new Date();
Job.Builder jobDetails1 = new Job.Builder("foo");
jobDetails1.setDataDescription(new DataDescription.Builder());
jobDetails1.setAnalysisConfig(createAnalysisConfig());
jobDetails1.setRenormalizationWindowDays(3L);
jobDetails1.setCreateTime(date);
Job.Builder jobDetails2 = new Job.Builder("foo");
jobDetails2.setDataDescription(new DataDescription.Builder());
jobDetails2.setRenormalizationWindowDays(4L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
}
public void testEquals_GivenDifferentBackgroundPersistInterval() {
Date date = new Date();
Job.Builder jobDetails1 = new Job.Builder("foo");
jobDetails1.setDataDescription(new DataDescription.Builder());
jobDetails1.setAnalysisConfig(createAnalysisConfig());
jobDetails1.setBackgroundPersistInterval(TimeValue.timeValueSeconds(10000L));
jobDetails1.setCreateTime(date);
Job.Builder jobDetails2 = new Job.Builder("foo");
jobDetails2.setDataDescription(new DataDescription.Builder());
jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L));
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
}
public void testEquals_GivenDifferentModelSnapshotRetentionDays() {
Date date = new Date();
Job.Builder jobDetails1 = new Job.Builder("foo");
jobDetails1.setDataDescription(new DataDescription.Builder());
jobDetails1.setAnalysisConfig(createAnalysisConfig());
jobDetails1.setModelSnapshotRetentionDays(10L);
jobDetails1.setCreateTime(date);
Job.Builder jobDetails2 = new Job.Builder("foo");
jobDetails2.setDataDescription(new DataDescription.Builder());
jobDetails2.setModelSnapshotRetentionDays(8L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
}
public void testEquals_GivenDifferentResultsRetentionDays() {
Date date = new Date();
Job.Builder jobDetails1 = new Job.Builder("foo");
jobDetails1.setDataDescription(new DataDescription.Builder());
jobDetails1.setAnalysisConfig(createAnalysisConfig());
jobDetails1.setCreateTime(date);
jobDetails1.setResultsRetentionDays(30L);
Job.Builder jobDetails2 = new Job.Builder("foo");
jobDetails2.setDataDescription(new DataDescription.Builder());
jobDetails2.setResultsRetentionDays(4L);
jobDetails2.setAnalysisConfig(createAnalysisConfig());
jobDetails2.setCreateTime(date);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
}
public void testEquals_GivenDifferentCustomSettings() {
Job.Builder jobDetails1 = buildJobBuilder("foo");
Map<String, Object> customSettings1 = new HashMap<>();
customSettings1.put("key1", "value1");
jobDetails1.setCustomSettings(customSettings1);
Job.Builder jobDetails2 = buildJobBuilder("foo");
Map<String, Object> customSettings2 = new HashMap<>();
customSettings2.put("key2", "value2");
jobDetails2.setCustomSettings(customSettings2);
assertFalse(jobDetails1.build().equals(jobDetails2.build()));
}
public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
Job job = createTestInstance();
@ -184,20 +95,6 @@ public class JobTests extends AbstractXContentTestCase<Job> {
assertEquals("[job_type] must not be null", ex.getMessage());
}
public static Job.Builder buildJobBuilder(String id, Date date) {
Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(date);
AnalysisConfig.Builder ac = createAnalysisConfig();
DataDescription.Builder dc = new DataDescription.Builder();
builder.setAnalysisConfig(ac);
builder.setDataDescription(dc);
return builder;
}
public static Job.Builder buildJobBuilder(String id) {
return buildJobBuilder(id, new Date());
}
public static String randomValidJobId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
@ -262,6 +159,9 @@ public class JobTests extends AbstractXContentTestCase<Job> {
if (randomBoolean()) {
builder.setResultsIndexName(randomValidJobId());
}
if (randomBoolean()) {
builder.setDeleting(randomBoolean());
}
return builder;
}

View File

@ -4,26 +4,57 @@
[[java-rest-high-x-pack-machine-learning-delete-job-request]]
==== Delete Job Request
A `DeleteJobRequest` object requires a non-null `jobId` and can optionally set `force`.
Can be executed as follows:
A `DeleteJobRequest` object requires a non-null `jobId`.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request]
---------------------------------------------------
<1> Constructing a new request referencing an existing `jobId`
==== Optional Arguments
The following arguments are optional:
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-force]
---------------------------------------------------
<1> Use to forcefully delete an opened job;
this method is quicker than closing and deleting the job.
Defaults to `false`
Defaults to `false`.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-wait-for-completion]
---------------------------------------------------
<1> Use to set whether the request should wait until the operation has completed before returning.
Defaults to `true`.
[[java-rest-high-x-pack-machine-learning-delete-job-execution]]
==== Execution
The request can be executed through the `MachineLearningClient` contained
in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-execute]
--------------------------------------------------
[[java-rest-high-x-pack-machine-learning-delete-job-response]]
==== Delete Job Response
The returned `AcknowledgedResponse` object indicates the acknowledgement of the request:
The returned `DeleteJobResponse` object contains the acknowledgement of the
job deletion or the deletion task depending on whether the request was set
to wait for completion:
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-response]
---------------------------------------------------
<1> `isAcknowledged` was the deletion request acknowledged or not
<1> whether was job deletion was acknowledged or not; will be `null` when set not to wait for completion
<2> the id of the job deletion task; will be `null` when set to wait for completion
[[java-rest-high-x-pack-machine-learning-delete-job-async]]
==== Delete Job Asynchronously

View File

@ -41,6 +41,9 @@ separated list.
(boolean) Use to forcefully delete an opened job; this method is quicker than
closing and deleting the job.
`wait_for_completion`::
(boolean) Specifies whether the request should return immediately or wait
until the job deletion completes. Defaults to `true`.
==== Authorization
@ -66,4 +69,23 @@ When the job is deleted, you receive the following results:
"acknowledged": true
}
----
// TESTRESPONSE
// TESTRESPONSE
In the next example we delete the `total-requests` job asynchronously:
[source,js]
--------------------------------------------------
DELETE _xpack/ml/anomaly_detectors/total-requests?wait_for_completion=false
--------------------------------------------------
// CONSOLE
// TEST[skip:setup:server_metrics_job]
When `wait_for_completion` is set to `false`, the response contains the id
of the job deletion task:
[source,js]
----
{
"task": "oTUltX4IQMOUUVeiohTt8A:39"
}
----
// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]

View File

@ -19,10 +19,13 @@
package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -96,6 +99,15 @@ public final class TaskId implements Writeable {
out.writeLong(id);
}
public static ContextParser<Void, TaskId> parser() {
return (p, c) -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return new TaskId(p.text());
}
throw new ElasticsearchParseException("Expected a string but found [{}] instead", p.currentToken());
};
}
public String getNodeId() {
return nodeId;
}

View File

@ -76,7 +76,7 @@ public final class TaskResult implements Writeable, ToXContentObject {
* Construct a {@linkplain TaskResult} for a task that completed successfully.
*/
public TaskResult(TaskInfo task, ToXContent response) throws IOException {
this(true, task, null, toXContent(response));
this(true, task, null, XContentHelper.toXContent(response, Requests.INDEX_CONTENT_TYPE, true));
}
private TaskResult(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) {
@ -222,16 +222,6 @@ public final class TaskResult implements Writeable, ToXContentObject {
return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap());
}
private static BytesReference toXContent(ToXContent result) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
// Elasticsearch's Response object never emit starting or ending objects. Most other implementers of ToXContent do....
builder.startObject();
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return BytesReference.bytes(builder);
}
}
private static BytesReference toXContent(Exception error) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
builder.startObject();

View File

@ -91,9 +91,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
return groupOrJobLookup.expandJobIds(expression, allowNoJobs);
}
public boolean isJobDeleted(String jobId) {
public boolean isJobDeleting(String jobId) {
Job job = jobs.get(jobId);
return job == null || job.isDeleted();
return job == null || job.isDeleting();
}
public SortedMap<String, DatafeedConfig> getDatafeeds() {
@ -287,7 +287,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
if (job == null) {
throw new ResourceNotFoundException("job [" + jobId + "] does not exist");
}
if (job.isDeleted() == false) {
if (job.isDeleting() == false) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it hasn't marked as deleted");
}
return this;
@ -318,7 +318,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
private void checkJobIsAvailableForDatafeed(String jobId) {
Job job = jobs.get(jobId);
if (job == null || job.isDeleted()) {
if (job == null || job.isDeleting()) {
throw ExceptionsHelper.missingJobException(jobId);
}
Optional<DatafeedConfig> existingDatafeed = getDatafeedByJobId(jobId);
@ -387,14 +387,14 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
return new MlMetadata(jobs, datafeeds);
}
public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
Job job = jobs.get(jobId);
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (job.isDeleted()) {
if (job.isDeleting()) {
// Job still exists but is already being deleted
throw new JobAlreadyMarkedAsDeletedException();
return;
}
checkJobHasNoDatafeed(jobId);
@ -408,7 +408,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
}
}
Job.Builder jobBuilder = new Job.Builder(job);
jobBuilder.setDeleted(true);
jobBuilder.setDeleting(true);
putJob(jobBuilder.build(), true);
}
@ -430,7 +430,4 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
}
return mlMetadata;
}
public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
}
}

View File

@ -42,6 +42,11 @@ public class DeleteJobAction extends Action<AcknowledgedResponse> {
private String jobId;
private boolean force;
/**
* Should this task store its result?
*/
private boolean shouldStoreResult;
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
@ -64,6 +69,18 @@ public class DeleteJobAction extends Action<AcknowledgedResponse> {
this.force = force;
}
/**
* Should this task store its result after it has finished?
*/
public void setShouldStoreResult(boolean shouldStoreResult) {
this.shouldStoreResult = shouldStoreResult;
}
@Override
public boolean getShouldStoreResult() {
return shouldStoreResult;
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -75,7 +75,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETED = new ParseField("deleted");
public static final ParseField DELETING = new ParseField("deleting");
// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("jobs");
@ -119,7 +119,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
parser.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION);
parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
parser.declareBoolean(Builder::setDeleted, DELETED);
parser.declareBoolean(Builder::setDeleting, DELETING);
return parser;
}
@ -152,14 +152,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private final String modelSnapshotId;
private final Version modelSnapshotMinVersion;
private final String resultsIndexName;
private final boolean deleted;
private final boolean deleting;
private Job(String jobId, String jobType, Version jobVersion, List<String> groups, String description,
Date createTime, Date finishedTime, Long establishedModelMemory,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleted) {
String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting) {
this.jobId = jobId;
this.jobType = jobType;
@ -181,7 +181,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.modelSnapshotId = modelSnapshotId;
this.modelSnapshotMinVersion = modelSnapshotMinVersion;
this.resultsIndexName = resultsIndexName;
this.deleted = deleted;
this.deleting = deleting;
}
public Job(StreamInput in) throws IOException {
@ -224,7 +224,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
modelSnapshotMinVersion = null;
}
resultsIndexName = in.readString();
deleted = in.readBoolean();
deleting = in.readBoolean();
}
/**
@ -375,8 +375,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return modelSnapshotMinVersion;
}
public boolean isDeleted() {
return deleted;
public boolean isDeleting() {
return deleting;
}
/**
@ -489,7 +489,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
}
out.writeString(resultsIndexName);
out.writeBoolean(deleted);
out.writeBoolean(deleting);
}
@Override
@ -554,8 +554,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion);
}
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
if (params.paramAsBoolean("all", false)) {
builder.field(DELETED.getPreferredName(), deleted);
if (deleting) {
builder.field(DELETING.getPreferredName(), deleting);
}
return builder;
}
@ -591,7 +591,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleted, that.deleted);
&& Objects.equals(this.deleting, that.deleting);
}
@Override
@ -599,7 +599,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@ -647,7 +647,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
private String modelSnapshotId;
private Version modelSnapshotMinVersion;
private String resultsIndexName;
private boolean deleted;
private boolean deleting;
public Builder() {
}
@ -677,7 +677,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
this.modelSnapshotId = job.getModelSnapshotId();
this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleted = job.isDeleted();
this.deleting = job.isDeleting();
}
public Builder(StreamInput in) throws IOException {
@ -717,7 +717,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
modelSnapshotMinVersion = null;
}
resultsIndexName = in.readOptionalString();
deleted = in.readBoolean();
deleting = in.readBoolean();
}
public Builder setId(String id) {
@ -834,8 +834,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return this;
}
public Builder setDeleted(boolean deleted) {
this.deleted = deleted;
public Builder setDeleting(boolean deleting) {
this.deleting = deleting;
return this;
}
@ -911,7 +911,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
}
}
out.writeOptionalString(resultsIndexName);
out.writeBoolean(deleted);
out.writeBoolean(deleting);
}
@Override
@ -972,8 +972,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
if (params.paramAsBoolean("all", false)) {
builder.field(DELETED.getPreferredName(), deleted);
if (deleting) {
builder.field(DELETING.getPreferredName(), deleting);
}
builder.endObject();
@ -1006,7 +1006,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleted, that.deleted);
&& Objects.equals(this.deleting, that.deleting);
}
@Override
@ -1014,7 +1014,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription,
createTime, finishedTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId,
modelSnapshotMinVersion, resultsIndexName, deleted);
modelSnapshotMinVersion, resultsIndexName, deleting);
}
/**
@ -1127,7 +1127,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
id, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted);
modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting);
}
private void checkValidBackgroundPersistInterval() {

View File

@ -67,6 +67,8 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
public static final String JOB_AUDIT_DELETED = "Job deleted";
public static final String JOB_AUDIT_KILLING = "Killing job";
public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}";

View File

@ -12,7 +12,17 @@ import java.util.Map;
public class JobDeletionTask extends Task {
private volatile boolean started;
public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
}
public void start() {
started = true;
}
public boolean isStarted() {
return started;
}
}

View File

@ -16,9 +16,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
import org.junit.After;
@ -26,6 +26,8 @@ import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
@ -386,6 +388,41 @@ public class MlJobIT extends ESRestTestCase {
String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesAfterDelete, containsString(indexName));
waitUntilIndexIsEmpty(indexName);
// check that the job itself is gone
expectThrows(ResponseException.class, () ->
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}
public void testDeleteJobAsync() throws Exception {
String jobId = "delete-job-async-job";
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesBeforeDelete, containsString(indexName));
Response response = client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId
+ "?wait_for_completion=false"));
// Wait for task to complete
String taskId = extractTaskId(response);
Response taskResponse = client().performRequest(new Request("GET", "_tasks/" + taskId + "?wait_for_completion=true"));
assertThat(EntityUtils.toString(taskResponse.getEntity()), containsString("\"acknowledged\":true"));
// check that the index still exists (it's shared by default)
String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesAfterDelete, containsString(indexName));
waitUntilIndexIsEmpty(indexName);
// check that the job itself is gone
expectThrows(ResponseException.class, () ->
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}
private void waitUntilIndexIsEmpty(String indexName) throws Exception {
assertBusy(() -> {
try {
String count = EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity());
@ -394,10 +431,14 @@ public class MlJobIT extends ESRestTestCase {
fail(e.getMessage());
}
});
}
// check that the job itself is gone
expectThrows(ResponseException.class, () ->
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
private static String extractTaskId(Response response) throws IOException {
String responseAsString = EntityUtils.toString(response.getEntity());
Pattern matchTaskId = Pattern.compile(".*\"task\":.*\"(.*)\".*");
Matcher taskIdMatcher = matchTaskId.matcher(responseAsString);
assertTrue(taskIdMatcher.matches());
return taskIdMatcher.group(1);
}
public void testDeleteJobAfterMissingIndex() throws Exception {
@ -521,7 +562,7 @@ public class MlJobIT extends ESRestTestCase {
}
public void testDelete_multipleRequest() throws Exception {
String jobId = "delete-job-mulitple-times";
String jobId = "delete-job-multiple-times";
createFarequoteJob(jobId);
ConcurrentMapLong<Response> responses = ConcurrentCollections.newConcurrentMapLong();
@ -532,8 +573,8 @@ public class MlJobIT extends ESRestTestCase {
AtomicReference<ResponseException> recreationException = new AtomicReference<>();
Runnable deleteJob = () -> {
boolean forceDelete = randomBoolean();
try {
boolean forceDelete = randomBoolean();
String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId;
if (forceDelete) {
url += "?force=true";
@ -554,6 +595,7 @@ public class MlJobIT extends ESRestTestCase {
} catch (ResponseException re) {
recreationException.set(re);
} catch (IOException e) {
logger.error("Error trying to recreate the job", e);
ioe.set(e);
}
}
@ -563,14 +605,14 @@ public class MlJobIT extends ESRestTestCase {
// the other to complete. This is difficult to schedule but
// hopefully it will happen in CI
int numThreads = 5;
Thread [] threads = new Thread[numThreads];
for (int i=0; i<numThreads; i++) {
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(deleteJob);
}
for (int i=0; i<numThreads; i++) {
for (int i = 0; i < numThreads; i++) {
threads[i].start();
}
for (int i=0; i<numThreads; i++) {
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}

View File

@ -99,7 +99,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
Consumer<String> jobIdProcessor = id -> {
validateJobAndTaskState(id, mlMetadata, tasksMetaData);
Job job = mlMetadata.getJobs().get(id);
if (job.isDeleted()) {
if (job.isDeleting()) {
return;
}
addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs);

View File

@ -23,9 +23,9 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -34,9 +34,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.IdsQueryBuilder;
@ -45,14 +45,13 @@ import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@ -72,10 +71,11 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -90,6 +90,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final Auditor auditor;
private final JobResultsProvider jobResultsProvider;
/**
* A map of task listeners by job_id.
* Subsequent delete requests store their listeners in the corresponding list in this map
* and wait to be notified when the first deletion task completes.
* This is guarded by synchronizing on its lock.
*/
private final Map<String, List<ActionListener<AcknowledgedResponse>>> listenersByJobId;
@Inject
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
@ -101,6 +109,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
this.persistentTasksService = persistentTasksService;
this.auditor = auditor;
this.jobResultsProvider = jobResultsProvider;
this.listenersByJobId = new HashMap<>();
}
@Override
@ -114,42 +123,8 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}
@Override
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
forceDeleteJob(request, listener);
} else {
normalDeleteJob(request, listener);
}
},
e -> {
if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) {
// Don't kick off a parallel deletion task, but just wait for
// the in-progress request to finish. This is much safer in the
// case where the job with the same name might be immediately
// recreated after the delete returns. However, if a force
// delete times out then eventually kick off a parallel delete
// in case the original completely failed for some reason.
waitForDeletingJob(request.getJobId(), MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT,
ActionListener.wrap(
listener::onResponse,
e2 -> {
if (request.isForce() && e2 instanceof TimeoutException) {
forceDeleteJob(request, listener);
} else {
listener.onFailure(e2);
}
}
));
} else {
listener.onFailure(e);
}
});
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
protected ClusterBlockException checkBlock(DeleteJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
@ -158,13 +133,71 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}
@Override
protected ClusterBlockException checkBlock(DeleteJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
logger.debug("Deleting job '{}'", request.getJobId());
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
// Check if there is a deletion task for this job already and if yes wait for it to complete
synchronized (listenersByJobId) {
if (listenersByJobId.containsKey(request.getJobId())) {
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete",
request.getJobId(), task.getId());
listenersByJobId.get(request.getJobId()).add(listener);
return;
} else {
List<ActionListener<AcknowledgedResponse>> listeners = new ArrayList<>();
listeners.add(listener);
listenersByJobId.put(request.getJobId(), listeners);
}
}
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
// The listener that will be executed at the end of the chain will notify all listeners
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
ack -> notifyListeners(request.getJobId(), ack, null),
e -> notifyListeners(request.getJobId(), null, e)
);
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
forceDeleteJob(parentTaskClient, request, finalListener);
} else {
normalDeleteJob(parentTaskClient, request, finalListener);
}
},
e -> {
auditor.error(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING_FAILED, e.getMessage()));
finalListener.onFailure(e);
});
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
}
private void normalDeleteJob(DeleteJobAction.Request request, ActionListener<AcknowledgedResponse> listener) {
private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
synchronized (listenersByJobId) {
List<ActionListener<AcknowledgedResponse>> listeners = listenersByJobId.remove(jobId);
if (listeners == null) {
logger.error("[{}] No deletion job listeners could be found", jobId);
return;
}
for (ActionListener<AcknowledgedResponse> listener : listeners) {
if (error != null) {
listener.onFailure(error);
} else {
listener.onResponse(ack);
}
}
}
}
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
String jobId = request.getJobId();
logger.debug("Deleting job '" + jobId + "'");
// Step 4. When the job has been removed from the cluster state, return a response
// -------
@ -212,10 +245,11 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
// Step 1. Delete the physical storage
deleteJobDocuments(jobId, removeFromCalendarsHandler, listener::onFailure);
deleteJobDocuments(parentTaskClient, jobId, removeFromCalendarsHandler, listener::onFailure);
}
private void deleteJobDocuments(String jobId, CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId);
final String indexPattern = indexName + "-*";
@ -241,7 +275,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
logger.warn("DBQ failure: " + failure);
}
}
deleteAliases(jobId, client, completionHandler);
deleteAliases(parentTaskClient, jobId, completionHandler);
}
},
failureHandler);
@ -260,7 +294,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
} else { // We did not execute DBQ, no need to delete aliases or check the response
dbqHandler.onResponse(null);
}
@ -279,13 +313,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
request.indicesOptions(IndicesOptions.lenientExpandOpen());
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
parentTaskClient.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias
failureHandler),
client.admin().indices()::delete);
parentTaskClient.admin().indices()::delete);
}
},
failure -> {
@ -312,7 +346,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(source);
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
}
},
failureHandler
@ -320,19 +354,19 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler),
response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler),
failureHandler);
// Step 2. Delete state done, delete the quantiles
ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler),
bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler),
failureHandler);
// Step 1. Delete the model state
deleteModelState(jobId, client, deleteStateHandler);
deleteModelState(parentTaskClient, jobId, deleteStateHandler);
}
private void deleteQuantiles(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
// The quantiles type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
@ -344,7 +378,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> finishedHandler.onResponse(true),
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
@ -356,19 +390,20 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}));
}
private void deleteModelState(String jobId, Client client, ActionListener<BulkResponse> listener) {
private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkResponse> listener) {
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
response -> {
List<ModelSnapshot> deleteCandidates = response.getPage().results();
JobDataDeleter deleter = new JobDataDeleter(client, jobId);
JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId);
deleter.deleteModelSnapshots(deleteCandidates, listener);
},
listener::onFailure));
}
private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener<Boolean> finishedHandler) {
private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum,
ActionListener<Boolean> finishedHandler) {
// The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
@ -380,13 +415,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
request.setAbortOnVersionConflict(false);
request.setRefresh(true);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
response -> {
// If we successfully deleted a document try the next one; if not we're done
if (response.getDeleted() > 0) {
// There's an assumption here that there won't be very many categorizer
// state documents, so the recursion won't go more than, say, 5 levels deep
deleteCategorizerState(jobId, client, docNum + 1, finishedHandler);
deleteCategorizerState(parentTaskClient, jobId, docNum + 1, finishedHandler);
return;
}
finishedHandler.onResponse(true);
@ -401,14 +436,15 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
}));
}
private void deleteAliases(String jobId, Client client, ActionListener<AcknowledgedResponse> finishedHandler) {
private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId,
ActionListener<AcknowledgedResponse> finishedHandler) {
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
// first find the concrete indices associated with the aliases
GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName)
.indicesOptions(IndicesOptions.lenientExpandOpen());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
ActionListener.<GetAliasesResponse>wrap(
getAliasesResponse -> {
// remove the aliases from the concrete indices found in the first step
@ -419,13 +455,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
finishedHandler.onResponse(new AcknowledgedResponse(true));
return;
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
ActionListener.<AcknowledgedResponse>wrap(
finishedHandler::onResponse,
finishedHandler::onFailure),
client.admin().indices()::aliases);
parentTaskClient.admin().indices()::aliases);
},
finishedHandler::onFailure), client.admin().indices()::getAliases);
finishedHandler::onFailure), parentTaskClient.admin().indices()::getAliases);
}
private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) {
@ -445,7 +481,10 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
.indices(indices.toArray(new String[indices.size()])));
}
private void forceDeleteJob(DeleteJobAction.Request request, ActionListener<AcknowledgedResponse> listener) {
private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) {
logger.debug("Force deleting job [{}]", request.getJobId());
final ClusterState state = clusterService.state();
final String jobId = request.getJobId();
@ -454,13 +493,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
normalDeleteJob(request, listener);
normalDeleteJob(parentTaskClient, request, listener);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
normalDeleteJob(request, listener);
normalDeleteJob(parentTaskClient, request, listener);
} else {
listener.onFailure(e);
}
@ -483,12 +522,13 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
);
// 1. Kill the job's process
killProcess(jobId, killJobListener);
killProcess(parentTaskClient, jobId, killJobListener);
}
private void killProcess(String jobId, ActionListener<KillProcessAction.Response> listener) {
private void killProcess(ParentTaskAssigningClient parentTaskClient, String jobId,
ActionListener<KillProcessAction.Response> listener) {
KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId);
executeAsyncWithOrigin(client, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener);
}
private void removePersistentTask(String jobId, ClusterState currentState,
@ -520,7 +560,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleted(jobId, tasks, force);
builder.markJobAsDeleting(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}
@ -537,32 +577,6 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
});
}
private void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
ClusterState clusterState = stateObserver.setAndGetObservedState();
if (jobIsDeletedFromState(jobId, clusterState)) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(new AcknowledgedResponse(true));
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new TimeoutException("timed out after " + timeout));
}
}, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout);
}
}
static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}

View File

@ -183,6 +183,6 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
List<GetJobsStatsAction.Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
!mlMetadata.isJobDeleted(jobId)).collect(Collectors.toList());
!mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList());
}
}

View File

@ -127,8 +127,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
if (job == null) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (job.isDeleted()) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted");
if (job.isDeleting()) {
throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it is being deleted");
}
if (job.getJobVersion() == null) {
throw ExceptionsHelper.badRequestException("Cannot open job [" + jobId

View File

@ -7,10 +7,15 @@ package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -37,6 +42,35 @@ public class RestDeleteJobAction extends BaseRestHandler {
deleteJobRequest.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), deleteJobRequest.isForce()));
deleteJobRequest.timeout(restRequest.paramAsTime("timeout", deleteJobRequest.timeout()));
deleteJobRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", deleteJobRequest.masterNodeTimeout()));
return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel));
if (restRequest.paramAsBoolean("wait_for_completion", true)) {
return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel));
} else {
deleteJobRequest.setShouldStoreResult(true);
Task task = client.executeLocally(DeleteJobAction.INSTANCE, deleteJobRequest, nullTaskListener());
// Send task description id instead of waiting for the message
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", client.getLocalNodeId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}
}
// We do not want to log anything due to a delete action
// The response or error will be returned to the client when called synchronously
// or it will be stored in the task result when called asynchronously
private static TaskListener nullTaskListener() {
return new TaskListener() {
@Override
public void onResponse(Task task, Object o) {}
@Override
public void onFailure(Task task, Throwable e) {}
};
}
}

View File

@ -124,7 +124,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
public void testRemoveJob() {
Job.Builder jobBuilder = buildJobBuilder("1");
jobBuilder.setDeleted(true);
jobBuilder.setDeleting(true);
Job job1 = jobBuilder.build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);
@ -206,7 +206,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
}
public void testPutDatafeed_failBecauseJobIsBeingDeleted() {
Job job1 = createDatafeedJob().setDeleted(true).build(new Date());
Job job1 = createDatafeedJob().setDeleting(true).build(new Date());
DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build();
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1, false);

View File

@ -28,7 +28,7 @@ public class TransportGetJobsStatsActionTests extends ESTestCase {
public void testDetermineJobIds() {
MlMetadata mlMetadata = mock(MlMetadata.class);
when(mlMetadata.isJobDeleted(eq("id4"))).thenReturn(true);
when(mlMetadata.isJobDeleting(eq("id4"))).thenReturn(true);
List<String> result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Collections.singletonList("id1"), Collections.emptyList());

View File

@ -79,14 +79,14 @@ public class TransportOpenJobActionTests extends ESTestCase {
expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", mlBuilder.build()));
}
public void testValidate_jobMarkedAsDeleted() {
public void testValidate_jobMarkedAsDeleting() {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
Job.Builder jobBuilder = buildJobBuilder("job_id");
jobBuilder.setDeleted(true);
jobBuilder.setDeleting(true);
mlBuilder.putJob(jobBuilder.build(), false);
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> TransportOpenJobAction.validate("job_id", mlBuilder.build()));
assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage());
assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage());
}
public void testValidate_jobWithoutVersion() {

View File

@ -1,125 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class DeleteJobIT extends BaseMlIntegTestCase {
public void testWaitForDelete() throws ExecutionException, InterruptedException {
final String jobId = "wait-for-delete-job";
Job.Builder job = createJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
CountDownLatch markAsDeletedLatch = new CountDownLatch(1);
clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return markJobAsDeleted(jobId, currentState);
}
@Override
public void onFailure(String source, Exception e) {
markAsDeletedLatch.countDown();
exceptionHolder.set(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
markAsDeletedLatch.countDown();
}
});
assertTrue("Timed out waiting for state update", markAsDeletedLatch.await(5, TimeUnit.SECONDS));
assertNull("mark-job-as-deleted task failed: " + exceptionHolder.get(), exceptionHolder.get());
// Job is marked as deleting so now a delete request should wait for it.
AtomicBoolean isDeleted = new AtomicBoolean(false);
AtomicReference<Exception> deleteFailure = new AtomicReference<>();
ActionListener<AcknowledgedResponse> deleteListener = new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
isDeleted.compareAndSet(false, response.isAcknowledged());
}
@Override
public void onFailure(Exception e) {
deleteFailure.set(e);
}
};
client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId), deleteListener);
awaitBusy(isDeleted::get, 1, TimeUnit.SECONDS);
// still waiting
assertFalse(isDeleted.get());
CountDownLatch removeJobLatch = new CountDownLatch(1);
clusterService().submitStateUpdateTask("remove-job-from-state", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
assertFalse(isDeleted.get());
return removeJobFromClusterState(jobId, currentState);
}
@Override
public void onFailure(String source, Exception e) {
removeJobLatch.countDown();
exceptionHolder.set(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
removeJobLatch.countDown();
}
});
assertTrue("Timed out waiting for remove job from state response", removeJobLatch.await(5, TimeUnit.SECONDS));
assertNull("remove-job-from-state task failed: " + exceptionHolder.get(), exceptionHolder.get());
assertNull("Job deletion failed: " + deleteFailure.get(), deleteFailure.get());
assertTrue("Job was not deleted", isDeleted.get());
}
private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
assertNotNull(mlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata);
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
builder.markJobAsDeleted(jobId, tasks, true);
ClusterState.Builder newState = ClusterState.builder(currentState);
return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build())
.build();
}
private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) {
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
ClusterState.Builder newState = ClusterState.builder(currentState);
return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build())
.build();
}
}

View File

@ -15,8 +15,13 @@
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "True if the job should be forcefully deleted"
"description": "True if the job should be forcefully deleted",
"default": false
},
"wait_for_completion": {
"type": "boolean",
"description": "Should this request wait until the operation has completed before returning",
"default": true
}
}
},