[ML] Fix possible race condition when starting datafeed (#51302)

The ID of the datafeed's associated job was being obtained
frequently by looking up the datafeed task in a map that
was being modified in other threads.  This could lead to
NPEs if the datafeed stopped running at an unexpected time.

This change reduces the number of places where a datafeed's
associated job ID is looked up to avoid the possibility of
failures when the datafeed's task is removed from the map
of running tasks during multi-step operations in other
threads.

Fixes #51285
This commit is contained in:
David Roberts 2020-01-22 11:30:47 +00:00
parent bfcfcdee33
commit 932c63297f
1 changed files with 25 additions and 16 deletions

View File

@ -85,13 +85,14 @@ public class DatafeedManager {
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap( ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> { datafeedJob -> {
String jobId = datafeedJob.getJobId();
Holder holder = new Holder(task, datafeedId, datafeedJob, Holder holder = new Holder(task, datafeedId, datafeedJob,
new ProblemTracker(auditor, datafeedJob.getJobId()), finishHandler); new ProblemTracker(auditor, jobId), finishHandler);
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() { task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTask<?> persistentTask) { public void onResponse(PersistentTask<?> persistentTask) {
taskRunner.runWhenJobIsOpened(task); taskRunner.runWhenJobIsOpened(task, jobId);
} }
@Override @Override
@ -267,17 +268,23 @@ public class DatafeedManager {
} }
} }
private String getJobId(TransportStartDatafeedAction.DatafeedTask task) { /**
return runningDatafeedsOnThisNode.get(task.getAllocationId()).getJobId(); * Returns <code>null</code> if the datafeed is not running on this node.
*/
private String getJobIdIfDatafeedRunningOnThisNode(TransportStartDatafeedAction.DatafeedTask task) {
Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId());
if (holder == null) {
return null;
}
return holder.getJobId();
} }
private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) { private JobState getJobState(PersistentTasksCustomMetaData tasks, String jobId) {
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); return MlTasks.getJobStateModifiedForReassignments(jobId, tasks);
} }
private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, String jobId) {
TransportStartDatafeedAction.DatafeedTask datafeedTask) { PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks);
if (jobTask == null) { if (jobTask == null) {
return false; return false;
} }
@ -492,14 +499,14 @@ public class DatafeedManager {
private final List<TransportStartDatafeedAction.DatafeedTask> tasksToRun = new CopyOnWriteArrayList<>(); private final List<TransportStartDatafeedAction.DatafeedTask> tasksToRun = new CopyOnWriteArrayList<>();
private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) { private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask, String jobId) {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) { if (getJobState(tasks, jobId) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, jobId)) {
runTask(datafeedTask); runTask(datafeedTask);
} else { } else {
logger.info("Datafeed [{}] is waiting for job [{}] to be opened", logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
datafeedTask.getDatafeedId(), getJobId(datafeedTask)); datafeedTask.getDatafeedId(), jobId);
tasksToRun.add(datafeedTask); tasksToRun.add(datafeedTask);
} }
} }
@ -530,17 +537,19 @@ public class DatafeedManager {
List<TransportStartDatafeedAction.DatafeedTask> remainingTasks = new ArrayList<>(); List<TransportStartDatafeedAction.DatafeedTask> remainingTasks = new ArrayList<>();
for (TransportStartDatafeedAction.DatafeedTask datafeedTask : tasksToRun) { for (TransportStartDatafeedAction.DatafeedTask datafeedTask : tasksToRun) {
if (runningDatafeedsOnThisNode.containsKey(datafeedTask.getAllocationId()) == false) { String jobId = getJobIdIfDatafeedRunningOnThisNode(datafeedTask);
if (jobId == null) {
// Datafeed is not running on this node any more
continue; continue;
} }
JobState jobState = getJobState(currentTasks, datafeedTask); JobState jobState = getJobState(currentTasks, jobId);
if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) { if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, jobId) == false) {
remainingTasks.add(datafeedTask); remainingTasks.add(datafeedTask);
} else if (jobState == JobState.OPENED) { } else if (jobState == JobState.OPENED) {
runTask(datafeedTask); runTask(datafeedTask);
} else { } else {
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); datafeedTask.getDatafeedId(), jobId, jobState);
datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20)); datafeedTask.stop("job_never_opened", TimeValue.timeValueSeconds(20));
} }
} }