From e69d82a2b4d585a5c452ce2a52ec1e346e84dc27 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 4 May 2015 10:54:07 -0700 Subject: [PATCH] Realtime: Delay firehose connection until job is started. Some firehoses (like the Kafka firehose) acquire input resources when they connect, so it helps to delay this until after plumber.startJob() runs. --- .../common/task/RealtimeIndexTask.java | 12 ++-- .../segment/realtime/RealtimeManager.java | 66 +++++++++---------- .../segment/realtime/RealtimeManagerTest.java | 1 - 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 2fdbe98a769..4cef65fcc5e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -167,10 +167,6 @@ public class RealtimeIndexTask extends AbstractTask boolean normalExit = true; - // Set up firehose - final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod(); - final Firehose firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); - // It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for // stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced. @@ -280,12 +276,19 @@ public class RealtimeIndexTask extends AbstractTask this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. + Firehose firehose = null; + try { plumber.startJob(); // Set up metrics emission toolbox.getMonitorScheduler().addMonitor(metricsMonitor); + // Set up firehose + final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod(); + firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); + // Time to read data! long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { @@ -338,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask log.makeAlert(e, "Failed to finish realtime task").emit(); } finally { + // firehose will be non-null since normalExit is true CloseQuietly.close(firehose); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 2a1f0679841..5a68e516f3a 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -18,9 +18,7 @@ package io.druid.segment.realtime; -import com.fasterxml.jackson.annotation.JacksonInject; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -33,7 +31,6 @@ import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; -import io.druid.guice.annotations.Processing; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; @@ -58,7 +55,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; /** */ @@ -68,24 +64,20 @@ public class RealtimeManager implements QuerySegmentWalker private final List fireDepartments; private final QueryRunnerFactoryConglomerate conglomerate; - private ExecutorService executorService; /** * key=data source name,value=FireChiefs of all partition of that data source */ private final Map> chiefs; - @Inject public RealtimeManager( List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate, - @JacksonInject @Processing ExecutorService executorService + QueryRunnerFactoryConglomerate conglomerate ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; - this.executorService = executorService; this.chiefs = Maps.newHashMap(); } @@ -112,7 +104,6 @@ public class RealtimeManager implements QuerySegmentWalker ) ); chief.setDaemon(true); - chief.init(); chief.start(); } } @@ -188,8 +179,8 @@ public class RealtimeManager implements QuerySegmentWalker { private final FireDepartment fireDepartment; private final FireDepartmentMetrics metrics; + private final RealtimeTuningConfig config; - private volatile RealtimeTuningConfig config = null; private volatile Firehose firehose = null; private volatile Plumber plumber = null; private volatile boolean normalExit = true; @@ -199,26 +190,42 @@ public class RealtimeManager implements QuerySegmentWalker ) { this.fireDepartment = fireDepartment; - + this.config = fireDepartment.getTuningConfig(); this.metrics = fireDepartment.getMetrics(); } - public void init() throws IOException + public Firehose initFirehose() { - config = fireDepartment.getTuningConfig(); - synchronized (this) { - try { - log.info("Calling the FireDepartment and getting a Firehose."); - firehose = fireDepartment.connect(); - log.info("Firehose acquired!"); + if (firehose == null) { + try { + log.info("Calling the FireDepartment and getting a Firehose."); + firehose = fireDepartment.connect(); + log.info("Firehose acquired!"); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } else { + log.warn("Firehose already connected, skipping initFirehose()."); + } + + return firehose; + } + } + + public Plumber initPlumber() + { + synchronized (this) { + if (plumber == null) { log.info("Someone get us a plumber!"); plumber = fireDepartment.findPlumber(); log.info("We have our plumber!"); + } else { + log.warn("Plumber already trained, skipping initPlumber()."); } - catch (IOException e) { - throw Throwables.propagate(e); - } + + return plumber; } } @@ -230,13 +237,15 @@ public class RealtimeManager implements QuerySegmentWalker @Override public void run() { - verifyState(); - + plumber = initPlumber(); final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); try { plumber.startJob(); + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. + firehose = initFirehose(); + long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); while (firehose.hasMore()) { InputRow inputRow = null; @@ -310,15 +319,6 @@ public class RealtimeManager implements QuerySegmentWalker } } - private void verifyState() - { - Preconditions.checkNotNull(config, "config is null, init() must be called first."); - Preconditions.checkNotNull(firehose, "firehose is null, init() must be called first."); - Preconditions.checkNotNull(plumber, "plumber is null, init() must be called first."); - - log.info("FireChief[%s] state ok.", fireDepartment.getDataSchema().getDataSource()); - } - public QueryRunner getQueryRunner(Query query) { QueryRunnerFactory> factory = conglomerate.findFactory(query); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 6d8de85679e..99f1aca4463 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -118,7 +118,6 @@ public class RealtimeManagerTest tuningConfig ) ), - null, null ); }