From 6ba0a0df8b86dca9fd4d37a496b243f6301ba7ec Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 14 Aug 2014 13:56:36 -0700 Subject: [PATCH] address cr --- .../realtime/plumber/RealtimePlumber.java | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index fd06b54502c..82ebf9f5d3e 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -313,10 +313,16 @@ public class RealtimePlumber implements Plumber { final Interval interval = sink.getInterval(); - // use a marker file to indicate that merging has completed - final File marker = new File(computePersistDir(schema, sink.getInterval()), "marker"); - if (marker.exists()) { - removeMergedSegment(sink); + // use a file to indicate that pushing has completed + final File persistDir = computePersistDir(schema, interval); + final File mergedTarget = new File(persistDir, "merged"); + final File isPushedMarker = new File(persistDir, "isPushedMarker"); + + if (!isPushedMarker.exists()) { + removeSegment(sink, mergedTarget); + } else { + log.info("Skipping already-merged sink: %s", sink); + return; } for (FireHydrant hydrant : sink) { @@ -329,12 +335,6 @@ public class RealtimePlumber implements Plumber } } - final File mergedTarget = new File(computePersistDir(schema, interval), "merged"); - if (mergedTarget.exists()) { - log.info("Skipping already-merged sink: %s", sink); - return; - } - try { List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { @@ -349,9 +349,6 @@ public class RealtimePlumber implements Plumber schema.getAggregators(), mergedTarget ); - if (!marker.createNewFile()) { - log.makeAlert("Unable to make marker file[%s]! WTF?!", marker).emit(); - } QueryableIndex index = IndexIO.loadIndex(mergedFile); @@ -360,11 +357,13 @@ public class RealtimePlumber implements Plumber sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) ); - if (!marker.delete()) { - log.makeAlert("Unable to remove merged marker", marker).emit(); - } - segmentPublisher.publishSegment(segment); + + if (!isPushedMarker.createNewFile()) { + log.makeAlert("Unable to make marker file! WTF?!") + .addData("marker", isPushedMarker) + .emit(); + } } catch (Exception e) { log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) @@ -374,9 +373,6 @@ public class RealtimePlumber implements Plumber // We're trying to shut down, and this segment failed to push. Let's just get rid of it. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. abandonSegment(truncatedTime, sink); - } else { - // Delete any possibly-partially-written files, so we can try again on the next push cycle. - removeMergedSegment(sink); } } } @@ -812,12 +808,6 @@ public class RealtimePlumber implements Plumber ); } - private void removeMergedSegment(final Sink sink) - { - final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged"); - removeSegment(sink, mergedTarget); - } - private void removeSegment(final Sink sink, final File target) { if (target.exists()) { @@ -826,7 +816,8 @@ public class RealtimePlumber implements Plumber FileUtils.deleteDirectory(target); } catch (Exception e) { - log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource()) + log.makeAlert(e, "Unable to remove file for dataSource[%s]", schema.getDataSource()) + .addData("file", target) .addData("interval", sink.getInterval()) .emit(); }