diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0dcc25c8825..04624ab05da 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobUpdate; @@ -56,7 +55,6 @@ public class AutodetectCommunicator implements Closeable { private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1); private final Job job; - private final JobTask jobTask; private final AutodetectProcess autodetectProcess; private final StateStreamer stateStreamer; private final DataCountsReporter dataCountsReporter; @@ -66,12 +64,11 @@ public class AutodetectCommunicator implements Closeable { private final NamedXContentRegistry xContentRegistry; private volatile boolean processKilled; - AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, StateStreamer stateStreamer, + AutodetectCommunicator(Job job, AutodetectProcess process, StateStreamer stateStreamer, DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, Consumer onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) { this.job = job; - this.jobTask = jobTask; this.autodetectProcess = process; this.stateStreamer = stateStreamer; this.dataCountsReporter = dataCountsReporter; @@ -261,10 +258,6 @@ public class AutodetectCommunicator implements Closeable { } } - public JobTask getJobTask() { - return jobTask; - } - public ZonedDateTime getProcessStartTime() { return autodetectProcess.getProcessStartTime(); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 996d0a76586..444196c6a57 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; @@ -50,7 +49,6 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; -import org.elasticsearch.xpack.ml.job.results.Forecast; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -59,7 +57,6 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; -import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -105,8 +102,7 @@ public class AutodetectProcessManager extends AbstractComponent { private final JobResultsPersister jobResultsPersister; private final JobDataCountsPersister jobDataCountsPersister; - private final ConcurrentMap autoDetectCommunicatorByOpenJob = new ConcurrentHashMap<>(); - private final ConcurrentMap autoDetectCommunicatorByClosingJob = new ConcurrentHashMap<>(); + private final ConcurrentMap processByAllocation = new ConcurrentHashMap<>(); private final int maxAllowedRunningJobs; @@ -134,53 +130,37 @@ public class AutodetectProcessManager extends AbstractComponent { } public synchronized void closeAllJobsOnThisNode(String reason) throws IOException { - int numJobs = autoDetectCommunicatorByOpenJob.size(); + int numJobs = processByAllocation.size(); if (numJobs != 0) { logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); - for (AutodetectCommunicator communicator : autoDetectCommunicatorByOpenJob.values()) { - closeJob(communicator.getJobTask(), false, reason); + for (ProcessContext process : processByAllocation.values()) { + closeJob(process.getJobTask(), false, reason); } } } public void killProcess(JobTask jobTask, boolean awaitCompletion, String reason) { - String extraInfo; - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId()); - if (communicator == null) { - extraInfo = " while closing"; - // if there isn't an open job, check for a closing job - communicator = autoDetectCommunicatorByClosingJob.remove(jobTask.getAllocationId()); - } else { - extraInfo = ""; - } - if (communicator != null) { - if (reason == null) { - logger.info("Killing job [{}]{}", jobTask.getJobId(), extraInfo); - } else { - logger.info("Killing job [{}]{}, because [{}]", jobTask.getJobId(), extraInfo, reason); - } - killProcess(communicator, jobTask.getJobId(), awaitCompletion, true); + ProcessContext processContext = processByAllocation.remove(jobTask.getAllocationId()); + if (processContext != null) { + processContext.newKillBuilder() + .setAwaitCompletion(awaitCompletion) + .setFinish(true) + .setReason(reason) + .kill(); } } public void killAllProcessesOnThisNode() { - // first kill open jobs, then closing jobs - for (Iterator iter : Arrays.asList(autoDetectCommunicatorByOpenJob.values().iterator(), - autoDetectCommunicatorByClosingJob.values().iterator())) { - while (iter.hasNext()) { - AutodetectCommunicator communicator = iter.next(); - iter.remove(); - killProcess(communicator, communicator.getJobTask().getJobId(), false, false); - } - } - } - - private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) { - try { - communicator.killProcess(awaitCompletion, finish); - } catch (IOException e) { - logger.error("[{}] Failed to kill autodetect process for job", jobId); + Iterator iterator = processByAllocation.values().iterator(); + while (iterator.hasNext()) { + ProcessContext processContext = iterator.next(); + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .setSilent(true) + .kill(); + iterator.remove(); } } @@ -205,7 +185,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void processData(JobTask jobTask, InputStream input, XContentType xContentType, DataLoadParams params, BiConsumer handler) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open"); } @@ -223,7 +203,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener handler) { logger.debug("Flushing job {}", jobTask.getJobId()); - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId()); logger.debug(message); @@ -250,7 +230,7 @@ public class AutodetectProcessManager extends AbstractComponent { */ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { logger.debug("Forecasting job {}", jobTask.getJobId()); - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId()); logger.debug(message); @@ -271,7 +251,7 @@ public class AutodetectProcessManager extends AbstractComponent { public void writeUpdateProcessMessage(JobTask jobTask, List updates, ModelPlotConfig config, Consumer handler) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; logger.debug(message); @@ -298,6 +278,7 @@ public class AutodetectProcessManager extends AbstractComponent { } logger.info("Opening job [{}]", jobId); + processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); jobProvider.getAutodetectParams(job, params -> { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @@ -308,19 +289,29 @@ public class AutodetectProcessManager extends AbstractComponent { @Override protected void doRun() throws Exception { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext == null) { + logger.debug("Aborted opening job [{}] as it has been closed", jobId); + return; + } + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", jobId, processContext.getState().getClass().getName()); + return; + } + try { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.computeIfAbsent(jobTask.getAllocationId(), - id -> create(jobTask, params, handler)); - communicator.init(params.modelSnapshot()); + createProcessAndSetRunning(processContext, params, handler); + processContext.getAutodetectCommunicator().init(params.modelSnapshot()); setJobState(jobTask, JobState.OPENED); } catch (Exception e1) { // No need to log here as the persistent task framework will log it try { // Don't leave a partially initialised process hanging around - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.remove(jobTask.getAllocationId()); - if (communicator != null) { - communicator.killProcess(false, false); - } + processContext.newKillBuilder() + .setAwaitCompletion(false) + .setFinish(false) + .kill(); + processByAllocation.remove(jobTask.getAllocationId()); } finally { setJobState(jobTask, JobState.FAILED, e2 -> handler.accept(e1)); } @@ -333,13 +324,28 @@ public class AutodetectProcessManager extends AbstractComponent { }); } + private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer handler) { + try { + // At this point we lock the process context until the process has been started. + // The reason behind this is to ensure closing the job does not happen before + // the process is started as that can result to the job getting seemingly closed + // but the actual process is hanging alive. + processContext.tryLock(); + AutodetectCommunicator communicator = create(processContext.getJobTask(), params, handler); + processContext.setRunning(communicator); + } finally { + // Now that the process is running and we have updated its state we can unlock. + // It is important to unlock before we initialize the communicator (ie. load the model state) + // as that may be a long-running method. + processContext.unlock(); + } + } + AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams, Consumer handler) { // Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME // that an open job uses, so include them too when considering if enough threads are available. - // There's a slight possibility that the same key is in both sets, hence it's not sufficient to simply - // add the two map sizes. - int currentRunningJobs = Sets.union(autoDetectCommunicatorByOpenJob.keySet(), autoDetectCommunicatorByClosingJob.keySet()).size(); - if (currentRunningJobs >= maxAllowedRunningJobs) { + int currentRunningJobs = processByAllocation.size(); + if (currentRunningJobs > maxAllowedRunningJobs) { throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } @@ -390,7 +396,7 @@ public class AutodetectProcessManager extends AbstractComponent { } throw e; } - return new AutodetectCommunicator(job, jobTask, process, new StateStreamer(client), dataCountsReporter, processor, handler, + return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler, xContentRegistry, autodetectWorkerExecutor); } @@ -429,31 +435,34 @@ public class AutodetectProcessManager extends AbstractComponent { String jobId = jobTask.getJobId(); long allocationId = jobTask.getAllocationId(); logger.debug("Attempting to close job [{}], because [{}]", jobId, reason); - // don't remove the communicator immediately, because we need to ensure it's in the - // map of closing communicators before it's removed from the map of running ones - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(allocationId); - if (communicator == null) { - logger.debug("Cannot close: no active autodetect process for job [{}]", jobId); - return; - } - // keep a record of the job, so that it can still be killed while closing - autoDetectCommunicatorByClosingJob.putIfAbsent(allocationId, communicator); - communicator = autoDetectCommunicatorByOpenJob.remove(allocationId); - if (communicator == null) { - // if we get here a simultaneous close request beat us to the remove() call - logger.debug("Already closing autodetect process for job [{}]", jobId); + // don't remove the process context immediately, because we need to ensure + // it is reachable to enable killing a job while it is closing + ProcessContext processContext = processByAllocation.get(allocationId); + if (processContext == null) { + logger.debug("Cannot close job [{}] as it has already been closed", jobId); return; } + processContext.tryLock(); + processContext.setDying(); + processContext.unlock(); + if (reason == null) { logger.info("Closing job [{}]", jobId); } else { logger.info("Closing job [{}], because [{}]", jobId, reason); } + AutodetectCommunicator communicator = processContext.getAutodetectCommunicator(); + if (communicator == null) { + logger.debug("Job [{}] is being closed before its process is started", jobId); + jobTask.markAsCompleted(); + return; + } + try { communicator.close(restart, reason); - autoDetectCommunicatorByClosingJob.remove(allocationId); + processByAllocation.remove(allocationId); } catch (Exception e) { logger.warn("[" + jobId + "] Exception closing autodetect process", e); setJobState(jobTask, JobState.FAILED); @@ -462,15 +471,29 @@ public class AutodetectProcessManager extends AbstractComponent { } int numberOfOpenJobs() { - return autoDetectCommunicatorByOpenJob.size(); + return (int) processByAllocation.values().stream() + .filter(p -> p.getState() != ProcessContext.ProcessStateName.DYING) + .count(); } boolean jobHasActiveAutodetectProcess(JobTask jobTask) { - return autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()) != null; + return getAutodetectCommunicator(jobTask) != null; + } + + private AutodetectCommunicator getAutodetectCommunicator(JobTask jobTask) { + return processByAllocation.getOrDefault(jobTask.getAllocationId(), new ProcessContext(jobTask)).getAutodetectCommunicator(); + } + + private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + return processContext.getAutodetectCommunicator(); + } + return null; } public Optional jobOpenTime(JobTask jobTask) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { return Optional.empty(); } @@ -516,7 +539,7 @@ public class AutodetectProcessManager extends AbstractComponent { } public Optional> getStatistics(JobTask jobTask) { - AutodetectCommunicator communicator = autoDetectCommunicatorByOpenJob.get(jobTask.getAllocationId()); + AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { return Optional.empty(); } @@ -597,6 +620,5 @@ public class AutodetectProcessManager extends AbstractComponent { awaitTermination.countDown(); } } - } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 04870785e06..16ec2621c92 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -39,7 +39,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); - private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); + public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Client client; private final Environment env; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java new file mode 100644 index 00000000000..409639705ae --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/ProcessContext.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The process context that encapsulates the job task, the process state and the autodetect communicator. + */ +final class ProcessContext { + + private static final Logger LOGGER = Loggers.getLogger(ProcessContext.class); + + private final ReentrantLock lock = new ReentrantLock(); + private final JobTask jobTask; + private volatile AutodetectCommunicator autodetectCommunicator; + private volatile ProcessState state; + + ProcessContext(JobTask jobTask) { + this.jobTask = jobTask; + this.state = new ProcessNotRunningState(); + } + + JobTask getJobTask() { + return jobTask; + } + + AutodetectCommunicator getAutodetectCommunicator() { + return autodetectCommunicator; + } + + private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunicator) { + this.autodetectCommunicator = autodetectCommunicator; + } + + ProcessStateName getState() { + return state.getName(); + } + + private void setState(ProcessState state) { + this.state = state; + } + + void tryLock() { + try { + if (lock.tryLock(NativeAutodetectProcessFactory.PROCESS_STARTUP_TIMEOUT.getSeconds(), TimeUnit.SECONDS) == false) { + LOGGER.error("Failed to acquire process lock for job [{}]", jobTask.getJobId()); + throw ExceptionsHelper.serverError("Failed to acquire process lock for job [" + jobTask.getJobId() + "]"); + } + } catch (InterruptedException e) { + throw new ElasticsearchException(e); + } + } + + void unlock() { + lock.unlock(); + } + + void setRunning(AutodetectCommunicator autodetectCommunicator) { + assert lock.isHeldByCurrentThread(); + state.setRunning(this, autodetectCommunicator); + } + + void setDying() { + assert lock.isHeldByCurrentThread(); + state.setDying(this); + } + + KillBuilder newKillBuilder() { + return new ProcessContext.KillBuilder(); + } + + class KillBuilder { + private boolean awaitCompletion; + private boolean finish; + private boolean silent; + private String reason; + + KillBuilder setAwaitCompletion(boolean awaitCompletion) { + this.awaitCompletion = awaitCompletion; + return this; + } + + KillBuilder setFinish(boolean finish) { + this.finish = finish; + return this; + } + + KillBuilder setSilent(boolean silent) { + this.silent = silent; + return this; + } + + KillBuilder setReason(String reason) { + this.reason = reason; + return this; + } + + void kill() { + if (autodetectCommunicator == null) { + return; + } + String jobId = jobTask.getJobId(); + + if (silent == false) { + String extraInfo = (state.getName() == ProcessStateName.DYING) ? " while closing" : ""; + if (reason == null) { + LOGGER.info("Killing job [{}]{}", jobId, extraInfo); + } else { + LOGGER.info("Killing job [{}]{}, because [{}]", jobId, extraInfo, reason); + } + } + try { + autodetectCommunicator.killProcess(awaitCompletion, finish); + } catch (IOException e) { + LOGGER.error("[{}] Failed to kill autodetect process for job", jobId); + } + } + } + + enum ProcessStateName { + NOT_RUNNING, RUNNING, DYING + } + + private interface ProcessState { + void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator); + void setDying(ProcessContext processContext); + ProcessStateName getName(); + } + + private static class ProcessNotRunningState implements ProcessState { + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + processContext.setAutodetectCommunicator(autodetectCommunicator); + processContext.setState(new ProcessRunningState()); + } + + @Override + public void setDying(ProcessContext processContext) { + processContext.setState(new ProcessDyingState()); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.NOT_RUNNING; + } + } + + private static class ProcessRunningState implements ProcessState { + + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + LOGGER.debug("Process set to [running] while it was already in that state"); + } + + @Override + public void setDying(ProcessContext processContext) { + processContext.setState(new ProcessDyingState()); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.RUNNING; + } + } + + private static class ProcessDyingState implements ProcessState { + + @Override + public void setRunning(ProcessContext processContext, AutodetectCommunicator autodetectCommunicator) { + LOGGER.debug("Process set to [running] while it was in [dying]"); + } + + @Override + public void setDying(ProcessContext processContext) { + LOGGER.debug("Process set to [dying] while it was already in that state"); + } + + @Override + public ProcessStateName getName() { + return ProcessStateName.DYING; + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 27396737af5..de03143be9a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -198,9 +198,7 @@ public class AutodetectCommunicatorTests extends ESTestCase { ((ActionListener) invocation.getArguments()[0]).onResponse(true); return null; }).when(dataCountsReporter).finishReporting(any()); - JobTask jobTask = mock(JobTask.class); - when(jobTask.getJobId()).thenReturn("foo"); - return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess, stateStreamer, + return new AutodetectCommunicator(createJobDetails(), autodetectProcess, stateStreamer, dataCountsReporter, autoDetectResultProcessor, finishHandler, new NamedXContentRegistry(Collections.emptyList()), executorService); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 636f9d2e029..8e281c7034c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -459,7 +459,6 @@ public class AutodetectProcessManagerTests extends ESTestCase { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - when(communicator.getJobTask()).thenReturn(jobTask); manager.openJob(jobTask, e -> {}); manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),