Schemaless metrics + additional metrics for things we care about

This commit is contained in:
fjy 2015-03-16 15:57:10 -07:00
parent f5943ed494
commit 963e5765bf
28 changed files with 522 additions and 242 deletions

View File

@ -3,11 +3,168 @@ layout: doc_page
--- ---
# Druid Metrics # Druid Metrics
Druid emits a variety of query, ingestion, and coordination metrics. Druid generates metrics related to queries, ingestion, and coordination.
Metrics can be displayed in the runtime log file or over http (to a service such as Apache Kafka). Metric emission is disabled by default. Metrics are emitted as JSON objects to a runtime log file or over HTTP (to a service such as Apache Kafka). Metric emission is disabled by default.
All Druid metrics share a common set of fields:
* `timestamp` - the time the metric was created
* `metric` - the name of the metric
* `service` - the service name that emitted the metric
* `host` - the host name that emitted the metric
* `value` - some numeric value associated with the metric
Metrics may have additional dimensions beyond those listed above.
Most metric values reset each emission period.
Available Metrics Available Metrics
----------------- -----------------
Please refer to the following [spreadsheet](https://docs.google.com/spreadsheets/d/15XxGrGv2ggdt4SnCoIsckqUNJBZ9ludyhC9hNQa3l-M/edit#gid=0) for available Druid metrics. In the near future, we will be reworking Druid metrics and fully listing out important metrics to monitor and their expected values. ## Query Metrics
### Broker
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
### Historical
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the historical node).|id, segment.|several hundred milliseconds|
### Real-time
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
### Cache
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/cache/delta/*`|Cache metrics since the last emission.||N/A|
|`query/cache/total/*`|Total cache metrics.||N/A|
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`*/numEntries`|Number of cache entries.||Varies.|
|`*/sizeBytes`|Size in bytes of cache entries.||Varies.|
|`*/hits`|Number of cache hits.||Varies.|
|`*/misses`|Number of cache misses.||Varies.|
|`*/evictions`|Number of cache evictions.||Varies.|
|`*/hitRate`|Cache hit rate.||~40%|
|`*/averageByte`|Average cache entry byte size.||Varies.|
|`*/timeouts`|Number of cache timeouts.||0|
|`*/errors`|Number of cache errors.||0|
## Ingestion Metrics
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource.|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource.|0|
|`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.|
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.|
|`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0|
|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0|
|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0|
### Indexing Service
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run task.|dataSource, taskType, taskStatus.|Varies.|
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskType, interval.|Varies.|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskType, interval.|Varies.|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskType, interval.|Varies.|
## Coordination
These metrics are for the Druid coordinator and are reset each time the coordinator runs the coordination logic.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`segment/added/count`|Number of segments added to the cluster.|tier.|Varies.|
|`segment/moved/count`|Number of segments moved in the cluster.|tier.|Varies.|
|`segment/dropped/count`|Number of segments dropped due to being overshadowed.|tier.|Varies.|
|`segment/deleted/count`|Number of segments dropped due to rules.|tier.|Varies.|
|`segment/unneeded/count`|Number of segments dropped due to being marked as unused.|tier.|Varies.|
|`segment/cost/raw`|Used in cost balancing. The raw cost of hosting segments.|tier.|Varies.|
|`segment/cost/normalization`|Used in cost balancing. The normalization of hosting segments.|tier.|Varies.|
|`segment/cost/normalized`|Used in cost balancing. The normalized cost of hosting segments.|tier.|Varies.|
|`segment/loadQueue/size`|Size in bytes of segments to load.|server.|Varies.|
|`segment/loadQueue/failed`|Number of segments that failed to load.|server.|0|
|`segment/loadQueue/count`|Number of segments to load.|server.|Varies.|
|`segment/dropQueue/count`|Number of segments to drop.|server.|Varies.|
|`segment/size`|Size in bytes of available segments.|dataSource.|Varies.|
|`segment/count`|Number of available segments.|dataSource.|< max|
|`segment/overShadowed/count`|Number of overShadowed segments.||Varies.|
## General Health
### Historical
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`segment/max`|Maximum byte limit available for segments.||Varies.|
|`segment/used`|Bytes used for served segments.|dataSource, tier, priority.|< max|
|`segment/usedPercent`|Percentage of space used by served segments.|dataSource, tier, priority.|< 100%|
|`segment/count`|Number of served segments.|dataSource, tier, priority.|Varies.|
### JVM
These metrics are only available if the JVMMonitor module is included.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`jvm/pool/committed`|Committed pool.|poolKind, poolName.|close to max pool|
|`jvm/pool/init`|Initial pool.|poolKind, poolName.|Varies.|
|`jvm/pool/max`|Max pool.|poolKind, poolName.|Varies.|
|`jvm/pool/used`|Pool used.|poolKind, poolName.|< max pool|
|`jvm/bufferpool/count`|bufferPoolName.|Bufferpool count.|Varies.|
|`jvm/bufferpool/used`|bufferPoolName.|Bufferpool used.|close to capacity|
|`jvm/bufferpool/capacity`|bufferPoolName.|Bufferpool capacity.|Varies.|
|`jvm/mem/init`|Initial memory.|memKind.|Varies.|
|`jvm/mem/max`|Max memory.|memKind.|Varies.|
|`jvm/mem/used`|Used memory.|memKind.|< max memory|
|`jvm/mem/committed`|Committed memory.|memKind.|close to max memory|
|`jvm/gc/count`|gcName.|Garbage collection count.|< 100|
|`jvm/gc/time`|gcName.|Garbage collection time.|< 1s|
## Sys
These metrics are only available if the SysMonitor module is included.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`sys/swap/free`|Free swap.||Varies.|
|`sys/swap/max`|Max swap.||Varies.|
|`sys/swap/pageIn`|Paged in swap.||Varies.|
|`sys/swap/pageOut`|Paged out swap.||Varies.|
|`sys/disk/write/count`|Writes to disk.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.|
|`sys/disk/read/count`|Reads from disk.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.|
|`sys/disk/write/size`|Bytes written to disk. Can we used to determine how much paging is occuring with regards to segments.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.|
|`sys/disk/read/size`|Bytes read from disk. Can we used to determine how much paging is occuring with regards to segments.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.|
|`sys/net/write/size`|Bytes written to the network.|netName, netAddress, netHwaddr|Varies.|
|`sys/net/read/size`|Bytes read from the network.|netName, netAddress, netHwaddr|Varies.|
|`sys/fs/used`|Filesystem bytes used.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|< max|
|`sys/fs/max`|Filesystesm bytes max.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.|
|`sys/mem/used`|Memory used.||< max|
|`sys/mem/max`|Memory max.||Varies.|
|`sys/storage/used`|Disk space used.|fsDirName.|Varies.|
|`sys/cpu`|CPU used.|cpuName, cpuTime.|Varies.|

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.io.IOException; import java.io.IOException;
@ -83,12 +84,12 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
// Emit metrics // Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource()) .setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setUser4(task.getType()); .setDimension(DruidMetrics.TASK_TYPE, task.getType());
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
metricBuilder.setUser5(segment.getInterval().toString()); metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("indexer/segment/bytes", segment.getSize())); toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
} }
return retVal; return retVal;

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.io.IOException; import java.io.IOException;
@ -63,12 +64,12 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
// Emit metrics // Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource()) .setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setUser4(task.getType()); .setDimension(DruidMetrics.TASK_TYPE, task.getType());
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
metricBuilder.setUser5(segment.getInterval().toString()); metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize())); toolbox.getEmitter().emit(metricBuilder.build("segment/moved/bytes", segment.getSize()));
} }
return null; return null;

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.io.IOException; import java.io.IOException;
@ -50,7 +51,9 @@ public class SegmentNukeAction implements TaskAction<Void>
public TypeReference<Void> getReturnTypeReference() public TypeReference<Void> getReturnTypeReference()
{ {
return new TypeReference<Void>() {}; return new TypeReference<Void>()
{
};
} }
@Override @Override
@ -61,12 +64,12 @@ public class SegmentNukeAction implements TaskAction<Void>
// Emit metrics // Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource()) .setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setUser4(task.getType()); .setDimension(DruidMetrics.TASK_TYPE, task.getType());
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
metricBuilder.setUser5(segment.getInterval().toString()); metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentNuked/bytes", segment.getSize())); toolbox.getEmitter().emit(metricBuilder.build("segment/nuked/bytes", segment.getSize()));
} }
return null; return null;

View File

@ -42,6 +42,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.metadata.EntryExistsException; import io.druid.metadata.EntryExistsException;
import io.druid.query.DruidMetrics;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -55,11 +56,11 @@ import java.util.concurrent.locks.ReentrantLock;
/** /**
* Interface between task producers and the task runner. * Interface between task producers and the task runner.
* * <p/>
* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a * This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a
* {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready * {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready
* in time (based on its {@link Task#isReady} method). * in time (based on its {@link Task#isReady} method).
* * <p/>
* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object. * For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object.
*/ */
public class TaskQueue public class TaskQueue
@ -270,7 +271,8 @@ public class TaskQueue
for (final String taskId : tasksToKill) { for (final String taskId : tasksToKill) {
try { try {
taskRunner.shutdown(taskId); taskRunner.shutdown(taskId);
} catch (Exception e) { }
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId); log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
} }
} }
@ -291,6 +293,7 @@ public class TaskQueue
* @param task task to add * @param task task to add
* *
* @return true * @return true
*
* @throws io.druid.metadata.EntryExistsException if the task already exists * @throws io.druid.metadata.EntryExistsException if the task already exists
*/ */
public boolean add(final Task task) throws EntryExistsException public boolean add(final Task task) throws EntryExistsException
@ -316,6 +319,7 @@ public class TaskQueue
/** /**
* Shuts down a task if it has not yet finished. * Shuts down a task if it has not yet finished.
*
* @param taskId task to kill * @param taskId task to kill
*/ */
public void shutdown(final String taskId) public void shutdown(final String taskId)
@ -330,7 +334,8 @@ public class TaskQueue
break; break;
} }
} }
} finally { }
finally {
giant.unlock(); giant.unlock();
} }
} }
@ -364,14 +369,15 @@ public class TaskQueue
// Inform taskRunner that this task can be shut down // Inform taskRunner that this task can be shut down
try { try {
taskRunner.shutdown(task.getId()); taskRunner.shutdown(task.getId());
} catch (Exception e) { }
catch (Exception e) {
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
} }
// Remove from running tasks // Remove from running tasks
int removed = 0; int removed = 0;
for (int i = tasks.size() - 1 ; i >= 0 ; i--) { for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) { if (tasks.get(i).getId().equals(task.getId())) {
removed ++; removed++;
tasks.remove(i); tasks.remove(i);
break; break;
} }
@ -420,8 +426,9 @@ public class TaskQueue
private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final ListenableFuture<TaskStatus> statusFuture) private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final ListenableFuture<TaskStatus> statusFuture)
{ {
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource()) .setDimension("dataSource", task.getDataSource())
.setUser4(task.getType()); .setDimension("taskType", task.getType());
Futures.addCallback( Futures.addCallback(
statusFuture, statusFuture,
new FutureCallback<TaskStatus>() new FutureCallback<TaskStatus>()
@ -458,8 +465,8 @@ public class TaskQueue
// Emit event and log, if the task is done // Emit event and log, if the task is done
if (status.isComplete()) { if (status.isComplete()) {
metricBuilder.setUser3(status.getStatusCode().toString()); metricBuilder.setDimension(DruidMetrics.TASK_STATUS, status.getStatusCode().toString());
emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration())); emitter.emit(metricBuilder.build("task/run/time", status.getDuration()));
log.info( log.info(
"Task %s: %s (%d run duration)", "Task %s: %s (%d run duration)",

View File

@ -23,18 +23,46 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List;
/** /**
*/ */
public class QueryMetricUtil public class DruidMetrics
{ {
public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(Query<T> query) public final static String DATASOURCE = "dataSource";
public final static String TYPE = "type";
public final static String INTERVAL = "interval";
public final static String ID = "id";
public final static String STATUS = "status";
// task metrics
public final static String TASK_TYPE = "taskType";
public final static String TASK_STATUS = "taskStatus";
public final static String SERVER = "server";
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
{
int retVal = 0;
for (AggregatorFactory agg : aggs) {
// This needs to change when we have support column types better
if (!agg.getTypeName().equals("float") && !agg.getTypeName().equals("long")) {
retVal++;
}
}
return retVal;
}
public static <T> ServiceMetricEvent.Builder makePartialQueryTimeMetric(Query<T> query)
{ {
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setDimension(DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setDimension(TYPE, query.getType())
.setUser5( .setDimension(
INTERVAL,
Lists.transform( Lists.transform(
query.getIntervals(), query.getIntervals(),
new Function<Interval, String>() new Function<Interval, String>()
@ -47,23 +75,24 @@ public class QueryMetricUtil
} }
).toArray(new String[query.getIntervals().size()]) ).toArray(new String[query.getIntervals().size()])
) )
.setUser6(String.valueOf(query.hasFilters())) .setDimension("hasFilters", String.valueOf(query.hasFilters()))
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()); .setDimension("duration", query.getDuration().toPeriod().toStandardMinutes().toString());
} }
public static <T> ServiceMetricEvent.Builder makeRequestTimeMetric( public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
final ObjectMapper jsonMapper, final Query<T> query, final String remoteAddr final ObjectMapper jsonMapper, final Query<T> query, final String remoteAddr
) throws JsonProcessingException ) throws JsonProcessingException
{ {
return makeQueryTimeMetric(query) return makePartialQueryTimeMetric(query)
.setUser3( .setDimension(
"context",
jsonMapper.writeValueAsString( jsonMapper.writeValueAsString(
query.getContext() == null query.getContext() == null
? ImmutableMap.of() ? ImmutableMap.of()
: query.getContext() : query.getContext()
) )
) )
.setUser7(remoteAddr) .setDimension("remoteAddress", remoteAddr)
.setUser8(query.getId()); .setDimension(ID, query.getId());
} }
} }

View File

@ -18,13 +18,13 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -47,8 +47,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
private final QueryWatcher queryWatcher; private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
public IntervalChunkingQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest, public IntervalChunkingQueryRunner(
ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter) QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest,
ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter
)
{ {
this.baseRunner = baseRunner; this.baseRunner = baseRunner;
this.toolChest = toolChest; this.toolChest = toolChest;
@ -65,56 +67,62 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
return baseRunner.run(query, responseContext); return baseRunner.run(query, responseContext);
} }
List<Interval> chunkIntervals = Lists.newArrayList(FunctionalIterable List<Interval> chunkIntervals = Lists.newArrayList(
.create(query.getIntervals()) FunctionalIterable
.transformCat( .create(query.getIntervals())
new Function<Interval, Iterable<Interval>>() .transformCat(
{ new Function<Interval, Iterable<Interval>>()
@Override {
public Iterable<Interval> apply(Interval input) @Override
{ public Iterable<Interval> apply(Interval input)
return splitInterval(input, chunkPeriod); {
} return splitInterval(input, chunkPeriod);
} }
)); }
)
);
if(chunkIntervals.size() <= 1) { if (chunkIntervals.size() <= 1) {
return baseRunner.run(query, responseContext); return baseRunner.run(query, responseContext);
} }
final QueryRunner<T> finalQueryRunner = new AsyncQueryRunner<T>( return Sequences.concat(
//Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is Lists.newArrayList(
//not lazy i.e. it does most of its work on call to run() method FunctionalIterable.create(chunkIntervals).transform(
toolChest.mergeResults( new Function<Interval, Sequence<T>>()
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{ {
@Override @Override
public ServiceMetricEvent.Builder apply(Query<T> input) public Sequence<T> apply(Interval singleInterval)
{ {
return toolChest.makeMetricBuilder(input); return new AsyncQueryRunner<T>(
} //Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is
}, //not lazy i.e. it does most of its work on call to run() method
baseRunner toolChest.mergeResults(
).withWaitMeasuredFromNow()), new MetricsEmittingQueryRunner<T>(
executor, queryWatcher); emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
return Sequences.concat( {
Lists.newArrayList(FunctionalIterable.create(chunkIntervals).transform( @Override
new Function<Interval, Sequence<T>>() public ServiceMetricEvent.Builder apply(Query<T> input)
{ {
@Override return toolChest.makeMetricBuilder(input);
public Sequence<T> apply(Interval singleInterval) }
{ },
return finalQueryRunner.run( baseRunner,
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), "query/intervalChunk/time",
responseContext ImmutableMap.of("chunkInterval", singleInterval.toString())
).withWaitMeasuredFromNow()
),
executor, queryWatcher
).run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))),
responseContext
); );
} }
} }
)) )
); )
);
} }
private Iterable<Interval> splitInterval(Interval interval, Period period) private Iterable<Interval> splitInterval(Interval interval, Period period)
@ -143,7 +151,8 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
return intervals; return intervals;
} }
private Period getChunkPeriod(Query<T> query) { private Period getChunkPeriod(Query<T> query)
{
String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D");
return Period.parse(p); return Period.parse(p);
} }

View File

@ -67,7 +67,7 @@ public class MetricsEmittingExecutorService extends ForwardingListeningExecutorS
private void emitMetrics() private void emitMetrics()
{ {
if (delegate instanceof PrioritizedExecutorService) { if (delegate instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) delegate).getQueueSize())); emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate).getQueueSize()));
} }
} }
} }

View File

@ -18,6 +18,8 @@
package io.druid.query; package io.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
@ -26,19 +28,21 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
*/ */
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T> public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{ {
private static final String DEFAULT_METRIC_NAME = "query/time"; private static final String DEFAULT_METRIC_NAME = "query/partial/time";
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn; private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner; private final QueryRunner<T> queryRunner;
private final long creationTime; private final long creationTime;
private final String metricName; private final String metricName;
private final Map<String, String> userDimensions;
public MetricsEmittingQueryRunner( public MetricsEmittingQueryRunner(
ServiceEmitter emitter, ServiceEmitter emitter,
@ -46,7 +50,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
QueryRunner<T> queryRunner QueryRunner<T> queryRunner
) )
{ {
this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME); this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME, Maps.<String, String>newHashMap());
} }
public MetricsEmittingQueryRunner( public MetricsEmittingQueryRunner(
@ -54,7 +58,8 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
Function<Query<T>, ServiceMetricEvent.Builder> builderFn, Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner, QueryRunner<T> queryRunner,
long creationTime, long creationTime,
String metricName String metricName,
Map<String, String> userDimensions
) )
{ {
this.emitter = emitter; this.emitter = emitter;
@ -62,33 +67,47 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
this.queryRunner = queryRunner; this.queryRunner = queryRunner;
this.creationTime = creationTime; this.creationTime = creationTime;
this.metricName = metricName; this.metricName = metricName;
this.userDimensions = userDimensions;
} }
public MetricsEmittingQueryRunner( public MetricsEmittingQueryRunner(
ServiceEmitter emitter, ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn, Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner, QueryRunner<T> queryRunner,
String metricName String metricName,
Map<String, String> userDimensions
) )
{ {
this(emitter, builderFn, queryRunner, -1, metricName); this(emitter, builderFn, queryRunner, -1, metricName, userDimensions);
} }
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow() public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{ {
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName); return new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
queryRunner,
System.currentTimeMillis(),
metricName,
userDimensions
);
} }
@Override @Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext) public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{ {
final ServiceMetricEvent.Builder builder = builderFn.apply(query); final ServiceMetricEvent.Builder builder = builderFn.apply(query);
for (Map.Entry<String, String> userDimension : userDimensions.entrySet()) {
builder.setDimension(userDimension.getKey(), userDimension.getValue());
}
String queryId = query.getId(); String queryId = query.getId();
if (queryId == null) { if (queryId == null) {
queryId = ""; queryId = "";
} }
builder.setUser8(queryId); builder.setDimension(DruidMetrics.ID, queryId);
return new Sequence<T>() return new Sequence<T>()
{ {
@ -102,11 +121,11 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator); retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
catch (Error e) { catch (Error e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
finally { finally {
@ -115,7 +134,7 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
emitter.emit(builder.build(metricName, timeTaken)); emitter.emit(builder.build(metricName, timeTaken));
if (creationTime > 0) { if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime)); emitter.emit(builder.build("query/wait/time", startTime - creationTime));
} }
} }
@ -132,11 +151,11 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator); retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
catch (Error e) { catch (Error e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
@ -164,11 +183,11 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
return makeYielder(startTime, yielder.next(initValue), builder); return makeYielder(startTime, yielder.next(initValue), builder);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
catch (Error e) { catch (Error e) {
builder.setUser10("failed"); builder.setDimension(DruidMetrics.STATUS, "failed");
throw e; throw e;
} }
} }
@ -183,15 +202,15 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
public void close() throws IOException public void close() throws IOException
{ {
try { try {
if (!isDone() && builder.getUser10() == null) { if (!isDone() && builder.getDimension(DruidMetrics.STATUS) == null) {
builder.setUser10("short"); builder.setDimension(DruidMetrics.STATUS, "short");
} }
long timeTaken = System.currentTimeMillis() - startTime; long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build(metricName, timeTaken)); emitter.emit(builder.build(metricName, timeTaken));
if (creationTime > 0) { if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime)); emitter.emit(builder.build("query/wait/time", startTime - creationTime));
} }
} }
finally { finally {

View File

@ -159,7 +159,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@Override @Override
public String getTypeName() public String getTypeName()
{ {
throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); return "histogram";
} }
@Override @Override

View File

@ -118,9 +118,8 @@ public class DataSourceQueryQueryToolChest
public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query)
{ {
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setDimension("type", query.getType());
.setUser6("false");
} }
@Override @Override

View File

@ -21,13 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -49,7 +47,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryDataSource; import io.druid.query.QueryDataSource;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner; import io.druid.query.SubqueryQueryRunner;
@ -62,12 +60,10 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
@ -123,7 +119,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return mergeGroupByResults( return mergeGroupByResults(
(GroupByQuery) input, (GroupByQuery) input,
runner, runner,
responseContext); responseContext
);
} }
return runner.run(input, responseContext); return runner.run(input, responseContext);
} }
@ -258,9 +255,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query) return DruidMetrics.makePartialQueryTimeMetric(query)
.setUser3(String.format("%,d dims", query.getDimensions().size())) .setDimension("numDimensions", String.valueOf(query.getDimensions().size()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())); .setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()))
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
} }
@Override @Override

View File

@ -33,7 +33,7 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner; import io.druid.query.ResultMergeQueryRunner;
@ -144,7 +144,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query); return DruidMetrics.makePartialQueryTimeMetric(query);
} }
@Override @Override

View File

@ -38,7 +38,7 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -122,7 +122,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query); return DruidMetrics.makePartialQueryTimeMetric(query);
} }
@Override @Override

View File

@ -34,7 +34,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -121,7 +121,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query); return DruidMetrics.makePartialQueryTimeMetric(query);
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import io.druid.collections.OrderedMergeSequence;
import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil; import io.druid.query.DataSourceUtil;
import io.druid.query.DruidMetrics;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
@ -122,9 +123,8 @@ public class TimeBoundaryQueryQueryToolChest
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
{ {
return new ServiceMetricEvent.Builder() return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource())) .setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType()) .setDimension(DruidMetrics.TYPE, query.getType());
.setUser6("false");
} }
@Override @Override

View File

@ -17,32 +17,6 @@
package io.druid.query.timeseries; package io.druid.query.timeseries;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -53,6 +27,29 @@ import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -120,8 +117,15 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query) return DruidMetrics.makePartialQueryTimeMetric(query)
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())); .setDimension(
"numMetrics",
String.valueOf(query.getAggregatorSpecs().size())
)
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
} }
@Override @Override

View File

@ -38,7 +38,7 @@ import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryCacheHelper; import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.Result; import io.druid.query.Result;
@ -82,7 +82,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
} }
protected static String[] extractFactoryName(final List<AggregatorFactory> aggregatorFactories){ protected static String[] extractFactoryName(final List<AggregatorFactory> aggregatorFactories)
{
return Lists.transform( return Lists.transform(
aggregatorFactories, new Function<AggregatorFactory, String>() aggregatorFactories, new Function<AggregatorFactory, String>()
{ {
@ -153,15 +154,20 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query) public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query)
{ {
return QueryMetricUtil.makeQueryTimeMetric(query) return DruidMetrics.makePartialQueryTimeMetric(query)
.setUser4( .setDimension(
String.format( "threshold",
"topN/%s/%s", String.valueOf(query.getThreshold())
query.getThreshold(), )
query.getDimensionSpec().getDimension() .setDimension("dimension", query.getDimensionSpec().getDimension())
) .setDimension(
) "numMetrics",
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size())); String.valueOf(query.getAggregatorSpecs().size())
)
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
} }
@Override @Override
@ -254,7 +260,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
+ 1 + 1
); );
for( int i = 0; i < aggFactoryNames.length; ++i){ for (int i = 0; i < aggFactoryNames.length; ++i) {
final String name = aggFactoryNames[i]; final String name = aggFactoryNames[i];
values.put(name, input.getMetric(name)); values.put(name, input.getMetric(name));
} }
@ -267,7 +273,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(postAgg.getName(), postAgg.compute(values)); values.put(postAgg.getName(), postAgg.compute(values));
} }
} }
for( int i = 0; i < aggFactoryNames.length; ++i){ for (int i = 0; i < aggFactoryNames.length; ++i) {
final String name = aggFactoryNames[i]; final String name = aggFactoryNames[i];
values.put(name, fn.manipulate(aggregatorFactories[i], input.getMetric(name))); values.put(name, fn.manipulate(aggregatorFactories[i], input.getMetric(name)));
} }

View File

@ -43,6 +43,8 @@ import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.QueryableDruidServer;
@ -51,6 +53,7 @@ import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.BySegmentResultValueClass; import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
@ -66,6 +69,7 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -90,6 +94,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig; private final CacheConfig cacheConfig;
private final ListeningExecutorService backgroundExecutorService; private final ListeningExecutorService backgroundExecutorService;
private final ServiceEmitter emitter;
@Inject @Inject
public CachingClusteredClient( public CachingClusteredClient(
@ -98,7 +103,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Cache cache, Cache cache,
@Smile ObjectMapper objectMapper, @Smile ObjectMapper objectMapper,
@BackgroundCaching ExecutorService backgroundExecutorService, @BackgroundCaching ExecutorService backgroundExecutorService,
CacheConfig cacheConfig CacheConfig cacheConfig,
ServiceEmitter emitter
) )
{ {
this.warehouse = warehouse; this.warehouse = warehouse;
@ -107,6 +113,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.cacheConfig = cacheConfig; this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);
this.emitter = emitter;
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
Executors.newFixedThreadPool( Executors.newFixedThreadPool(
@ -325,7 +332,21 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final DruidServer server = entry.getKey(); final DruidServer server = entry.getKey();
final List<SegmentDescriptor> descriptors = entry.getValue(); final List<SegmentDescriptor> descriptors = entry.getValue();
final QueryRunner clientQueryable = serverView.getQueryRunner(server); final QueryRunner clientQueryable = new MetricsEmittingQueryRunner(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
serverView.getQueryRunner(server),
"query/node/time",
ImmutableMap.of("server",server.getName())
);
if (clientQueryable == null) { if (clientQueryable == null) {
log.error("WTF!? server[%s] doesn't have a client Queryable?", server); log.error("WTF!? server[%s] doesn't have a client Queryable?", server);
continue; continue;

View File

@ -45,8 +45,8 @@ public class CacheMonitor extends AbstractMonitor
final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats); final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
emitStats(emitter, "cache/delta", deltaCacheStats, builder); emitStats(emitter, "query/cache/delta", deltaCacheStats, builder);
emitStats(emitter, "cache/total", currCacheStats, builder); emitStats(emitter, "query/cache/total", currCacheStats, builder);
prevCacheStats = currCacheStats; prevCacheStats = currCacheStats;
return true; return true;

View File

@ -23,6 +23,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.AbstractMonitor;
import io.druid.query.DruidMetrics;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -55,30 +56,30 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
} }
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setUser2(fireDepartment.getDataSchema().getDataSource()); .setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource());
final long thrownAway = metrics.thrownAway() - previous.thrownAway(); final long thrownAway = metrics.thrownAway() - previous.thrownAway();
if (thrownAway > 0) { if (thrownAway > 0) {
log.warn("[%,d] events thrown away because they are outside the window period!", thrownAway); log.warn("[%,d] events thrown away because they are outside the window period!", thrownAway);
} }
emitter.emit(builder.build("events/thrownAway", thrownAway)); emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
final long unparseable = metrics.unparseable() - previous.unparseable(); final long unparseable = metrics.unparseable() - previous.unparseable();
if (unparseable > 0) { if (unparseable > 0) {
log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable); log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable);
} }
emitter.emit(builder.build("events/unparseable", unparseable)); emitter.emit(builder.build("ingest/events/unparseable", unparseable));
emitter.emit(builder.build("events/processed", metrics.processed() - previous.processed())); emitter.emit(builder.build("ingest/events/processed", metrics.processed() - previous.processed()));
emitter.emit(builder.build("rows/output", metrics.rowOutput() - previous.rowOutput())); emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("persists/num", metrics.numPersists() - previous.numPersists())); emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists()));
emitter.emit(builder.build("persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis()));
emitter.emit( emitter.emit(
builder.build( builder.build(
"persists/backPressure", "ingest/persists/backPressure",
metrics.persistBackPressureMillis() - previous.persistBackPressureMillis() metrics.persistBackPressureMillis() - previous.persistBackPressureMillis()
) )
); );
emitter.emit(builder.build("failed/persists", metrics.failedPersists() - previous.failedPersists())); emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists()));
emitter.emit(builder.build("failed/handoff", metrics.failedHandoffs() - previous.failedHandoffs())); emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs()));
previousValues.put(fireDepartment, metrics); previousValues.put(fireDepartment, metrics);
} }

View File

@ -29,12 +29,11 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig; import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder; import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router; import io.druid.server.router.Router;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
@ -261,8 +260,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
final long requestTime = System.currentTimeMillis() - start; final long requestTime = System.currentTimeMillis() - start;
try { try {
emitter.emit( emitter.emit(
QueryMetricUtil.makeRequestTimeMetric(jsonMapper, query, req.getRemoteAddr()) DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr())
.build("request/time", requestTime) .build("query/time", requestTime)
); );
requestLogger.log( requestLogger.log(
@ -272,7 +271,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
query, query,
new QueryStats( new QueryStats(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"request/time", "query/time",
requestTime, requestTime,
"success", "success",
true true

View File

@ -34,7 +34,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryInterruptedException; import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetricUtil; import io.druid.query.DruidMetrics;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.server.initialization.ServerConfig; import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger; import io.druid.server.log.RequestLogger;
@ -184,10 +184,10 @@ public class QueryResource
jsonWriter.writeValue(outputStream, yielder); jsonWriter.writeValue(outputStream, yielder);
outputStream.close(); outputStream.close();
final long requestTime = System.currentTimeMillis() - start; final long queryTime = System.currentTimeMillis() - start;
emitter.emit( emitter.emit(
QueryMetricUtil.makeRequestTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
.build("request/time", requestTime) .build("query/time", queryTime)
); );
requestLogger.log( requestLogger.log(
@ -197,7 +197,7 @@ public class QueryResource
theQuery, theQuery,
new QueryStats( new QueryStats(
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"request/time", requestTime, "query/time", queryTime,
"success", true "success", true
) )
) )

View File

@ -92,12 +92,9 @@ public class SQLAuditManager implements AuditManager
{ {
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(auditEntry.getKey()) .setDimension("key", auditEntry.getKey())
.setUser2(auditEntry.getType()) .setDimension("type", auditEntry.getType())
.setUser3(auditEntry.getAuditInfo().getAuthor()) .setDimension("author", auditEntry.getAuditInfo().getAuthor())
.setUser5(jsonMapper.writeValueAsString(auditEntry.getPayload()))
.setUser6(auditEntry.getAuditInfo().getComment())
.setUser7(auditEntry.getAuditTime().toString())
.build("config/audit", 1) .build("config/audit", 1)
); );

View File

@ -19,6 +19,7 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -324,7 +325,7 @@ public class ServerManager implements QuerySegmentWalker
private TimelineLookup<String, ReferenceCountingSegment> getTimelineLookup(DataSource dataSource) private TimelineLookup<String, ReferenceCountingSegment> getTimelineLookup(DataSource dataSource)
{ {
final List<String> names = dataSource.getNames(); final List<String> names = dataSource.getNames();
if(names.size() == 1){ if (names.size() == 1) {
return dataSources.get(names.get(0)); return dataSources.get(names.get(0));
} else { } else {
return new UnionTimeLineLookup<>( return new UnionTimeLineLookup<>(
@ -438,12 +439,15 @@ public class ServerManager implements QuerySegmentWalker
} }
}, },
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter), new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
"scan/time" "query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
), ),
cachingExec, cachingExec,
cacheConfig cacheConfig
) )
) ),
"query/segmentAndCache/time",
ImmutableMap.of("segment", adapter.getIdentifier())
).withWaitMeasuredFromNow(), ).withWaitMeasuredFromNow(),
segmentSpec segmentSpec
); );

View File

@ -25,6 +25,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource; import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidServer; import io.druid.client.ImmutableDruidServer;
import io.druid.collections.CountingMap; import io.druid.collections.CountingMap;
import io.druid.query.DruidMetrics;
import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -44,7 +45,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
private <T extends Number> void emitTieredStats( private <T extends Number> void emitTieredStats(
final ServiceEmitter emitter, final ServiceEmitter emitter,
final String formatString, final String metricName,
final Map<String, T> statMap final Map<String, T> statMap
) )
{ {
@ -53,9 +54,11 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
String tier = entry.getKey(); String tier = entry.getKey();
Number value = entry.getValue(); Number value = entry.getValue();
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder().build( new ServiceMetricEvent.Builder()
String.format(formatString, tier), value.doubleValue() .setDimension("tier", tier)
) .build(
metricName, value.doubleValue()
)
); );
} }
} }
@ -78,6 +81,11 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
} }
} }
emitTieredStats(
emitter, "segment/assigned/count",
assigned
);
Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount"); Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount");
if (dropped != null) { if (dropped != null) {
for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) { for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) {
@ -89,29 +97,34 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
} }
emitTieredStats( emitTieredStats(
emitter, "coordinator/%s/cost/raw", emitter, "segment/dropped/count",
dropped
);
emitTieredStats(
emitter, "segment/cost/raw",
stats.getPerTierStats().get("initialCost") stats.getPerTierStats().get("initialCost")
); );
emitTieredStats( emitTieredStats(
emitter, "coordinator/%s/cost/normalization", emitter, "segment/cost/normalization",
stats.getPerTierStats().get("normalization") stats.getPerTierStats().get("normalization")
); );
emitTieredStats( emitTieredStats(
emitter, "coordinator/%s/moved/count", emitter, "segment/moved/count",
stats.getPerTierStats().get("movedCount") stats.getPerTierStats().get("movedCount")
); );
emitTieredStats( emitTieredStats(
emitter, "coordinator/%s/deleted/count", emitter, "segment/deleted/count",
stats.getPerTierStats().get("deletedCount") stats.getPerTierStats().get("deletedCount")
); );
Map<String, AtomicLong> normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); Map<String, AtomicLong> normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand");
if (normalized != null) { if (normalized != null) {
emitTieredStats( emitTieredStats(
emitter, "coordinator/%s/cost/normalized", emitter, "segment/cost/normalized",
Maps.transformEntries( Maps.transformEntries(
normalized, normalized,
new Maps.EntryTransformer<String, AtomicLong, Number>() new Maps.EntryTransformer<String, AtomicLong, Number>()
@ -136,9 +149,14 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
} }
} }
emitTieredStats(
emitter, "segment/unneeded/count",
stats.getPerTierStats().get("unneededCount")
);
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder().build( new ServiceMetricEvent.Builder().build(
"coordinator/overShadowed/count", stats.getGlobalStats().get("overShadowedCount") "segment/overShadowed/count", stats.getGlobalStats().get("overShadowedCount")
) )
); );
@ -184,26 +202,26 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
LoadQueuePeon queuePeon = entry.getValue(); LoadQueuePeon queuePeon = entry.getValue();
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(serverName).build( .setDimension(DruidMetrics.SERVER, serverName).build(
"coordinator/loadQueue/size", queuePeon.getLoadQueueSize() "segment/loadQueue/size", queuePeon.getLoadQueueSize()
) )
); );
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(serverName).build( .setDimension(DruidMetrics.SERVER, serverName).build(
"coordinator/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() "segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
) )
); );
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(serverName).build( .setDimension(DruidMetrics.SERVER, serverName).build(
"coordinator/loadQueue/count", queuePeon.getSegmentsToLoad().size() "segment/loadQueue/count", queuePeon.getSegmentsToLoad().size()
) )
); );
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(serverName).build( .setDimension(DruidMetrics.SERVER, serverName).build(
"coordinator/dropQueue/count", queuePeon.getSegmentsToDrop().size() "segment/dropQueue/count", queuePeon.getSegmentsToDrop().size()
) )
); );
} }
@ -222,8 +240,8 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
Long size = entry.getValue(); Long size = entry.getValue();
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(dataSource).build( .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"coordinator/segment/size", size "segment/size", size
) )
); );
} }
@ -232,8 +250,8 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
Long count = entry.getValue(); Long count = entry.getValue();
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(dataSource).build( .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"coordinator/segment/count", count "segment/count", count
) )
); );
} }

View File

@ -22,17 +22,18 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.AbstractMonitor;
import io.druid.client.DruidServerConfig; import io.druid.client.DruidServerConfig;
import io.druid.query.DruidMetrics;
import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ServerManager;
import java.util.Map; import java.util.Map;
public class ServerMonitor extends AbstractMonitor public class HistoricalMetricsMonitor extends AbstractMonitor
{ {
private final DruidServerConfig serverConfig; private final DruidServerConfig serverConfig;
private final ServerManager serverManager; private final ServerManager serverManager;
@Inject @Inject
public ServerMonitor( public HistoricalMetricsMonitor(
DruidServerConfig serverConfig, DruidServerConfig serverConfig,
ServerManager serverManager ServerManager serverManager
) )
@ -44,39 +45,37 @@ public class ServerMonitor extends AbstractMonitor
@Override @Override
public boolean doMonitor(ServiceEmitter emitter) public boolean doMonitor(ServiceEmitter emitter)
{ {
emitter.emit(new ServiceMetricEvent.Builder().build("server/segment/max", serverConfig.getMaxSize())); emitter.emit(new ServiceMetricEvent.Builder().build("segment/max", serverConfig.getMaxSize()));
long totalUsed = 0;
long totalCount = 0;
for (Map.Entry<String, Long> entry : serverManager.getDataSourceSizes().entrySet()) { for (Map.Entry<String, Long> entry : serverManager.getDataSourceSizes().entrySet()) {
String dataSource = entry.getKey(); String dataSource = entry.getKey();
long used = entry.getValue(); long used = entry.getValue();
totalUsed += used;
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser1(dataSource) final ServiceMetricEvent.Builder builder =
.setUser2(serverConfig.getTier()); new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension("tier", serverConfig.getTier())
.setDimension("priority", String.valueOf(serverConfig.getPriority()));
emitter.emit(builder.build("server/segment/used", used));
emitter.emit(builder.build("segment/used", used));
final double usedPercent = serverConfig.getMaxSize() == 0 ? 0 : used / (double) serverConfig.getMaxSize(); final double usedPercent = serverConfig.getMaxSize() == 0 ? 0 : used / (double) serverConfig.getMaxSize();
emitter.emit(builder.build("server/segment/usedPercent", usedPercent)); emitter.emit(builder.build("segment/usedPercent", usedPercent));
} }
for (Map.Entry<String, Long> entry : serverManager.getDataSourceCounts().entrySet()) { for (Map.Entry<String, Long> entry : serverManager.getDataSourceCounts().entrySet()) {
String dataSource = entry.getKey(); String dataSource = entry.getKey();
long count = entry.getValue(); long count = entry.getValue();
totalCount += count; final ServiceMetricEvent.Builder builder =
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser1(dataSource) new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource)
.setUser2(serverConfig.getTier()); .setDimension("tier", serverConfig.getTier())
.setDimension(
"priority",
String.valueOf(serverConfig.getPriority())
);
emitter.emit(builder.build("server/segment/count", count)); emitter.emit(builder.build("segment/count", count));
} }
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser2(serverConfig.getTier());
emitter.emit(builder.build("server/segment/totalUsed", totalUsed));
final double totalUsedPercent = serverConfig.getMaxSize() == 0 ? 0 : totalUsed / (double) serverConfig.getMaxSize();
emitter.emit(builder.build("server/segment/totalUsedPercent", totalUsedPercent));
emitter.emit(builder.build("server/segment/totalCount", totalCount));
return true; return true;
} }
} }

View File

@ -17,6 +17,7 @@
package io.druid.client; package io.druid.client;
import com.amazonaws.services.support.model.Service;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
@ -48,6 +49,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.TrinaryFn; import com.metamx.common.guava.nary.TrinaryFn;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache; import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache; import io.druid.client.cache.MapCache;
@ -215,6 +217,7 @@ public class CachingClusteredClientTest
protected VersionedIntervalTimeline<String, ServerSelector> timeline; protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView; protected TimelineServerView serverView;
protected Cache cache; protected Cache cache;
protected ServiceEmitter emitter;
DruidServer[] servers; DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed) public CachingClusteredClientTest(int randomSeed)
@ -244,6 +247,7 @@ public class CachingClusteredClientTest
timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural()); timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
serverView = EasyMock.createStrictMock(TimelineServerView.class); serverView = EasyMock.createStrictMock(TimelineServerView.class);
cache = MapCache.create(100000); cache = MapCache.create(100000);
emitter = EasyMock.createStrictMock(ServiceEmitter.class);
client = makeClient(MoreExecutors.sameThreadExecutor()); client = makeClient(MoreExecutors.sameThreadExecutor());
servers = new DruidServer[]{ servers = new DruidServer[]{
@ -2093,7 +2097,8 @@ public class CachingClusteredClientTest
{ {
return true; return true;
} }
} },
emitter
); );
} }