From 6cef4980bfbeed6c26f49f55de47d575c35bb639 Mon Sep 17 00:00:00 2001 From: "jianran.tfh" Date: Thu, 11 Aug 2016 10:25:04 +0800 Subject: [PATCH] fix the pre middle manager peon no stop --- .../io/druid/indexing/overlord/TaskQueue.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 5eed155fd66..fa433ef8dbe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -46,10 +46,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.metadata.EntryExistsException; import io.druid.query.DruidMetrics; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -195,6 +192,25 @@ public class TaskQueue giant.lock(); try { + if (taskRunner.getRunningTasks() != null) { + int i = 0; + for (TaskRunnerWorkItem item : taskRunner.getRunningTasks()) { + if (++ i >= taskRunner.getRunningTasks().size()) { + break; + } + Iterator iter = tasks.iterator(); + while (iter.hasNext()) { + Task task = iter.next(); + if (task.getId().equals(item.getTaskId())) { + log.info("start notify success task:%s", task.getId()); + notifyStatus(task, TaskStatus.success(task.getId())); + log.info("end notify success task:%s", task.getId()); + break; + } + } + + } + } tasks.clear(); taskFutures.clear(); active = false;