diff --git a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java index ad7284ffcf2..5b22bbbd41a 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java @@ -3,6 +3,7 @@ package com.metamx.druid.realtime; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.logger.Logger; import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.input.InputRow; import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory; @@ -21,6 +22,8 @@ import java.util.concurrent.ScheduledExecutorService; */ public class GracefulShutdownFirehose implements Firehose { + private static final Logger log = new Logger(GracefulShutdownFirehose.class); + private final Firehose firehose; private final IndexGranularity segmentGranularity; private final long windowMillis; @@ -55,10 +58,14 @@ public class GracefulShutdownFirehose implements Firehose public void shutdown() throws IOException { final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); + final long end = segmentGranularity.increment(truncatedNow) + windowMillis; + final Duration timeUntilShutdown = new Duration(System.currentTimeMillis(), end); + + log.info("Shutting down in %s", timeUntilShutdown); ScheduledExecutors.scheduleWithFixedDelay( scheduledExecutor, - new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis), + timeUntilShutdown, new Callable() { @Override