mirror of https://github.com/apache/druid.git
Fix leaking Status Path nodes in ZK
- remove ZK status path nodes for workers after they are removed
This commit is contained in:
parent
66d105940d
commit
8d7a566bae
|
@ -878,17 +878,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
for (String assignedTask : tasksToFail) {
|
for (String assignedTask : tasksToFail) {
|
||||||
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
|
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
|
||||||
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
|
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
|
||||||
try {
|
if (cf.checkExists().forPath(taskPath) != null) {
|
||||||
if (cf.checkExists().forPath(taskPath) != null) {
|
cf.delete().guaranteed().forPath(taskPath);
|
||||||
cf.delete().guaranteed().forPath(taskPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cf.checkExists().forPath(statusPath) != null) {
|
|
||||||
cf.delete().guaranteed().forPath(statusPath);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
if (cf.checkExists().forPath(statusPath) != null) {
|
||||||
|
cf.delete().guaranteed().forPath(statusPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Failing task[%s]", assignedTask);
|
log.info("Failing task[%s]", assignedTask);
|
||||||
|
@ -899,8 +894,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
|
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// worker is gone, remove worker task status announcements path.
|
||||||
|
String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker);
|
||||||
|
if (cf.checkExists().forPath(workerStatusPath) != null) {
|
||||||
|
cf.delete().guaranteed().forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
finally {
|
catch (Exception e) {
|
||||||
|
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
} finally {
|
||||||
removedWorkerCleanups.remove(worker);
|
removedWorkerCleanups.remove(worker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -379,6 +379,7 @@ public class RemoteTaskRunnerTest
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
Assert.assertNull(cf.checkExists().forPath(statusPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue