Merge pull request #1337 from gianm/delay-firehose-connect

Realtime: Delay firehose connection until job is started.
This commit is contained in:
Fangjin Yang 2015-05-04 17:18:28 -07:00
commit d7562fd4d1
3 changed files with 48 additions and 39 deletions

View File

@ -167,10 +167,6 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true; boolean normalExit = true;
// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
final Firehose firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for // It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced. // stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
@ -280,12 +276,19 @@ public class RealtimeIndexTask extends AbstractTask
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;
try { try {
plumber.startJob(); plumber.startJob();
// Set up metrics emission // Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor); toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// Time to read data! // Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) { while (firehose.hasMore()) {
@ -338,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
log.makeAlert(e, "Failed to finish realtime task").emit(); log.makeAlert(e, "Failed to finish realtime task").emit();
} }
finally { finally {
// firehose will be non-null since normalExit is true
CloseQuietly.close(firehose); CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
} }

View File

@ -18,9 +18,7 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -33,7 +31,6 @@ import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner; import io.druid.query.NoopQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
@ -58,7 +55,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
/** /**
*/ */
@ -68,24 +64,20 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments; private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate; private final QueryRunnerFactoryConglomerate conglomerate;
private ExecutorService executorService;
/** /**
* key=data source name,value=FireChiefs of all partition of that data source * key=data source name,value=FireChiefs of all partition of that data source
*/ */
private final Map<String, List<FireChief>> chiefs; private final Map<String, List<FireChief>> chiefs;
@Inject @Inject
public RealtimeManager( public RealtimeManager(
List<FireDepartment> fireDepartments, List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate, QueryRunnerFactoryConglomerate conglomerate
@JacksonInject @Processing ExecutorService executorService
) )
{ {
this.fireDepartments = fireDepartments; this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate; this.conglomerate = conglomerate;
this.executorService = executorService;
this.chiefs = Maps.newHashMap(); this.chiefs = Maps.newHashMap();
} }
@ -104,9 +96,14 @@ public class RealtimeManager implements QuerySegmentWalker
} }
chiefs.add(chief); chiefs.add(chief);
chief.setName(String.format("chief-%s", schema.getDataSource())); chief.setName(
String.format(
"chief-%s[%s]",
schema.getDataSource(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum()
)
);
chief.setDaemon(true); chief.setDaemon(true);
chief.init();
chief.start(); chief.start();
} }
} }
@ -182,8 +179,8 @@ public class RealtimeManager implements QuerySegmentWalker
{ {
private final FireDepartment fireDepartment; private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics; private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;
private volatile RealtimeTuningConfig config = null;
private volatile Firehose firehose = null; private volatile Firehose firehose = null;
private volatile Plumber plumber = null; private volatile Plumber plumber = null;
private volatile boolean normalExit = true; private volatile boolean normalExit = true;
@ -193,26 +190,42 @@ public class RealtimeManager implements QuerySegmentWalker
) )
{ {
this.fireDepartment = fireDepartment; this.fireDepartment = fireDepartment;
this.config = fireDepartment.getTuningConfig();
this.metrics = fireDepartment.getMetrics(); this.metrics = fireDepartment.getMetrics();
} }
public void init() throws IOException public Firehose initFirehose()
{ {
config = fireDepartment.getTuningConfig();
synchronized (this) { synchronized (this) {
try { if (firehose == null) {
log.info("Calling the FireDepartment and getting a Firehose."); try {
firehose = fireDepartment.connect(); log.info("Calling the FireDepartment and getting a Firehose.");
log.info("Firehose acquired!"); firehose = fireDepartment.connect();
log.info("Firehose acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehose().");
}
return firehose;
}
}
public Plumber initPlumber()
{
synchronized (this) {
if (plumber == null) {
log.info("Someone get us a plumber!"); log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber(); plumber = fireDepartment.findPlumber();
log.info("We have our plumber!"); log.info("We have our plumber!");
} else {
log.warn("Plumber already trained, skipping initPlumber().");
} }
catch (IOException e) {
throw Throwables.propagate(e); return plumber;
}
} }
} }
@ -224,13 +237,15 @@ public class RealtimeManager implements QuerySegmentWalker
@Override @Override
public void run() public void run()
{ {
verifyState(); plumber = initPlumber();
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try { try {
plumber.startJob(); plumber.startJob();
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
firehose = initFirehose();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) { while (firehose.hasMore()) {
InputRow inputRow = null; InputRow inputRow = null;
@ -304,15 +319,6 @@ public class RealtimeManager implements QuerySegmentWalker
} }
} }
private void verifyState()
{
Preconditions.checkNotNull(config, "config is null, init() must be called first.");
Preconditions.checkNotNull(firehose, "firehose is null, init() must be called first.");
Preconditions.checkNotNull(plumber, "plumber is null, init() must be called first.");
log.info("FireChief[%s] state ok.", fireDepartment.getDataSchema().getDataSource());
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query) public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{ {
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query); QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);

View File

@ -118,7 +118,6 @@ public class RealtimeManagerTest
tuningConfig tuningConfig
) )
), ),
null,
null null
); );
} }