diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 8af8b31dd2b..670ee124601 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -17,7 +17,14 @@ package io.druid.server; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingClusteredClient; +import io.druid.query.CPUTimeMetricQueryRunner; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.PostProcessingOperator; import io.druid.query.Query; @@ -28,16 +35,12 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; - import io.druid.query.UnionQueryRunner; -import java.util.Map; - import org.joda.time.Interval; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceEmitter; +import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -80,23 +83,37 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private QueryRunner makeRunner(final Query query) { final QueryToolChest> toolChest = warehouse.getToolChest(query); - final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( - toolChest.postMergeQueryDecoration( - toolChest.mergeResults( - new UnionQueryRunner( - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( - baseClient, - toolChest, - retryConfig, - objectMapper - ) - ), - toolChest + final QueryRunner baseRunner = CPUTimeMetricQueryRunner.safeBuild( + new FinalizeResultsQueryRunner( + toolChest.postMergeQueryDecoration( + toolChest.mergeResults( + new UnionQueryRunner( + toolChest.preMergeQueryDecoration( + new RetryQueryRunner( + baseClient, + toolChest, + retryConfig, + objectMapper + ) + ), + toolChest + ) ) - ) + ), + toolChest ), - toolChest + new Function, ServiceMetricEvent.Builder>() + { + @Nullable + @Override + public ServiceMetricEvent.Builder apply(Query tQuery) + { + return toolChest.makeMetricBuilder(tQuery); + } + }, + emitter, + new AtomicLong(0L), + true ); final Map context = query.getContext();