address cr

This commit is contained in:
fjy 2014-08-14 13:56:36 -07:00
parent 96d5d4a59a
commit 6ba0a0df8b
1 changed files with 18 additions and 27 deletions

View File

@ -313,10 +313,16 @@ public class RealtimePlumber implements Plumber
{ {
final Interval interval = sink.getInterval(); final Interval interval = sink.getInterval();
// use a marker file to indicate that merging has completed // use a file to indicate that pushing has completed
final File marker = new File(computePersistDir(schema, sink.getInterval()), "marker"); final File persistDir = computePersistDir(schema, interval);
if (marker.exists()) { final File mergedTarget = new File(persistDir, "merged");
removeMergedSegment(sink); final File isPushedMarker = new File(persistDir, "isPushedMarker");
if (!isPushedMarker.exists()) {
removeSegment(sink, mergedTarget);
} else {
log.info("Skipping already-merged sink: %s", sink);
return;
} }
for (FireHydrant hydrant : sink) { for (FireHydrant hydrant : sink) {
@ -329,12 +335,6 @@ public class RealtimePlumber implements Plumber
} }
} }
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
try { try {
List<QueryableIndex> indexes = Lists.newArrayList(); List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) { for (FireHydrant fireHydrant : sink) {
@ -349,9 +349,6 @@ public class RealtimePlumber implements Plumber
schema.getAggregators(), schema.getAggregators(),
mergedTarget mergedTarget
); );
if (!marker.createNewFile()) {
log.makeAlert("Unable to make marker file[%s]! WTF?!", marker).emit();
}
QueryableIndex index = IndexIO.loadIndex(mergedFile); QueryableIndex index = IndexIO.loadIndex(mergedFile);
@ -360,11 +357,13 @@ public class RealtimePlumber implements Plumber
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
); );
if (!marker.delete()) {
log.makeAlert("Unable to remove merged marker", marker).emit();
}
segmentPublisher.publishSegment(segment); segmentPublisher.publishSegment(segment);
if (!isPushedMarker.createNewFile()) {
log.makeAlert("Unable to make marker file! WTF?!")
.addData("marker", isPushedMarker)
.emit();
}
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
@ -374,9 +373,6 @@ public class RealtimePlumber implements Plumber
// We're trying to shut down, and this segment failed to push. Let's just get rid of it. // We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink); abandonSegment(truncatedTime, sink);
} else {
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
removeMergedSegment(sink);
} }
} }
} }
@ -812,12 +808,6 @@ public class RealtimePlumber implements Plumber
); );
} }
private void removeMergedSegment(final Sink sink)
{
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
removeSegment(sink, mergedTarget);
}
private void removeSegment(final Sink sink, final File target) private void removeSegment(final Sink sink, final File target)
{ {
if (target.exists()) { if (target.exists()) {
@ -826,7 +816,8 @@ public class RealtimePlumber implements Plumber
FileUtils.deleteDirectory(target); FileUtils.deleteDirectory(target);
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource()) log.makeAlert(e, "Unable to remove file for dataSource[%s]", schema.getDataSource())
.addData("file", target)
.addData("interval", sink.getInterval()) .addData("interval", sink.getInterval())
.emit(); .emit();
} }