[ml] Don’t open failed jobs after restart (elastic/x-pack-elasticsearch#3997)

Original commit: elastic/x-pack-elasticsearch@f4b2ff084b
This commit is contained in:
David Kyle 2018-02-25 13:47:15 +00:00 committed by GitHub
parent b8c9c5325c
commit 9786b38747
4 changed files with 26 additions and 1 deletions

View File

@ -170,7 +170,6 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
* <li>If the job is opening</li> * <li>If the job is opening</li>
* </ul> * </ul>
* *
* If the job is already closed an empty Optional is returned.
* @param jobId Job Id * @param jobId Job Id
* @param mlMetadata ML MetaData * @param mlMetadata ML MetaData
* @param tasks Persistent tasks * @param tasks Persistent tasks

View File

@ -583,6 +583,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) { protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) {
JobTask jobTask = (JobTask) task; JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager; jobTask.autodetectProcessManager = autodetectProcessManager;
JobTaskStatus jobStateStatus = (JobTaskStatus) status;
// If the job is failed then the Persistent Task Service will
// try to restart it on a node restart. Exiting here leaves the
// job in the failed state and it must be force closed.
if (jobStateStatus != null && jobStateStatus.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
return;
}
autodetectProcessManager.openJob(jobTask, e2 -> { autodetectProcessManager.openJob(jobTask, e2 -> {
if (e2 == null) { if (e2 == null) {
task.markAsCompleted(); task.markAsCompleted();

View File

@ -153,6 +153,13 @@ public class AutodetectProcessManager extends AbstractComponent {
.setFinish(true) .setFinish(true)
.setReason(reason) .setReason(reason)
.kill(); .kill();
} else {
// If the process is missing but the task exists this is most likely
// because the job went into the failed state then the node restarted
// causing the task to be recreated but the failed process wasn't.
// We still need to remove the task from the TaskManager (which
// is what the kill would do)
jobTask.markAsCompleted();
} }
} }

View File

@ -529,6 +529,17 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verify(communicator).killProcess(false, false); verify(communicator).killProcess(false, false);
} }
public void testKillingAMissingJobFinishesTheTask() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.killProcess(jobTask, false, null);
verify(jobTask).markAsCompleted();
}
public void testProcessData_GivenStateNotOpened() { public void testProcessData_GivenStateNotOpened() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {