diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index d64397447e4..dddf21bf324 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -32,6 +32,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -77,6 +78,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -98,7 +100,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final ObjectMapper jsonMapper; private final PortFinder portFinder; - private final Map tasks = Maps.newHashMap(); + // Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. + private final Map tasks = Maps.newConcurrentMap(); private volatile boolean stopping = false; @@ -479,11 +482,27 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis(); // Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff. - log.info("Waiting %,dms for shutdown.", timeout); + log.info("Waiting up to %,dms for shutdown.", timeout); if (timeout > 0) { try { - exec.awaitTermination(timeout, TimeUnit.MILLISECONDS); - log.info("Finished stopping in %,dms.", System.currentTimeMillis() - start.getMillis()); + final boolean terminated = exec.awaitTermination(timeout, TimeUnit.MILLISECONDS); + final long elapsed = System.currentTimeMillis() - start.getMillis(); + if (terminated) { + log.info("Finished stopping in %,dms.", elapsed); + } else { + final Set stillRunning = ImmutableSet.copyOf(tasks.keySet()); + + log.makeAlert("Failed to stop forked tasks") + .addData("stillRunning", stillRunning) + .addData("elapsed", elapsed) + .emit(); + + log.warn( + "Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]", + elapsed, + Joiner.on("; ").join(stillRunning) + ); + } } catch (InterruptedException e) { log.warn(e, "Interrupted while waiting for executor to finish.");