From 9ab22f238e32d0a1bbd0613749a8bf6c5e08c3ca Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 13 May 2013 11:06:46 -0700 Subject: [PATCH] fix bug in RTR for not correctly exiting after timeout --- .../merger/coordinator/RemoteTaskRunner.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 2542d825422..fe0577a8c77 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -230,6 +230,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider return null; } + public boolean isWorkerRunningTask(String workerHost, String taskId) + { + ZkWorker zkWorker = zkWorkers.get(workerHost); + + return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId)); + } + /** * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. * @@ -350,6 +357,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider /** * Adds a task to the pending queue + * * @param taskRunnerWorkItem */ private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) @@ -506,8 +514,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // on a worker - this avoids overflowing a worker with tasks synchronized (statusLock) { - while (findWorkerRunningTask(task.getId()) == null) { + while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) { statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); + if (!isWorkerRunningTask(theWorker.getHost(), task.getId())) { + log.error( + "Something went wrong! %s never ran task %s after %s!", + theWorker.getHost(), + task.getId(), + config.getTaskAssignmentTimeoutDuration() + ); + retryTask(runningTasks.get(task.getId()), theWorker.getHost()); + break; + } } } }