From f547049929ff97decdad6fbb5730aac8f41463a2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 8 Aug 2014 10:59:16 -0700 Subject: [PATCH 1/2] Change merged-segment-removing behavior in RealtimePlumber. Old: Merged segments are deleted upon successful push, or on IOException. New: Merged segments are deleted on any Exception, but NOT successful push. Deleting a merged segment means that the next merge-and-push run will try to push it again. So we want that to happen if there was any sort of Exception. We *don't* want it to happen if the merge-and-push was successful, since in that case, we are just waiting for historical nodes to load the segment. It might take a while, but there's no reason to re-push while waiting. --- .../realtime/plumber/RealtimePlumber.java | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index fb28f6c8ae1..0da1a9a376d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber return; } - File mergedFile = null; try { List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { @@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber indexes.add(queryableIndex); } - mergedFile = IndexMerger.mergeQueryableIndex( + final File mergedFile = IndexMerger.mergeQueryableIndex( indexes, schema.getAggregators(), mergedTarget @@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber segmentPublisher.publishSegment(segment); } - catch (IOException e) { + catch (Exception e) { log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); - if (shuttingDown) { + if (!shuttingDown) { // We're trying to shut down, and this segment failed to push. Let's just get rid of it. + // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. abandonSegment(truncatedTime, sink); - } - } - - if (mergedFile != null) { - try { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } - catch (IOException e) { - log.warn(e, "Error deleting directory[%s]", mergedFile); + } else { + // Delete any possibly-partially-written files, so we can try again on the next push cycle. + removeMergedSegment(sink); } } } @@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber } /** - * Unannounces a given sink and removes all local references to it. + * Unannounces a given sink and removes all local references to it. It is important that this is only called + * from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while + * being created. */ protected void abandonSegment(final long truncatedTime, final Sink sink) { try { segmentAnnouncer.unannounceSegment(sink.getSegment()); - FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); + removeMergedSegment(sink); log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); sinks.remove(truncatedTime); sinkTimeline.remove( @@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber handoffCondition.notifyAll(); } } - catch (IOException e) { + catch (Exception e) { log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) .addData("interval", sink.getInterval()) .emit(); @@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber } ); } + + private void removeMergedSegment(final Sink sink) + { + final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged"); + if (mergedTarget.exists()) { + try { + log.info("Deleting Index File[%s]", mergedTarget); + FileUtils.deleteDirectory(mergedTarget); + } + catch (Exception e) { + log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + } } From 179170224d6f8b547a7b6ba2c2fe3d1b17bde65b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 8 Aug 2014 11:20:36 -0700 Subject: [PATCH 2/2] Fix conditional. --- .../java/io/druid/segment/realtime/plumber/RealtimePlumber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 0da1a9a376d..bf1b575ff41 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -356,7 +356,7 @@ public class RealtimePlumber implements Plumber log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); - if (!shuttingDown) { + if (shuttingDown) { // We're trying to shut down, and this segment failed to push. Let's just get rid of it. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. abandonSegment(truncatedTime, sink);