From 79a95f7789f7ecbfd77726d5f35936ccd57603da Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 11 Mar 2016 09:07:28 -0800 Subject: [PATCH] WorkerTaskMonitor: stop() waits for mainLoop to exit. Fixes #2637. --- .../indexing/worker/WorkerTaskMonitor.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 56c3d73bc9a..137b84c7952 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -47,8 +47,10 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; /** * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be @@ -57,6 +59,7 @@ import java.util.concurrent.LinkedBlockingDeque; public class WorkerTaskMonitor { private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class); + private static final int STOP_WARNING_SECONDS = 10; private final ObjectMapper jsonMapper; private final PathChildrenCache pathChildrenCache; @@ -68,6 +71,7 @@ public class WorkerTaskMonitor private final BlockingQueue notices = new LinkedBlockingDeque<>(); private final Map running = new ConcurrentHashMap<>(); + private final CountDownLatch doneStopping = new CountDownLatch(1); private final Object lifecycleLock = new Object(); private volatile boolean started = false; @@ -149,6 +153,9 @@ public class WorkerTaskMonitor catch (InterruptedException e) { log.info("WorkerTaskMonitor interrupted, exiting."); } + finally { + doneStopping.countDown(); + } } private void restoreRestorableTasks() @@ -237,19 +244,27 @@ public class WorkerTaskMonitor } @LifecycleStop - public void stop() + public void stop() throws InterruptedException { synchronized (lifecycleLock) { Preconditions.checkState(started, "not started"); try { + started = false; exec.shutdownNow(); pathChildrenCache.close(); taskRunner.stop(); - started = false; + if (!doneStopping.await(STOP_WARNING_SECONDS, TimeUnit.SECONDS)) { + log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", STOP_WARNING_SECONDS); + doneStopping.await(); + } + log.info("Stopped WorkerTaskMonitor."); } + catch (InterruptedException e) { + throw e; + } catch (Exception e) { log.makeAlert(e, "Exception stopping WorkerTaskMonitor") .emit();