mirror of https://github.com/apache/druid.git
change shutdown logic for firehose
This commit is contained in:
parent
465b78b4bc
commit
a24274029f
|
@ -118,6 +118,12 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
@Override
|
@Override
|
||||||
public TaskStatus run(final TaskToolbox toolbox) throws Exception
|
public TaskStatus run(final TaskToolbox toolbox) throws Exception
|
||||||
{
|
{
|
||||||
|
synchronized (lock) {
|
||||||
|
if (shutdown) {
|
||||||
|
return TaskStatus.success(getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (this.plumber != null) {
|
if (this.plumber != null) {
|
||||||
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
|
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
|
||||||
}
|
}
|
||||||
|
@ -133,12 +139,7 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
|
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
|
||||||
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
|
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
|
||||||
|
|
||||||
synchronized (lock) {
|
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
|
||||||
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
|
|
||||||
if (shutdown) {
|
|
||||||
firehose.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
|
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
|
||||||
// TODO -- the ServerView, which seems kind of odd?)
|
// TODO -- the ServerView, which seems kind of odd?)
|
||||||
|
|
Loading…
Reference in New Issue