mirror of https://github.com/apache/druid.git
parent
f4ab1c2e52
commit
79a95f7789
|
@ -47,8 +47,10 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
|
||||
|
@ -57,6 +59,7 @@ import java.util.concurrent.LinkedBlockingDeque;
|
|||
public class WorkerTaskMonitor
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
|
||||
private static final int STOP_WARNING_SECONDS = 10;
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final PathChildrenCache pathChildrenCache;
|
||||
|
@ -68,6 +71,7 @@ public class WorkerTaskMonitor
|
|||
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
|
||||
private final Map<String, TaskDetails> running = new ConcurrentHashMap<>();
|
||||
|
||||
private final CountDownLatch doneStopping = new CountDownLatch(1);
|
||||
private final Object lifecycleLock = new Object();
|
||||
private volatile boolean started = false;
|
||||
|
||||
|
@ -149,6 +153,9 @@ public class WorkerTaskMonitor
|
|||
catch (InterruptedException e) {
|
||||
log.info("WorkerTaskMonitor interrupted, exiting.");
|
||||
}
|
||||
finally {
|
||||
doneStopping.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private void restoreRestorableTasks()
|
||||
|
@ -237,19 +244,27 @@ public class WorkerTaskMonitor
|
|||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
public void stop() throws InterruptedException
|
||||
{
|
||||
synchronized (lifecycleLock) {
|
||||
Preconditions.checkState(started, "not started");
|
||||
|
||||
try {
|
||||
started = false;
|
||||
exec.shutdownNow();
|
||||
pathChildrenCache.close();
|
||||
taskRunner.stop();
|
||||
|
||||
started = false;
|
||||
if (!doneStopping.await(STOP_WARNING_SECONDS, TimeUnit.SECONDS)) {
|
||||
log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", STOP_WARNING_SECONDS);
|
||||
doneStopping.await();
|
||||
}
|
||||
|
||||
log.info("Stopped WorkerTaskMonitor.");
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw e;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception stopping WorkerTaskMonitor")
|
||||
.emit();
|
||||
|
|
Loading…
Reference in New Issue