diff --git a/docs/content/Metrics.md b/docs/content/Metrics.md index 4f35587eb23..6d9f4326002 100644 --- a/docs/content/Metrics.md +++ b/docs/content/Metrics.md @@ -3,11 +3,168 @@ layout: doc_page --- # Druid Metrics -Druid emits a variety of query, ingestion, and coordination metrics. +Druid generates metrics related to queries, ingestion, and coordination. -Metrics can be displayed in the runtime log file or over http (to a service such as Apache Kafka). Metric emission is disabled by default. +Metrics are emitted as JSON objects to a runtime log file or over HTTP (to a service such as Apache Kafka). Metric emission is disabled by default. + +All Druid metrics share a common set of fields: + +* `timestamp` - the time the metric was created +* `metric` - the name of the metric +* `service` - the service name that emitted the metric +* `host` - the host name that emitted the metric +* `value` - some numeric value associated with the metric + +Metrics may have additional dimensions beyond those listed above. + +Most metric values reset each emission period. Available Metrics ----------------- -Please refer to the following [spreadsheet](https://docs.google.com/spreadsheets/d/15XxGrGv2ggdt4SnCoIsckqUNJBZ9ludyhC9hNQa3l-M/edit#gid=0) for available Druid metrics. In the near future, we will be reworking Druid metrics and fully listing out important metrics to monitor and their expected values. +## Query Metrics + +### Broker + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s| +|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s| +|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s| + +### Historical + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s| +|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds| +|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds| +|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0| +|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the historical node).|id, segment.|several hundred milliseconds| + +### Real-time + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s| +|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|several hundred milliseconds| +|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0| + +### Cache + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`query/cache/delta/*`|Cache metrics since the last emission.||N/A| +|`query/cache/total/*`|Total cache metrics.||N/A| + + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`*/numEntries`|Number of cache entries.||Varies.| +|`*/sizeBytes`|Size in bytes of cache entries.||Varies.| +|`*/hits`|Number of cache hits.||Varies.| +|`*/misses`|Number of cache misses.||Varies.| +|`*/evictions`|Number of cache evictions.||Varies.| +|`*/hitRate`|Cache hit rate.||~40%| +|`*/averageByte`|Average cache entry byte size.||Varies.| +|`*/timeouts`|Number of cache timeouts.||0| +|`*/errors`|Number of cache errors.||0| + +## Ingestion Metrics + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource.|0| +|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource.|0| +|`ingest/events/processed`|Number of events successfully processed.|dataSource.|Equal to your # of events.| +|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.| +|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.| +|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|dataSource.|Depends on configuration.|Generally a few minutes at most.| +|`ingest/persists/backPressure`|Number of persists pending.|dataSource.|0| +|`ingest/persists/failed`|Number of persists that failed.|dataSource.|0| +|`ingest/handoff/failed`|Number of handoffs that failed.|dataSource.|0| + +### Indexing Service + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`task/run/time`|Milliseconds taken to run task.|dataSource, taskType, taskStatus.|Varies.| +|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskType, interval.|Varies.| +|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskType, interval.|Varies.| +|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskType, interval.|Varies.| + +## Coordination + +These metrics are for the Druid coordinator and are reset each time the coordinator runs the coordination logic. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`segment/added/count`|Number of segments added to the cluster.|tier.|Varies.| +|`segment/moved/count`|Number of segments moved in the cluster.|tier.|Varies.| +|`segment/dropped/count`|Number of segments dropped due to being overshadowed.|tier.|Varies.| +|`segment/deleted/count`|Number of segments dropped due to rules.|tier.|Varies.| +|`segment/unneeded/count`|Number of segments dropped due to being marked as unused.|tier.|Varies.| +|`segment/cost/raw`|Used in cost balancing. The raw cost of hosting segments.|tier.|Varies.| +|`segment/cost/normalization`|Used in cost balancing. The normalization of hosting segments.|tier.|Varies.| +|`segment/cost/normalized`|Used in cost balancing. The normalized cost of hosting segments.|tier.|Varies.| +|`segment/loadQueue/size`|Size in bytes of segments to load.|server.|Varies.| +|`segment/loadQueue/failed`|Number of segments that failed to load.|server.|0| +|`segment/loadQueue/count`|Number of segments to load.|server.|Varies.| +|`segment/dropQueue/count`|Number of segments to drop.|server.|Varies.| +|`segment/size`|Size in bytes of available segments.|dataSource.|Varies.| +|`segment/count`|Number of available segments.|dataSource.|< max| +|`segment/overShadowed/count`|Number of overShadowed segments.||Varies.| + +## General Health + +### Historical + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`segment/max`|Maximum byte limit available for segments.||Varies.| +|`segment/used`|Bytes used for served segments.|dataSource, tier, priority.|< max| +|`segment/usedPercent`|Percentage of space used by served segments.|dataSource, tier, priority.|< 100%| +|`segment/count`|Number of served segments.|dataSource, tier, priority.|Varies.| + +### JVM + +These metrics are only available if the JVMMonitor module is included. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`jvm/pool/committed`|Committed pool.|poolKind, poolName.|close to max pool| +|`jvm/pool/init`|Initial pool.|poolKind, poolName.|Varies.| +|`jvm/pool/max`|Max pool.|poolKind, poolName.|Varies.| +|`jvm/pool/used`|Pool used.|poolKind, poolName.|< max pool| +|`jvm/bufferpool/count`|bufferPoolName.|Bufferpool count.|Varies.| +|`jvm/bufferpool/used`|bufferPoolName.|Bufferpool used.|close to capacity| +|`jvm/bufferpool/capacity`|bufferPoolName.|Bufferpool capacity.|Varies.| +|`jvm/mem/init`|Initial memory.|memKind.|Varies.| +|`jvm/mem/max`|Max memory.|memKind.|Varies.| +|`jvm/mem/used`|Used memory.|memKind.|< max memory| +|`jvm/mem/committed`|Committed memory.|memKind.|close to max memory| +|`jvm/gc/count`|gcName.|Garbage collection count.|< 100| +|`jvm/gc/time`|gcName.|Garbage collection time.|< 1s| + +## Sys + +These metrics are only available if the SysMonitor module is included. + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`sys/swap/free`|Free swap.||Varies.| +|`sys/swap/max`|Max swap.||Varies.| +|`sys/swap/pageIn`|Paged in swap.||Varies.| +|`sys/swap/pageOut`|Paged out swap.||Varies.| +|`sys/disk/write/count`|Writes to disk.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.| +|`sys/disk/read/count`|Reads from disk.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.| +|`sys/disk/write/size`|Bytes written to disk. Can we used to determine how much paging is occuring with regards to segments.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.| +|`sys/disk/read/size`|Bytes read from disk. Can we used to determine how much paging is occuring with regards to segments.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.| +|`sys/net/write/size`|Bytes written to the network.|netName, netAddress, netHwaddr|Varies.| +|`sys/net/read/size`|Bytes read from the network.|netName, netAddress, netHwaddr|Varies.| +|`sys/fs/used`|Filesystem bytes used.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|< max| +|`sys/fs/max`|Filesystesm bytes max.|fsDevName, fsDirName, fsTypeName, fsSysTypeName, fsOptions.|Varies.| +|`sys/mem/used`|Memory used.||< max| +|`sys/mem/max`|Memory max.||Varies.| +|`sys/storage/used`|Disk space used.|fsDirName.|Varies.| +|`sys/cpu`|CPU used.|cpuName, cpuTime.|Varies.| diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java index 5f1e94eb7d6..3a9e73746ea 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; +import io.druid.query.DruidMetrics; import io.druid.timeline.DataSegment; import java.io.IOException; @@ -83,12 +84,12 @@ public class SegmentInsertAction implements TaskAction> // Emit metrics final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() - .setUser2(task.getDataSource()) - .setUser4(task.getType()); + .setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) + .setDimension(DruidMetrics.TASK_TYPE, task.getType()); for (DataSegment segment : segments) { - metricBuilder.setUser5(segment.getInterval().toString()); - toolbox.getEmitter().emit(metricBuilder.build("indexer/segment/bytes", segment.getSize())); + metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize())); } return retVal; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java index 0b8ca6c81e3..13f0323f87e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; +import io.druid.query.DruidMetrics; import io.druid.timeline.DataSegment; import java.io.IOException; @@ -63,12 +64,12 @@ public class SegmentMetadataUpdateAction implements TaskAction // Emit metrics final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() - .setUser2(task.getDataSource()) - .setUser4(task.getType()); + .setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) + .setDimension(DruidMetrics.TASK_TYPE, task.getType()); for (DataSegment segment : segments) { - metricBuilder.setUser5(segment.getInterval().toString()); - toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentMoved/bytes", segment.getSize())); + metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("segment/moved/bytes", segment.getSize())); } return null; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java index dba95fc1f56..242371c58a2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.task.Task; +import io.druid.query.DruidMetrics; import io.druid.timeline.DataSegment; import java.io.IOException; @@ -50,7 +51,9 @@ public class SegmentNukeAction implements TaskAction public TypeReference getReturnTypeReference() { - return new TypeReference() {}; + return new TypeReference() + { + }; } @Override @@ -61,12 +64,12 @@ public class SegmentNukeAction implements TaskAction // Emit metrics final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() - .setUser2(task.getDataSource()) - .setUser4(task.getType()); + .setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) + .setDimension(DruidMetrics.TASK_TYPE, task.getType()); for (DataSegment segment : segments) { - metricBuilder.setUser5(segment.getInterval().toString()); - toolbox.getEmitter().emit(metricBuilder.build("indexer/segmentNuked/bytes", segment.getSize())); + metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); + toolbox.getEmitter().emit(metricBuilder.build("segment/nuked/bytes", segment.getSize())); } return null; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 1ebd764d138..61b60644cfe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -42,6 +42,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.metadata.EntryExistsException; +import io.druid.query.DruidMetrics; import java.util.List; import java.util.Map; @@ -55,11 +56,11 @@ import java.util.concurrent.locks.ReentrantLock; /** * Interface between task producers and the task runner. - * + *

* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a * {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready * in time (based on its {@link Task#isReady} method). - * + *

* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object. */ public class TaskQueue @@ -270,7 +271,8 @@ public class TaskQueue for (final String taskId : tasksToKill) { try { taskRunner.shutdown(taskId); - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "TaskRunner failed to clean up task: %s", taskId); } } @@ -291,6 +293,7 @@ public class TaskQueue * @param task task to add * * @return true + * * @throws io.druid.metadata.EntryExistsException if the task already exists */ public boolean add(final Task task) throws EntryExistsException @@ -316,6 +319,7 @@ public class TaskQueue /** * Shuts down a task if it has not yet finished. + * * @param taskId task to kill */ public void shutdown(final String taskId) @@ -330,7 +334,8 @@ public class TaskQueue break; } } - } finally { + } + finally { giant.unlock(); } } @@ -364,14 +369,15 @@ public class TaskQueue // Inform taskRunner that this task can be shut down try { taskRunner.shutdown(task.getId()); - } catch (Exception e) { + } + catch (Exception e) { log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); } // Remove from running tasks int removed = 0; - for (int i = tasks.size() - 1 ; i >= 0 ; i--) { + for (int i = tasks.size() - 1; i >= 0; i--) { if (tasks.get(i).getId().equals(task.getId())) { - removed ++; + removed++; tasks.remove(i); break; } @@ -420,8 +426,9 @@ public class TaskQueue private ListenableFuture attachCallbacks(final Task task, final ListenableFuture statusFuture) { final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() - .setUser2(task.getDataSource()) - .setUser4(task.getType()); + .setDimension("dataSource", task.getDataSource()) + .setDimension("taskType", task.getType()); + Futures.addCallback( statusFuture, new FutureCallback() @@ -458,8 +465,8 @@ public class TaskQueue // Emit event and log, if the task is done if (status.isComplete()) { - metricBuilder.setUser3(status.getStatusCode().toString()); - emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration())); + metricBuilder.setDimension(DruidMetrics.TASK_STATUS, status.getStatusCode().toString()); + emitter.emit(metricBuilder.build("task/run/time", status.getDuration())); log.info( "Task %s: %s (%d run duration)", diff --git a/processing/src/main/java/io/druid/query/QueryMetricUtil.java b/processing/src/main/java/io/druid/query/DruidMetrics.java similarity index 55% rename from processing/src/main/java/io/druid/query/QueryMetricUtil.java rename to processing/src/main/java/io/druid/query/DruidMetrics.java index f268c8b81b5..4687b054a7a 100644 --- a/processing/src/main/java/io/druid/query/QueryMetricUtil.java +++ b/processing/src/main/java/io/druid/query/DruidMetrics.java @@ -23,18 +23,46 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.query.aggregation.AggregatorFactory; import org.joda.time.Interval; +import java.util.List; + /** */ -public class QueryMetricUtil +public class DruidMetrics { - public static ServiceMetricEvent.Builder makeQueryTimeMetric(Query query) + public final static String DATASOURCE = "dataSource"; + public final static String TYPE = "type"; + public final static String INTERVAL = "interval"; + public final static String ID = "id"; + public final static String STATUS = "status"; + + // task metrics + public final static String TASK_TYPE = "taskType"; + public final static String TASK_STATUS = "taskStatus"; + + public final static String SERVER = "server"; + + public static int findNumComplexAggs(List aggs) + { + int retVal = 0; + for (AggregatorFactory agg : aggs) { + // This needs to change when we have support column types better + if (!agg.getTypeName().equals("float") && !agg.getTypeName().equals("long")) { + retVal++; + } + } + return retVal; + } + + public static ServiceMetricEvent.Builder makePartialQueryTimeMetric(Query query) { return new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser4(query.getType()) - .setUser5( + .setDimension(DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())) + .setDimension(TYPE, query.getType()) + .setDimension( + INTERVAL, Lists.transform( query.getIntervals(), new Function() @@ -47,23 +75,24 @@ public class QueryMetricUtil } ).toArray(new String[query.getIntervals().size()]) ) - .setUser6(String.valueOf(query.hasFilters())) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()); + .setDimension("hasFilters", String.valueOf(query.hasFilters())) + .setDimension("duration", query.getDuration().toPeriod().toStandardMinutes().toString()); } - public static ServiceMetricEvent.Builder makeRequestTimeMetric( + public static ServiceMetricEvent.Builder makeQueryTimeMetric( final ObjectMapper jsonMapper, final Query query, final String remoteAddr ) throws JsonProcessingException { - return makeQueryTimeMetric(query) - .setUser3( + return makePartialQueryTimeMetric(query) + .setDimension( + "context", jsonMapper.writeValueAsString( query.getContext() == null ? ImmutableMap.of() : query.getContext() ) ) - .setUser7(remoteAddr) - .setUser8(query.getId()); + .setDimension("remoteAddress", remoteAddr) + .setDimension(ID, query.getId()); } } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index 243c73efc20..351d0fea9b1 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -18,13 +18,13 @@ package io.druid.query; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; - import io.druid.granularity.PeriodGranularity; import io.druid.query.spec.MultipleIntervalSegmentSpec; import org.joda.time.Interval; @@ -47,8 +47,10 @@ public class IntervalChunkingQueryRunner implements QueryRunner private final QueryWatcher queryWatcher; private final ServiceEmitter emitter; - public IntervalChunkingQueryRunner(QueryRunner baseRunner, QueryToolChest> toolChest, - ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter) + public IntervalChunkingQueryRunner( + QueryRunner baseRunner, QueryToolChest> toolChest, + ExecutorService executor, QueryWatcher queryWatcher, ServiceEmitter emitter + ) { this.baseRunner = baseRunner; this.toolChest = toolChest; @@ -65,56 +67,62 @@ public class IntervalChunkingQueryRunner implements QueryRunner return baseRunner.run(query, responseContext); } - List chunkIntervals = Lists.newArrayList(FunctionalIterable - .create(query.getIntervals()) - .transformCat( - new Function>() - { - @Override - public Iterable apply(Interval input) - { - return splitInterval(input, chunkPeriod); - } - } - )); + List chunkIntervals = Lists.newArrayList( + FunctionalIterable + .create(query.getIntervals()) + .transformCat( + new Function>() + { + @Override + public Iterable apply(Interval input) + { + return splitInterval(input, chunkPeriod); + } + } + ) + ); - if(chunkIntervals.size() <= 1) { + if (chunkIntervals.size() <= 1) { return baseRunner.run(query, responseContext); } - final QueryRunner finalQueryRunner = new AsyncQueryRunner( - //Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is - //not lazy i.e. it does most of its work on call to run() method - toolChest.mergeResults( - new MetricsEmittingQueryRunner( - emitter, - new Function, ServiceMetricEvent.Builder>() + return Sequences.concat( + Lists.newArrayList( + FunctionalIterable.create(chunkIntervals).transform( + new Function>() { @Override - public ServiceMetricEvent.Builder apply(Query input) + public Sequence apply(Interval singleInterval) { - return toolChest.makeMetricBuilder(input); - } - }, - baseRunner - ).withWaitMeasuredFromNow()), - executor, queryWatcher); - - return Sequences.concat( - Lists.newArrayList(FunctionalIterable.create(chunkIntervals).transform( - new Function>() - { - @Override - public Sequence apply(Interval singleInterval) - { - return finalQueryRunner.run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), - responseContext + return new AsyncQueryRunner( + //Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is + //not lazy i.e. it does most of its work on call to run() method + toolChest.mergeResults( + new MetricsEmittingQueryRunner( + emitter, + new Function, ServiceMetricEvent.Builder>() + { + @Override + public ServiceMetricEvent.Builder apply(Query input) + { + return toolChest.makeMetricBuilder(input); + } + }, + baseRunner, + "query/intervalChunk/time", + ImmutableMap.of("chunkInterval", singleInterval.toString()) + ).withWaitMeasuredFromNow() + ), + executor, queryWatcher + ).run( + query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), + responseContext ); - } - } - )) - ); + } + } + ) + ) + ); } private Iterable splitInterval(Interval interval, Period period) @@ -143,7 +151,8 @@ public class IntervalChunkingQueryRunner implements QueryRunner return intervals; } - private Period getChunkPeriod(Query query) { + private Period getChunkPeriod(Query query) + { String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); return Period.parse(p); } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java index 172b7a84c72..e6f88b674d8 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingExecutorService.java @@ -67,7 +67,7 @@ public class MetricsEmittingExecutorService extends ForwardingListeningExecutorS private void emitMetrics() { if (delegate instanceof PrioritizedExecutorService) { - emitter.emit(metricBuilder.build("exec/backlog", ((PrioritizedExecutorService) delegate).getQueueSize())); + emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate).getQueueSize())); } } } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 722a3a2a400..960a63b2722 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -18,6 +18,8 @@ package io.druid.query; import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Yielder; @@ -26,19 +28,21 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import java.io.IOException; +import java.util.List; import java.util.Map; /** */ public class MetricsEmittingQueryRunner implements QueryRunner { - private static final String DEFAULT_METRIC_NAME = "query/time"; + private static final String DEFAULT_METRIC_NAME = "query/partial/time"; private final ServiceEmitter emitter; private final Function, ServiceMetricEvent.Builder> builderFn; private final QueryRunner queryRunner; private final long creationTime; private final String metricName; + private final Map userDimensions; public MetricsEmittingQueryRunner( ServiceEmitter emitter, @@ -46,7 +50,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner QueryRunner queryRunner ) { - this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME); + this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME, Maps.newHashMap()); } public MetricsEmittingQueryRunner( @@ -54,7 +58,8 @@ public class MetricsEmittingQueryRunner implements QueryRunner Function, ServiceMetricEvent.Builder> builderFn, QueryRunner queryRunner, long creationTime, - String metricName + String metricName, + Map userDimensions ) { this.emitter = emitter; @@ -62,33 +67,47 @@ public class MetricsEmittingQueryRunner implements QueryRunner this.queryRunner = queryRunner; this.creationTime = creationTime; this.metricName = metricName; + this.userDimensions = userDimensions; } public MetricsEmittingQueryRunner( ServiceEmitter emitter, Function, ServiceMetricEvent.Builder> builderFn, QueryRunner queryRunner, - String metricName + String metricName, + Map userDimensions ) { - this(emitter, builderFn, queryRunner, -1, metricName); + this(emitter, builderFn, queryRunner, -1, metricName, userDimensions); } public MetricsEmittingQueryRunner withWaitMeasuredFromNow() { - return new MetricsEmittingQueryRunner(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName); + return new MetricsEmittingQueryRunner( + emitter, + builderFn, + queryRunner, + System.currentTimeMillis(), + metricName, + userDimensions + ); } @Override public Sequence run(final Query query, final Map responseContext) { final ServiceMetricEvent.Builder builder = builderFn.apply(query); + + for (Map.Entry userDimension : userDimensions.entrySet()) { + builder.setDimension(userDimension.getKey(), userDimension.getValue()); + } + String queryId = query.getId(); if (queryId == null) { queryId = ""; } - builder.setUser8(queryId); + builder.setDimension(DruidMetrics.ID, queryId); return new Sequence() { @@ -102,11 +121,11 @@ public class MetricsEmittingQueryRunner implements QueryRunner retVal = queryRunner.run(query, responseContext).accumulate(outType, accumulator); } catch (RuntimeException e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } catch (Error e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } finally { @@ -115,7 +134,7 @@ public class MetricsEmittingQueryRunner implements QueryRunner emitter.emit(builder.build(metricName, timeTaken)); if (creationTime > 0) { - emitter.emit(builder.build("query/wait", startTime - creationTime)); + emitter.emit(builder.build("query/wait/time", startTime - creationTime)); } } @@ -132,11 +151,11 @@ public class MetricsEmittingQueryRunner implements QueryRunner retVal = queryRunner.run(query, responseContext).toYielder(initValue, accumulator); } catch (RuntimeException e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } catch (Error e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } @@ -164,11 +183,11 @@ public class MetricsEmittingQueryRunner implements QueryRunner return makeYielder(startTime, yielder.next(initValue), builder); } catch (RuntimeException e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } catch (Error e) { - builder.setUser10("failed"); + builder.setDimension(DruidMetrics.STATUS, "failed"); throw e; } } @@ -183,15 +202,15 @@ public class MetricsEmittingQueryRunner implements QueryRunner public void close() throws IOException { try { - if (!isDone() && builder.getUser10() == null) { - builder.setUser10("short"); + if (!isDone() && builder.getDimension(DruidMetrics.STATUS) == null) { + builder.setDimension(DruidMetrics.STATUS, "short"); } long timeTaken = System.currentTimeMillis() - startTime; emitter.emit(builder.build(metricName, timeTaken)); if (creationTime > 0) { - emitter.emit(builder.build("query/wait", startTime - creationTime)); + emitter.emit(builder.build("query/wait/time", startTime - creationTime)); } } finally { diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index a67f8d068c6..3319b0987f7 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -159,7 +159,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory @Override public String getTypeName() { - throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); + return "histogram"; } @Override diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 05418096f38..4120f78998e 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -118,9 +118,8 @@ public class DataSourceQueryQueryToolChest public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query) { return new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser4(query.getType()) - .setUser6("false"); + .setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource())) + .setDimension("type", query.getType()); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 38b9d482509..fe2132afa3f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -21,13 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Supplier; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.Pair; @@ -49,7 +47,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryCacheHelper; import io.druid.query.QueryDataSource; -import io.druid.query.QueryMetricUtil; +import io.druid.query.DruidMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; @@ -62,12 +60,10 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; /** */ @@ -123,7 +119,8 @@ public class GroupByQueryQueryToolChest extends QueryToolChest aggregatorFactories){ + protected static String[] extractFactoryName(final List aggregatorFactories) + { return Lists.transform( aggregatorFactories, new Function() { @@ -153,15 +154,20 @@ public class TopNQueryQueryToolChest extends QueryToolChest implements QueryRunner private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; private final ListeningExecutorService backgroundExecutorService; + private final ServiceEmitter emitter; @Inject public CachingClusteredClient( @@ -98,7 +103,8 @@ public class CachingClusteredClient implements QueryRunner Cache cache, @Smile ObjectMapper objectMapper, @BackgroundCaching ExecutorService backgroundExecutorService, - CacheConfig cacheConfig + CacheConfig cacheConfig, + ServiceEmitter emitter ) { this.warehouse = warehouse; @@ -107,6 +113,7 @@ public class CachingClusteredClient implements QueryRunner this.objectMapper = objectMapper; this.cacheConfig = cacheConfig; this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService); + this.emitter = emitter; serverView.registerSegmentCallback( Executors.newFixedThreadPool( @@ -325,7 +332,21 @@ public class CachingClusteredClient implements QueryRunner final DruidServer server = entry.getKey(); final List descriptors = entry.getValue(); - final QueryRunner clientQueryable = serverView.getQueryRunner(server); + final QueryRunner clientQueryable = new MetricsEmittingQueryRunner( + emitter, + new Function, ServiceMetricEvent.Builder>() + { + @Override + public ServiceMetricEvent.Builder apply(@Nullable final Query input) + { + return toolChest.makeMetricBuilder(input); + } + }, + serverView.getQueryRunner(server), + "query/node/time", + ImmutableMap.of("server",server.getName()) + ); + if (clientQueryable == null) { log.error("WTF!? server[%s] doesn't have a client Queryable?", server); continue; diff --git a/server/src/main/java/io/druid/client/cache/CacheMonitor.java b/server/src/main/java/io/druid/client/cache/CacheMonitor.java index ad6e906e18e..a2129690aa2 100644 --- a/server/src/main/java/io/druid/client/cache/CacheMonitor.java +++ b/server/src/main/java/io/druid/client/cache/CacheMonitor.java @@ -45,8 +45,8 @@ public class CacheMonitor extends AbstractMonitor final CacheStats deltaCacheStats = currCacheStats.delta(prevCacheStats); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - emitStats(emitter, "cache/delta", deltaCacheStats, builder); - emitStats(emitter, "cache/total", currCacheStats, builder); + emitStats(emitter, "query/cache/delta", deltaCacheStats, builder); + emitStats(emitter, "query/cache/total", currCacheStats, builder); prevCacheStats = currCacheStats; return true; diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 9f8bb49b8ba..57c25cc2bf4 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -23,6 +23,7 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; +import io.druid.query.DruidMetrics; import java.util.List; import java.util.Map; @@ -55,30 +56,30 @@ public class RealtimeMetricsMonitor extends AbstractMonitor } final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() - .setUser2(fireDepartment.getDataSchema().getDataSource()); + .setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource()); final long thrownAway = metrics.thrownAway() - previous.thrownAway(); if (thrownAway > 0) { log.warn("[%,d] events thrown away because they are outside the window period!", thrownAway); } - emitter.emit(builder.build("events/thrownAway", thrownAway)); + emitter.emit(builder.build("ingest/events/thrownAway", thrownAway)); final long unparseable = metrics.unparseable() - previous.unparseable(); if (unparseable > 0) { log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable); } - emitter.emit(builder.build("events/unparseable", unparseable)); - emitter.emit(builder.build("events/processed", metrics.processed() - previous.processed())); - emitter.emit(builder.build("rows/output", metrics.rowOutput() - previous.rowOutput())); - emitter.emit(builder.build("persists/num", metrics.numPersists() - previous.numPersists())); - emitter.emit(builder.build("persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); + emitter.emit(builder.build("ingest/events/unparseable", unparseable)); + emitter.emit(builder.build("ingest/events/processed", metrics.processed() - previous.processed())); + emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput())); + emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists())); + emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previous.persistTimeMillis())); emitter.emit( builder.build( - "persists/backPressure", + "ingest/persists/backPressure", metrics.persistBackPressureMillis() - previous.persistBackPressureMillis() ) ); - emitter.emit(builder.build("failed/persists", metrics.failedPersists() - previous.failedPersists())); - emitter.emit(builder.build("failed/handoff", metrics.failedHandoffs() - previous.failedHandoffs())); + emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previous.failedPersists())); + emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previous.failedHandoffs())); previousValues.put(fireDepartment, metrics); } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index b07ad878297..bbfedf60fd6 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -29,12 +29,11 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.query.Query; -import io.druid.query.QueryMetricUtil; +import io.druid.query.DruidMetrics; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -261,8 +260,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet final long requestTime = System.currentTimeMillis() - start; try { emitter.emit( - QueryMetricUtil.makeRequestTimeMetric(jsonMapper, query, req.getRemoteAddr()) - .build("request/time", requestTime) + DruidMetrics.makeQueryTimeMetric(jsonMapper, query, req.getRemoteAddr()) + .build("query/time", requestTime) ); requestLogger.log( @@ -272,7 +271,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet query, new QueryStats( ImmutableMap.of( - "request/time", + "query/time", requestTime, "success", true diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index c6007abff02..4da5d76725f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -34,7 +34,7 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; -import io.druid.query.QueryMetricUtil; +import io.druid.query.DruidMetrics; import io.druid.query.QuerySegmentWalker; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; @@ -184,10 +184,10 @@ public class QueryResource jsonWriter.writeValue(outputStream, yielder); outputStream.close(); - final long requestTime = System.currentTimeMillis() - start; + final long queryTime = System.currentTimeMillis() - start; emitter.emit( - QueryMetricUtil.makeRequestTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) - .build("request/time", requestTime) + DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) + .build("query/time", queryTime) ); requestLogger.log( @@ -197,7 +197,7 @@ public class QueryResource theQuery, new QueryStats( ImmutableMap.of( - "request/time", requestTime, + "query/time", queryTime, "success", true ) ) diff --git a/server/src/main/java/io/druid/server/audit/SQLAuditManager.java b/server/src/main/java/io/druid/server/audit/SQLAuditManager.java index caf7177d474..246b318a917 100644 --- a/server/src/main/java/io/druid/server/audit/SQLAuditManager.java +++ b/server/src/main/java/io/druid/server/audit/SQLAuditManager.java @@ -92,12 +92,9 @@ public class SQLAuditManager implements AuditManager { emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(auditEntry.getKey()) - .setUser2(auditEntry.getType()) - .setUser3(auditEntry.getAuditInfo().getAuthor()) - .setUser5(jsonMapper.writeValueAsString(auditEntry.getPayload())) - .setUser6(auditEntry.getAuditInfo().getComment()) - .setUser7(auditEntry.getAuditTime().toString()) + .setDimension("key", auditEntry.getKey()) + .setDimension("type", auditEntry.getType()) + .setDimension("author", auditEntry.getAuditInfo().getAuthor()) .build("config/audit", 1) ); diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index e89922492c4..3ec790ec9a4 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -19,6 +19,7 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -324,7 +325,7 @@ public class ServerManager implements QuerySegmentWalker private TimelineLookup getTimelineLookup(DataSource dataSource) { final List names = dataSource.getNames(); - if(names.size() == 1){ + if (names.size() == 1) { return dataSources.get(names.get(0)); } else { return new UnionTimeLineLookup<>( @@ -438,12 +439,15 @@ public class ServerManager implements QuerySegmentWalker } }, new ReferenceCountingSegmentQueryRunner(factory, adapter), - "scan/time" + "query/segment/time", + ImmutableMap.of("segment", adapter.getIdentifier()) ), cachingExec, cacheConfig ) - ) + ), + "query/segmentAndCache/time", + ImmutableMap.of("segment", adapter.getIdentifier()) ).withWaitMeasuredFromNow(), segmentSpec ); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index c47af202cc6..4942e449363 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -25,6 +25,7 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.collections.CountingMap; +import io.druid.query.DruidMetrics; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -44,7 +45,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper private void emitTieredStats( final ServiceEmitter emitter, - final String formatString, + final String metricName, final Map statMap ) { @@ -53,9 +54,11 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper String tier = entry.getKey(); Number value = entry.getValue(); emitter.emit( - new ServiceMetricEvent.Builder().build( - String.format(formatString, tier), value.doubleValue() - ) + new ServiceMetricEvent.Builder() + .setDimension("tier", tier) + .build( + metricName, value.doubleValue() + ) ); } } @@ -78,6 +81,11 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper } } + emitTieredStats( + emitter, "segment/assigned/count", + assigned + ); + Map dropped = stats.getPerTierStats().get("droppedCount"); if (dropped != null) { for (Map.Entry entry : dropped.entrySet()) { @@ -89,29 +97,34 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper } emitTieredStats( - emitter, "coordinator/%s/cost/raw", + emitter, "segment/dropped/count", + dropped + ); + + emitTieredStats( + emitter, "segment/cost/raw", stats.getPerTierStats().get("initialCost") ); emitTieredStats( - emitter, "coordinator/%s/cost/normalization", + emitter, "segment/cost/normalization", stats.getPerTierStats().get("normalization") ); emitTieredStats( - emitter, "coordinator/%s/moved/count", + emitter, "segment/moved/count", stats.getPerTierStats().get("movedCount") ); emitTieredStats( - emitter, "coordinator/%s/deleted/count", + emitter, "segment/deleted/count", stats.getPerTierStats().get("deletedCount") ); Map normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); if (normalized != null) { emitTieredStats( - emitter, "coordinator/%s/cost/normalized", + emitter, "segment/cost/normalized", Maps.transformEntries( normalized, new Maps.EntryTransformer() @@ -136,9 +149,14 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper } } + emitTieredStats( + emitter, "segment/unneeded/count", + stats.getPerTierStats().get("unneededCount") + ); + emitter.emit( new ServiceMetricEvent.Builder().build( - "coordinator/overShadowed/count", stats.getGlobalStats().get("overShadowedCount") + "segment/overShadowed/count", stats.getGlobalStats().get("overShadowedCount") ) ); @@ -184,26 +202,26 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper LoadQueuePeon queuePeon = entry.getValue(); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(serverName).build( - "coordinator/loadQueue/size", queuePeon.getLoadQueueSize() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/size", queuePeon.getLoadQueueSize() ) ); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(serverName).build( - "coordinator/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() ) ); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(serverName).build( - "coordinator/loadQueue/count", queuePeon.getSegmentsToLoad().size() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/count", queuePeon.getSegmentsToLoad().size() ) ); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(serverName).build( - "coordinator/dropQueue/count", queuePeon.getSegmentsToDrop().size() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/dropQueue/count", queuePeon.getSegmentsToDrop().size() ) ); } @@ -222,8 +240,8 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper Long size = entry.getValue(); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(dataSource).build( - "coordinator/segment/size", size + .setDimension(DruidMetrics.DATASOURCE, dataSource).build( + "segment/size", size ) ); } @@ -232,8 +250,8 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper Long count = entry.getValue(); emitter.emit( new ServiceMetricEvent.Builder() - .setUser1(dataSource).build( - "coordinator/segment/count", count + .setDimension(DruidMetrics.DATASOURCE, dataSource).build( + "segment/count", count ) ); } diff --git a/server/src/main/java/io/druid/server/metrics/ServerMonitor.java b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java similarity index 56% rename from server/src/main/java/io/druid/server/metrics/ServerMonitor.java rename to server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java index d73cf8442ea..900f595e7bb 100644 --- a/server/src/main/java/io/druid/server/metrics/ServerMonitor.java +++ b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java @@ -22,17 +22,18 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; import io.druid.client.DruidServerConfig; +import io.druid.query.DruidMetrics; import io.druid.server.coordination.ServerManager; import java.util.Map; -public class ServerMonitor extends AbstractMonitor +public class HistoricalMetricsMonitor extends AbstractMonitor { private final DruidServerConfig serverConfig; private final ServerManager serverManager; @Inject - public ServerMonitor( + public HistoricalMetricsMonitor( DruidServerConfig serverConfig, ServerManager serverManager ) @@ -44,39 +45,37 @@ public class ServerMonitor extends AbstractMonitor @Override public boolean doMonitor(ServiceEmitter emitter) { - emitter.emit(new ServiceMetricEvent.Builder().build("server/segment/max", serverConfig.getMaxSize())); - long totalUsed = 0; - long totalCount = 0; + emitter.emit(new ServiceMetricEvent.Builder().build("segment/max", serverConfig.getMaxSize())); for (Map.Entry entry : serverManager.getDataSourceSizes().entrySet()) { String dataSource = entry.getKey(); long used = entry.getValue(); - totalUsed += used; - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser1(dataSource) - .setUser2(serverConfig.getTier()); + final ServiceMetricEvent.Builder builder = + new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension("tier", serverConfig.getTier()) + .setDimension("priority", String.valueOf(serverConfig.getPriority())); - emitter.emit(builder.build("server/segment/used", used)); + + emitter.emit(builder.build("segment/used", used)); final double usedPercent = serverConfig.getMaxSize() == 0 ? 0 : used / (double) serverConfig.getMaxSize(); - emitter.emit(builder.build("server/segment/usedPercent", usedPercent)); + emitter.emit(builder.build("segment/usedPercent", usedPercent)); } for (Map.Entry entry : serverManager.getDataSourceCounts().entrySet()) { String dataSource = entry.getKey(); long count = entry.getValue(); - totalCount += count; - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser1(dataSource) - .setUser2(serverConfig.getTier()); + final ServiceMetricEvent.Builder builder = + new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension("tier", serverConfig.getTier()) + .setDimension( + "priority", + String.valueOf(serverConfig.getPriority()) + ); - emitter.emit(builder.build("server/segment/count", count)); + emitter.emit(builder.build("segment/count", count)); } - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setUser2(serverConfig.getTier()); - emitter.emit(builder.build("server/segment/totalUsed", totalUsed)); - final double totalUsedPercent = serverConfig.getMaxSize() == 0 ? 0 : totalUsed / (double) serverConfig.getMaxSize(); - emitter.emit(builder.build("server/segment/totalUsedPercent", totalUsedPercent)); - emitter.emit(builder.build("server/segment/totalCount", totalCount)); - return true; } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 47d92e63261..baec61b7422 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -17,6 +17,7 @@ package io.druid.client; +import com.amazonaws.services.support.model.Service; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.dataformat.smile.SmileFactory; @@ -48,6 +49,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.TrinaryFn; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; @@ -215,6 +217,7 @@ public class CachingClusteredClientTest protected VersionedIntervalTimeline timeline; protected TimelineServerView serverView; protected Cache cache; + protected ServiceEmitter emitter; DruidServer[] servers; public CachingClusteredClientTest(int randomSeed) @@ -244,6 +247,7 @@ public class CachingClusteredClientTest timeline = new VersionedIntervalTimeline<>(Ordering.natural()); serverView = EasyMock.createStrictMock(TimelineServerView.class); cache = MapCache.create(100000); + emitter = EasyMock.createStrictMock(ServiceEmitter.class); client = makeClient(MoreExecutors.sameThreadExecutor()); servers = new DruidServer[]{ @@ -2093,7 +2097,8 @@ public class CachingClusteredClientTest { return true; } - } + }, + emitter ); }