Add segment merge time as a metric

Add merge and persist cpu time

Fix typo

review comment

move cpu time measuring to VMUtils

review comments.
This commit is contained in:
Nishant 2015-09-24 17:00:14 +05:30
parent e4ac78e43d
commit 7cecc55045
6 changed files with 115 additions and 19 deletions

View File

@ -17,10 +17,43 @@
package io.druid.common.utils;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
public class VMUtils
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
public static boolean isThreadCpuTimeEnabled()
{
return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled();
}
public static long safeGetThreadCpuTime()
{
if (!isThreadCpuTimeEnabled()) {
return 0L;
} else {
return getCurrentThreadCpuTime();
}
}
/**
* Returns the total CPU time for current thread.
* This method should be called after verifying that cpu time measurement for current thread is supported by JVM
*
* @return total CPU time for the current thread in nanoseconds.
*
* @throws java.lang.UnsupportedOperationException if the Java
* virtual machine does not support CPU time measurement for
* the current thread.
*/
public static long getCurrentThreadCpuTime()
{
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}
public static long getMaxDirectMemory() throws UnsupportedOperationException
{
try {
@ -28,7 +61,12 @@ public class VMUtils
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj));
throw new UnsupportedOperationException(
String.format(
"Cannot determine maxDirectMemory from [%s]",
maxDirectMemoryObj
)
);
} else {
return ((Number) maxDirectMemoryObj).longValue();
}

View File

@ -89,10 +89,15 @@ Memcached client metrics are reported as per the following. These metrics come d
|`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.|
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0|
|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0|
|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.|
|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.|
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
### Indexing Service

View File

@ -32,6 +32,7 @@ import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.utils.VMUtils;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
@ -40,7 +41,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final ServiceEmitter emitter;
@ -55,7 +55,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
@ -78,12 +78,12 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return baseSequence.accumulate(initValue, accumulator);
}
finally {
cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start);
}
}
@ -98,13 +98,13 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public OutType get()
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.get();
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}
@ -112,13 +112,13 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
@Override
public Yielder<OutType> next(OutType initValue)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
final long start = VMUtils.getCurrentThreadCpuTime();
try {
return delegateYielder.next(initValue);
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
VMUtils.getCurrentThreadCpuTime() - start
);
}
}
@ -164,7 +164,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
if (!VMUtils.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);

View File

@ -34,6 +34,9 @@ public class FireDepartmentMetrics
private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
private final AtomicLong failedPersists = new AtomicLong(0);
private final AtomicLong failedHandoffs = new AtomicLong(0);
private final AtomicLong mergeTimeMillis = new AtomicLong(0);
private final AtomicLong mergeCpuTime = new AtomicLong(0);
private final AtomicLong persistCpuTime = new AtomicLong(0);
public void incrementProcessed()
{
@ -80,6 +83,19 @@ public class FireDepartmentMetrics
failedHandoffs.incrementAndGet();
}
public void incrementMergeTimeMillis(long millis)
{
mergeTimeMillis.addAndGet(millis);
}
public void incrementMergeCpuTime(long mergeTime){
mergeCpuTime.addAndGet(mergeTime);
}
public void incrementPersistCpuTime(long persistTime){
persistCpuTime.addAndGet(persistTime);
}
public long processed()
{
return processedCount.get();
@ -125,6 +141,22 @@ public class FireDepartmentMetrics
return failedHandoffs.get();
}
public long mergeTimeMillis()
{
return mergeTimeMillis.get();
}
public long mergeCpuTime()
{
return mergeCpuTime.get();
}
public long persistCpuTime()
{
return persistCpuTime.get();
}
public FireDepartmentMetrics snapshot()
{
final FireDepartmentMetrics retVal = new FireDepartmentMetrics();
@ -137,6 +169,9 @@ public class FireDepartmentMetrics
retVal.persistBackPressureMillis.set(persistBackPressureMillis.get());
retVal.failedPersists.set(failedPersists.get());
retVal.failedHandoffs.set(failedHandoffs.get());
retVal.mergeTimeMillis.set(mergeTimeMillis.get());
retVal.mergeCpuTime.set(mergeCpuTime.get());
retVal.persistCpuTime.set(persistCpuTime.get());
return retVal;
}
@ -158,6 +193,10 @@ public class FireDepartmentMetrics
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
failedPersists.addAndGet(otherSnapshot.failedPersists());
failedHandoffs.addAndGet(otherSnapshot.failedHandoffs());
mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis());
mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime());
persistCpuTime.addAndGet(otherSnapshot.persistCpuTime());
return this;
}
}

View File

@ -72,6 +72,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists()));
emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis()));
emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previous.persistCpuTime()));
emitter.emit(
builder.build(
"ingest/persists/backPressure",
@ -80,7 +81,8 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
);
emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists()));
emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs()));
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime()));
previousValues.put(fireDepartment, metrics);
}

View File

@ -43,6 +43,7 @@ import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.common.utils.VMUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
@ -76,6 +77,8 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -400,6 +403,7 @@ public class RealtimePlumber implements Plumber
handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
off and the others fail, the real-time would believe that it needs to re-ingest the data).
*/
long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime();
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(
@ -415,6 +419,7 @@ public class RealtimePlumber implements Plumber
throw e;
}
finally {
metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime);
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
@ -482,7 +487,8 @@ public class RealtimePlumber implements Plumber
}
}
}
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
final Stopwatch mergeStopwatch = Stopwatch.createStarted();
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@ -508,6 +514,9 @@ public class RealtimePlumber implements Plumber
config.getIndexSpec()
);
}
// emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
QueryableIndex index = IndexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
@ -539,6 +548,9 @@ public class RealtimePlumber implements Plumber
abandonSegment(truncatedTime, sink);
}
}
finally {
mergeStopwatch.stop();
}
}
}
);