diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index f26899bce04..8d78f5bce2e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -21,7 +21,6 @@ package io.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; @@ -84,6 +83,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final ServiceEmitter emitter; private final TaskLocation location; + private volatile boolean stopping = false; + @Inject public ThreadPoolTaskRunner( TaskToolboxFactory toolboxFactory, @@ -132,6 +133,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker @LifecycleStop public void stop() { + stopping = true; + for (Map.Entry entry : exec.entrySet()) { try { entry.getValue().shutdown(); @@ -159,11 +162,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker new Interval(new DateTime(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(), TimeUnit.MILLISECONDS ); + + // Ignore status, it doesn't matter for graceful shutdowns. log.info( - "Graceful shutdown of task[%s] finished in %,dms with status[%s].", + "Graceful shutdown of task[%s] finished in %,dms.", task.getId(), - System.currentTimeMillis() - start, - taskStatus.getStatusCode() + System.currentTimeMillis() - start ); } catch (Exception e) { @@ -407,8 +411,16 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker status = task.run(toolbox); } catch (InterruptedException e) { - log.error(e, "Interrupted while running task[%s]", task); - throw Throwables.propagate(e); + // Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable. + if (stopping) { + // Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this. + log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task); + } else { + // Not stopping, this is definitely unexpected. + log.warn(e, "Interrupted while running task[%s]", task); + } + + status = TaskStatus.failure(task.getId()); } catch (Exception e) { log.error(e, "Exception while running task[%s]", task); @@ -416,16 +428,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker } catch (Throwable t) { log.error(t, "Uncaught Throwable while running task[%s]", task); - throw Throwables.propagate(t); + throw t; } - try { - return status.withDuration(System.currentTimeMillis() - startTime); - } - catch (Exception e) { - log.error(e, "Uncaught Exception during callback for task[%s]", task); - throw Throwables.propagate(e); - } + return status.withDuration(System.currentTimeMillis() - startTime); } } }