From d8d3a6958e3e1fd8c591313fe783a680ba8b06fb Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 2 Nov 2013 00:21:45 -0700 Subject: [PATCH 1/3] Realtime: Abandon segments that fail to push --- .../plumber/RealtimePlumberSchool.java | 60 +++++++++++-------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 74893edbf02..35abfcaf866 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -98,6 +98,8 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); + private volatile boolean shuttingDown = false; + @JacksonInject @NotNull private volatile ServiceEmitter emitter; @@ -410,14 +412,16 @@ public class RealtimePlumberSchool implements PlumberSchool log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); + if (shuttingDown) { + // We're trying to shut down, and this segment failed to push. Let's just get rid of it. + abandonSegment(truncatedTime, sink); + } } if (mergedFile != null) { try { - if (mergedFile != null) { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } + log.info("Deleting Index File[%s]", mergedFile); + FileUtils.deleteDirectory(mergedFile); } catch (IOException e) { log.warn(e, "Error deleting directory[%s]", mergedFile); @@ -433,6 +437,8 @@ public class RealtimePlumberSchool implements PlumberSchool { log.info("Shutting down..."); + shuttingDown = true; + for (final Map.Entry entry : sinks.entrySet()) { persistAndMerge(entry.getKey(), entry.getValue()); } @@ -613,26 +619,7 @@ public class RealtimePlumberSchool implements PlumberSchool final String sinkVersion = sink.getSegment().getVersion(); if (segmentVersion.compareTo(sinkVersion) >= 0) { log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - try { - segmentAnnouncer.unannounceSegment(sink.getSegment()); - FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier()); - sinks.remove(sinkKey); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - new SingleElementPartitionChunk(sink) - ); - - synchronized (handoffCondition) { - handoffCondition.notifyAll(); - } - } - catch (IOException e) { - log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .emit(); - } + abandonSegment(sinkKey, sink); } } } @@ -706,6 +693,31 @@ public class RealtimePlumberSchool implements PlumberSchool } ); } + + /** + * Unannounces a given sink and removes all local references to it. + */ + private void abandonSegment(final long truncatedTime, final Sink sink) { + try { + segmentAnnouncer.unannounceSegment(sink.getSegment()); + FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); + log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); + sinks.remove(truncatedTime); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk<>(sink) + ); + synchronized (handoffCondition) { + handoffCondition.notifyAll(); + } + } + catch (IOException e) { + log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } }; } From f8e564469f0cc215ab3ab4a57ae0d3a29dee777a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 2 Nov 2013 00:30:48 -0700 Subject: [PATCH 2/3] S3Utils: More aggressive retries, S3 can be pretty unreliable sometimes --- .../src/main/java/io/druid/storage/s3/S3Utils.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 0dfa685d3bd..b488d3e25f1 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -58,7 +58,7 @@ public class S3Utils public static T retryS3Operation(Callable f) throws IOException, S3ServiceException, InterruptedException { int nTry = 0; - final int maxTries = 3; + final int maxTries = 10; while (true) { try { nTry++; @@ -89,13 +89,10 @@ public class S3Utils private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException { final long baseSleepMillis = 1000; - final double fuzziness = 0.2; - final long sleepMillis = Math.max( - baseSleepMillis, - (long) (baseSleepMillis * Math.pow(2, nTry) * - (1 + new Random().nextGaussian() * fuzziness)) - ); - log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis); + final long maxSleepMillis = 60000; + final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); + final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); + System.out.println(String.format("S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis)); Thread.sleep(sleepMillis); } From 50a8c806c31ab6489444a7f199d8fc6eb4e5463a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 2 Nov 2013 15:24:05 -0700 Subject: [PATCH 3/3] S3Utils: Replace S.o.println with log.warn --- s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index b488d3e25f1..598153ec778 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -92,7 +92,7 @@ public class S3Utils final long maxSleepMillis = 60000; final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); - System.out.println(String.format("S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis)); + log.warn("S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis); Thread.sleep(sleepMillis); }