diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 47f53097022..2e094bf8964 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -418,7 +418,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider @Override public void run() { - cleanup(workerId, taskId); + runningTasks.remove(taskId); addPendingTask(taskRunnerWorkItem); } }, @@ -644,15 +644,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { - Set tasksToRetry = Sets.newHashSet( - cf.getChildren() - .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) - ); - tasksToRetry.addAll( - cf.getChildren() - .forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost())) - ); - log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); + List tasksToRetry = cf.getChildren() + .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())); + log.info("%s has %d pending tasks to retry", worker.getHost(), tasksToRetry.size()); for (String taskId : tasksToRetry) { TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);