[ML] Wait for autodetect to be ready in the datafeed (#37349)
This is a reinforcement of #37227. It turns out that persistent tasks are not made stale if the node they were running on is restarted and the master node does not notice this. The main scenario where this happens is when minimum master nodes is the same as the number of nodes in the cluster, so the cluster cannot elect a master node when any node is restarted. When an ML node restarts we need the datafeeds for any jobs that were running on that node to not just wait until the jobs are allocated, but to wait for the autodetect process of the job to start up. In the case of reassignment of the job persistent task this was dealt with by the stale status test. But in the case where a node restarts but its persistent tasks are not reassigned we need a deeper test. Fixes #36810
This commit is contained in:
parent
4d3928d444
commit
1da59db3fb
|
@ -432,7 +432,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
|
|||
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
|
||||
auditor, System::currentTimeMillis);
|
||||
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
|
||||
System::currentTimeMillis, auditor);
|
||||
System::currentTimeMillis, auditor, autodetectProcessManager);
|
||||
this.datafeedManager.set(datafeedManager);
|
||||
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
|
||||
autodetectProcessManager);
|
||||
|
|
|
@ -27,9 +27,11 @@ import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -62,16 +64,18 @@ public class DatafeedManager {
|
|||
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
|
||||
private final DatafeedJobBuilder datafeedJobBuilder;
|
||||
private final TaskRunner taskRunner = new TaskRunner();
|
||||
private final AutodetectProcessManager autodetectProcessManager;
|
||||
private volatile boolean isolated;
|
||||
|
||||
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor) {
|
||||
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
|
||||
this.client = Objects.requireNonNull(client);
|
||||
this.clusterService = Objects.requireNonNull(clusterService);
|
||||
this.threadPool = threadPool;
|
||||
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
|
||||
this.autodetectProcessManager = autodetectProcessManager;
|
||||
clusterService.addListener(taskRunner);
|
||||
}
|
||||
|
||||
|
@ -256,6 +260,21 @@ public class DatafeedManager {
|
|||
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
|
||||
}
|
||||
|
||||
private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks,
|
||||
TransportStartDatafeedAction.DatafeedTask datafeedTask) {
|
||||
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks);
|
||||
if (jobTask == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
JobTaskState state = (JobTaskState) jobTask.getState();
|
||||
if (state == null || state.isStatusStale(jobTask)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId());
|
||||
}
|
||||
|
||||
private TimeValue computeNextDelay(long next) {
|
||||
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
|
||||
}
|
||||
|
@ -446,7 +465,7 @@ public class DatafeedManager {
|
|||
private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (getJobState(tasks, datafeedTask) == JobState.OPENED) {
|
||||
if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) {
|
||||
runTask(datafeedTask);
|
||||
} else {
|
||||
logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
|
||||
|
@ -485,10 +504,10 @@ public class DatafeedManager {
|
|||
continue;
|
||||
}
|
||||
JobState jobState = getJobState(currentTasks, datafeedTask);
|
||||
if (jobState == JobState.OPENED) {
|
||||
runTask(datafeedTask);
|
||||
} else if (jobState == JobState.OPENING) {
|
||||
if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) {
|
||||
remainingTasks.add(datafeedTask);
|
||||
} else if (jobState == JobState.OPENED) {
|
||||
runTask(datafeedTask);
|
||||
} else {
|
||||
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
|
||||
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState);
|
||||
|
|
|
@ -212,6 +212,13 @@ public class AutodetectProcessManager {
|
|||
*/
|
||||
public void persistJob(JobTask jobTask, Consumer<Exception> handler) {
|
||||
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process",
|
||||
jobTask.getJobId());
|
||||
logger.debug(message);
|
||||
handler.accept(ExceptionsHelper.conflictStatusException(message));
|
||||
return;
|
||||
}
|
||||
communicator.persistJob((aVoid, e) -> handler.accept(e));
|
||||
}
|
||||
|
||||
|
@ -239,7 +246,8 @@ public class AutodetectProcessManager {
|
|||
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
|
||||
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
|
||||
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() +
|
||||
"] does not have a corresponding autodetect process");
|
||||
}
|
||||
communicator.writeToJob(input, analysisRegistry, xContentType, params, handler);
|
||||
}
|
||||
|
@ -257,7 +265,8 @@ public class AutodetectProcessManager {
|
|||
logger.debug("Flushing job {}", jobTask.getJobId());
|
||||
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
|
||||
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process",
|
||||
jobTask.getJobId());
|
||||
logger.debug(message);
|
||||
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
|
||||
return;
|
||||
|
@ -307,7 +316,8 @@ public class AutodetectProcessManager {
|
|||
logger.debug("Forecasting job {}", jobId);
|
||||
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
|
||||
String message = String.format(Locale.ROOT,
|
||||
"Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId);
|
||||
logger.debug(message);
|
||||
handler.accept(ExceptionsHelper.conflictStatusException(message));
|
||||
return;
|
||||
|
@ -327,7 +337,8 @@ public class AutodetectProcessManager {
|
|||
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
|
||||
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
|
||||
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() +
|
||||
"] does not have a corresponding autodetect process";
|
||||
logger.debug(message);
|
||||
handler.accept(ExceptionsHelper.conflictStatusException(message));
|
||||
return;
|
||||
|
@ -663,6 +674,14 @@ public class AutodetectProcessManager {
|
|||
return null;
|
||||
}
|
||||
|
||||
public boolean hasOpenAutodetectCommunicator(long jobAllocationId) {
|
||||
ProcessContext processContext = processByAllocation.get(jobAllocationId);
|
||||
if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
|
||||
return processContext.getAutodetectCommunicator() != null;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public Optional<Duration> jobOpenTime(JobTask jobTask) {
|
||||
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
|
||||
if (communicator == null) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
|
|||
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask;
|
||||
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
@ -48,6 +49,7 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
|
||||
|
@ -74,6 +76,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
private long currentTime = 120000;
|
||||
private Auditor auditor;
|
||||
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
|
||||
private AtomicBoolean hasOpenAutodetectCommunicator;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -128,7 +131,12 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
return null;
|
||||
}).when(datafeedJobBuilder).build(any(), any());
|
||||
|
||||
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor);
|
||||
hasOpenAutodetectCommunicator = new AtomicBoolean(true);
|
||||
AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class);
|
||||
doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong());
|
||||
|
||||
datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor,
|
||||
autodetectProcessManager);
|
||||
|
||||
verify(clusterService).addListener(capturedClusterStateListener.capture());
|
||||
}
|
||||
|
@ -278,7 +286,44 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
capturedClusterStateListener.getValue().clusterChanged(
|
||||
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));
|
||||
|
||||
// Now it should run as the job state chanded to OPENED
|
||||
// Now it should run as the job state changed to OPENED
|
||||
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() {
|
||||
|
||||
hasOpenAutodetectCommunicator.set(false);
|
||||
|
||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
|
||||
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
when(clusterService.state()).thenReturn(cs.build());
|
||||
|
||||
Consumer<Exception> handler = mockConsumer();
|
||||
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
|
||||
datafeedManager.run(task, handler);
|
||||
|
||||
// Verify datafeed has not started running yet as job doesn't have an open autodetect communicator
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
|
||||
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
|
||||
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
|
||||
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));
|
||||
|
||||
// Still no run
|
||||
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
|
||||
hasOpenAutodetectCommunicator.set(true);
|
||||
|
||||
capturedClusterStateListener.getValue().clusterChanged(
|
||||
new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build()));
|
||||
|
||||
// Now it should run as the autodetect communicator is open
|
||||
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue