Change merged-segment-removing behavior in RealtimePlumber.

Old: Merged segments are deleted upon successful push, or on IOException.
New: Merged segments are deleted on any Exception, but NOT successful push.

Deleting a merged segment means that the next merge-and-push run will try to
push it again. So we want that to happen if there was any sort of Exception.
We *don't* want it to happen if the merge-and-push was successful, since in
that case, we are just waiting for historical nodes to load the segment. It
might take a while, but there's no reason to re-push while waiting.
This commit is contained in:
Gian Merlino 2014-08-08 10:59:16 -07:00
parent e980904470
commit f547049929
1 changed files with 28 additions and 17 deletions

View File

@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber
return; return;
} }
File mergedFile = null;
try { try {
List<QueryableIndex> indexes = Lists.newArrayList(); List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) { for (FireHydrant fireHydrant : sink) {
@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber
indexes.add(queryableIndex); indexes.add(queryableIndex);
} }
mergedFile = IndexMerger.mergeQueryableIndex( final File mergedFile = IndexMerger.mergeQueryableIndex(
indexes, indexes,
schema.getAggregators(), schema.getAggregators(),
mergedTarget mergedTarget
@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber
segmentPublisher.publishSegment(segment); segmentPublisher.publishSegment(segment);
} }
catch (IOException e) { catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval) .addData("interval", interval)
.emit(); .emit();
if (shuttingDown) { if (!shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it. // We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink); abandonSegment(truncatedTime, sink);
} } else {
} // Delete any possibly-partially-written files, so we can try again on the next push cycle.
removeMergedSegment(sink);
if (mergedFile != null) {
try {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
} }
} }
} }
@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber
} }
/** /**
* Unannounces a given sink and removes all local references to it. * Unannounces a given sink and removes all local references to it. It is important that this is only called
* from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while
* being created.
*/ */
protected void abandonSegment(final long truncatedTime, final Sink sink) protected void abandonSegment(final long truncatedTime, final Sink sink)
{ {
try { try {
segmentAnnouncer.unannounceSegment(sink.getSegment()); segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); removeMergedSegment(sink);
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime); sinks.remove(truncatedTime);
sinkTimeline.remove( sinkTimeline.remove(
@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber
handoffCondition.notifyAll(); handoffCondition.notifyAll();
} }
} }
catch (IOException e) { catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval()) .addData("interval", sink.getInterval())
.emit(); .emit();
@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber
} }
); );
} }
private void removeMergedSegment(final Sink sink)
{
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
if (mergedTarget.exists()) {
try {
log.info("Deleting Index File[%s]", mergedTarget);
FileUtils.deleteDirectory(mergedTarget);
}
catch (Exception e) {
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
}
} }