add explicit null check for moving tasks from pending to running

This commit is contained in:
fjy 2013-08-21 13:02:35 -07:00
parent 88661b26a0
commit 54f00479cc
1 changed files with 16 additions and 8 deletions

View File

@ -521,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
}
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()).withWorker(theWorker));
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return;
}
runningTasks.put(task.getId(), workItem.withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
@ -613,10 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task %s just disappeared but I didn't know about it?!", taskId);
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
}
break;
}
@ -661,11 +669,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
log.info("%s: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("%s: Found %s running", worker.getHost(), entry.getKey());
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
@ -678,10 +686,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
cf.delete().guaranteed().forPath(taskPath);
}
log.info("Failing task %s", assignedTask);
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
@ -693,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
zkWorker.close();
}
catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost());
log.error(e, "Exception closing worker[%s]!", worker.getHost());
}
zkWorkers.remove(worker.getHost());
}