From 20acd6f450c955ccd56462f5243e0b4512002f52 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 25 Jul 2013 14:38:07 -0700 Subject: [PATCH] RealtimePlumberSchool: Persist and merge immediately upon finishJob --- .../plumber/RealtimePlumberSchool.java | 150 ++++++++++-------- 1 file changed, 80 insertions(+), 70 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 894e8f1df28..654935b58f0 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -105,8 +105,6 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile SegmentPublisher segmentPublisher = null; private volatile ServerView serverView = null; - private volatile boolean noMoreData = false; - @JsonCreator public RealtimePlumberSchool( @JsonProperty("windowPeriod") Period windowPeriod, @@ -321,12 +319,89 @@ public class RealtimePlumberSchool implements PlumberSchool ); } + // Submits persist-n-merge task for a Sink to the persistExecutor + private void persistAndMerge(final long truncatedTime, final Sink sink) + { + final String threadName = String.format( + "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) + ); + persistExecutor.execute( + new ThreadRenamingRunnable(threadName) + { + @Override + public void doRun() + { + final Interval interval = sink.getInterval(); + + for (FireHydrant hydrant : sink) { + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); + } + } + + final File mergedTarget = new File(computePersistDir(schema, interval), "merged"); + if (mergedTarget.exists()) { + log.info("Skipping already-merged sink: %s", sink); + return; + } + + File mergedFile = null; + try { + List indexes = Lists.newArrayList(); + for (FireHydrant fireHydrant : sink) { + Segment segment = fireHydrant.getSegment(); + final QueryableIndex queryableIndex = segment.asQueryableIndex(); + log.info("Adding hydrant[%s]", fireHydrant); + indexes.add(queryableIndex); + } + + mergedFile = IndexMerger.mergeQueryableIndex( + indexes, + schema.getAggregators(), + mergedTarget + ); + + QueryableIndex index = IndexIO.loadIndex(mergedFile); + + DataSegment segment = dataSegmentPusher.push( + mergedFile, + sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + ); + + segmentPublisher.publishSegment(segment); + } + catch (IOException e) { + log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) + .addData("interval", interval) + .emit(); + } + + if (mergedFile != null) { + try { + if (mergedFile != null) { + log.info("Deleting Index File[%s]", mergedFile); + FileUtils.deleteDirectory(mergedFile); + } + } + catch (IOException e) { + log.warn(e, "Error deleting directory[%s]", mergedFile); + } + } + } + } + ); + } + @Override public void finishJob() { log.info("Shutting down..."); - noMoreData = true; + for (final Map.Entry entry : sinks.entrySet()) { + persistAndMerge(entry.getKey(), entry.getValue()); + } while (!sinks.isEmpty()) { try { @@ -557,79 +632,14 @@ public class RealtimePlumberSchool implements PlumberSchool List> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { final Long intervalStart = entry.getKey(); - if (noMoreData || intervalStart < minTimestamp) { + if (intervalStart < minTimestamp) { log.info("Adding entry[%s] for merge and push.", entry); sinksToPush.add(entry); } } for (final Map.Entry entry : sinksToPush) { - final Sink sink = entry.getValue(); - - final String threadName = String.format( - "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey()) - ); - persistExecutor.execute( - new ThreadRenamingRunnable(threadName) - { - @Override - public void doRun() - { - final Interval interval = sink.getInterval(); - - for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); - } - } - - File mergedFile = null; - try { - List indexes = Lists.newArrayList(); - for (FireHydrant fireHydrant : sink) { - Segment segment = fireHydrant.getSegment(); - final QueryableIndex queryableIndex = segment.asQueryableIndex(); - log.info("Adding hydrant[%s]", fireHydrant); - indexes.add(queryableIndex); - } - - mergedFile = IndexMerger.mergeQueryableIndex( - indexes, - schema.getAggregators(), - new File(computePersistDir(schema, interval), "merged") - ); - - QueryableIndex index = IndexIO.loadIndex(mergedFile); - - DataSegment segment = dataSegmentPusher.push( - mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) - ); - - segmentPublisher.publishSegment(segment); - } - catch (IOException e) { - log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) - .addData("interval", interval) - .emit(); - } - - if (mergedFile != null) { - try { - if (mergedFile != null) { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } - } - catch (IOException e) { - log.warn(e, "Error deleting directory[%s]", mergedFile); - } - } - } - } - ); + persistAndMerge(entry.getKey(), entry.getValue()); } if (stopped) {