fix bug in RTR for not correctly exiting after timeout

This commit is contained in:
fjy 2013-05-13 11:06:46 -07:00
parent 219a8d7d05
commit 9ab22f238e
1 changed files with 19 additions and 1 deletions

View File

@ -230,6 +230,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return null; return null;
} }
public boolean isWorkerRunningTask(String workerHost, String taskId)
{
ZkWorker zkWorker = zkWorkers.get(workerHost);
return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId));
}
/** /**
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
* *
@ -350,6 +357,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/** /**
* Adds a task to the pending queue * Adds a task to the pending queue
*
* @param taskRunnerWorkItem * @param taskRunnerWorkItem
*/ */
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
@ -506,8 +514,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks // on a worker - this avoids overflowing a worker with tasks
synchronized (statusLock) { synchronized (statusLock) {
while (findWorkerRunningTask(task.getId()) == null) { while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
theWorker.getHost(),
task.getId(),
config.getTaskAssignmentTimeoutDuration()
);
retryTask(runningTasks.get(task.getId()), theWorker.getHost());
break;
}
} }
} }
} }