fix bug in RTR with statuses getting removed

This commit is contained in:
fjy 2013-05-31 13:59:04 -07:00
parent 9cb9f4753d
commit f1f465fcab
1 changed files with 4 additions and 10 deletions

View File

@ -418,7 +418,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override @Override
public void run() public void run()
{ {
cleanup(workerId, taskId); runningTasks.remove(taskId);
addPendingTask(taskRunnerWorkItem); addPendingTask(taskRunnerWorkItem);
} }
}, },
@ -644,15 +644,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ZkWorker zkWorker = zkWorkers.get(worker.getHost()); ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) { if (zkWorker != null) {
try { try {
Set<String> tasksToRetry = Sets.newHashSet( List<String> tasksToRetry = cf.getChildren()
cf.getChildren() .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()));
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) log.info("%s has %d pending tasks to retry", worker.getHost(), tasksToRetry.size());
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) { for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);