diff --git a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java index 5b22bbbd41a..504a83738e6 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java @@ -8,6 +8,7 @@ import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.input.InputRow; import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory; import com.metamx.druid.realtime.plumber.RejectionPolicy; +import org.apache.commons.lang.mutable.MutableBoolean; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -30,6 +31,7 @@ public class GracefulShutdownFirehose implements Firehose private final ScheduledExecutorService scheduledExecutor; private final RejectionPolicy rejectionPolicy; + private final MutableBoolean hasMore = new MutableBoolean(true); private volatile boolean shutdown = false; public GracefulShutdownFirehose( @@ -72,7 +74,7 @@ public class GracefulShutdownFirehose implements Firehose public ScheduledExecutors.Signal call() throws Exception { try { - firehose.close(); + hasMore.setValue(false); } catch (Exception e) { throw Throwables.propagate(e); @@ -89,7 +91,7 @@ public class GracefulShutdownFirehose implements Firehose @Override public boolean hasMore() { - return firehose.hasMore(); + return hasMore.booleanValue() && firehose.hasMore(); } @Override