diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index b9d49396682..a4a00a4fdb8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -48,8 +48,8 @@ import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.utils.VMUtils; import io.druid.concurrent.Execs; -import io.druid.data.input.Committer; import io.druid.concurrent.TaskThreadPriority; +import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; @@ -59,7 +59,6 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryToolChest; -import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; @@ -69,7 +68,6 @@ import io.druid.segment.IndexSpec; import io.druid.segment.Metadata; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; -import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -90,7 +88,6 @@ import org.joda.time.Interval; import org.joda.time.Period; import javax.annotation.Nullable; -import javax.ws.rs.HEAD; import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; @@ -481,50 +478,52 @@ public class RealtimePlumber implements Plumber mergeExecutor.execute( new ThreadRenamingRunnable(threadName) { + final Interval interval = sink.getInterval(); + Stopwatch mergeStopwatch = null; + @Override public void doRun() { - final Interval interval = sink.getInterval(); - - // Bail out if this sink has been abandoned by a previously-executed task. - if (sinks.get(truncatedTime) != sink) { - log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink); - return; - } - - // Use a file to indicate that pushing has completed. - final File persistDir = computePersistDir(schema, interval); - final File mergedTarget = new File(persistDir, "merged"); - final File isPushedMarker = new File(persistDir, "isPushedMarker"); - - if (!isPushedMarker.exists()) { - removeSegment(sink, mergedTarget); - if (mergedTarget.exists()) { - log.wtf("Merged target[%s] exists?!", mergedTarget); + try { + // Bail out if this sink has been abandoned by a previously-executed task. + if (sinks.get(truncatedTime) != sink) { + log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink); + return; + } + + // Use a file to indicate that pushing has completed. + final File persistDir = computePersistDir(schema, interval); + final File mergedTarget = new File(persistDir, "merged"); + final File isPushedMarker = new File(persistDir, "isPushedMarker"); + + if (!isPushedMarker.exists()) { + removeSegment(sink, mergedTarget); + if (mergedTarget.exists()) { + log.wtf("Merged target[%s] exists?!", mergedTarget); + return; + } + } else { + log.info("Already pushed sink[%s]", sink); return; } - } else { - log.info("Already pushed sink[%s]", sink); - return; - } /* Note: it the plumber crashes after persisting a subset of hydrants then might duplicate data as these hydrants will be read but older commitMetadata will be used. fixing this possibly needs structural changes to plumber. */ - for (FireHydrant hydrant : sink) { - synchronized (hydrant) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval, null); - metrics.incrementRowOutputCount(rowCount); + for (FireHydrant hydrant : sink) { + synchronized (hydrant) { + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval, null); + metrics.incrementRowOutputCount(rowCount); + } } } - } - final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime(); - final Stopwatch mergeStopwatch = Stopwatch.createStarted(); - try { + final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime(); + mergeStopwatch = Stopwatch.createStarted(); + List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { Segment segment = fireHydrant.getSegment(); @@ -575,7 +574,9 @@ public class RealtimePlumber implements Plumber } } finally { - mergeStopwatch.stop(); + if (mergeStopwatch != null) { + mergeStopwatch.stop(); + } } } }