Merge pull request #2508 from gianm/ftr-shutdown-logging

ForkingTaskRunner: Better logging during orderly shutdown.
This commit is contained in:
Fangjin Yang 2016-02-19 10:02:24 -08:00
commit ddf913d626
1 changed files with 23 additions and 4 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -77,6 +78,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -98,7 +100,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final PortFinder portFinder; private final PortFinder portFinder;
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap(); // Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting.
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newConcurrentMap();
private volatile boolean stopping = false; private volatile boolean stopping = false;
@ -479,11 +482,27 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis(); final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
// Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff. // Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff.
log.info("Waiting %,dms for shutdown.", timeout); log.info("Waiting up to %,dms for shutdown.", timeout);
if (timeout > 0) { if (timeout > 0) {
try { try {
exec.awaitTermination(timeout, TimeUnit.MILLISECONDS); final boolean terminated = exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
log.info("Finished stopping in %,dms.", System.currentTimeMillis() - start.getMillis()); final long elapsed = System.currentTimeMillis() - start.getMillis();
if (terminated) {
log.info("Finished stopping in %,dms.", elapsed);
} else {
final Set<String> stillRunning = ImmutableSet.copyOf(tasks.keySet());
log.makeAlert("Failed to stop forked tasks")
.addData("stillRunning", stillRunning)
.addData("elapsed", elapsed)
.emit();
log.warn(
"Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]",
elapsed,
Joiner.on("; ").join(stillRunning)
);
}
} }
catch (InterruptedException e) { catch (InterruptedException e) {
log.warn(e, "Interrupted while waiting for executor to finish."); log.warn(e, "Interrupted while waiting for executor to finish.");