Merge pull request #2638 from gianm/attempt-fix-2637

WorkerTaskMonitor: stop() waits for mainLoop to exit.
This commit is contained in:
Fangjin Yang 2016-03-11 13:27:24 -08:00
commit f381c6066e
1 changed files with 17 additions and 2 deletions

View File

@ -47,8 +47,10 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/** /**
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
@ -57,6 +59,7 @@ import java.util.concurrent.LinkedBlockingDeque;
public class WorkerTaskMonitor public class WorkerTaskMonitor
{ {
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class); private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
private static final int STOP_WARNING_SECONDS = 10;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final PathChildrenCache pathChildrenCache; private final PathChildrenCache pathChildrenCache;
@ -68,6 +71,7 @@ public class WorkerTaskMonitor
private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>(); private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
private final Map<String, TaskDetails> running = new ConcurrentHashMap<>(); private final Map<String, TaskDetails> running = new ConcurrentHashMap<>();
private final CountDownLatch doneStopping = new CountDownLatch(1);
private final Object lifecycleLock = new Object(); private final Object lifecycleLock = new Object();
private volatile boolean started = false; private volatile boolean started = false;
@ -149,6 +153,9 @@ public class WorkerTaskMonitor
catch (InterruptedException e) { catch (InterruptedException e) {
log.info("WorkerTaskMonitor interrupted, exiting."); log.info("WorkerTaskMonitor interrupted, exiting.");
} }
finally {
doneStopping.countDown();
}
} }
private void restoreRestorableTasks() private void restoreRestorableTasks()
@ -237,19 +244,27 @@ public class WorkerTaskMonitor
} }
@LifecycleStop @LifecycleStop
public void stop() public void stop() throws InterruptedException
{ {
synchronized (lifecycleLock) { synchronized (lifecycleLock) {
Preconditions.checkState(started, "not started"); Preconditions.checkState(started, "not started");
try { try {
started = false;
exec.shutdownNow(); exec.shutdownNow();
pathChildrenCache.close(); pathChildrenCache.close();
taskRunner.stop(); taskRunner.stop();
started = false; if (!doneStopping.await(STOP_WARNING_SECONDS, TimeUnit.SECONDS)) {
log.warn("WorkerTaskMonitor taking longer than %s seconds to exit. Still waiting...", STOP_WARNING_SECONDS);
doneStopping.await();
}
log.info("Stopped WorkerTaskMonitor."); log.info("Stopped WorkerTaskMonitor.");
} }
catch (InterruptedException e) {
throw e;
}
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Exception stopping WorkerTaskMonitor") log.makeAlert(e, "Exception stopping WorkerTaskMonitor")
.emit(); .emit();