Merge pull request #1696 from metamx/cpuTimeReporting

Add CPU time to metrics for segment scanning.
This commit is contained in:
Himanshu 2015-09-14 10:53:55 -05:00
commit 5ff92664f8
3 changed files with 259 additions and 50 deletions

View File

@ -42,6 +42,7 @@ Available Metrics
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the historical node).|id, segment.|several hundred milliseconds|
|`query/cpu/time`|Microseconds of CPU time taken to complete a query|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|Varies|
### Real-time

View File

@ -0,0 +1,173 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final ServiceEmitter emitter;
private final AtomicLong cpuTimeAccumulator;
private final boolean report;
private CPUTimeMetricQueryRunner(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
ServiceEmitter emitter,
AtomicLong cpuTimeAccumulator,
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
this.builderFn = builderFn;
this.emitter = emitter;
this.cpuTimeAccumulator = cpuTimeAccumulator == null ? new AtomicLong(0L) : cpuTimeAccumulator;
this.report = report;
}
@Override
public Sequence<T> run(
final Query<T> query, final Map<String, Object> responseContext
)
{
final Sequence<T> baseSequence = delegate.run(query, responseContext);
return Sequences.withEffect(
new Sequence<T>()
{
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
try {
return baseSequence.accumulate(initValue, accumulator);
}
finally {
cpuTimeAccumulator.addAndGet(THREAD_MX_BEAN.getCurrentThreadCpuTime() - start);
}
}
@Override
public <OutType> Yielder<OutType> toYielder(
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
final Yielder<OutType> delegateYielder = baseSequence.toYielder(initValue, accumulator);
return new Yielder<OutType>()
{
@Override
public OutType get()
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
try {
return delegateYielder.get();
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
);
}
}
@Override
public Yielder<OutType> next(OutType initValue)
{
final long start = THREAD_MX_BEAN.getCurrentThreadCpuTime();
try {
return delegateYielder.next(initValue);
}
finally {
cpuTimeAccumulator.addAndGet(
THREAD_MX_BEAN.getCurrentThreadCpuTime() - start
);
}
}
@Override
public boolean isDone()
{
return delegateYielder.isDone();
}
@Override
public void close() throws IOException
{
delegateYielder.close();
}
};
}
},
new Runnable()
{
@Override
public void run()
{
if (report) {
final long cpuTime = cpuTimeAccumulator.get();
if (cpuTime > 0) {
final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query));
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000));
}
}
}
},
MoreExecutors.sameThreadExecutor()
);
}
public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
ServiceEmitter emitter,
AtomicLong accumulator,
boolean report
)
{
if (!THREAD_MX_BEAN.isThreadCpuTimeSupported() || !THREAD_MX_BEAN.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);
}
}
}

View File

@ -36,6 +36,7 @@ import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.DataSource;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
@ -72,6 +73,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -254,6 +256,8 @@ public class ServerManager implements QuerySegmentWalker
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
@ -307,7 +311,9 @@ public class ServerManager implements QuerySegmentWalker
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
)
),
builderFn,
cpuTimeAccumulator
);
}
}
@ -316,9 +322,15 @@ public class ServerManager implements QuerySegmentWalker
}
);
return new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),
builderFn,
emitter,
cpuTimeAccumulator,
true
);
}
@ -363,6 +375,9 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>();
}
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
.transformCat(
@ -388,15 +403,21 @@ public class ServerManager implements QuerySegmentWalker
final ReferenceCountingSegment adapter = chunk.getObject();
return Arrays.asList(
buildAndDecorateQueryRunner(factory, toolChest, adapter, input)
buildAndDecorateQueryRunner(factory, toolChest, adapter, input, builderFn, cpuTimeAccumulator)
);
}
}
);
return new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),
builderFn,
emitter,
cpuTimeAccumulator,
true
);
}
@ -404,52 +425,66 @@ public class ServerManager implements QuerySegmentWalker
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
final ReferenceCountingSegment adapter,
final SegmentDescriptor segmentDescriptor
final SegmentDescriptor segmentDescriptor,
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
final AtomicLong cpuTimeAccumulator
)
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
return new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
new CachingQueryRunner<T>(
return CPUTimeMetricQueryRunner.safeBuild(
new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
segmentDescriptor,
objectMapper,
cache,
toolChest,
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
),
cachingExec,
cacheConfig
)
),
"query/segmentAndCache/time",
ImmutableMap.of("segment", adapter.getIdentifier())
).withWaitMeasuredFromNow(),
segmentSpec
adapter.getDataInterval().getStart(),
new CachingQueryRunner<T>(
adapter.getIdentifier(),
segmentDescriptor,
objectMapper,
cache,
toolChest,
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
),
cachingExec,
cacheConfig
)
),
"query/segmentAndCache/time",
ImmutableMap.of("segment", adapter.getIdentifier())
).withWaitMeasuredFromNow(),
segmentSpec
),
builderFn,
emitter,
cpuTimeAccumulator,
false
);
}
private static <T> Function<Query<T>, ServiceMetricEvent.Builder> getBuilderFn(final QueryToolChest<T, Query<T>> toolChest)
{
return new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
};
}
}