[ML] do not start stopping tasks on reassignment (#55315) (#55388)

When a anomaly jobs, datafeeds, and analytics tasks are stopped, they enter an ephemeral state called `STOPPING`. 

If the node executing the task fails while this is occurring, they could be stuck in the limbo state of `STOPPING`. It is best to mark the tasks as completed if they get reassigned to a node.
This commit is contained in:
Benjamin Trent 2020-04-17 08:57:12 -04:00 committed by GitHub
parent 290361c63b
commit 65e0084120
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 4 deletions

View File

@ -437,10 +437,18 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
JobTask jobTask = (JobTask) task; JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager; jobTask.autodetectProcessManager = autodetectProcessManager;
JobTaskState jobTaskState = (JobTaskState) state; JobTaskState jobTaskState = (JobTaskState) state;
JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
// If the job is closing, simply stop and return
if (JobState.CLOSING.equals(jobState)) {
// Mark as completed instead of using `stop` as stop assumes native processes have started
logger.info("[{}] job got reassigned while stopping. Marking as completed", params.getJobId());
jobTask.markAsCompleted();
return;
}
// If the job is failed then the Persistent Task Service will // If the job is failed then the Persistent Task Service will
// try to restart it on a node restart. Exiting here leaves the // try to restart it on a node restart. Exiting here leaves the
// job in the failed state and it must be force closed. // job in the failed state and it must be force closed.
if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { if (JobState.FAILED.equals(jobState)) {
return; return;
} }

View File

@ -642,11 +642,16 @@ public class TransportStartDataFrameAnalyticsAction
PersistentTaskState state) { PersistentTaskState state) {
logger.info("[{}] Starting data frame analytics", params.getId()); logger.info("[{}] Starting data frame analytics", params.getId());
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
// If we are "stopping" there is nothing to do // If we are "stopping" there is nothing to do and we should stop
if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {
logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", params.getId());
task.markAsCompleted();
return;
}
// If we are "failed" then we should leave the task as is; for recovery it must be force stopped. // If we are "failed" then we should leave the task as is; for recovery it must be force stopped.
if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf( if (DataFrameAnalyticsState.FAILED.equals(analyticsState)) {
DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) {
return; return;
} }

View File

@ -397,6 +397,14 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
final StartDatafeedAction.DatafeedParams params, final StartDatafeedAction.DatafeedParams params,
final PersistentTaskState state) { final PersistentTaskState state) {
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
DatafeedState datafeedState = (DatafeedState) state;
// If we are "stopping" there is nothing to do
if (DatafeedState.STOPPING.equals(datafeedState)) {
logger.info("[{}] datafeed got reassigned while stopping. Marking as completed", params.getDatafeedId());
datafeedTask.markAsCompleted();
return;
}
datafeedTask.datafeedManager = datafeedManager; datafeedTask.datafeedManager = datafeedManager;
datafeedManager.run(datafeedTask, datafeedManager.run(datafeedTask,
(error) -> { (error) -> {