diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 0fd8d1b5b74..cbcaf54b46b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -401,16 +401,12 @@ public class AutodetectProcessManager implements ClusterStateListener { logger.debug("Aborted opening job [{}] as it has been closed", jobId); return; } - if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { - logger.debug("Cannot open job [{}] when its state is [{}]", - jobId, processContext.getState().getClass().getName()); - return; - } try { - createProcessAndSetRunning(processContext, job, params, closeHandler); - processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot()); - setJobState(jobTask, JobState.OPENED); + if (createProcessAndSetRunning(processContext, job, params, closeHandler)) { + processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot()); + setJobState(jobTask, JobState.OPENED); + } } catch (Exception e1) { // No need to log here as the persistent task framework will log it try { @@ -447,19 +443,25 @@ public class AutodetectProcessManager implements ClusterStateListener { ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler); } - private void createProcessAndSetRunning(ProcessContext processContext, - Job job, - AutodetectParams params, - BiConsumer handler) throws IOException { + private boolean createProcessAndSetRunning(ProcessContext processContext, + Job job, + AutodetectParams params, + BiConsumer handler) throws IOException { // At this point we lock the process context until the process has been started. // The reason behind this is to ensure closing the job does not happen before // the process is started as that can result to the job getting seemingly closed // but the actual process is hanging alive. processContext.tryLock(); try { + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", + job.getId(), processContext.getState().getClass().getName()); + return false; + } AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler); communicator.writeHeader(); processContext.setRunning(communicator); + return true; } finally { // Now that the process is running and we have updated its state we can unlock. // It is important to unlock before we initialize the communicator (ie. load the model state) @@ -592,6 +594,8 @@ public class AutodetectProcessManager implements ClusterStateListener { try { if (processContext.setDying() == false) { logger.debug("Cannot close job [{}] as it has been marked as dying", jobId); + // The only way we can get here is if 2 close requests are made very close together. + // The other close has done the work so it's safe to return here without doing anything. return; } @@ -605,10 +609,10 @@ public class AutodetectProcessManager implements ClusterStateListener { if (communicator == null) { logger.debug("Job [{}] is being closed before its process is started", jobId); jobTask.markAsCompleted(); - return; + } else { + communicator.close(restart, reason); } - communicator.close(restart, reason); processByAllocation.remove(allocationId); } catch (Exception e) { // If the close failed because the process has explicitly been killed by us then just pass on that exception @@ -628,7 +632,7 @@ public class AutodetectProcessManager implements ClusterStateListener { try { nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription()); } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e); + logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobId), e); } }