diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index e4f4b4e87bc..1057b95b534 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -167,9 +167,9 @@ public class MlMetadata implements MetaData.Custom { } public MlMetadataDiff(StreamInput in) throws IOException { - this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, + this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new, MlMetadataDiff::readJobDiffFrom); - this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, + this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readSchedulerDiffFrom); } @@ -356,8 +356,8 @@ public class MlMetadata implements MetaData.Custom { throw ExceptionsHelper.missingJobException(jobId); } if (job.isDeleted()) { - // Job still exists - return; + // Job still exists but is already being deleted + throw new JobAlreadyMarkedAsDeletedException(); } checkJobHasNoDatafeed(jobId); @@ -441,4 +441,6 @@ public class MlMetadata implements MetaData.Custom { } } + public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java index 9d44f26ee1c..8ae7ab4458a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.TimeoutException; public class DeleteJobAction extends Action { @@ -199,19 +200,6 @@ public class DeleteJobAction extends Action listener) throws Exception { - // For a normal delete check if the job is already being deleted. - if (request.isForce() == false) { - MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE); - if (currentMlMetadata != null) { - Job job = currentMlMetadata.getJobs().get(request.getJobId()); - if (job != null && job.isDeleted()) { - // This is a generous timeout value but it's unlikely to ever take this long - waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, listener); - return; - } - } - } - ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { @@ -220,7 +208,28 @@ public class DeleteJobAction extends Action { + if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) { + // Don't kick off a parallel deletion task, but just wait for + // the in-progress request to finish. This is much safer in the + // case where the job with the same name might be immediately + // recreated after the delete returns. However, if a force + // delete times out then eventually kick off a parallel delete + // in case the original completely failed for some reason. + waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, ActionListener.wrap( + listener::onResponse, + e2 -> { + if (request.isForce() && e2 instanceof TimeoutException) { + forceDeleteJob(request, (JobStorageDeletionTask) task, listener); + } else { + listener.onFailure(e2); + } + } + )); + } else { + listener.onFailure(e); + } + }); markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); } @@ -354,7 +363,7 @@ public class DeleteJobAction extends Action jobIsDeletedFromState(jobId, newClusterState), timeout); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 27cd31941fe..f704cbdca93 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -122,7 +123,7 @@ public class MlJobIT extends ESRestTestCase { assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-3\"")); } - private Response createFarequoteJob(String jobId) throws Exception { + private Response createFarequoteJob(String jobId) throws IOException { String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n" + " \"analysis_config\" : {\n" + " \"bucket_span\": \"3600s\",\n" + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n" @@ -544,6 +545,9 @@ public class MlJobIT extends ESRestTestCase { ConcurrentMapLong responses = ConcurrentCollections.newConcurrentMapLong(); ConcurrentMapLong responseExceptions = ConcurrentCollections.newConcurrentMapLong(); AtomicReference ioe = new AtomicReference<>(); + AtomicInteger recreationGuard = new AtomicInteger(0); + AtomicReference recreationResponse = new AtomicReference<>(); + AtomicReference recreationException = new AtomicReference<>(); Runnable deleteJob = () -> { try { @@ -560,6 +564,17 @@ public class MlJobIT extends ESRestTestCase { ioe.set(e); } + // Immediately after the first deletion finishes, recreate the job. This should pick up + // race conditions where another delete request deletes part of the newly created job. + if (recreationGuard.getAndIncrement() == 0) { + try { + recreationResponse.set(createFarequoteJob(jobId)); + } catch (ResponseException re) { + recreationException.set(re); + } catch (IOException e) { + ioe.set(e); + } + } }; // The idea is to hit the situation where one request waits for @@ -593,6 +608,25 @@ public class MlJobIT extends ESRestTestCase { for (Response response : responses.values()) { assertEquals(responseEntityToString(response), 200, response.getStatusLine().getStatusCode()); } + + assertNotNull(recreationResponse.get()); + assertEquals(responseEntityToString(recreationResponse.get()), 200, recreationResponse.get().getStatusLine().getStatusCode()); + + if (recreationException.get() != null) { + assertNull(recreationException.get().getMessage(), recreationException.get()); + } + + // Check that the job aliases exist. These are the last thing to be deleted when a job is deleted, so + // if there's been a race between deletion and recreation these are what will be missing. + Response response = client().performRequest("get", "_aliases"); + assertEquals(200, response.getStatusLine().getStatusCode()); + String responseAsString = responseEntityToString(response); + + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId) + + "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId + "\",\"boost\":1.0}}}}")); + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId) + "\":{}")); + + assertEquals(numThreads, recreationGuard.get()); } private static String responseEntityToString(Response response) throws Exception {