RealtimePlumberSchool: Faster graceful shutdown

This commit is contained in:
Gian Merlino 2013-03-19 18:29:00 -07:00
parent 017278a5d5
commit dc615ab795
2 changed files with 13 additions and 18 deletions

View File

@ -146,7 +146,7 @@ public class IndexerCoordinatorResource
return Response.serverError().build();
}
return Response.ok().build();
return Response.ok(ImmutableMap.of("task", taskid)).build();
}
// Legacy endpoint

View File

@ -89,6 +89,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -304,13 +305,12 @@ public class RealtimePlumberSchool implements PlumberSchool
@Override
public void finishJob()
{
log.info("Shutting down....");
log.info("Shutting down...");
while (!sinks.isEmpty()) {
try {
Duration retry = new Duration("PT60S");
log.info(
"Cannot shut down yet! Sinks for %s remain! Next retry in %s",
"Cannot shut down yet! Sinks remain: %s",
Joiner.on(", ").join(
Iterables.transform(
sinks.values(),
@ -323,27 +323,18 @@ public class RealtimePlumberSchool implements PlumberSchool
}
}
)
),
retry
)
);
Thread.sleep(retry.getMillis());
synchronized (handoffCondition) {
handoffCondition.wait();
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
for (final Sink sink : sinks.values()) {
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
}
catch (Exception e) {
log.makeAlert("Failed to unannounce segment on shutdown")
.addData("segment", sink.getSegment())
.emit();
}
}
// scheduledExecutor is shutdown here, but persistExecutor is shutdown when the
// ServerView sends it a new segment callback
@ -459,6 +450,10 @@ public class RealtimePlumberSchool implements PlumberSchool
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier());
sinks.remove(sinkKey);
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
catch (IOException e) {
log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource())