Merge pull request #1765 from metamx/brokerCpuMetrics

Add CPUTimeMetricQueryRunner to ClientQuerySegmentWalker
This commit is contained in:
Xavier Léauté 2015-10-05 10:51:30 -07:00
commit d8a1dd8d68
1 changed files with 39 additions and 22 deletions

View File

@ -17,7 +17,14 @@
package io.druid.server; package io.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingClusteredClient; import io.druid.client.CachingClusteredClient;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.PostProcessingOperator; import io.druid.query.PostProcessingOperator;
import io.druid.query.Query; import io.druid.query.Query;
@ -28,16 +35,12 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner; import io.druid.query.UnionQueryRunner;
import java.util.Map;
import org.joda.time.Interval; import org.joda.time.Interval;
import com.fasterxml.jackson.core.type.TypeReference; import javax.annotation.Nullable;
import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Map;
import com.google.inject.Inject; import java.util.concurrent.atomic.AtomicLong;
import com.metamx.emitter.service.ServiceEmitter;
/** /**
*/ */
@ -80,23 +83,37 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private <T> QueryRunner<T> makeRunner(final Query<T> query) private <T> QueryRunner<T> makeRunner(final Query<T> query)
{ {
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query); final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>( final QueryRunner<T> baseRunner = CPUTimeMetricQueryRunner.safeBuild(
toolChest.postMergeQueryDecoration( new FinalizeResultsQueryRunner<T>(
toolChest.mergeResults( toolChest.postMergeQueryDecoration(
new UnionQueryRunner<T>( toolChest.mergeResults(
toolChest.preMergeQueryDecoration( new UnionQueryRunner<T>(
new RetryQueryRunner<T>( toolChest.preMergeQueryDecoration(
baseClient, new RetryQueryRunner<T>(
toolChest, baseClient,
retryConfig, toolChest,
objectMapper retryConfig,
) objectMapper
), )
toolChest ),
toolChest
)
) )
) ),
toolChest
), ),
toolChest new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
{
return toolChest.makeMetricBuilder(tQuery);
}
},
emitter,
new AtomicLong(0L),
true
); );
final Map<String, Object> context = query.getContext(); final Map<String, Object> context = query.getContext();