diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 68aa23577ff..9f6e4c41acc 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -1,9 +1,11 @@ package io.druid.segment.realtime.plumber; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.common.guava.ThreadRenamingCallable; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexGranularity; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -14,6 +16,7 @@ import org.joda.time.Duration; import org.joda.time.Period; import java.io.File; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -28,13 +31,8 @@ public class FlushingPlumber extends RealtimePlumber private final Duration flushDuration; - private final ScheduledExecutorService flushScheduledExec = Executors.newScheduledThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("flushing_scheduled_%d") - .build() - ); + private volatile ScheduledExecutorService flushScheduledExec = null; + private volatile boolean stopped = false; public FlushingPlumber( Duration flushDuration, @@ -78,7 +76,19 @@ public class FlushingPlumber extends RealtimePlumber computeBaseDir(getSchema()).mkdirs(); initializeExecutors(); + + if (flushScheduledExec == null) { + flushScheduledExec = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("flushing_scheduled_%d") + .build() + ); + } + bootstrapSinksFromDisk(); + startFlushThread(); } protected void flushAfterDuration(final long truncatedTime, final Sink sink) @@ -105,6 +115,67 @@ public class FlushingPlumber extends RealtimePlumber ); } + private void startFlushThread() + { + final long truncatedNow = getSegmentGranularity().truncate(new DateTime()).getMillis(); + final long windowMillis = getWindowPeriod().toStandardDuration().getMillis(); + + log.info( + "Expect to run at [%s]", + new DateTime().plus( + new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis) + ) + ); + + ScheduledExecutors + .scheduleAtFixedRate( + flushScheduledExec, + new Duration(System.currentTimeMillis(), getSegmentGranularity().increment(truncatedNow) + windowMillis), + new Duration(truncatedNow, getSegmentGranularity().increment(truncatedNow)), + new ThreadRenamingCallable( + String.format( + "%s-flusher-%d", + getSchema().getDataSource(), + getSchema().getShardSpec().getPartitionNum() + ) + ) + { + @Override + public ScheduledExecutors.Signal doCall() + { + if (stopped) { + log.info("Stopping flusher thread"); + return ScheduledExecutors.Signal.STOP; + } + + long minTimestamp = getSegmentGranularity().truncate( + getRejectionPolicy().getCurrMaxTime().minus(windowMillis) + ).getMillis(); + + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : getSinks().entrySet()) { + final Long intervalStart = entry.getKey(); + if (intervalStart < minTimestamp) { + log.info("Adding entry[%s] for merge and push.", entry); + sinksToPush.add(entry); + } + } + + for (final Map.Entry entry : sinksToPush) { + flushAfterDuration(entry.getKey(), entry.getValue()); + } + + if (stopped) { + log.info("Stopping merge-n-push overseer thread"); + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; + } + } + } + ); + } + @Override public void finishJob() { @@ -114,5 +185,11 @@ public class FlushingPlumber extends RealtimePlumber flushAfterDuration(entry.getKey(), entry.getValue()); } shutdownExecutors(); + + if (flushScheduledExec != null) { + flushScheduledExec.shutdown(); + } + + stopped = true; } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 104316c0c01..e7a09df689a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -134,9 +134,24 @@ public class RealtimePlumber implements Plumber return schema; } - public ScheduledExecutorService getScheduledExecutor() + public Period getWindowPeriod() { - return scheduledExecutor; + return windowPeriod; + } + + public IndexGranularity getSegmentGranularity() + { + return segmentGranularity; + } + + public VersioningPolicy getVersioningPolicy() + { + return versioningPolicy; + } + + public RejectionPolicy getRejectionPolicy() + { + return rejectionPolicy; } public Map getSinks()