diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 6469fdc616e..42083efac5b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; @@ -46,6 +47,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; @@ -278,8 +280,9 @@ public class CloseJobAction extends Action listener) { - task.closeJob("close job (api)"); - listener.onResponse(new Response(true)); + protected void taskOperation(Request request, OpenJobAction.JobTask jobTask, ActionListener listener) { + JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId()); + jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { + // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + jobTask.closeJob("close job (api)"); + listener.onResponse(new Response(true)); + } + }); + }, listener::onFailure)); } @Override @@ -540,7 +557,7 @@ public class CloseJobAction extends Action jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask == null) { + if (jobTask == null || jobTask.getStatus() == null) { + throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT); + } + JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus(); + if (jobTaskStatus.getState().isAnyOf(JobState.OPENED, JobState.FAILED) == false) { throw new ElasticsearchStatusException("cannot close job, because job [" + jobId + "] is not open", RestStatus.CONFLICT); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java index 3f49396fc24..cdb8fec7545 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobState.java @@ -22,7 +22,7 @@ import java.util.Locale; */ public enum JobState implements ToXContent, Writeable { - CLOSED, OPENED, FAILED; + CLOSING, CLOSED, OPENED, FAILED; public static JobState fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java index 55adff4e298..45c84dc143a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobStateTests.java @@ -10,33 +10,39 @@ import org.elasticsearch.test.ESTestCase; public class JobStateTests extends ESTestCase { public void testFromString() { + assertEquals(JobState.fromString("closing"), JobState.CLOSING); assertEquals(JobState.fromString("closed"), JobState.CLOSED); assertEquals(JobState.fromString("failed"), JobState.FAILED); assertEquals(JobState.fromString("opened"), JobState.OPENED); + assertEquals(JobState.fromString("CLOSING"), JobState.CLOSING); assertEquals(JobState.fromString("CLOSED"), JobState.CLOSED); assertEquals(JobState.fromString("FAILED"), JobState.FAILED); assertEquals(JobState.fromString("OPENED"), JobState.OPENED); } public void testToString() { + assertEquals("closing", JobState.CLOSING.toString()); assertEquals("closed", JobState.CLOSED.toString()); assertEquals("failed", JobState.FAILED.toString()); assertEquals("opened", JobState.OPENED.toString()); } public void testValidOrdinals() { - assertEquals(0, JobState.CLOSED.ordinal()); - assertEquals(1, JobState.OPENED.ordinal()); - assertEquals(2, JobState.FAILED.ordinal()); + assertEquals(0, JobState.CLOSING.ordinal()); + assertEquals(1, JobState.CLOSED.ordinal()); + assertEquals(2, JobState.OPENED.ordinal()); + assertEquals(3, JobState.FAILED.ordinal()); } public void testIsAnyOf() { assertFalse(JobState.OPENED.isAnyOf()); assertFalse(JobState.OPENED.isAnyOf(JobState.CLOSED, JobState.FAILED)); assertFalse(JobState.CLOSED.isAnyOf(JobState.FAILED, JobState.OPENED)); + assertFalse(JobState.CLOSING.isAnyOf(JobState.FAILED, JobState.OPENED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED)); assertTrue(JobState.OPENED.isAnyOf(JobState.OPENED, JobState.CLOSED)); assertTrue(JobState.CLOSED.isAnyOf(JobState.CLOSED)); + assertTrue(JobState.CLOSING.isAnyOf(JobState.CLOSING)); } }