From 1da59db3fbe7099eacce8a022c694ad8b9005aa8 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 11 Jan 2019 13:22:35 +0000 Subject: [PATCH] [ML] Wait for autodetect to be ready in the datafeed (#37349) This is a reinforcement of #37227. It turns out that persistent tasks are not made stale if the node they were running on is restarted and the master node does not notice this. The main scenario where this happens is when minimum master nodes is the same as the number of nodes in the cluster, so the cluster cannot elect a master node when any node is restarted. When an ML node restarts we need the datafeeds for any jobs that were running on that node to not just wait until the jobs are allocated, but to wait for the autodetect process of the job to start up. In the case of reassignment of the job persistent task this was dealt with by the stale status test. But in the case where a node restarts but its persistent tasks are not reassigned we need a deeper test. Fixes #36810 --- .../xpack/ml/MachineLearning.java | 4 +- .../xpack/ml/datafeed/DatafeedManager.java | 29 ++++++++-- .../autodetect/AutodetectProcessManager.java | 27 +++++++-- .../ml/datafeed/DatafeedManagerTests.java | 55 +++++++++++++++++-- 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 418add2757f..cc259f51c1e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -432,7 +432,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, - System::currentTimeMillis, auditor); + System::currentTimeMillis, auditor, autodetectProcessManager); this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); @@ -473,7 +473,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu return Arrays.asList( new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(), memoryTracker.get(), client), - new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get()) + new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get()) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 6367a13100e..e004a718b13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -27,9 +27,11 @@ import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.ArrayList; @@ -62,16 +64,18 @@ public class DatafeedManager { private final ConcurrentMap runningDatafeedsOnThisNode = new ConcurrentHashMap<>(); private final DatafeedJobBuilder datafeedJobBuilder; private final TaskRunner taskRunner = new TaskRunner(); + private final AutodetectProcessManager autodetectProcessManager; private volatile boolean isolated; public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, - Supplier currentTimeSupplier, Auditor auditor) { + Supplier currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = threadPool; this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.auditor = Objects.requireNonNull(auditor); this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder); + this.autodetectProcessManager = autodetectProcessManager; clusterService.addListener(taskRunner); } @@ -256,6 +260,21 @@ public class DatafeedManager { return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); } + private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, + TransportStartDatafeedAction.DatafeedTask datafeedTask) { + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks); + if (jobTask == null) { + return false; + } + + JobTaskState state = (JobTaskState) jobTask.getState(); + if (state == null || state.isStatusStale(jobTask)) { + return false; + } + + return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId()); + } + private TimeValue computeNextDelay(long next) { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } @@ -446,7 +465,7 @@ public class DatafeedManager { private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (getJobState(tasks, datafeedTask) == JobState.OPENED) { + if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) { runTask(datafeedTask); } else { logger.info("Datafeed [{}] is waiting for job [{}] to be opened", @@ -485,10 +504,10 @@ public class DatafeedManager { continue; } JobState jobState = getJobState(currentTasks, datafeedTask); - if (jobState == JobState.OPENED) { - runTask(datafeedTask); - } else if (jobState == JobState.OPENING) { + if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) { remainingTasks.add(datafeedTask); + } else if (jobState == JobState.OPENED) { + runTask(datafeedTask); } else { logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 251a2a5224a..32507df53ce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -212,6 +212,13 @@ public class AutodetectProcessManager { */ public void persistJob(JobTask jobTask, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); + if (communicator == null) { + String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process", + jobTask.getJobId()); + logger.debug(message); + handler.accept(ExceptionsHelper.conflictStatusException(message)); + return; + } communicator.persistJob((aVoid, e) -> handler.accept(e)); } @@ -239,7 +246,8 @@ public class AutodetectProcessManager { XContentType xContentType, DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open"); + throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"); } communicator.writeToJob(input, analysisRegistry, xContentType, params, handler); } @@ -257,7 +265,8 @@ public class AutodetectProcessManager { logger.debug("Flushing job {}", jobTask.getJobId()); AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId()); + String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process", + jobTask.getJobId()); logger.debug(message); handler.onFailure(ExceptionsHelper.conflictStatusException(message)); return; @@ -307,7 +316,8 @@ public class AutodetectProcessManager { logger.debug("Forecasting job {}", jobId); AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId); + String message = String.format(Locale.ROOT, + "Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId); logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -327,7 +337,8 @@ public class AutodetectProcessManager { public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; + String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"; logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -663,6 +674,14 @@ public class AutodetectProcessManager { return null; } + public boolean hasOpenAutodetectCommunicator(long jobAllocationId) { + ProcessContext processContext = processByAllocation.get(jobAllocationId); + if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + return processContext.getAutodetectCommunicator() != null; + } + return false; + } + public Optional jobOpenTime(JobTask jobTask) { AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 9bf883232c6..858f7e0f7d1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -48,6 +49,7 @@ import java.util.Collections; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; @@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase { private long currentTime = 120000; private Auditor auditor; private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); + private AtomicBoolean hasOpenAutodetectCommunicator; @Before @SuppressWarnings("unchecked") public void setUpTests() { Job.Builder job = createDatafeedJob().setCreateTime(new Date()); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() @@ -128,7 +131,12 @@ public class DatafeedManagerTests extends ESTestCase { return null; }).when(datafeedJobBuilder).build(any(), any()); - datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); + hasOpenAutodetectCommunicator = new AtomicBoolean(true); + AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class); + doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong()); + + datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor, + autodetectProcessManager); verify(clusterService).addListener(capturedClusterStateListener.capture()); } @@ -259,7 +267,7 @@ public class DatafeedManagerTests extends ESTestCase { // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) @@ -270,7 +278,7 @@ public class DatafeedManagerTests extends ESTestCase { // Still no run verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -278,7 +286,44 @@ public class DatafeedManagerTests extends ESTestCase { capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); - // Now it should run as the job state chanded to OPENED + // Now it should run as the job state changed to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + + public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() { + + hasOpenAutodetectCommunicator.set(false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job doesn't have an open autodetect communicator + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + hasOpenAutodetectCommunicator.set(true); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build())); + + // Now it should run as the autodetect communicator is open verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); }