Realtime: Abandon segments that fail to push

This commit is contained in:
Gian Merlino 2013-11-02 00:21:45 -07:00
parent c2074744b5
commit d8d3a6958e
1 changed files with 36 additions and 24 deletions

View File

@ -98,6 +98,8 @@ public class RealtimePlumberSchool implements PlumberSchool
private final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object(); private final Object handoffCondition = new Object();
private volatile boolean shuttingDown = false;
@JacksonInject @JacksonInject
@NotNull @NotNull
private volatile ServiceEmitter emitter; private volatile ServiceEmitter emitter;
@ -410,15 +412,17 @@ public class RealtimePlumberSchool implements PlumberSchool
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval) .addData("interval", interval)
.emit(); .emit();
if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
abandonSegment(truncatedTime, sink);
}
} }
if (mergedFile != null) { if (mergedFile != null) {
try { try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile); log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile); FileUtils.deleteDirectory(mergedFile);
} }
}
catch (IOException e) { catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile); log.warn(e, "Error deleting directory[%s]", mergedFile);
} }
@ -433,6 +437,8 @@ public class RealtimePlumberSchool implements PlumberSchool
{ {
log.info("Shutting down..."); log.info("Shutting down...");
shuttingDown = true;
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) { for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
persistAndMerge(entry.getKey(), entry.getValue()); persistAndMerge(entry.getKey(), entry.getValue());
} }
@ -613,26 +619,7 @@ public class RealtimePlumberSchool implements PlumberSchool
final String sinkVersion = sink.getSegment().getVersion(); final String sinkVersion = sink.getSegment().getVersion();
if (segmentVersion.compareTo(sinkVersion) >= 0) { if (segmentVersion.compareTo(sinkVersion) >= 0) {
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);
try { abandonSegment(sinkKey, sink);
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier());
sinks.remove(sinkKey);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<Sink>(sink)
);
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
catch (IOException e) {
log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
} }
} }
} }
@ -706,6 +693,31 @@ public class RealtimePlumberSchool implements PlumberSchool
} }
); );
} }
/**
* Unannounces a given sink and removes all local references to it.
*/
private void abandonSegment(final long truncatedTime, final Sink sink) {
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
sink.getInterval(),
sink.getVersion(),
new SingleElementPartitionChunk<>(sink)
);
synchronized (handoffCondition) {
handoffCondition.notifyAll();
}
}
catch (IOException e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
}; };
} }