diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index f89f72b7eca..3c893a5a40b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -61,6 +61,12 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private volatile GracefulShutdownFirehose firehose = null; + @JsonIgnore + private final Object lock = new Object(); + + @JsonIgnore + private volatile boolean shutdown = false; + private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); @JsonCreator @@ -126,7 +132,13 @@ public class RealtimeIndexTask extends AbstractTask final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod(); - firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); + + synchronized (lock) { + firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); + if (shutdown) { + firehose.shutdown(); + } + } // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like // TODO -- the ServerView, which seems kind of odd?) @@ -273,7 +285,12 @@ public class RealtimeIndexTask extends AbstractTask public void shutdown() { try { - firehose.shutdown(); + synchronized (lock) { + shutdown = true; + if (firehose != null) { + firehose.shutdown(); + } + } } catch (IOException e) { throw Throwables.propagate(e);