From a24274029f06f57670f63117641e586f468546d8 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Fri, 12 Apr 2013 11:49:01 -0700 Subject: [PATCH] change shutdown logic for firehose --- .../druid/merger/common/task/RealtimeIndexTask.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index 99528e84ee9..65bad47c1bc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -118,6 +118,12 @@ public class RealtimeIndexTask extends AbstractTask @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { + synchronized (lock) { + if (shutdown) { + return TaskStatus.success(getId()); + } + } + if (this.plumber != null) { throw new IllegalStateException("WTF?!? run with non-null plumber??!"); } @@ -133,12 +139,7 @@ public class RealtimeIndexTask extends AbstractTask final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod(); - synchronized (lock) { - firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); - if (shutdown) { - firehose.shutdown(); - } - } + firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like // TODO -- the ServerView, which seems kind of odd?)