diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 80bd59235c5..e9b0ca388a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -68,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService; public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); - private final Period windowPeriod; private final File basePersistDirectory; private final IndexGranularity segmentGranularity; @@ -84,16 +83,15 @@ public class RealtimePlumber implements Plumber private final SegmentPublisher segmentPublisher; private final ServerView serverView; private final int maxPendingPersists; - private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); - private volatile boolean shuttingDown = false; private volatile boolean stopped = false; private volatile ExecutorService persistExecutor = null; + private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; public RealtimePlumber( @@ -306,7 +304,7 @@ public class RealtimePlumber implements Plumber final String threadName = String.format( "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) ); - persistExecutor.execute( + mergeExecutor.execute( new ThreadRenamingRunnable(threadName) { @Override @@ -315,10 +313,10 @@ public class RealtimePlumber implements Plumber final Interval interval = sink.getInterval(); for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); } } @@ -431,6 +429,13 @@ public class RealtimePlumber implements Plumber "plumber_persist_%d", maxPendingPersists ); } + if (persistExecutor == null) { + // use a blocking single threaded executor to throttle the firehose when write to disk is slow + mergeExecutor = Execs.newBlockingSingleThreaded( + "plumber_persist_%d", maxPendingPersists + ); + } + if (scheduledExecutor == null) { scheduledExecutor = Executors.newScheduledThreadPool( 1, @@ -592,7 +597,11 @@ public class RealtimePlumber implements Plumber log.info("Adding entry[%s] for merge and push.", entry); sinksToPush.add(entry); } else { - log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate); + log.warn( + "[%s] < [%s] Skipping persist and merge.", + new DateTime(intervalStart), + minTimestampAsDate + ); } } @@ -660,39 +669,46 @@ public class RealtimePlumber implements Plumber */ protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) { - if (indexToPersist.hasSwapped()) { + synchronized (indexToPersist) { + if (indexToPersist.hasSwapped()) { + log.info( + "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", + schema.getDataSource(), interval, indexToPersist + ); + return 0; + } + log.info( - "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", - schema.getDataSource(), interval, indexToPersist + "DataSource[%s], Interval[%s], persisting Hydrant[%s]", + schema.getDataSource(), + interval, + indexToPersist ); - return 0; - } + try { + int numRows = indexToPersist.getIndex().size(); - log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); - try { - int numRows = indexToPersist.getIndex().size(); + File persistedFile = IndexMerger.persist( + indexToPersist.getIndex(), + new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) + ); - File persistedFile = IndexMerger.persist( - indexToPersist.getIndex(), - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) - ); + indexToPersist.swapSegment( + new QueryableIndexSegment( + indexToPersist.getSegment().getIdentifier(), + IndexIO.loadIndex(persistedFile) + ) + ); - indexToPersist.swapSegment( - new QueryableIndexSegment( - indexToPersist.getSegment().getIdentifier(), - IndexIO.loadIndex(persistedFile) - ) - ); + return numRows; + } + catch (IOException e) { + log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) + .addData("interval", interval) + .addData("count", indexToPersist.getCount()) + .emit(); - return numRows; - } - catch (IOException e) { - log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) - .addData("interval", interval) - .addData("count", indexToPersist.getCount()) - .emit(); - - throw Throwables.propagate(e); + throw Throwables.propagate(e); + } } }