mirror of https://github.com/apache/druid.git
ForkingTaskRunner: Better logging during orderly shutdown.
This commit is contained in:
parent
7e5d9a84f9
commit
c0c6cf77fa
|
@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -77,6 +78,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -98,7 +100,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
private final ObjectMapper jsonMapper;
|
||||
private final PortFinder portFinder;
|
||||
|
||||
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
|
||||
// Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting.
|
||||
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newConcurrentMap();
|
||||
|
||||
private volatile boolean stopping = false;
|
||||
|
||||
|
@ -479,11 +482,27 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
|
||||
|
||||
// Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff.
|
||||
log.info("Waiting %,dms for shutdown.", timeout);
|
||||
log.info("Waiting up to %,dms for shutdown.", timeout);
|
||||
if (timeout > 0) {
|
||||
try {
|
||||
exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
|
||||
log.info("Finished stopping in %,dms.", System.currentTimeMillis() - start.getMillis());
|
||||
final boolean terminated = exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
|
||||
final long elapsed = System.currentTimeMillis() - start.getMillis();
|
||||
if (terminated) {
|
||||
log.info("Finished stopping in %,dms.", elapsed);
|
||||
} else {
|
||||
final Set<String> stillRunning = ImmutableSet.copyOf(tasks.keySet());
|
||||
|
||||
log.makeAlert("Failed to stop forked tasks")
|
||||
.addData("stillRunning", stillRunning)
|
||||
.addData("elapsed", elapsed)
|
||||
.emit();
|
||||
|
||||
log.warn(
|
||||
"Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]",
|
||||
elapsed,
|
||||
Joiner.on("; ").join(stillRunning)
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.warn(e, "Interrupted while waiting for executor to finish.");
|
||||
|
|
Loading…
Reference in New Issue