From a8e394c3b554cba3c3ea271c3a0f2460a3e0a101 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 21 Jun 2017 09:14:51 +0100 Subject: [PATCH] [ML] Fix race condition between multiple job deletion and immediate recreation (elastic/x-pack-elasticsearch#1793) If multiple job deletion requests were sent in quick succession, there was a race condition that meant they could both get through the check to enforce one active deletion request at a time. Then, if the job was immediately recreated after the first deletion request returned, the second, still running, deletion request could interfere with it and delete the aliases that the put job request created. This problem can be avoided by using the "ask forgiveness, not permission" idiom when checking if the job is already being deleted at the beginning of each deletion request. Additionally, now even force delete requests will wait for a certain amount of time for a prior delete request to complete. This is to avoid the same race conditions. However, force delete requests will eventually start an (unsafe) parallel delete to provide a get-out in case a delete request completely dies. relates elastic/x-pack-elasticsearch#1765 Original commit: elastic/x-pack-elasticsearch@b5c8f26a0e894c62e76ea95bd127b474d138b90d --- .../elasticsearch/xpack/ml/MlMetadata.java | 10 +++-- .../xpack/ml/action/DeleteJobAction.java | 39 ++++++++++++------- .../xpack/ml/integration/MlJobIT.java | 36 ++++++++++++++++- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index e4f4b4e87bc..1057b95b534 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -167,9 +167,9 @@ public class MlMetadata implements MetaData.Custom { } public MlMetadataDiff(StreamInput in) throws IOException { - this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, + this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, MlMetadataDiff::readJobDiffFrom); - this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, + this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readSchedulerDiffFrom); } @@ -356,8 +356,8 @@ public class MlMetadata implements MetaData.Custom { throw ExceptionsHelper.missingJobException(jobId); } if (job.isDeleted()) { - // Job still exists - return; + // Job still exists but is already being deleted + throw new JobAlreadyMarkedAsDeletedException(); } checkJobHasNoDatafeed(jobId); @@ -441,4 +441,6 @@ public class MlMetadata implements MetaData.Custom { } } + public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java index 9d44f26ee1c..8ae7ab4458a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.TimeoutException; public class DeleteJobAction extends Action { @@ -199,19 +200,6 @@ public class DeleteJobAction extends Action listener) throws Exception { - // For a normal delete check if the job is already being deleted. - if (request.isForce() == false) { - MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE); - if (currentMlMetadata != null) { - Job job = currentMlMetadata.getJobs().get(request.getJobId()); - if (job != null && job.isDeleted()) { - // This is a generous timeout value but it's unlikely to ever take this long - waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, listener); - return; - } - } - } - ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { @@ -220,7 +208,28 @@ public class DeleteJobAction extends Action { + if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) { + // Don't kick off a parallel deletion task, but just wait for + // the in-progress request to finish. This is much safer in the + // case where the job with the same name might be immediately + // recreated after the delete returns. However, if a force + // delete times out then eventually kick off a parallel delete + // in case the original completely failed for some reason. + waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, ActionListener.wrap( + listener::onResponse, + e2 -> { + if (request.isForce() && e2 instanceof TimeoutException) { + forceDeleteJob(request, (JobStorageDeletionTask) task, listener); + } else { + listener.onFailure(e2); + } + } + )); + } else { + listener.onFailure(e); + } + }); markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); } @@ -354,7 +363,7 @@ public class DeleteJobAction extends Action jobIsDeletedFromState(jobId, newClusterState), timeout); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 27cd31941fe..f704cbdca93 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -122,7 +123,7 @@ public class MlJobIT extends ESRestTestCase { assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-3\"")); } - private Response createFarequoteJob(String jobId) throws Exception { + private Response createFarequoteJob(String jobId) throws IOException { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\": \"3600s\",\n" + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" @@ -544,6 +545,9 @@ public class MlJobIT extends ESRestTestCase { ConcurrentMapLong responses = ConcurrentCollections.newConcurrentMapLong(); ConcurrentMapLong responseExceptions = ConcurrentCollections.newConcurrentMapLong(); AtomicReference ioe = new AtomicReference<>(); + AtomicInteger recreationGuard = new AtomicInteger(0); + AtomicReference recreationResponse = new AtomicReference<>(); + AtomicReference recreationException = new AtomicReference<>(); Runnable deleteJob = () -> { try { @@ -560,6 +564,17 @@ public class MlJobIT extends ESRestTestCase { ioe.set(e); } + // Immediately after the first deletion finishes, recreate the job. This should pick up + // race conditions where another delete request deletes part of the newly created job. + if (recreationGuard.getAndIncrement() == 0) { + try { + recreationResponse.set(createFarequoteJob(jobId)); + } catch (ResponseException re) { + recreationException.set(re); + } catch (IOException e) { + ioe.set(e); + } + } }; // The idea is to hit the situation where one request waits for @@ -593,6 +608,25 @@ public class MlJobIT extends ESRestTestCase { for (Response response : responses.values()) { assertEquals(responseEntityToString(response), 200, response.getStatusLine().getStatusCode()); } + + assertNotNull(recreationResponse.get()); + assertEquals(responseEntityToString(recreationResponse.get()), 200, recreationResponse.get().getStatusLine().getStatusCode()); + + if (recreationException.get() != null) { + assertNull(recreationException.get().getMessage(), recreationException.get()); + } + + // Check that the job aliases exist. These are the last thing to be deleted when a job is deleted, so + // if there's been a race between deletion and recreation these are what will be missing. + Response response = client().performRequest("get", "_aliases"); + assertEquals(200, response.getStatusLine().getStatusCode()); + String responseAsString = responseEntityToString(response); + + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + + "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId + "\",\"boost\":1.0}}}}")); + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId) + "\":{}")); + + assertEquals(numThreads, recreationGuard.get()); } private static String responseEntityToString(Response response) throws Exception {