Merge pull request #1021 from druid-io/log-mpp

Add more metrics and logs for when ingestion may be throttled
This commit is contained in:
Xavier Léauté 2015-01-09 14:37:30 -08:00
commit 705e5ab0b1
4 changed files with 86 additions and 17 deletions

View File

@ -59,7 +59,8 @@ public class Execs
/**
* @param nameFormat nameformat for threadFactory
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
* @param capacity maximum capacity after which the executorService will block on accepting new tasks
*
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)

View File

@ -19,10 +19,10 @@
package io.druid.segment.realtime;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class FireDepartmentMetrics
@ -31,6 +31,9 @@ public class FireDepartmentMetrics
private final AtomicLong thrownAwayCount = new AtomicLong(0);
private final AtomicLong unparseableCount = new AtomicLong(0);
private final AtomicLong rowOutputCount = new AtomicLong(0);
private final AtomicLong numPersists = new AtomicLong(0);
private final AtomicLong persistTimeMillis = new AtomicLong(0);
private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
public void incrementProcessed()
{
@ -52,6 +55,21 @@ public class FireDepartmentMetrics
rowOutputCount.addAndGet(numRows);
}
public void incrementNumPersists()
{
numPersists.incrementAndGet();
}
public void incrementPersistTimeMillis(long millis)
{
persistTimeMillis.addAndGet(millis);
}
public void incrementPersistBackPressureMillis(long millis)
{
persistBackPressureMillis.addAndGet(millis);
}
public long processed()
{
return processedCount.get();
@ -72,6 +90,21 @@ public class FireDepartmentMetrics
return rowOutputCount.get();
}
public long numPersists()
{
return numPersists.get();
}
public long persistTimeMillis()
{
return persistTimeMillis.get();
}
public long persistBackPressureMillis()
{
return persistBackPressureMillis.get();
}
public FireDepartmentMetrics snapshot()
{
final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
@ -79,6 +112,9 @@ public class FireDepartmentMetrics
retVal.thrownAwayCount.set(thrownAwayCount.get());
retVal.unparseableCount.set(unparseableCount.get());
retVal.rowOutputCount.set(rowOutputCount.get());
retVal.numPersists.set(numPersists.get());
retVal.persistTimeMillis.set(persistTimeMillis.get());
retVal.persistBackPressureMillis.set(persistBackPressureMillis.get());
return retVal;
}
@ -95,6 +131,9 @@ public class FireDepartmentMetrics
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
unparseableCount.addAndGet(otherSnapshot.unparseable());
numPersists.addAndGet(otherSnapshot.numPersists());
persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis());
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
return this;
}
}

View File

@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
/**
*/
*/
public class RealtimeMetricsMonitor extends AbstractMonitor
{
private final Map<FireDepartment, FireDepartmentMetrics> previousValues;
@ -60,6 +60,14 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("events/unparseable", metrics.unparseable() - previous.unparseable()));
emitter.emit(builder.build("events/processed", metrics.processed() - previous.processed()));
emitter.emit(builder.build("rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("persists/num", metrics.numPersists() - previous.numPersists()));
emitter.emit(builder.build("persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis()));
emitter.emit(
builder.build(
"persists/backPressure",
metrics.persistBackPressureMillis() - previous.persistBackPressureMillis()
)
);
previousValues.put(fireDepartment, metrics);
}

View File

@ -3,6 +3,7 @@ package io.druid.segment.realtime.plumber;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -69,12 +70,15 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*/
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private static final int WARN_DELAY = 1000;
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
@ -91,6 +95,7 @@ public class RealtimePlumber implements Plumber
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
@ -297,19 +302,35 @@ public class RealtimePlumber implements Plumber
log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void doRun()
{
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
}
commitRunnable.run();
}
finally {
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
}
commitRunnable.run();
}
}
);
final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
metrics.incrementPersistBackPressureMillis(startDelay);
if (startDelay > WARN_DELAY) {
log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
}
runExecStopwatch.stop();
}
// Submits persist-n-merge task for a Sink to the mergeExecutor
@ -696,7 +717,7 @@ public class RealtimePlumber implements Plumber
* being created.
*
* @param truncatedTime sink key
* @param sink sink to unannounce
* @param sink sink to unannounce
*/
protected void abandonSegment(final long truncatedTime, final Sink sink)
{
@ -735,8 +756,8 @@ public class RealtimePlumber implements Plumber
* Persists the given hydrant and returns the number of rows persisted
*
* @param indexToPersist hydrant to persist
* @param schema datasource schema
* @param interval interval to persist
* @param schema datasource schema
* @param interval interval to persist
*
* @return the number of rows persisted
*/
@ -845,13 +866,13 @@ public class RealtimePlumber implements Plumber
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
}