mirror of https://github.com/apache/druid.git
RealtimeIndexTask: Better handle early shutdowns
This commit is contained in:
parent
76f4d12059
commit
ed56bc52e9
|
@ -61,6 +61,12 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private volatile GracefulShutdownFirehose firehose = null;
|
private volatile GracefulShutdownFirehose firehose = null;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private volatile boolean shutdown = false;
|
||||||
|
|
||||||
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
|
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
@ -126,7 +132,13 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
|
|
||||||
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
|
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
|
||||||
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
|
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
|
||||||
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
|
|
||||||
|
synchronized (lock) {
|
||||||
|
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
|
||||||
|
if (shutdown) {
|
||||||
|
firehose.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
|
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
|
||||||
// TODO -- the ServerView, which seems kind of odd?)
|
// TODO -- the ServerView, which seems kind of odd?)
|
||||||
|
@ -273,7 +285,12 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
public void shutdown()
|
public void shutdown()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
firehose.shutdown();
|
synchronized (lock) {
|
||||||
|
shutdown = true;
|
||||||
|
if (firehose != null) {
|
||||||
|
firehose.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
|
Loading…
Reference in New Issue