From 5c1b2360d63aefc501ac65c478e766bcf483e620 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 20 Dec 2012 16:11:08 -0800 Subject: [PATCH] - Add optional query/wait metric to MetricsEmittingQueryRunner - Add MetricsEmittingExecutorService decorator, and use it on compute nodes --- .../druid/http/ClientQuerySegmentWalker.java | 2 +- .../query/MetricsEmittingExecutorService.java | 72 +++++++++++++++++++ .../query/MetricsEmittingQueryRunner.java | 25 +++++++ .../druid/coordination/ServerManager.java | 2 +- .../com/metamx/druid/http/ComputeNode.java | 17 +++-- 5 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java diff --git a/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java b/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java index d316e8fcab5..d053d6507a2 100644 --- a/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java +++ b/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java @@ -83,7 +83,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } }, toolChest.preMergeQueryDecoration(baseClient) - ) + ).withWaitMeasuredFromNow() ) ), toolChest diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java new file mode 100644 index 00000000000..2e453ddbe0b --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java @@ -0,0 +1,72 @@ +package com.metamx.druid.query; + +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class MetricsEmittingExecutorService extends AbstractExecutorService +{ + private final ExecutorService base; + private final ServiceEmitter emitter; + private final ServiceMetricEvent.Builder metricBuilder; + + public MetricsEmittingExecutorService( + ExecutorService base, + ServiceEmitter emitter, + ServiceMetricEvent.Builder metricBuilder + ) + { + this.base = base; + this.emitter = emitter; + this.metricBuilder = metricBuilder; + } + + @Override + public void shutdown() + { + base.shutdown(); + } + + @Override + public List shutdownNow() + { + return base.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return base.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return base.isTerminated(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException + { + return base.awaitTermination(l, timeUnit); + } + + @Override + public void execute(Runnable runnable) + { + emitMetrics(); + base.execute(runnable); + } + + private void emitMetrics() + { + if (base instanceof ThreadPoolExecutor) { + emitter.emit(metricBuilder.build("exec/backlog", ((ThreadPoolExecutor) base).getQueue().size())); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java index 44db0ac3893..9a7111526de 100644 --- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java @@ -37,16 +37,33 @@ public class MetricsEmittingQueryRunner implements QueryRunner private final ServiceEmitter emitter; private final Function, ServiceMetricEvent.Builder> builderFn; private final QueryRunner queryRunner; + private final long creationTime; public MetricsEmittingQueryRunner( ServiceEmitter emitter, Function, ServiceMetricEvent.Builder> builderFn, QueryRunner queryRunner ) + { + this(emitter, builderFn, queryRunner, -1); + } + + public MetricsEmittingQueryRunner( + ServiceEmitter emitter, + Function, ServiceMetricEvent.Builder> builderFn, + QueryRunner queryRunner, + long creationTime + ) { this.emitter = emitter; this.builderFn = builderFn; this.queryRunner = queryRunner; + this.creationTime = creationTime; + } + + public MetricsEmittingQueryRunner withWaitMeasuredFromNow() + { + return new MetricsEmittingQueryRunner(emitter, builderFn, queryRunner, System.currentTimeMillis()); } @Override @@ -77,6 +94,10 @@ public class MetricsEmittingQueryRunner implements QueryRunner long timeTaken = System.currentTimeMillis() - startTime; emitter.emit(builder.build("query/time", timeTaken)); + + if(creationTime > 0) { + emitter.emit(builder.build("query/wait", startTime - creationTime)); + } } return retVal; @@ -149,6 +170,10 @@ public class MetricsEmittingQueryRunner implements QueryRunner long timeTaken = System.currentTimeMillis() - startTime; emitter.emit(builder.build("query/time", timeTaken)); + if(creationTime > 0) { + emitter.emit(builder.build("query/wait", startTime - creationTime)); + } + yielder.close(); } }; diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index 1d3675e570d..9d0e8bbdfa2 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -336,7 +336,7 @@ public class ServerManager implements QuerySegmentWalker adapter.getInterval().getStart(), factory.createRunner(adapter) ) - ), + ).withWaitMeasuredFromNow(), segmentSpec ); } diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 021ad751ad6..39f43bfcde2 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -40,9 +40,11 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.QueryableLoaderConfig; import com.metamx.druid.loading.StorageAdapterLoader; import com.metamx.druid.metrics.ServerMonitor; +import com.metamx.druid.query.MetricsEmittingExecutorService; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.Monitor; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.smile.SmileFactory; @@ -118,13 +120,16 @@ public class ComputeNode extends BaseServerNode final List monitors = getMonitors(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); - final ExecutorService executorService = ExecutorServices.create( - getLifecycle(), - getConfigFactory().buildWithReplacements( - ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") - ) + final ExecutorService executorService = new MetricsEmittingExecutorService( + ExecutorServices.create( + getLifecycle(), + getConfigFactory().buildWithReplacements( + ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") + ) + ), emitter, new ServiceMetricEvent.Builder() ); - ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService); + + final ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService); final ZkCoordinator coordinator = new ZkCoordinator( getJsonMapper(),