Realtime: Delay firehose connection until job is started.

Some firehoses (like the Kafka firehose) acquire input resources when they
connect, so it helps to delay this until after plumber.startJob() runs.
This commit is contained in:
Gian Merlino 2015-05-04 10:54:07 -07:00
parent 8eb441ec08
commit e69d82a2b4
3 changed files with 41 additions and 38 deletions

View File

@ -167,10 +167,6 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true;
// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
final Firehose firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
@ -280,12 +276,19 @@ public class RealtimeIndexTask extends AbstractTask
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;
try {
plumber.startJob();
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
// Set up firehose
final Period intermediatePersistPeriod = spec.getTuningConfig().getIntermediatePersistPeriod();
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// Time to read data!
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
@ -338,6 +341,7 @@ public class RealtimeIndexTask extends AbstractTask
log.makeAlert(e, "Failed to finish realtime task").emit();
}
finally {
// firehose will be non-null since normalExit is true
CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}

View File

@ -18,9 +18,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@ -33,7 +31,6 @@ import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
@ -58,7 +55,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
*/
@ -68,24 +64,20 @@ public class RealtimeManager implements QuerySegmentWalker
private final List<FireDepartment> fireDepartments;
private final QueryRunnerFactoryConglomerate conglomerate;
private ExecutorService executorService;
/**
* key=data source name,value=FireChiefs of all partition of that data source
*/
private final Map<String, List<FireChief>> chiefs;
@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject @Processing ExecutorService executorService
QueryRunnerFactoryConglomerate conglomerate
)
{
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.executorService = executorService;
this.chiefs = Maps.newHashMap();
}
@ -112,7 +104,6 @@ public class RealtimeManager implements QuerySegmentWalker
)
);
chief.setDaemon(true);
chief.init();
chief.start();
}
}
@ -188,8 +179,8 @@ public class RealtimeManager implements QuerySegmentWalker
{
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;
private volatile RealtimeTuningConfig config = null;
private volatile Firehose firehose = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
@ -199,26 +190,42 @@ public class RealtimeManager implements QuerySegmentWalker
)
{
this.fireDepartment = fireDepartment;
this.config = fireDepartment.getTuningConfig();
this.metrics = fireDepartment.getMetrics();
}
public void init() throws IOException
public Firehose initFirehose()
{
config = fireDepartment.getTuningConfig();
synchronized (this) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
if (firehose == null) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehose().");
}
return firehose;
}
}
public Plumber initPlumber()
{
synchronized (this) {
if (plumber == null) {
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
log.info("We have our plumber!");
} else {
log.warn("Plumber already trained, skipping initPlumber().");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return plumber;
}
}
@ -230,13 +237,15 @@ public class RealtimeManager implements QuerySegmentWalker
@Override
public void run()
{
verifyState();
plumber = initPlumber();
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try {
plumber.startJob();
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
firehose = initFirehose();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
InputRow inputRow = null;
@ -310,15 +319,6 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
private void verifyState()
{
Preconditions.checkNotNull(config, "config is null, init() must be called first.");
Preconditions.checkNotNull(firehose, "firehose is null, init() must be called first.");
Preconditions.checkNotNull(plumber, "plumber is null, init() must be called first.");
log.info("FireChief[%s] state ok.", fireDepartment.getDataSchema().getDataSource());
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);

View File

@ -118,7 +118,6 @@ public class RealtimeManagerTest
tuningConfig
)
),
null,
null
);
}