diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java index 8d35219192a..f1e715bef1a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java @@ -40,7 +40,7 @@ public class InvalidLicenseEnforcer extends AbstractComponent { @Override protected void doRun() throws Exception { - datafeedJobRunner.closeAllDatafeeds("invalid license"); + datafeedJobRunner.stopAllDatafeeds("invalid license"); autodetectProcessManager.closeAllJobs("invalid license"); } }); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 4f41bd26183..268f23792c4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -66,7 +66,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.BiConsumer; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; @@ -221,7 +220,7 @@ public class OpenJobAction extends Action cancelHandler; + private volatile AutodetectProcessManager autodetectProcessManager; JobTask(String jobId, long id, String type, String action, TaskId parentTask) { super(id, type, action, "job-" + jobId, parentTask); @@ -236,7 +235,7 @@ public class OpenJobAction extends Action listener) { + JobTask jobTask = (JobTask) task; + jobTask.autodetectProcessManager = autodetectProcessManager; autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> { if (e1 != null) { listener.onFailure(e1); return; } - JobTask jobTask = (JobTask) task; - jobTask.cancelHandler = (restart, reason) -> autodetectProcessManager.closeJob(request.getJobId(), restart, reason); autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { if (e2 == null) { listener.onResponse(new TransportResponse.Empty()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 4c7db40d888..d522206784d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -230,7 +230,7 @@ public class StartDatafeedAction private final String datafeedId; private final long startTime; private final Long endTime; - private volatile DatafeedJobRunner.Holder holder; + private volatile DatafeedJobRunner datafeedJobRunner; public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request) { super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); @@ -239,6 +239,15 @@ public class StartDatafeedAction this.endTime = request.endTime; } + /* only for testing */ + public DatafeedTask(long id, String type, String action, TaskId parentTaskId, Request request, DatafeedJobRunner datafeedJobRunner) { + super(id, type, action, "datafeed-" + request.getDatafeedId(), parentTaskId); + this.datafeedId = request.getDatafeedId(); + this.startTime = request.startTime; + this.endTime = request.endTime; + this.datafeedJobRunner = datafeedJobRunner; + } + public String getDatafeedId() { return datafeedId; } @@ -256,10 +265,6 @@ public class StartDatafeedAction return endTime != null; } - public void setHolder(DatafeedJobRunner.Holder holder) { - this.holder = holder; - } - @Override protected void onCancelled() { stop(getReasonCancelled()); @@ -270,10 +275,7 @@ public class StartDatafeedAction } public void stop(String reason, TimeValue timeout) { - if (holder == null) { - throw new IllegalStateException("task cancel ran before datafeed runner assigned the holder"); - } - holder.stop(reason, timeout, null); + datafeedJobRunner.stopDatafeed(datafeedId, reason, timeout); } } @@ -339,6 +341,7 @@ public class StartDatafeedAction protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request, ActionListener listener) { DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask; + datafeedTask.datafeedJobRunner = datafeedJobRunner; datafeedJobRunner.run(datafeedTask, (error) -> { if (error != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 44f4363a268..de346bb515d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -111,7 +111,14 @@ public class DatafeedJobRunner extends AbstractComponent { }, handler); } - public synchronized void closeAllDatafeeds(String reason) { + public synchronized void stopDatafeed(String datafeedId, String reason, TimeValue timeout) { + Holder holder = runningDatafeeds.remove(datafeedId); + if (holder != null) { + holder.stop(reason, timeout, null); + } + } + + public synchronized void stopAllDatafeeds(String reason) { int numDatafeeds = runningDatafeeds.size(); if (numDatafeeds != 0) { logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason); @@ -225,9 +232,7 @@ public class DatafeedJobRunner extends AbstractComponent { DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(datafeed, job); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); - Holder holder = new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler); - task.setHolder(holder); - return holder; + return new Holder(datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(auditor, job.getId()), handler); } DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 3ba51d8aea0..46d9a150ac5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -271,7 +271,8 @@ public class DatafeedJobRunnerTests extends ESTestCase { Consumer handler = mockConsumer(); boolean cancelled = randomBoolean(); StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request("datafeed_id", 0L); - StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, startDatafeedRequest); + StartDatafeedAction.DatafeedTask task = new StartDatafeedAction.DatafeedTask(1, "type", "action", null, + startDatafeedRequest, datafeedJobRunner); datafeedJobRunner.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_RUNNER_THREAD_POOL_NAME);