Merge pull request #1503 from metamx/fix-leaking-zk-nodes

Fix leaking Status Path nodes in ZK
This commit is contained in:
Himanshu 2015-07-10 17:40:18 -05:00
commit cac722968e
2 changed files with 16 additions and 11 deletions

View File

@ -878,17 +878,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
for (String assignedTask : tasksToFail) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
try {
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
catch (Exception e) {
throw Throwables.propagate(e);
if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
log.info("Failing task[%s]", assignedTask);
@ -899,8 +894,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
// worker is gone, remove worker task status announcements path.
String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
if (cf.checkExists().forPath(workerStatusPath) != null) {
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
}
}
finally {
catch (Exception e) {
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
throw Throwables.propagate(e);
} finally {
removedWorkerCleanups.remove(worker);
}
}

View File

@ -379,6 +379,7 @@ public class RemoteTaskRunnerTest
}
)
);
Assert.assertNull(cf.checkExists().forPath(statusPath));
}
@Test