From 7cecc550454c7846a659fc942f35f84cbe24812c Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 24 Sep 2015 17:00:14 +0530 Subject: [PATCH] Add segment merge time as a metric Add merge and persist cpu time Fix typo review comment move cpu time measuring to VMUtils review comments. --- .../java/io/druid/common/utils/VMUtils.java | 40 ++++++++++++++- docs/content/operations/metrics.md | 7 ++- .../druid/query/CPUTimeMetricQueryRunner.java | 18 +++---- .../realtime/FireDepartmentMetrics.java | 51 ++++++++++++++++--- .../realtime/RealtimeMetricsMonitor.java | 4 +- .../realtime/plumber/RealtimePlumber.java | 14 ++++- 6 files changed, 115 insertions(+), 19 deletions(-) diff --git a/common/src/main/java/io/druid/common/utils/VMUtils.java b/common/src/main/java/io/druid/common/utils/VMUtils.java index d4230a53e2c..69c9ad7b25e 100644 --- a/common/src/main/java/io/druid/common/utils/VMUtils.java +++ b/common/src/main/java/io/druid/common/utils/VMUtils.java @@ -17,10 +17,43 @@ package io.druid.common.utils; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.lang.reflect.InvocationTargetException; public class VMUtils { + private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); + + public static boolean isThreadCpuTimeEnabled() + { + return THREAD_MX_BEAN.isThreadCpuTimeSupported() && THREAD_MX_BEAN.isThreadCpuTimeEnabled(); + } + + public static long safeGetThreadCpuTime() + { + if (!isThreadCpuTimeEnabled()) { + return 0L; + } else { + return getCurrentThreadCpuTime(); + } + } + + /** + * Returns the total CPU time for current thread. + * This method should be called after verifying that cpu time measurement for current thread is supported by JVM + * + * @return total CPU time for the current thread in nanoseconds. + * + * @throws java.lang.UnsupportedOperationException if the Java + * virtual machine does not support CPU time measurement for + * the current thread. + */ + public static long getCurrentThreadCpuTime() + { + return THREAD_MX_BEAN.getCurrentThreadCpuTime(); + } + public static long getMaxDirectMemory() throws UnsupportedOperationException { try { @@ -28,7 +61,12 @@ public class VMUtils Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { - throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj)); + throw new UnsupportedOperationException( + String.format( + "Cannot determine maxDirectMemory from [%s]", + maxDirectMemoryObj + ) + ); } else { return ((Number) maxDirectMemoryObj).longValue(); } diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 0a445a29655..b8317ad78b3 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -89,10 +89,15 @@ Memcached client metrics are reported as per the following. These metrics come d |`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.| |`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.| |`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.| -|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.| +|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.| +|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|dataSource.|Depends on configuration. Generally a few minutes at most.| |`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0| |`ingest/persists/failed`|Number of persists that failed.|dataSource.|0| |`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0| +|`ingest/merge/time`|Milliseconds spent merging intermediate segments|dataSource.|Depends on configuration. Generally a few minutes at most.| +|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|dataSource.|Depends on configuration. Generally a few minutes at most.| + +Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0. ### Indexing Service diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 7c8dc6180ff..734eb55c6a1 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -32,6 +32,7 @@ import com.metamx.common.guava.YieldingAccumulator; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.common.utils.VMUtils; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; @@ -40,7 +41,6 @@ import java.util.concurrent.atomic.AtomicLong; public class CPUTimeMetricQueryRunner implements QueryRunner { - private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); private final QueryRunner delegate; private final Function, ServiceMetricEvent.Builder> builderFn; private final ServiceEmitter emitter; @@ -55,7 +55,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner boolean report ) { - if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) { + if (!VMUtils.isThreadCpuTimeEnabled()) { throw new ISE("Cpu time must enabled"); } this.delegate = delegate; @@ -78,12 +78,12 @@ public class CPUTimeMetricQueryRunner implements QueryRunner @Override public OutType accumulate(OutType initValue, Accumulator accumulator) { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return baseSequence.accumulate(initValue, accumulator); } finally { - cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start); + cpuTimeAccumulator.addAndGet(VMUtils.getCurrentThreadCpuTime() - start); } } @@ -98,13 +98,13 @@ public class CPUTimeMetricQueryRunner implements QueryRunner @Override public OutType get() { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return delegateYielder.get(); } finally { cpuTimeAccumulator.addAndGet( - THREAD_MX_BEAN.getCurrentThreadCpuTime() - start + VMUtils.getCurrentThreadCpuTime() - start ); } } @@ -112,13 +112,13 @@ public class CPUTimeMetricQueryRunner implements QueryRunner @Override public Yielder next(OutType initValue) { - final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime(); + final long start = VMUtils.getCurrentThreadCpuTime(); try { return delegateYielder.next(initValue); } finally { cpuTimeAccumulator.addAndGet( - THREAD_MX_BEAN.getCurrentThreadCpuTime() - start + VMUtils.getCurrentThreadCpuTime() - start ); } } @@ -164,7 +164,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner boolean report ) { - if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) { + if (!VMUtils.isThreadCpuTimeEnabled()) { return delegate; } else { return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report); diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java index 335aed6bda2..88c14a86eac 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetrics.java @@ -34,6 +34,9 @@ public class FireDepartmentMetrics private final AtomicLong persistBackPressureMillis = new AtomicLong(0); private final AtomicLong failedPersists = new AtomicLong(0); private final AtomicLong failedHandoffs = new AtomicLong(0); + private final AtomicLong mergeTimeMillis = new AtomicLong(0); + private final AtomicLong mergeCpuTime = new AtomicLong(0); + private final AtomicLong persistCpuTime = new AtomicLong(0); public void incrementProcessed() { @@ -71,14 +74,27 @@ public class FireDepartmentMetrics } public void incrementFailedPersists() - { - failedPersists.incrementAndGet(); - } + { + failedPersists.incrementAndGet(); + } public void incrementFailedHandoffs() - { - failedHandoffs.incrementAndGet(); - } + { + failedHandoffs.incrementAndGet(); + } + + public void incrementMergeTimeMillis(long millis) + { + mergeTimeMillis.addAndGet(millis); + } + + public void incrementMergeCpuTime(long mergeTime){ + mergeCpuTime.addAndGet(mergeTime); + } + + public void incrementPersistCpuTime(long persistTime){ + persistCpuTime.addAndGet(persistTime); + } public long processed() { @@ -125,6 +141,22 @@ public class FireDepartmentMetrics return failedHandoffs.get(); } + public long mergeTimeMillis() + { + return mergeTimeMillis.get(); + } + + public long mergeCpuTime() + { + return mergeCpuTime.get(); + } + + public long persistCpuTime() + { + return persistCpuTime.get(); + } + + public FireDepartmentMetrics snapshot() { final FireDepartmentMetrics retVal = new FireDepartmentMetrics(); @@ -137,6 +169,9 @@ public class FireDepartmentMetrics retVal.persistBackPressureMillis.set(persistBackPressureMillis.get()); retVal.failedPersists.set(failedPersists.get()); retVal.failedHandoffs.set(failedHandoffs.get()); + retVal.mergeTimeMillis.set(mergeTimeMillis.get()); + retVal.mergeCpuTime.set(mergeCpuTime.get()); + retVal.persistCpuTime.set(persistCpuTime.get()); return retVal; } @@ -158,6 +193,10 @@ public class FireDepartmentMetrics persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis()); failedPersists.addAndGet(otherSnapshot.failedPersists()); failedHandoffs.addAndGet(otherSnapshot.failedHandoffs()); + mergeTimeMillis.addAndGet(otherSnapshot.mergeTimeMillis()); + mergeCpuTime.addAndGet(otherSnapshot.mergeCpuTime()); + persistCpuTime.addAndGet(otherSnapshot.persistCpuTime()); return this; } + } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 57c25cc2bf4..4d9844d8da7 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -72,6 +72,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput())); emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists())); emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); + emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previous.persistCpuTime())); emitter.emit( builder.build( "ingest/persists/backPressure", @@ -80,7 +81,8 @@ public class RealtimeMetricsMonitor extends AbstractMonitor ); emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists())); emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs())); - + emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previous.mergeTimeMillis())); + emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previous.mergeCpuTime())); previousValues.put(fireDepartment, metrics); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 00a89969e6f..f0ac6924733 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -43,6 +43,7 @@ import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.common.utils.VMUtils; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; @@ -76,6 +77,8 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.SingleElementPartitionChunk; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -400,6 +403,7 @@ public class RealtimePlumber implements Plumber handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing off and the others fail, the real-time would believe that it needs to re-ingest the data). */ + long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime(); try { for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( @@ -415,6 +419,7 @@ public class RealtimePlumber implements Plumber throw e; } finally { + metrics.incrementPersistCpuTime(VMUtils.safeGetThreadCpuTime() - persistThreadCpuTime); metrics.incrementNumPersists(); metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS)); persistStopwatch.stop(); @@ -482,7 +487,8 @@ public class RealtimePlumber implements Plumber } } } - + final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime(); + final Stopwatch mergeStopwatch = Stopwatch.createStarted(); try { List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { @@ -508,6 +514,9 @@ public class RealtimePlumber implements Plumber config.getIndexSpec() ); } + // emit merge metrics before publishing segment + metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); + metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); QueryableIndex index = IndexIO.loadIndex(mergedFile); log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); @@ -539,6 +548,9 @@ public class RealtimePlumber implements Plumber abandonSegment(truncatedTime, sink); } } + finally { + mergeStopwatch.stop(); + } } } );