fix how flushing plumber flushes

This commit is contained in:
fjy 2013-12-06 10:25:50 -08:00
parent 932d688022
commit 767663af92
2 changed files with 101 additions and 9 deletions

View File

@ -1,9 +1,11 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexGranularity; import io.druid.segment.IndexGranularity;
import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireDepartmentMetrics;
@ -14,6 +16,7 @@ import org.joda.time.Duration;
import org.joda.time.Period; import org.joda.time.Period;
import java.io.File; import java.io.File;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -28,13 +31,8 @@ public class FlushingPlumber extends RealtimePlumber
private final Duration flushDuration; private final Duration flushDuration;
private final ScheduledExecutorService flushScheduledExec = Executors.newScheduledThreadPool( private volatile ScheduledExecutorService flushScheduledExec = null;
1, private volatile boolean stopped = false;
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("flushing_scheduled_%d")
.build()
);
public FlushingPlumber( public FlushingPlumber(
Duration flushDuration, Duration flushDuration,
@ -78,7 +76,19 @@ public class FlushingPlumber extends RealtimePlumber
computeBaseDir(getSchema()).mkdirs(); computeBaseDir(getSchema()).mkdirs();
initializeExecutors(); initializeExecutors();
if (flushScheduledExec == null) {
flushScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("flushing_scheduled_%d")
.build()
);
}
bootstrapSinksFromDisk(); bootstrapSinksFromDisk();
startFlushThread();
} }
protected void flushAfterDuration(final long truncatedTime, final Sink sink) protected void flushAfterDuration(final long truncatedTime, final Sink sink)
@ -105,6 +115,67 @@ public class FlushingPlumber extends RealtimePlumber
); );
} }
private void startFlushThread()
{
final long truncatedNow = getSegmentGranularity().truncate(new DateTime()).getMillis();
final long windowMillis = getWindowPeriod().toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis)
)
);
ScheduledExecutors
.scheduleAtFixedRate(
flushScheduledExec,
new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis),
new Duration(truncatedNow, getSegmentGranularity().increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-flusher-%d",
getSchema().getDataSource(),
getSchema().getShardSpec().getPartitionNum()
)
)
{
@Override
public ScheduledExecutors.Signal doCall()
{
if (stopped) {
log.info("Stopping flusher thread");
return ScheduledExecutors.Signal.STOP;
}
long minTimestamp = getSegmentGranularity().truncate(
getRejectionPolicy().getCurrMaxTime().minus(windowMillis)
).getMillis();
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : getSinks().entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
}
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
flushAfterDuration(entry.getKey(), entry.getValue());
}
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
@Override @Override
public void finishJob() public void finishJob()
{ {
@ -114,5 +185,11 @@ public class FlushingPlumber extends RealtimePlumber
flushAfterDuration(entry.getKey(), entry.getValue()); flushAfterDuration(entry.getKey(), entry.getValue());
} }
shutdownExecutors(); shutdownExecutors();
if (flushScheduledExec != null) {
flushScheduledExec.shutdown();
}
stopped = true;
} }
} }

View File

@ -134,9 +134,24 @@ public class RealtimePlumber implements Plumber
return schema; return schema;
} }
public ScheduledExecutorService getScheduledExecutor() public Period getWindowPeriod()
{ {
return scheduledExecutor; return windowPeriod;
}
public IndexGranularity getSegmentGranularity()
{
return segmentGranularity;
}
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
public RejectionPolicy getRejectionPolicy()
{
return rejectionPolicy;
} }
public Map<Long, Sink> getSinks() public Map<Long, Sink> getSinks()