diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 56c3d73bc9a..137b84c7952 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -47,8 +47,10 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; /** * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be @@ -57,6 +59,7 @@ import java.util.concurrent.LinkedBlockingDeque; public class WorkerTaskMonitor { private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class); + private static final int STOP_WARNING_SECONDS = 10; private final ObjectMapper jsonMapper; private final PathChildrenCache pathChildrenCache; @@ -68,6 +71,7 @@ public class WorkerTaskMonitor private final BlockingQueue notices = new LinkedBlockingDeque<>(); private final Map running = new ConcurrentHashMap<>(); + private final CountDownLatch doneStopping = new CountDownLatch(1); private final Object lifecycleLock = new Object(); private volatile boolean started = false; @@ -149,6 +153,9 @@ public class WorkerTaskMonitor catch (InterruptedException e) { log.info("WorkerTaskMonitor interrupted, exiting."); } + finally { + doneStopping.countDown(); + } } private void restoreRestorableTasks() @@ -237,19 +244,27 @@ public class WorkerTaskMonitor } @LifecycleStop - public void stop() + public void stop() throws InterruptedException { synchronized (lifecycleLock) { Preconditions.checkState(started, "not started"); try { + started = false; exec.shutdownNow(); pathChildrenCache.close(); taskRunner.stop(); - started = false; + if (!doneStopping.await(STOP_WARNING_SECONDS, TimeUnit.SECONDS)) { + log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", STOP_WARNING_SECONDS); + doneStopping.await(); + } + log.info("Stopped WorkerTaskMonitor."); } + catch (InterruptedException e) { + throw e; + } catch (Exception e) { log.makeAlert(e, "Exception stopping WorkerTaskMonitor") .emit();