From c46e09902d4b75154e7c3bb3a65ffb1e124fca13 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 26 Sep 2017 11:46:01 +0100 Subject: [PATCH] [ML] Fix close job when the process has not launched yet (elastic/x-pack-elasticsearch#2616) If a job close is requested after a job was opened but before its process was launched, the job close returns successfully without doing anything. The result is that the process hangs around. This has been causing test failures as documented int elastic/x-pack-elasticsearch#2360 and elastic/x-pack-elasticsearch#1270. This commit fixes this problem by refactoring the AutodetectProcessManager. It introduces a state pattern to make clear the states of the process and it uses locking to ensure a close waits for the job process to be created. relates elastic/x-pack-elasticsearch#1270 Original commit: elastic/x-pack-elasticsearch@ff858bd1362abc0b9d6ce887ce44b35d1cdc26ab --- .../autodetect/AutodetectCommunicator.java | 9 +- .../autodetect/AutodetectProcessManager.java | 172 ++++++++------- .../NativeAutodetectProcessFactory.java | 2 +- .../process/autodetect/ProcessContext.java | 195 ++++++++++++++++++ .../AutodetectCommunicatorTests.java | 4 +- .../AutodetectProcessManagerTests.java | 1 - 6 files changed, 295 insertions(+), 88 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0dcc25c8825..04624ab05da 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobUpdate; @@ -56,7 +55,6 @@ public class AutodetectCommunicator implements Closeable { private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1); private final Job job; - private final JobTask jobTask; private final AutodetectProcess autodetectProcess; private final StateStreamer stateStreamer; private final DataCountsReporter dataCountsReporter; @@ -66,12 +64,11 @@ public class AutodetectCommunicator implements Closeable { private final NamedXContentRegistry xContentRegistry; private volatile boolean processKilled; - AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, StateStreamer stateStreamer, + AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer, DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, Consumer onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; - this.jobTask = jobTask; this.autodetectProcess = process; this.stateStreamer = stateStreamer; this.dataCountsReporter = dataCountsReporter; @@ -261,10 +258,6 @@ public class AutodetectCommunicator implements Closeable { } } - public JobTask getJobTask() { - return jobTask; - } - public ZonedDateTime getProcessStartTime() { return autodetectProcess.getProcessStartTime(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 996d0a76586..444196c6a57 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -50,7 +49,6 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; -import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -59,7 +57,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; -import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -105,8 +102,7 @@ public class AutodetectProcessManager extends AbstractComponent { private final JobResultsPersister jobResultsPersister; private final JobDataCountsPersister jobDataCountsPersister; - private final ConcurrentMap autoDetectCommunicatorByOpenJob = new ConcurrentHashMap<>(); - private final ConcurrentMap autoDetectCommunicatorByClosingJob = new ConcurrentHashMap<>(); + private final ConcurrentMap processByAllocation = new ConcurrentHashMap<>(); private final int maxAllowedRunningJobs; @@ -134,53 +130,37 @@ public class AutodetectProcessManager extends AbstractComponent { } public synchronized void closeAllJobsOnThisNode(String reason) throws IOException { - int numJobs = autoDetectCommunicatorByOpenJob.size(); + int numJobs = processByAllocation.size(); if (numJobs != 0) { logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); - for (AutodetectCommunicator communicator : autoDetectCommunicatorByOpenJob.values()) { - closeJob(communicator.getJobTask(), false, reason); + for (ProcessContext process : processByAllocation.values()) { + closeJob(process.getJobTask(), false, reason); } } } public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason) { - String extraInfo; - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId()); - if (communicator == null) { - extraInfo = " while closing"; - // if there isn't an open job, check for a closing job - communicator = autoDetectCommunicatorByClosingJob.remove(jobTask.getAllocationId()); - } else { - extraInfo = ""; - } - if (communicator != null) { - if (reason == null) { - logger.info("Killing job [{}]{}", jobTask.getJobId(), extraInfo); - } else { - logger.info("Killing job [{}]{}, because [{}]", jobTask.getJobId(), extraInfo, reason); - } - killProcess(communicator, jobTask.getJobId(), awaitCompletion, true); + ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId()); + if (processContext != null) { + processContext.newKillBuilder() + .setAwaitCompletion(awaitCompletion) + .setFinish(true) + .setReason(reason) + .kill(); } } public void killAllProcessesOnThisNode() { - // first kill open jobs, then closing jobs - for (Iterator iter : Arrays.asList(autoDetectCommunicatorByOpenJob.values().iterator(), - autoDetectCommunicatorByClosingJob.values().iterator())) { - while (iter.hasNext()) { - AutodetectCommunicator communicator = iter.next(); - iter.remove(); - killProcess(communicator, communicator.getJobTask().getJobId(), false, false); - } - } - } - - private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) { - try { - communicator.killProcess(awaitCompletion, finish); - } catch (IOException e) { - logger.error("[{}] Failed to kill autodetect process for job", jobId); + Iterator iterator = processByAllocation.values().iterator(); + while (iterator.hasNext()) { + ProcessContext processContext = iterator.next(); + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .setSilent(true) + .kill(); + iterator.remove(); } } @@ -205,7 +185,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void processData(JobTask jobTask, InputStream input, XContentType xContentType, DataLoadParams params, BiConsumer handler) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open"); } @@ -223,7 +203,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener handler) { logger.debug("Flushing job {}", jobTask.getJobId()); - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId()); logger.debug(message); @@ -250,7 +230,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { logger.debug("Forecasting job {}", jobTask.getJobId()); - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId()); logger.debug(message); @@ -271,7 +251,7 @@ public class AutodetectProcessManager extends AbstractComponent { public void writeUpdateProcessMessage(JobTask jobTask, List updates, ModelPlotConfig config, Consumer handler) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; logger.debug(message); @@ -298,6 +278,7 @@ public class AutodetectProcessManager extends AbstractComponent { } logger.info("Opening job [{}]", jobId); + processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); jobProvider.getAutodetectParams(job, params -> { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @@ -308,19 +289,29 @@ public class AutodetectProcessManager extends AbstractComponent { @Override protected void doRun() throws Exception { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext == null) { + logger.debug("Aborted opening job [{}] as it has been closed", jobId); + return; + } + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", jobId, processContext.getState().getClass().getName()); + return; + } + try { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.computeIfAbsent(jobTask.getAllocationId(), - id -> create(jobTask, params, handler)); - communicator.init(params.modelSnapshot()); + createProcessAndSetRunning(processContext, params, handler); + processContext.getAutodetectCommunicator().init(params.modelSnapshot()); setJobState(jobTask, JobState.OPENED); } catch (Exception e1) { // No need to log here as the persistent task framework will log it try { // Don't leave a partially initialised process hanging around - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId()); - if (communicator != null) { - communicator.killProcess(false, false); - } + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .kill(); + processByAllocation.remove(jobTask.getAllocationId()); } finally { setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1)); } @@ -333,13 +324,28 @@ public class AutodetectProcessManager extends AbstractComponent { }); } + private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer handler) { + try { + // At this point we lock the process context until the process has been started. + // The reason behind this is to ensure closing the job does not happen before + // the process is started as that can result to the job getting seemingly closed + // but the actual process is hanging alive. + processContext.tryLock(); + AutodetectCommunicator communicator = create(processContext.getJobTask(), params, handler); + processContext.setRunning(communicator); + } finally { + // Now that the process is running and we have updated its state we can unlock. + // It is important to unlock before we initialize the communicator (ie. load the model state) + // as that may be a long-running method. + processContext.unlock(); + } + } + AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer handler) { // Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME // that an open job uses, so include them too when considering if enough threads are available. - // There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply - // add the two map sizes. - int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size(); - if (currentRunningJobs >= maxAllowedRunningJobs) { + int currentRunningJobs = processByAllocation.size(); + if (currentRunningJobs > maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } @@ -390,7 +396,7 @@ public class AutodetectProcessManager extends AbstractComponent { } throw e; } - return new AutodetectCommunicator(job, jobTask, process, new StateStreamer(client), dataCountsReporter, processor, handler, + return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler, xContentRegistry, autodetectWorkerExecutor); } @@ -429,31 +435,34 @@ public class AutodetectProcessManager extends AbstractComponent { String jobId = jobTask.getJobId(); long allocationId = jobTask.getAllocationId(); logger.debug("Attempting to close job [{}], because [{}]", jobId, reason); - // don't remove the communicator immediately, because we need to ensure it's in the - // map of closing communicators before it's removed from the map of running ones - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(allocationId); - if (communicator == null) { - logger.debug("Cannot close: no active autodetect process for job [{}]", jobId); - return; - } - // keep a record of the job, so that it can still be killed while closing - autoDetectCommunicatorByClosingJob.putIfAbsent(allocationId, communicator); - communicator = autoDetectCommunicatorByOpenJob.remove(allocationId); - if (communicator == null) { - // if we get here a simultaneous close request beat us to the remove() call - logger.debug("Already closing autodetect process for job [{}]", jobId); + // don't remove the process context immediately, because we need to ensure + // it is reachable to enable killing a job while it is closing + ProcessContext processContext = processByAllocation.get(allocationId); + if (processContext == null) { + logger.debug("Cannot close job [{}] as it has already been closed", jobId); return; } + processContext.tryLock(); + processContext.setDying(); + processContext.unlock(); + if (reason == null) { logger.info("Closing job [{}]", jobId); } else { logger.info("Closing job [{}], because [{}]", jobId, reason); } + AutodetectCommunicator communicator = processContext.getAutodetectCommunicator(); + if (communicator == null) { + logger.debug("Job [{}] is being closed before its process is started", jobId); + jobTask.markAsCompleted(); + return; + } + try { communicator.close(restart, reason); - autoDetectCommunicatorByClosingJob.remove(allocationId); + processByAllocation.remove(allocationId); } catch (Exception e) { logger.warn("[" + jobId + "] Exception closing autodetect process", e); setJobState(jobTask, JobState.FAILED); @@ -462,15 +471,29 @@ public class AutodetectProcessManager extends AbstractComponent { } int numberOfOpenJobs() { - return autoDetectCommunicatorByOpenJob.size(); + return (int) processByAllocation.values().stream() + .filter(p -> p.getState() != ProcessContext.ProcessStateName.DYING) + .count(); } boolean jobHasActiveAutodetectProcess(JobTask jobTask) { - return autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()) != null; + return getAutodetectCommunicator(jobTask) != null; + } + + private AutodetectCommunicator getAutodetectCommunicator(JobTask jobTask) { + return processByAllocation.getOrDefault(jobTask.getAllocationId(), new ProcessContext(jobTask)).getAutodetectCommunicator(); + } + + private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + return processContext.getAutodetectCommunicator(); + } + return null; } public Optional jobOpenTime(JobTask jobTask) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { return Optional.empty(); } @@ -516,7 +539,7 @@ public class AutodetectProcessManager extends AbstractComponent { } public Optional> getStatistics(JobTask jobTask) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { return Optional.empty(); } @@ -597,6 +620,5 @@ public class AutodetectProcessManager extends AbstractComponent { awaitTermination.countDown(); } } - } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 04870785e06..16ec2621c92 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -39,7 +39,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); + public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Client client; private final Environment env; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java new file mode 100644 index 00000000000..409639705ae --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The process context that encapsulates the job task, the process state and the autodetect communicator. + */ +final class ProcessContext { + + private static final Logger LOGGER = Loggers.getLogger(ProcessContext.class); + + private final ReentrantLock lock = new ReentrantLock(); + private final JobTask jobTask; + private volatile AutodetectCommunicator autodetectCommunicator; + private volatile ProcessState state; + + ProcessContext(JobTask jobTask) { + this.jobTask = jobTask; + this.state = new ProcessNotRunningState(); + } + + JobTask getJobTask() { + return jobTask; + } + + AutodetectCommunicator getAutodetectCommunicator() { + return autodetectCommunicator; + } + + private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunicator) { + this.autodetectCommunicator = autodetectCommunicator; + } + + ProcessStateName getState() { + return state.getName(); + } + + private void setState(ProcessState state) { + this.state = state; + } + + void tryLock() { + try { + if (lock.tryLock(NativeAutodetectProcessFactory.PROCESS_STARTUP_TIMEOUT.getSeconds(), TimeUnit.SECONDS) == false) { + LOGGER.error("Failed to acquire process lock for job [{}]", jobTask.getJobId()); + throw ExceptionsHelper.serverError("Failed to acquire process lock for job [" + jobTask.getJobId() + "]"); + } + } catch (InterruptedException e) { + throw new ElasticsearchException(e); + } + } + + void unlock() { + lock.unlock(); + } + + void setRunning(AutodetectCommunicator autodetectCommunicator) { + assert lock.isHeldByCurrentThread(); + state.setRunning(this, autodetectCommunicator); + } + + void setDying() { + assert lock.isHeldByCurrentThread(); + state.setDying(this); + } + + KillBuilder newKillBuilder() { + return new ProcessContext.KillBuilder(); + } + + class KillBuilder { + private boolean awaitCompletion; + private boolean finish; + private boolean silent; + private String reason; + + KillBuilder setAwaitCompletion(boolean awaitCompletion) { + this.awaitCompletion = awaitCompletion; + return this; + } + + KillBuilder setFinish(boolean finish) { + this.finish = finish; + return this; + } + + KillBuilder setSilent(boolean silent) { + this.silent = silent; + return this; + } + + KillBuilder setReason(String reason) { + this.reason = reason; + return this; + } + + void kill() { + if (autodetectCommunicator == null) { + return; + } + String jobId = jobTask.getJobId(); + + if (silent == false) { + String extraInfo = (state.getName() == ProcessStateName.DYING) ? " while closing" : ""; + if (reason == null) { + LOGGER.info("Killing job [{}]{}", jobId, extraInfo); + } else { + LOGGER.info("Killing job [{}]{}, because [{}]", jobId, extraInfo, reason); + } + } + try { + autodetectCommunicator.killProcess(awaitCompletion, finish); + } catch (IOException e) { + LOGGER.error("[{}] Failed to kill autodetect process for job", jobId); + } + } + } + + enum ProcessStateName { + NOT_RUNNING, RUNNING, DYING + } + + private interface ProcessState { + void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator); + void setDying(ProcessContext processContext); + ProcessStateName getName(); + } + + private static class ProcessNotRunningState implements ProcessState { + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + processContext.setAutodetectCommunicator(autodetectCommunicator); + processContext.setState(new ProcessRunningState()); + } + + @Override + public void setDying(ProcessContext processContext) { + processContext.setState(new ProcessDyingState()); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.NOT_RUNNING; + } + } + + private static class ProcessRunningState implements ProcessState { + + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + LOGGER.debug("Process set to [running] while it was already in that state"); + } + + @Override + public void setDying(ProcessContext processContext) { + processContext.setState(new ProcessDyingState()); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.RUNNING; + } + } + + private static class ProcessDyingState implements ProcessState { + + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + LOGGER.debug("Process set to [running] while it was in [dying]"); + } + + @Override + public void setDying(ProcessContext processContext) { + LOGGER.debug("Process set to [dying] while it was already in that state"); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.DYING; + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 27396737af5..de03143be9a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -198,9 +198,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { ((ActionListener) invocation.getArguments()[0]).onResponse(true); return null; }).when(dataCountsReporter).finishReporting(any()); - JobTask jobTask = mock(JobTask.class); - when(jobTask.getJobId()).thenReturn("foo"); - return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess, stateStreamer, + return new AutodetectCommunicator(createJobDetails(), autodetectProcess, stateStreamer, dataCountsReporter, autoDetectResultProcessor, finishHandler, new NamedXContentRegistry(Collections.emptyList()), executorService); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 636f9d2e029..8e281c7034c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -459,7 +459,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - when(communicator.getJobTask()).thenReturn(jobTask); manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),