Merge pull request #2634 from gianm/stopGracefully-avoid-interrupt

ThreadPoolTaskRunner: Make graceful shutdown logs less scary.
This commit is contained in:
Nishant 2016-03-11 16:36:10 -08:00
commit cf7f6da392
1 changed files with 20 additions and 14 deletions

View File

@ -21,7 +21,6 @@ package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
@ -84,6 +83,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final ServiceEmitter emitter;
private final TaskLocation location;
private volatile boolean stopping = false;
@Inject
public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory,
@ -132,6 +133,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@LifecycleStop
public void stop()
{
stopping = true;
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdown();
@ -159,11 +162,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
new Interval(new DateTime(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
);
// Ignore status, it doesn't matter for graceful shutdowns.
log.info(
"Graceful shutdown of task[%s] finished in %,dms with status[%s].",
"Graceful shutdown of task[%s] finished in %,dms.",
task.getId(),
System.currentTimeMillis() - start,
taskStatus.getStatusCode()
System.currentTimeMillis() - start
);
}
catch (Exception e) {
@ -407,8 +411,16 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
status = task.run(toolbox);
}
catch (InterruptedException e) {
log.error(e, "Interrupted while running task[%s]", task);
throw Throwables.propagate(e);
// Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable.
if (stopping) {
// Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this.
log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task);
} else {
// Not stopping, this is definitely unexpected.
log.warn(e, "Interrupted while running task[%s]", task);
}
status = TaskStatus.failure(task.getId());
}
catch (Exception e) {
log.error(e, "Exception while running task[%s]", task);
@ -416,16 +428,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
catch (Throwable t) {
log.error(t, "Uncaught Throwable while running task[%s]", task);
throw Throwables.propagate(t);
throw t;
}
try {
return status.withDuration(System.currentTimeMillis() - startTime);
}
catch (Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e);
}
return status.withDuration(System.currentTimeMillis() - startTime);
}
}
}