persist-n-merge in separate pool

persist-n-merge in separate pool, do not block intermediate persists
This commit is contained in:
nishantmonu51 2014-04-11 12:18:11 +05:30
parent bfeed26765
commit db60353cac

View File

@ -68,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber public class RealtimePlumber implements Plumber
{ {
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final Period windowPeriod; private final Period windowPeriod;
private final File basePersistDirectory; private final File basePersistDirectory;
private final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
@ -84,16 +83,15 @@ public class RealtimePlumber implements Plumber
private final SegmentPublisher segmentPublisher; private final SegmentPublisher segmentPublisher;
private final ServerView serverView; private final ServerView serverView;
private final int maxPendingPersists; private final int maxPendingPersists;
private final Object handoffCondition = new Object(); private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap(); private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>( private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER String.CASE_INSENSITIVE_ORDER
); );
private volatile boolean shuttingDown = false; private volatile boolean shuttingDown = false;
private volatile boolean stopped = false; private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null; private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber( public RealtimePlumber(
@ -306,7 +304,7 @@ public class RealtimePlumber implements Plumber
final String threadName = String.format( final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime) "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
); );
persistExecutor.execute( mergeExecutor.execute(
new ThreadRenamingRunnable(threadName) new ThreadRenamingRunnable(threadName)
{ {
@Override @Override
@ -315,10 +313,10 @@ public class RealtimePlumber implements Plumber
final Interval interval = sink.getInterval(); final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) { for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) { if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval); final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount); metrics.incrementRowOutputCount(rowCount);
} }
} }
@ -431,6 +429,13 @@ public class RealtimePlumber implements Plumber
"plumber_persist_%d", maxPendingPersists "plumber_persist_%d", maxPendingPersists
); );
} }
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = Execs.newBlockingSingleThreaded(
"plumber_persist_%d", maxPendingPersists
);
}
if (scheduledExecutor == null) { if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool( scheduledExecutor = Executors.newScheduledThreadPool(
1, 1,
@ -592,7 +597,11 @@ public class RealtimePlumber implements Plumber
log.info("Adding entry[%s] for merge and push.", entry); log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry); sinksToPush.add(entry);
} else { } else {
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate); log.warn(
"[%s] < [%s] Skipping persist and merge.",
new DateTime(intervalStart),
minTimestampAsDate
);
} }
} }
@ -660,39 +669,46 @@ public class RealtimePlumber implements Plumber
*/ */
protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{ {
if (indexToPersist.hasSwapped()) { synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info( log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", "DataSource[%s], Interval[%s], persisting Hydrant[%s]",
schema.getDataSource(), interval, indexToPersist schema.getDataSource(),
interval,
indexToPersist
); );
return 0; try {
} int numRows = indexToPersist.getIndex().size();
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); File persistedFile = IndexMerger.persist(
try { indexToPersist.getIndex(),
int numRows = indexToPersist.getIndex().size(); new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
File persistedFile = IndexMerger.persist( indexToPersist.swapSegment(
indexToPersist.getIndex(), new QueryableIndexSegment(
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) indexToPersist.getSegment().getIdentifier(),
); IndexIO.loadIndex(persistedFile)
)
);
indexToPersist.swapSegment( return numRows;
new QueryableIndexSegment( }
indexToPersist.getSegment().getIdentifier(), catch (IOException e) {
IndexIO.loadIndex(persistedFile) log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
) .addData("interval", interval)
); .addData("count", indexToPersist.getCount())
.emit();
return numRows; throw Throwables.propagate(e);
} }
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
} }
} }