diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index 66af2a196ba..c7f13883719 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -59,7 +59,8 @@ public class Execs /** * @param nameFormat nameformat for threadFactory - * @param capacity maximum capacity after which the executorService will block on accepting new tasks + * @param capacity maximum capacity after which the executorService will block on accepting new tasks + * * @return ExecutorService which blocks accepting new tasks when the capacity reached */ public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity) diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index 81b1bc9e316..e48cdc389b3 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -19,10 +19,10 @@ package io.druid.segment.realtime; -import java.util.concurrent.atomic.AtomicLong; - import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicLong; + /** */ public class FireDepartmentMetrics @@ -31,6 +31,9 @@ public class FireDepartmentMetrics private final AtomicLong thrownAwayCount = new AtomicLong(0); private final AtomicLong unparseableCount = new AtomicLong(0); private final AtomicLong rowOutputCount = new AtomicLong(0); + private final AtomicLong numPersists = new AtomicLong(0); + private final AtomicLong persistTimeMillis = new AtomicLong(0); + private final AtomicLong persistBackPressureMillis = new AtomicLong(0); public void incrementProcessed() { @@ -52,6 +55,21 @@ public class FireDepartmentMetrics rowOutputCount.addAndGet(numRows); } + public void incrementNumPersists() + { + numPersists.incrementAndGet(); + } + + public void incrementPersistTimeMillis(long millis) + { + persistTimeMillis.addAndGet(millis); + } + + public void incrementPersistBackPressureMillis(long millis) + { + persistBackPressureMillis.addAndGet(millis); + } + public long processed() { return processedCount.get(); @@ -72,6 +90,21 @@ public class FireDepartmentMetrics return rowOutputCount.get(); } + public long numPersists() + { + return numPersists.get(); + } + + public long persistTimeMillis() + { + return persistTimeMillis.get(); + } + + public long persistBackPressureMillis() + { + return persistBackPressureMillis.get(); + } + public FireDepartmentMetrics snapshot() { final FireDepartmentMetrics retVal = new FireDepartmentMetrics(); @@ -79,6 +112,9 @@ public class FireDepartmentMetrics retVal.thrownAwayCount.set(thrownAwayCount.get()); retVal.unparseableCount.set(unparseableCount.get()); retVal.rowOutputCount.set(rowOutputCount.get()); + retVal.numPersists.set(numPersists.get()); + retVal.persistTimeMillis.set(persistTimeMillis.get()); + retVal.persistBackPressureMillis.set(persistBackPressureMillis.get()); return retVal; } @@ -95,6 +131,9 @@ public class FireDepartmentMetrics thrownAwayCount.addAndGet(otherSnapshot.thrownAway()); rowOutputCount.addAndGet(otherSnapshot.rowOutput()); unparseableCount.addAndGet(otherSnapshot.unparseable()); + numPersists.addAndGet(otherSnapshot.numPersists()); + persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis()); + persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); return this; } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 649f2b5b157..bf8ce9bd669 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; /** -*/ + */ public class RealtimeMetricsMonitor extends AbstractMonitor { private final Map previousValues; @@ -60,6 +60,14 @@ public class RealtimeMetricsMonitor extends AbstractMonitor emitter.emit(builder.build("events/unparseable", metrics.unparseable() - previous.unparseable())); emitter.emit(builder.build("events/processed", metrics.processed() - previous.processed())); emitter.emit(builder.build("rows/output", metrics.rowOutput() - previous.rowOutput())); + emitter.emit(builder.build("persists/num", metrics.numPersists() - previous.numPersists())); + emitter.emit(builder.build("persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); + emitter.emit( + builder.build( + "persists/backPressure", + metrics.persistBackPressureMillis() - previous.persistBackPressureMillis() + ) + ); previousValues.put(fireDepartment, metrics); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index e64111e98b6..1d163dce09d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -3,6 +3,7 @@ package io.druid.segment.realtime.plumber; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -69,12 +70,15 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** */ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); + private static final int WARN_DELAY = 1000; + private final DataSchema schema; private final RealtimeTuningConfig config; private final RejectionPolicy rejectionPolicy; @@ -91,6 +95,7 @@ public class RealtimePlumber implements Plumber private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); + private volatile boolean shuttingDown = false; private volatile boolean stopped = false; private volatile ExecutorService persistExecutor = null; @@ -297,19 +302,35 @@ public class RealtimePlumber implements Plumber log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); + final Stopwatch runExecStopwatch = Stopwatch.createStarted(); + final Stopwatch persistStopwatch = Stopwatch.createStarted(); persistExecutor.execute( new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) { @Override public void doRun() { - for (Pair pair : indexesToPersist) { - metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); + try { + for (Pair pair : indexesToPersist) { + metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); + } + commitRunnable.run(); + } + finally { + metrics.incrementNumPersists(); + metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); + persistStopwatch.stop(); } - commitRunnable.run(); } } ); + + final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS); + metrics.incrementPersistBackPressureMillis(startDelay); + if (startDelay > WARN_DELAY) { + log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay); + } + runExecStopwatch.stop(); } // Submits persist-n-merge task for a Sink to the mergeExecutor @@ -696,7 +717,7 @@ public class RealtimePlumber implements Plumber * being created. * * @param truncatedTime sink key - * @param sink sink to unannounce + * @param sink sink to unannounce */ protected void abandonSegment(final long truncatedTime, final Sink sink) { @@ -735,8 +756,8 @@ public class RealtimePlumber implements Plumber * Persists the given hydrant and returns the number of rows persisted * * @param indexToPersist hydrant to persist - * @param schema datasource schema - * @param interval interval to persist + * @param schema datasource schema + * @param interval interval to persist * * @return the number of rows persisted */ @@ -845,13 +866,13 @@ public class RealtimePlumber implements Plumber && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() && Iterables.any( sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } + { + @Override + public boolean apply(Long sinkKey) + { + return segment.getInterval().contains(sinkKey); + } + } ); } }