diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 418add2757f..cc259f51c1e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -432,7 +432,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, - System::currentTimeMillis, auditor); + System::currentTimeMillis, auditor, autodetectProcessManager); this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); @@ -473,7 +473,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu return Arrays.asList( new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), memoryTracker.get(), client), - new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get()) + new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 6367a13100e..e004a718b13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -27,9 +27,11 @@ import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.ArrayList; @@ -62,16 +64,18 @@ public class DatafeedManager { private final ConcurrentMap runningDatafeedsOnThisNode = new ConcurrentHashMap<>(); private final DatafeedJobBuilder datafeedJobBuilder; private final TaskRunner taskRunner = new TaskRunner(); + private final AutodetectProcessManager autodetectProcessManager; private volatile boolean isolated; public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, - Supplier currentTimeSupplier, Auditor auditor) { + Supplier currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = threadPool; this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.auditor = Objects.requireNonNull(auditor); this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder); + this.autodetectProcessManager = autodetectProcessManager; clusterService.addListener(taskRunner); } @@ -256,6 +260,21 @@ public class DatafeedManager { return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); } + private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, + TransportStartDatafeedAction.DatafeedTask datafeedTask) { + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks); + if (jobTask == null) { + return false; + } + + JobTaskState state = (JobTaskState) jobTask.getState(); + if (state == null || state.isStatusStale(jobTask)) { + return false; + } + + return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId()); + } + private TimeValue computeNextDelay(long next) { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } @@ -446,7 +465,7 @@ public class DatafeedManager { private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (getJobState(tasks, datafeedTask) == JobState.OPENED) { + if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) { runTask(datafeedTask); } else { logger.info("Datafeed [{}] is waiting for job [{}] to be opened", @@ -485,10 +504,10 @@ public class DatafeedManager { continue; } JobState jobState = getJobState(currentTasks, datafeedTask); - if (jobState == JobState.OPENED) { - runTask(datafeedTask); - } else if (jobState == JobState.OPENING) { + if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) { remainingTasks.add(datafeedTask); + } else if (jobState == JobState.OPENED) { + runTask(datafeedTask); } else { logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 251a2a5224a..32507df53ce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -212,6 +212,13 @@ public class AutodetectProcessManager { */ public void persistJob(JobTask jobTask, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); + if (communicator == null) { + String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process", + jobTask.getJobId()); + logger.debug(message); + handler.accept(ExceptionsHelper.conflictStatusException(message)); + return; + } communicator.persistJob((aVoid, e) -> handler.accept(e)); } @@ -239,7 +246,8 @@ public class AutodetectProcessManager { XContentType xContentType, DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open"); + throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"); } communicator.writeToJob(input, analysisRegistry, xContentType, params, handler); } @@ -257,7 +265,8 @@ public class AutodetectProcessManager { logger.debug("Flushing job {}", jobTask.getJobId()); AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId()); + String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process", + jobTask.getJobId()); logger.debug(message); handler.onFailure(ExceptionsHelper.conflictStatusException(message)); return; @@ -307,7 +316,8 @@ public class AutodetectProcessManager { logger.debug("Forecasting job {}", jobId); AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId); + String message = String.format(Locale.ROOT, + "Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId); logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -327,7 +337,8 @@ public class AutodetectProcessManager { public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; + String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"; logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -663,6 +674,14 @@ public class AutodetectProcessManager { return null; } + public boolean hasOpenAutodetectCommunicator(long jobAllocationId) { + ProcessContext processContext = processByAllocation.get(jobAllocationId); + if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + return processContext.getAutodetectCommunicator() != null; + } + return false; + } + public Optional jobOpenTime(JobTask jobTask) { AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 9bf883232c6..858f7e0f7d1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -48,6 +49,7 @@ import java.util.Collections; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; @@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase { private long currentTime = 120000; private Auditor auditor; private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); + private AtomicBoolean hasOpenAutodetectCommunicator; @Before @SuppressWarnings("unchecked") public void setUpTests() { Job.Builder job = createDatafeedJob().setCreateTime(new Date()); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() @@ -128,7 +131,12 @@ public class DatafeedManagerTests extends ESTestCase { return null; }).when(datafeedJobBuilder).build(any(), any()); - datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); + hasOpenAutodetectCommunicator = new AtomicBoolean(true); + AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class); + doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong()); + + datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor, + autodetectProcessManager); verify(clusterService).addListener(capturedClusterStateListener.capture()); } @@ -259,7 +267,7 @@ public class DatafeedManagerTests extends ESTestCase { // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) @@ -270,7 +278,7 @@ public class DatafeedManagerTests extends ESTestCase { // Still no run verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -278,7 +286,44 @@ public class DatafeedManagerTests extends ESTestCase { capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); - // Now it should run as the job state chanded to OPENED + // Now it should run as the job state changed to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + + public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() { + + hasOpenAutodetectCommunicator.set(false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job doesn't have an open autodetect communicator + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + hasOpenAutodetectCommunicator.set(true); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build())); + + // Now it should run as the autodetect communicator is open verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); }