From bb83ddb564710742f4fa9efe626d45dd20f05f1e Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Fri, 1 Mar 2013 19:51:33 -0800 Subject: [PATCH] fix bug with worker disappearance --- .../merger/coordinator/RemoteTaskRunner.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 5d3805f0ccd..42e70abef08 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -478,7 +478,7 @@ public class RemoteTaskRunner implements TaskRunner log.info("Task %s just disappeared!", taskId); retryTask(runningTasks.get(taskId), worker.getHost()); } else { - log.info("Lost a task I didn't know about: %s", taskId); + log.info("A task disappeared I didn't know about: %s", taskId); } } } @@ -513,19 +513,23 @@ public class RemoteTaskRunner implements TaskRunner ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { - Set tasksPending = Sets.newHashSet( + Set tasksToRetry = Sets.newHashSet( cf.getChildren() .forPath(JOINER.join(config.getTaskPath(), worker.getHost())) ); - log.info("%s had %d tasks pending", worker.getHost(), tasksPending.size()); + tasksToRetry.addAll( + cf.getChildren() + .forPath(JOINER.join(config.getStatusPath(), worker.getHost())) + ); + log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); - for (String taskId : tasksPending) { - TaskRunnerWorkItem taskRunnerWorkItem = pendingTasks.get(taskId); + for (String taskId : tasksToRetry) { + TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); if (taskRunnerWorkItem != null) { cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId)); retryTask(taskRunnerWorkItem, worker.getHost()); } else { - log.warn("RemoteTaskRunner has no knowledge of pending task %s", taskId); + log.warn("RemoteTaskRunner has no knowledge of task %s", taskId); } }