From ed56bc52e92c410472a6a083e36613d3f2cc4b2b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 19 Mar 2013 17:55:05 -0700 Subject: [PATCH] RealtimeIndexTask: Better handle early shutdowns --- .../merger/common/task/RealtimeIndexTask.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index f89f72b7eca..3c893a5a40b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -61,6 +61,12 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private volatile GracefulShutdownFirehose firehose = null; + @JsonIgnore + private final Object lock = new Object(); + + @JsonIgnore + private volatile boolean shutdown = false; + private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); @JsonCreator @@ -126,7 +132,13 @@ public class RealtimeIndexTask extends AbstractTask final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod(); - firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); + + synchronized (lock) { + firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); + if (shutdown) { + firehose.shutdown(); + } + } // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like // TODO -- the ServerView, which seems kind of odd?) @@ -273,7 +285,12 @@ public class RealtimeIndexTask extends AbstractTask public void shutdown() { try { - firehose.shutdown(); + synchronized (lock) { + shutdown = true; + if (firehose != null) { + firehose.shutdown(); + } + } } catch (IOException e) { throw Throwables.propagate(e);