[ML] Wait for job deletion if it is in the deleting state (elastic/x-pack-elasticsearch#1651)

* Wait for job deletion if it is in the deleting  state

* Tolerate errors if multiple force delete requests

Original commit: elastic/x-pack-elasticsearch@1f0c9fbb86
This commit is contained in:
David Kyle 2017-06-07 15:41:29 +01:00 committed by GitHub
parent f865755259
commit ba3e258470
7 changed files with 291 additions and 12 deletions

View File

@ -250,11 +250,8 @@ public class MlMetadata implements MetaData.Custom {
}
public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
Optional<DatafeedConfig> datafeed = getDatafeedByJobId(jobId);
if (datafeed.isPresent()) {
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed ["
+ datafeed.get().getId() + "] refers to it");
}
checkJobHasNoDatafeed(jobId);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +

View File

@ -19,6 +19,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -29,10 +30,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.JobManager;
@ -195,6 +199,19 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
// For a normal delete check if the job is already being deleted.
if (request.isForce() == false) {
MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE);
if (currentMlMetadata != null) {
Job job = currentMlMetadata.getJobs().get(request.getJobId());
if (job != null && job.isDeleted()) {
// This is a generous timeout value but it's unlikely to ever take this long
waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, listener);
return;
}
}
}
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
@ -317,6 +334,40 @@ public class DeleteJobAction extends Action<DeleteJobAction.Request, DeleteJobAc
});
}
void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener<Response> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
ClusterState clusterState = stateObserver.setAndGetObservedState();
if (jobIsDeletedFromState(jobId, clusterState)) {
listener.onResponse(new Response(true));
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(new Response(true));
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout));
}
}, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout);
}
}
static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
MlMetadata metadata = clusterState.metaData().custom(MlMetadata.TYPE);
if (metadata == null) {
return true;
}
return !metadata.getJobs().containsKey(jobId);
}
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());

View File

@ -309,7 +309,15 @@ public class JobManager extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
// We wouldn't have got here if the job never existed so
// the Job must have been deleted by another action.
// Don't error in this case
return currentState;
}
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return buildNewClusterState(currentState, builder);
}

View File

@ -168,7 +168,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap())));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
String expectedMsg = "Cannot delete job [" + job1.getId() + "] while datafeed [" + datafeedConfig1.getId() + "] refers to it";
String expectedMsg = "Cannot delete job [" + job1.getId() + "] because datafeed [" + datafeedConfig1.getId() + "] refers to it";
assertThat(e.getMessage(), equalTo(expectedMsg));
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import java.util.Date;
public class DeleteJobActionTests extends ESTestCase {
public void testJobIsDeletedFromState() {
MlMetadata mlMetadata = MlMetadata.EMPTY_METADATA;
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build();
assertTrue(DeleteJobAction.TransportAction.jobIsDeletedFromState("job_id_1", clusterState));
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false);
mlMetadata = mlBuilder.build();
clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata))
.build();
assertFalse(DeleteJobAction.TransportAction.jobIsDeletedFromState("job_id_1", clusterState));
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class DeleteJobIT extends BaseMlIntegTestCase {
public void testWaitForDelete() throws ExecutionException, InterruptedException {
final String jobId = "wait-for-delete-job";
Job.Builder job = createJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
CountDownLatch markAsDeletedLatch = new CountDownLatch(1);
clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return markJobAsDeleted(jobId, currentState);
}
@Override
public void onFailure(String source, Exception e) {
markAsDeletedLatch.countDown();
exceptionHolder.set(e);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
markAsDeletedLatch.countDown();
}
});
assertTrue("Timed out waiting for state update", markAsDeletedLatch.await(5, TimeUnit.SECONDS));
assertNull("mark-job-as-deleted task failed: " + exceptionHolder.get(), exceptionHolder.get());
// Job is marked as deleting so now a delete request should wait for it.
AtomicBoolean isDeleted = new AtomicBoolean(false);
AtomicReference<Exception> deleteFailure = new AtomicReference<>();
ActionListener<DeleteJobAction.Response> deleteListener = new ActionListener<DeleteJobAction.Response>() {
@Override
public void onResponse(DeleteJobAction.Response response) {
isDeleted.compareAndSet(false, response.isAcknowledged());
}
@Override
public void onFailure(Exception e) {
deleteFailure.set(e);
}
};
client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId), deleteListener);
awaitBusy(isDeleted::get, 1, TimeUnit.SECONDS);
// still waiting
assertFalse(isDeleted.get());
CountDownLatch removeJobLatch = new CountDownLatch(1);
clusterService().submitStateUpdateTask("remove-job-from-state", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
assertFalse(isDeleted.get());
return removeJobFromClusterState(jobId, currentState);
}
@Override
public void onFailure(String source, Exception e) {
removeJobLatch.countDown();
exceptionHolder.set(e);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
removeJobLatch.countDown();
}
});
assertTrue("Timed out waiting for remove job from state response", removeJobLatch.await(5, TimeUnit.SECONDS));
assertNull("remove-job-from-state task failed: " + exceptionHolder.get(), exceptionHolder.get());
assertNull("Job deletion failed: " + deleteFailure.get(), deleteFailure.get());
assertTrue("Job was not deleted", isDeleted.get());
}
private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) {
MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE);
assertNotNull(mlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata);
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
builder.markJobAsDeleted(jobId, tasks, true);
ClusterState.Builder newState = ClusterState.builder(currentState);
return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()).build();
}
private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) {
MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MlMetadata.TYPE));
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
ClusterState.Builder newState = ClusterState.builder(currentState);
return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()).build();
}
}

View File

@ -11,6 +11,8 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -18,10 +20,12 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.junit.After;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
@ -533,6 +537,64 @@ public class MlJobIT extends ESRestTestCase {
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
}
public void testDelete_multipleRequest() throws Exception {
String jobId = "delete-job-mulitple-times";
createFarequoteJob(jobId);
ConcurrentMapLong<Response> responses = ConcurrentCollections.newConcurrentMapLong();
ConcurrentMapLong<ResponseException> responseExceptions = ConcurrentCollections.newConcurrentMapLong();
AtomicReference<IOException> ioe = new AtomicReference<>();
Runnable deleteJob = () -> {
try {
boolean forceDelete = randomBoolean();
String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId;
if (forceDelete) {
url += "?force=true";
}
Response response = client().performRequest("delete", url);
responses.put(Thread.currentThread().getId(), response);
} catch (ResponseException re) {
responseExceptions.put(Thread.currentThread().getId(), re);
} catch (IOException e) {
ioe.set(e);
}
};
// The idea is to hit the situation where one request waits for
// the other to complete. This is difficult to schedule but
// hopefully it will happen in CI
int numThreads = 5;
Thread [] threads = new Thread[numThreads];
for (int i=0; i<numThreads; i++) {
threads[i] = new Thread(deleteJob);
}
for (int i=0; i<numThreads; i++) {
threads[i].start();
}
for (int i=0; i<numThreads; i++) {
threads[i].join();
}
if (ioe.get() != null) {
// This looks redundant but the check is done so we can
// print the exception's error message
assertNull(ioe.get().getMessage(), ioe.get());
}
assertEquals(numThreads, responses.size() + responseExceptions.size());
// 404s are ok as it means the job had already been deleted.
for (ResponseException re : responseExceptions.values()) {
assertEquals(re.getMessage(), 404, re.getResponse().getStatusLine().getStatusCode());
}
for (Response response : responses.values()) {
assertEquals(responseEntityToString(response), 200, response.getStatusLine().getStatusCode());
}
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));