mirror of https://github.com/apache/druid.git
fix RTR closing PCC too early
This commit is contained in:
parent
a3793eeba4
commit
d02be16245
|
@ -656,14 +656,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
|||
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
|
||||
if (zkWorker != null) {
|
||||
try {
|
||||
for (String assignedTask : cf.getChildren()
|
||||
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
|
||||
List<String> tasksToFail = Lists.newArrayList(
|
||||
cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
|
||||
);
|
||||
tasksToFail.addAll(zkWorker.getRunningTaskIds());
|
||||
for (String assignedTask : tasksToFail) {
|
||||
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
|
||||
if (cf.checkExists().forPath(taskPath) != null) {
|
||||
cf.delete().guaranteed().forPath(taskPath);
|
||||
}
|
||||
|
||||
log.info("Failing task %s", assignedTask);
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
|
||||
} else {
|
||||
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
|
||||
|
|
Loading…
Reference in New Issue