From 9786b3874726249407bb73d563353659f7fe89af Mon Sep 17 00:00:00 2001 From: David Kyle Date: Sun, 25 Feb 2018 13:47:15 +0000 Subject: [PATCH] =?UTF-8?q?[ml]=20Don=E2=80=99t=20open=20failed=20jobs=20a?= =?UTF-8?q?fter=20restart=20(elastic/x-pack-elasticsearch#3997)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Original commit: elastic/x-pack-elasticsearch@f4b2ff084b7e9b2da8f8d440fec0785ee37abfca --- .../xpack/ml/action/TransportCloseJobAction.java | 1 - .../xpack/ml/action/TransportOpenJobAction.java | 8 ++++++++ .../process/autodetect/AutodetectProcessManager.java | 7 +++++++ .../autodetect/AutodetectProcessManagerTests.java | 11 +++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index f4db7f81ab4..7d113a838dd 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -170,7 +170,6 @@ public class TransportCloseJobAction extends TransportTasksActionIf the job is opening * * - * If the job is already closed an empty Optional is returned. * @param jobId Job Id * @param mlMetadata ML MetaData * @param tasks Persistent tasks diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 3f011604a96..4d22e98498b 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -583,6 +583,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction { if (e2 == null) { task.markAsCompleted(); diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 76887a2a8bf..375119c37d2 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -153,6 +153,13 @@ public class AutodetectProcessManager extends AbstractComponent { .setFinish(true) .setReason(reason) .kill(); + } else { + // If the process is missing but the task exists this is most likely + // because the job went into the failed state then the node restarted + // causing the task to be recreated but the failed process wasn't. + // We still need to remove the task from the TaskManager (which + // is what the kill would do) + jobTask.markAsCompleted(); } } diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index c182d756aa0..c3e830553a2 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -529,6 +529,17 @@ public class AutodetectProcessManagerTests extends ESTestCase { verify(communicator).killProcess(false, false); } + public void testKillingAMissingJobFinishesTheTask() throws IOException { + AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); + AutodetectProcessManager manager = createManager(communicator); + JobTask jobTask = mock(JobTask.class); + when(jobTask.getJobId()).thenReturn("foo"); + + manager.killProcess(jobTask, false, null); + + verify(jobTask).markAsCompleted(); + } + public void testProcessData_GivenStateNotOpened() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); doAnswer(invocationOnMock -> {