diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index f4db7f81ab4..7d113a838dd 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -170,7 +170,6 @@ public class TransportCloseJobAction extends TransportTasksActionIf the job is opening * * - * If the job is already closed an empty Optional is returned. * @param jobId Job Id * @param mlMetadata ML MetaData * @param tasks Persistent tasks diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 3f011604a96..4d22e98498b 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -583,6 +583,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction { if (e2 == null) { task.markAsCompleted(); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 76887a2a8bf..375119c37d2 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -153,6 +153,13 @@ public class AutodetectProcessManager extends AbstractComponent { .setFinish(true) .setReason(reason) .kill(); + } else { + // If the process is missing but the task exists this is most likely + // because the job went into the failed state then the node restarted + // causing the task to be recreated but the failed process wasn't. + // We still need to remove the task from the TaskManager (which + // is what the kill would do) + jobTask.markAsCompleted(); } } diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index c182d756aa0..c3e830553a2 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -529,6 +529,17 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(communicator).killProcess(false, false); } + public void testKillingAMissingJobFinishesTheTask() throws IOException { + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + AutodetectProcessManager manager = createManager(communicator); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn("foo"); + + manager.killProcess(jobTask, false, null); + + verify(jobTask).markAsCompleted(); + } + public void testProcessData_GivenStateNotOpened() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); doAnswer(invocationOnMock -> {