Merge pull request #476 from metamx/non-blocking-persist-n-merge

Non blocking persist n merge
This commit is contained in:
fjy 2014-04-15 10:58:47 -06:00
commit 6f1f21cfd6

View File

@ -68,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
@ -84,16 +83,15 @@ public class RealtimePlumber implements Plumber
private final SegmentPublisher segmentPublisher;
private final ServerView serverView;
private final int maxPendingPersists;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber(
@ -300,13 +298,13 @@ public class RealtimePlumber implements Plumber
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
// Submits persist-n-merge task for a Sink to the mergeExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
mergeExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
@ -431,6 +429,13 @@ public class RealtimePlumber implements Plumber
"plumber_persist_%d", maxPendingPersists
);
}
if (mergeExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = Execs.newBlockingSingleThreaded(
"plumber_merge_%d", 1
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
@ -592,7 +597,11 @@ public class RealtimePlumber implements Plumber
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
} else {
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate);
log.warn(
"[%s] < [%s] Skipping persist and merge.",
new DateTime(intervalStart),
minTimestampAsDate
);
}
}
@ -660,39 +669,46 @@ public class RealtimePlumber implements Plumber
*/
protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
"DataSource[%s], Interval[%s], persisting Hydrant[%s]",
schema.getDataSource(),
interval,
indexToPersist
);
return 0;
}
try {
int numRows = indexToPersist.getIndex().size();
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
throw Throwables.propagate(e);
}
}
}