diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index 6c199fc7878..957bc3d99a5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -250,11 +250,8 @@ public class MlMetadata implements MetaData.Custom { } public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { - Optional datafeed = getDatafeedByJobId(jobId); - if (datafeed.isPresent()) { - throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed [" - + datafeed.get().getId() + "] refers to it"); - } + checkJobHasNoDatafeed(jobId); + JobState jobState = MlMetadata.getJobState(jobId, tasks); if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java index be9625166c0..9d44f26ee1c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteJobAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -29,10 +30,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.JobManager; @@ -195,6 +199,19 @@ public class DeleteJobAction extends Action listener) throws Exception { + // For a normal delete check if the job is already being deleted. + if (request.isForce() == false) { + MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE); + if (currentMlMetadata != null) { + Job job = currentMlMetadata.getJobs().get(request.getJobId()); + if (job != null && job.isDeleted()) { + // This is a generous timeout value but it's unlikely to ever take this long + waitForDeletingJob(request.getJobId(), MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT, listener); + return; + } + } + } + ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { @@ -317,6 +334,40 @@ public class DeleteJobAction extends Action listener) { + ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); + + ClusterState clusterState = stateObserver.setAndGetObservedState(); + if (jobIsDeletedFromState(jobId, clusterState)) { + listener.onResponse(new Response(true)); + } else { + stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(new Response(true)); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new IllegalStateException("timed out after " + timeout)); + } + }, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout); + } + } + + static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) { + MlMetadata metadata = clusterState.metaData().custom(MlMetadata.TYPE); + if (metadata == null) { + return true; + } + return !metadata.getJobs().containsKey(jobId); + } + private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index c453ddd8830..a419b9eeef2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -307,12 +307,20 @@ public class JobManager extends AbstractComponent { return acknowledged && response; } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - MlMetadata.Builder builder = createMlMetadataBuilder(currentState); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - return buildNewClusterState(currentState, builder); - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE); + if (currentMlMetadata.getJobs().containsKey(jobId) == false) { + // We wouldn't have got here if the job never existed so + // the Job must have been deleted by another action. + // Don't error in this case + return currentState; + } + + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + return buildNewClusterState(currentState, builder); + } }); // Step 1. Delete the physical storage diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index cd504f8a277..461b3951423 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -168,7 +168,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); - String expectedMsg = "Cannot delete job [" + job1.getId() + "] while datafeed [" + datafeedConfig1.getId() + "] refers to it"; + String expectedMsg = "Cannot delete job [" + job1.getId() + "] because datafeed [" + datafeedConfig1.getId() + "] refers to it"; assertThat(e.getMessage(), equalTo(expectedMsg)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobActionTests.java new file mode 100644 index 00000000000..40652768b3b --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DeleteJobActionTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; + +import java.util.Date; + +public class DeleteJobActionTests extends ESTestCase { + + public void testJobIsDeletedFromState() { + MlMetadata mlMetadata = MlMetadata.EMPTY_METADATA; + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata)) + .build(); + + assertTrue(DeleteJobAction.TransportAction.jobIsDeletedFromState("job_id_1", clusterState)); + + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false); + mlMetadata = mlBuilder.build(); + clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata)) + .build(); + + assertFalse(DeleteJobAction.TransportAction.jobIsDeletedFromState("job_id_1", clusterState)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java new file mode 100644 index 00000000000..847b87fba85 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class DeleteJobIT extends BaseMlIntegTestCase { + + public void testWaitForDelete() throws ExecutionException, InterruptedException { + final String jobId = "wait-for-delete-job"; + Job.Builder job = createJob(jobId); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); + assertTrue(putJobResponse.isAcknowledged()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + CountDownLatch markAsDeletedLatch = new CountDownLatch(1); + clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return markJobAsDeleted(jobId, currentState); + } + + @Override + public void onFailure(String source, Exception e) { + markAsDeletedLatch.countDown(); + exceptionHolder.set(e); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + markAsDeletedLatch.countDown(); + } + }); + + assertTrue("Timed out waiting for state update", markAsDeletedLatch.await(5, TimeUnit.SECONDS)); + assertNull("mark-job-as-deleted task failed: " + exceptionHolder.get(), exceptionHolder.get()); + + // Job is marked as deleting so now a delete request should wait for it. + AtomicBoolean isDeleted = new AtomicBoolean(false); + AtomicReference deleteFailure = new AtomicReference<>(); + ActionListener deleteListener = new ActionListener() { + @Override + public void onResponse(DeleteJobAction.Response response) { + isDeleted.compareAndSet(false, response.isAcknowledged()); + } + + @Override + public void onFailure(Exception e) { + deleteFailure.set(e); + } + }; + + client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId), deleteListener); + awaitBusy(isDeleted::get, 1, TimeUnit.SECONDS); + // still waiting + assertFalse(isDeleted.get()); + + CountDownLatch removeJobLatch = new CountDownLatch(1); + clusterService().submitStateUpdateTask("remove-job-from-state", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + assertFalse(isDeleted.get()); + return removeJobFromClusterState(jobId, currentState); + } + + @Override + public void onFailure(String source, Exception e) { + removeJobLatch.countDown(); + exceptionHolder.set(e); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + removeJobLatch.countDown(); + } + }); + + assertTrue("Timed out waiting for remove job from state response", removeJobLatch.await(5, TimeUnit.SECONDS)); + assertNull("remove-job-from-state task failed: " + exceptionHolder.get(), exceptionHolder.get()); + + assertNull("Job deletion failed: " + deleteFailure.get(), deleteFailure.get()); + assertTrue("Job was not deleted", isDeleted.get()); + } + + private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { + MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); + assertNotNull(mlMetadata); + + MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata); + PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + builder.markJobAsDeleted(jobId, tasks, true); + + ClusterState.Builder newState = ClusterState.builder(currentState); + return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()).build(); + } + + private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { + MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MlMetadata.TYPE)); + builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + + ClusterState.Builder newState = ClusterState.builder(currentState); + return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()).build(); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index ad7aba0c2d9..27cd31941fe 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -11,6 +11,8 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.ml.MachineLearning; @@ -18,10 +20,12 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.junit.After; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Locale; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; @@ -533,6 +537,64 @@ public class MlJobIT extends ESRestTestCase { client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); } + public void testDelete_multipleRequest() throws Exception { + String jobId = "delete-job-mulitple-times"; + createFarequoteJob(jobId); + + ConcurrentMapLong responses = ConcurrentCollections.newConcurrentMapLong(); + ConcurrentMapLong responseExceptions = ConcurrentCollections.newConcurrentMapLong(); + AtomicReference ioe = new AtomicReference<>(); + + Runnable deleteJob = () -> { + try { + boolean forceDelete = randomBoolean(); + String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId; + if (forceDelete) { + url += "?force=true"; + } + Response response = client().performRequest("delete", url); + responses.put(Thread.currentThread().getId(), response); + } catch (ResponseException re) { + responseExceptions.put(Thread.currentThread().getId(), re); + } catch (IOException e) { + ioe.set(e); + } + + }; + + // The idea is to hit the situation where one request waits for + // the other to complete. This is difficult to schedule but + // hopefully it will happen in CI + int numThreads = 5; + Thread [] threads = new Thread[numThreads]; + for (int i=0; i