diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 69801c4c6ed..8cc1c7e3ffc 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -21,12 +21,13 @@ package io.druid.query.scan; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.metamx.emitter.service.ServiceMetricEvent; +import com.google.inject.Inject; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; -import io.druid.query.DruidMetrics; import io.druid.query.Query; +import io.druid.query.QueryMetrics; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.MetricManipulationFn; @@ -39,6 +40,14 @@ public class ScanQueryQueryToolChest extends QueryToolChest mergeResults(final QueryRunner runner) { @@ -74,9 +83,9 @@ public class ScanQueryQueryToolChest extends QueryToolChest> makeMetrics(ScanQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java index b3ea2e8bdbd..8f088ad8a68 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -27,6 +27,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -63,7 +64,9 @@ import java.util.Map; @RunWith(Parameterized.class) public class MultiSegmentScanQueryTest { - private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest( + DefaultGenericQueryMetricsFactory.instance() + ); private static final QueryRunnerFactory factory = new ScanQueryRunnerFactory( toolChest, diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java index 21f9e7780b6..ceeae9e5251 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ObjectArrays; import com.google.common.collect.Sets; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.TableDataSource; @@ -95,7 +96,9 @@ public class ScanQueryRunnerTest ); public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); - private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest( + DefaultGenericQueryMetricsFactory.instance() + ); @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() throws IOException diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 805c9b2e25e..7aefbb7ad47 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -19,11 +19,8 @@ package io.druid.query; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.common.utils.VMUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -36,14 +33,14 @@ import java.util.concurrent.atomic.AtomicLong; public class CPUTimeMetricQueryRunner implements QueryRunner { private final QueryRunner delegate; - private final Function, ServiceMetricEvent.Builder> builderFn; + private final QueryToolChest> queryToolChest; private final ServiceEmitter emitter; private final AtomicLong cpuTimeAccumulator; private final boolean report; private CPUTimeMetricQueryRunner( QueryRunner delegate, - Function, ServiceMetricEvent.Builder> builderFn, + QueryToolChest> queryToolChest, ServiceEmitter emitter, AtomicLong cpuTimeAccumulator, boolean report @@ -53,7 +50,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner throw new ISE("Cpu time must enabled"); } this.delegate = delegate; - this.builderFn = builderFn; + this.queryToolChest = queryToolChest; this.emitter = emitter; this.cpuTimeAccumulator = cpuTimeAccumulator == null ? new AtomicLong(0L) : cpuTimeAccumulator; this.report = report; @@ -85,10 +82,9 @@ public class CPUTimeMetricQueryRunner implements QueryRunner public void after(boolean isDone, Throwable thrown) throws Exception { if (report) { - final long cpuTime = cpuTimeAccumulator.get(); - if (cpuTime > 0) { - final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query)); - emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000)); + final long cpuTimeNs = cpuTimeAccumulator.get(); + if (cpuTimeNs > 0) { + queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter); } } } @@ -98,7 +94,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner public static QueryRunner safeBuild( QueryRunner delegate, - Function, ServiceMetricEvent.Builder> builderFn, + QueryToolChest> queryToolChest, ServiceEmitter emitter, AtomicLong accumulator, boolean report @@ -107,7 +103,7 @@ public class CPUTimeMetricQueryRunner implements QueryRunner if (!VMUtils.isThreadCpuTimeEnabled()) { return delegate; } else { - return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report); + return new CPUTimeMetricQueryRunner<>(delegate, queryToolChest, emitter, accumulator, report); } } } diff --git a/processing/src/main/java/io/druid/query/DefaultGenericQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/DefaultGenericQueryMetricsFactory.java new file mode 100644 index 00000000000..b71da6e4dbb --- /dev/null +++ b/processing/src/main/java/io/druid/query/DefaultGenericQueryMetricsFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.druid.jackson.DefaultObjectMapper; + +public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFactory +{ + private static final GenericQueryMetricsFactory INSTANCE = + new DefaultGenericQueryMetricsFactory(new DefaultObjectMapper()); + + /** + * Should be used only in tests, directly or indirectly (e. g. in {@link + * io.druid.query.search.SearchQueryQueryToolChest#SearchQueryQueryToolChest( + * io.druid.query.search.search.SearchQueryConfig, IntervalChunkingQueryRunnerDecorator)}). + */ + @VisibleForTesting + public static GenericQueryMetricsFactory instance() + { + return INSTANCE; + } + + private final ObjectMapper jsonMapper; + + @Inject + public DefaultGenericQueryMetricsFactory(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public QueryMetrics> makeMetrics(Query query) + { + DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>(jsonMapper); + queryMetrics.query(query); + return queryMetrics; + } + +} diff --git a/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java new file mode 100644 index 00000000000..a19053be878 --- /dev/null +++ b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java @@ -0,0 +1,226 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import org.joda.time.Interval; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class DefaultQueryMetrics> implements QueryMetrics +{ + protected final ObjectMapper jsonMapper; + protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + protected final Map metrics = new HashMap<>(); + + public DefaultQueryMetrics(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public void query(QueryType query) + { + dataSource(query); + queryType(query); + interval(query); + hasFilters(query); + duration(query); + queryId(query); + } + + @Override + public void dataSource(QueryType query) + { + builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())); + } + + @Override + public void queryType(QueryType query) + { + builder.setDimension(DruidMetrics.TYPE, query.getType()); + } + + @Override + public void interval(QueryType query) + { + builder.setDimension( + DruidMetrics.INTERVAL, + query.getIntervals().stream().map(Interval::toString).toArray(String[]::new) + ); + } + + @Override + public void hasFilters(QueryType query) + { + builder.setDimension("hasFilters", String.valueOf(query.hasFilters())); + } + + @Override + public void duration(QueryType query) + { + builder.setDimension("duration", query.getDuration().toString()); + } + + @Override + public void queryId(QueryType query) + { + builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); + } + + @Override + public void context(QueryType query) + { + try { + builder.setDimension( + "context", + jsonMapper.writeValueAsString( + query.getContext() == null + ? ImmutableMap.of() + : query.getContext() + ) + ); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void server(String host) + { + builder.setDimension("server", host); + } + + @Override + public void remoteAddress(String remoteAddress) + { + builder.setDimension("remoteAddress", remoteAddress); + } + + @Override + public void status(String status) + { + builder.setDimension(DruidMetrics.STATUS, status); + } + + @Override + public void success(boolean success) + { + builder.setDimension("success", String.valueOf(success)); + } + + @Override + public void segment(String segmentIdentifier) + { + builder.setDimension("segment", segmentIdentifier); + } + + @Override + public void chunkInterval(Interval interval) + { + builder.setDimension("chunkInterval", interval.toString()); + } + + @Override + public QueryMetrics reportQueryTime(long timeNs) + { + return defaultTimeMetric("query/time", timeNs); + } + + @Override + public QueryMetrics reportQueryBytes(long byteCount) + { + metrics.put("query/bytes", byteCount); + return this; + } + + @Override + public QueryMetrics reportWaitTime(long timeNs) + { + return defaultTimeMetric("query/wait/time", timeNs); + } + + @Override + public QueryMetrics reportSegmentTime(long timeNs) + { + return defaultTimeMetric("query/segment/time", timeNs); + } + + @Override + public QueryMetrics reportSegmentAndCacheTime(long timeNs) + { + return defaultTimeMetric("query/segmentAndCache/time", timeNs); + } + + @Override + public QueryMetrics reportIntervalChunkTime(long timeNs) + { + return defaultTimeMetric("query/intervalChunk/time", timeNs); + } + + @Override + public QueryMetrics reportCpuTime(long timeNs) + { + metrics.put("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs)); + return this; + } + + @Override + public QueryMetrics reportNodeTimeToFirstByte(long timeNs) + { + return defaultTimeMetric("query/node/ttfb", timeNs); + } + + @Override + public QueryMetrics reportNodeTime(long timeNs) + { + return defaultTimeMetric("query/node/time", timeNs); + } + + private QueryMetrics defaultTimeMetric(String metricName, long timeNs) + { + metrics.put(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); + return this; + } + + @Override + public QueryMetrics reportNodeBytes(long byteCount) + { + metrics.put("query/node/bytes", byteCount); + return this; + } + + @Override + public void emit(ServiceEmitter emitter) + { + for (Map.Entry metric : metrics.entrySet()) { + emitter.emit(builder.build(metric.getKey(), metric.getValue())); + } + metrics.clear(); + } +} diff --git a/processing/src/main/java/io/druid/query/DruidMetrics.java b/processing/src/main/java/io/druid/query/DruidMetrics.java index 4801f0227e5..6a57eec7513 100644 --- a/processing/src/main/java/io/druid/query/DruidMetrics.java +++ b/processing/src/main/java/io/druid/query/DruidMetrics.java @@ -20,14 +20,7 @@ package io.druid.query; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.AggregatorFactory; -import org.joda.time.Interval; import java.util.List; @@ -61,50 +54,21 @@ public class DruidMetrics return retVal; } - public static ServiceMetricEvent.Builder makePartialQueryTimeMetric(Query query) - { - return new ServiceMetricEvent.Builder() - .setDimension(DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())) - .setDimension(TYPE, query.getType()) - .setDimension( - INTERVAL, - Lists.transform( - query.getIntervals(), - new Function() - { - @Override - public String apply(Interval input) - { - return input.toString(); - } - } - ).toArray(new String[query.getIntervals().size()]) - ) - .setDimension("hasFilters", String.valueOf(query.hasFilters())) - .setDimension("duration", query.getDuration().toString()) - .setDimension(ID, Strings.nullToEmpty(query.getId())); - } - - public static ServiceMetricEvent.Builder makeQueryTimeMetric( + public static QueryMetrics makeRequestMetrics( + final GenericQueryMetricsFactory queryMetricsFactory, final QueryToolChest> toolChest, - final ObjectMapper jsonMapper, final Query query, final String remoteAddr ) throws JsonProcessingException { - final ServiceMetricEvent.Builder baseMetric = toolChest == null - ? makePartialQueryTimeMetric(query) - : toolChest.makeMetricBuilder(query); - - return baseMetric - .setDimension( - "context", - jsonMapper.writeValueAsString( - query.getContext() == null - ? ImmutableMap.of() - : query.getContext() - ) - ) - .setDimension("remoteAddress", remoteAddr); + QueryMetrics> queryMetrics; + if (toolChest != null) { + queryMetrics = toolChest.makeMetrics(query); + } else { + queryMetrics = queryMetricsFactory.makeMetrics(query); + } + queryMetrics.context(query); + queryMetrics.remoteAddress(remoteAddr); + return queryMetrics; } } diff --git a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java index fbc1150f3c4..71bf44451cd 100644 --- a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java +++ b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java @@ -19,12 +19,9 @@ package io.druid.query; -import com.google.common.base.Function; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.guava.Sequence; -import javax.annotation.Nullable; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -90,15 +87,7 @@ public class FluentQueryRunnerBuilder return from( CPUTimeMetricQueryRunner.safeBuild( baseRunner, - new Function, ServiceMetricEvent.Builder>() - { - @Nullable - @Override - public ServiceMetricEvent.Builder apply(Query tQuery) - { - return toolChest.makeMetricBuilder(tQuery); - } - }, + toolChest, emitter, new AtomicLong(0L), true diff --git a/processing/src/main/java/io/druid/query/GenericQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/GenericQueryMetricsFactory.java new file mode 100644 index 00000000000..dd4297606da --- /dev/null +++ b/processing/src/main/java/io/druid/query/GenericQueryMetricsFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +/** + * This factory is used for DI of custom {@link QueryMetrics} implementations for all query types, which don't (yet) + * need to emit custom dimensions and/or metrics, i. e. they are good with the generic {@link QueryMetrics} interface. + */ +public interface GenericQueryMetricsFactory +{ + /** + * Creates a {@link QueryMetrics} for query, which doesn't have predefined QueryMetrics subclass. This method must + * call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning. + */ + QueryMetrics> makeMetrics(Query query); +} diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index ef82c5ce155..a42ce69fde0 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -20,10 +20,8 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.guava.Sequence; @@ -107,17 +105,10 @@ public class IntervalChunkingQueryRunner implements QueryRunner toolChest.mergeResults( new MetricsEmittingQueryRunner( emitter, - new Function, ServiceMetricEvent.Builder>() - { - @Override - public ServiceMetricEvent.Builder apply(Query input) - { - return toolChest.makeMetricBuilder(input); - } - }, + toolChest, baseRunner, - "query/intervalChunk/time", - ImmutableMap.of("chunkInterval", singleInterval.toString()) + QueryMetrics::reportIntervalChunkTime, + queryMetrics -> queryMetrics.chunkInterval(singleInterval) ).withWaitMeasuredFromNow() ), executor, queryWatcher diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index c35547c051d..096fb39d45f 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -19,76 +19,74 @@ package io.druid.query; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.SequenceWrapper; import io.druid.java.util.common.guava.Sequences; import java.util.Map; +import java.util.function.Consumer; +import java.util.function.ObjLongConsumer; /** */ public class MetricsEmittingQueryRunner implements QueryRunner { private final ServiceEmitter emitter; - private final Function, ServiceMetricEvent.Builder> builderFn; + private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; - private final long creationTime; - private final String metricName; - private final Map userDimensions; + private final long creationTimeNs; + private final ObjLongConsumer>> reportMetric; + private final Consumer>> applyCustomDimensions; private MetricsEmittingQueryRunner( ServiceEmitter emitter, - Function, ServiceMetricEvent.Builder> builderFn, + QueryToolChest> queryToolChest, QueryRunner queryRunner, - long creationTime, - String metricName, - Map userDimensions + long creationTimeNs, + ObjLongConsumer>> reportMetric, + Consumer>> applyCustomDimensions ) { this.emitter = emitter; - this.builderFn = builderFn; + this.queryToolChest = queryToolChest; this.queryRunner = queryRunner; - this.creationTime = creationTime; - this.metricName = metricName; - this.userDimensions = userDimensions; + this.creationTimeNs = creationTimeNs; + this.reportMetric = reportMetric; + this.applyCustomDimensions = applyCustomDimensions; } public MetricsEmittingQueryRunner( ServiceEmitter emitter, - Function, ServiceMetricEvent.Builder> builderFn, + QueryToolChest> queryToolChest, QueryRunner queryRunner, - String metricName, - Map userDimensions + ObjLongConsumer>> reportMetric, + Consumer>> applyCustomDimensions ) { - this(emitter, builderFn, queryRunner, -1, metricName, userDimensions); + this(emitter, queryToolChest, queryRunner, -1, reportMetric, applyCustomDimensions); } public MetricsEmittingQueryRunner withWaitMeasuredFromNow() { - return new MetricsEmittingQueryRunner( + return new MetricsEmittingQueryRunner<>( emitter, - builderFn, + queryToolChest, queryRunner, - System.currentTimeMillis(), - metricName, - userDimensions + System.nanoTime(), + reportMetric, + applyCustomDimensions ); } @Override public Sequence run(final Query query, final Map responseContext) { - final ServiceMetricEvent.Builder builder = builderFn.apply(query); + final QueryMetrics> queryMetrics = queryToolChest.makeMetrics(query); - for (Map.Entry userDimension : userDimensions.entrySet()) { - builder.setDimension(userDimension.getKey(), userDimension.getValue()); - } + applyCustomDimensions.accept(queryMetrics); return Sequences.wrap( // Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying @@ -104,28 +102,29 @@ public class MetricsEmittingQueryRunner implements QueryRunner }), new SequenceWrapper() { - private long startTime; + private long startTimeNs; @Override public void before() { - startTime = System.currentTimeMillis(); + startTimeNs = System.nanoTime(); } @Override public void after(boolean isDone, Throwable thrown) { if (thrown != null) { - builder.setDimension(DruidMetrics.STATUS, "failed"); + queryMetrics.status("failed"); } else if (!isDone) { - builder.setDimension(DruidMetrics.STATUS, "short"); + queryMetrics.status("short"); } - long timeTaken = System.currentTimeMillis() - startTime; - emitter.emit(builder.build(metricName, timeTaken)); + long timeTakenNs = System.nanoTime() - startTimeNs; + reportMetric.accept(queryMetrics, timeTakenNs); - if (creationTime > 0) { - emitter.emit(builder.build("query/wait/time", startTime - creationTime)); + if (creationTimeNs > 0) { + queryMetrics.reportWaitTime(startTimeNs - creationTimeNs); } + queryMetrics.emit(emitter); } } ); diff --git a/processing/src/main/java/io/druid/query/QueryMetrics.java b/processing/src/main/java/io/druid/query/QueryMetrics.java new file mode 100644 index 00000000000..3afc3a122cb --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryMetrics.java @@ -0,0 +1,249 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.metamx.emitter.service.ServiceEmitter; +import org.joda.time.Interval; + +/** + * Abstraction wrapping {@link com.metamx.emitter.service.ServiceMetricEvent.Builder} and allowing to control what + * metrics are actually emitted, what dimensions do they have, etc. + * + * + * Goals of QueryMetrics + * --------------------- + * 1. Skipping or partial filtering of particular dimensions or metrics entirely. Implementation could leave the body + * of the corresponding method empty, or implement random filtering like: + * public void reportCpuTime(long timeNs) + * { + * if (ThreadLocalRandom.current().nextDouble() < 0.1) { + * super.reportCpuTime(timeNs); + * } + * } + * + * 2. Ability to add new dimensions and metrics, possibly expensive to compute, or expensive to process (long string + * values, high cardinality, etc.) and not to affect existing Druid installations, by skipping (see 1.) those + * dimensions and metrics entirely in the default QueryMetrics implementations. Users who need those expensive + * dimensions and metrics, could explicitly emit them in their own QueryMetrics. + * + * 3. Control over the time unit, in which time metrics are emitted. By default (see {@link DefaultQueryMetrics} and + * it's subclasses) it's milliseconds, but if queries are fast, it could be not precise enough. + * + * 4. Control over the dimension and metric names. + * + * Here, "control" is provided to the operator of a Druid cluster, who would exercise that control through a + * site-specific extension adding XxxQueryMetricsFactory impl(s). + * + * + * Types of methods in this interface + * ---------------------------------- + * 1. Methods, pulling some dimensions from the query object. These methods are used to populate the metric before the + * query is run. These methods accept a single `QueryType query` parameter. {@link #query(Query)} calls all methods + * of this type, hence pulling all available information from the query object as dimensions. + * + * 2. Methods for setting dimensions, which become known in the process of the query execution or after the query is + * completed. + * + * 3. Methods to register metrics to be emitted later in bulk via {@link #emit(ServiceEmitter)}. These methods + * return this QueryMetrics object back for chaining. Names of these methods start with "report" prefix. + * + * + * Implementors expectations + * ------------------------- + * QueryMetrics is expected to be changed often, in every Druid release (including "patch" releases). Users who create + * their custom implementations of QueryMetrics should be ready to fix the code of their QueryMetrics (implement new + * methods) when they update Druid. Broken builds of custom extensions, containing custom QueryMetrics is the way to + * notify users that Druid core "wants" to emit new dimension or metric, and the user handles them manually: if the new + * dimension or metric is useful and not very expensive to process and store then emit, skip (see above Goals, 1.) + * otherwise. + * + *

If implementors of custom QueryMetrics don't want to fix builds on every Druid release (e. g. if they want to add + * a single dimension to emitted events and don't want to alter other dimensions and emitted metrics), they could + * inherit their custom QueryMetrics from {@link DefaultQueryMetrics} or query-specific default implementation class, + * such as {@link io.druid.query.topn.DefaultTopNQueryMetrics}. Those classes are guaranteed to stay around and + * implement new methods, added to the QueryMetrics interface (or a query-specific subinterface). However, there is no + * 100% guarantee of compatibility, because methods could not only be added to QueryMetrics, existing methods could also + * be changed or removed. + * + *

QueryMetrics is designed for use from a single thread, implementations shouldn't care about thread-safety. + * + * + * Adding new methods to QueryMetrics + * ---------------------------------- + * 1. When adding a new method for setting a dimension, which could be pulled from the query object, always make them + * accept a single `QueryType query` parameter, letting the implementations to do all the work of carving the dimension + * value out of the query object. + * + * 2. When adding a new method for setting a dimension, which becomes known in the process of the query execution or + * after the query is completed, design it so that as little work as possible is done for preparing arguments for this + * method, and as much work as possible is done in the implementations of this method, if they decide to actually emit + * this dimension. + * + * 3. When adding a new method for registering metrics, make it to accept the metric value in the smallest reasonable + * unit (i. e. nanoseconds for time metrics, bytes for metrics of data size, etc.), allowing the implementations of + * this method to round the value up to more coarse-grained units, if they don't need the maximum precision. + * + * + * Making subinterfaces of QueryMetrics for emitting custom dimensions and/or metrics for specific query types + * ----------------------------------------------------------------------------------------------------------- + * If a query type (e. g. {@link io.druid.query.search.search.SearchQuery} (it's runners) needs to emit custom + * dimensions and/or metrics which doesn't make sense for all other query types, the following steps should be executed: + * 1. Create `interface SearchQueryMetrics extends QueryMetrics` (here and below "Search" is the query type) with + * additional methods (see "Adding new methods" section above). + * + * 2. Create `class DefaultSearchQueryMetrics implements SearchQueryMetrics`. This class should implement extra methods + * from SearchQueryMetrics interfaces with empty bodies, AND DELEGATE ALL OTHER METHODS TO A QueryMetrics OBJECT, + * provided as a sole parameter in DefaultSearchQueryMetrics constructor. + * + * 3. Create `interface SearchQueryMetricsFactory` with a single method + * `SearchQueryMetrics makeMetrics(SearchQuery query);`. + * + * 4. Create `class DefaultSearchQueryMetricsFactory implements SearchQueryMetricsFactory`, which accepts {@link + * GenericQueryMetricsFactory} as injected constructor parameter, and implements makeMetrics() as + * `return new DefaultSearchQueryMetrics(genericQueryMetricsFactory.makeMetrics(query));` + * + * 5. Inject and use SearchQueryMetricsFactory instead of {@link GenericQueryMetricsFactory} in {@link + * io.druid.query.search.SearchQueryQueryToolChest}. + * + * 6. Specify `binder.bind(SearchQueryMetricsFactory.class).to(DefaultSearchQueryMetricsFactory.class)` in + * QueryToolChestModule (if the query type belongs to the core druid-processing, e. g. SearchQuery) or in some + * extension-specific Guice module otherwise, if the query type is defined in an extension, e. g. ScanQuery. + * + * This complex procedure is needed to ensure custom {@link GenericQueryMetricsFactory} specified by users still works + * for the query type when query type decides to create their custom QueryMetrics subclass. + * + * {@link io.druid.query.topn.TopNQueryMetrics}, {@link io.druid.query.groupby.GroupByQueryMetrics}, and {@link + * io.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are introduced at the + * same time as the whole QueryMetrics abstraction and their default implementations have to actually emit more + * dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct examples for + * following the plan specified above. + * + * @param + */ +public interface QueryMetrics> +{ + + /** + * Pulls all information from the query object into dimensions of future metrics. + */ + void query(QueryType query); + + /** + * Sets {@link Query#getDataSource()} of the given query as dimension. + */ + void dataSource(QueryType query); + + /** + * Sets {@link Query#getType()} of the given query as dimension. + */ + void queryType(QueryType query); + + /** + * Sets {@link Query#getIntervals()} of the given query as dimension. + */ + void interval(QueryType query); + + /** + * Sets {@link Query#hasFilters()} of the given query as dimension. + */ + void hasFilters(QueryType query); + + /** + * Sets {@link Query#getDuration()} of the given query as dimension. + */ + void duration(QueryType query); + + /** + * Sets {@link Query#getId()} of the given query as dimension. + */ + void queryId(QueryType query); + + /** + * Sets {@link Query#getContext()} of the given query as dimension. + */ + void context(QueryType query); + + void server(String host); + + void remoteAddress(String remoteAddress); + + void status(String status); + + void success(boolean success); + + void segment(String segmentIdentifier); + + void chunkInterval(Interval interval); + + /** + * Registers "query time" metric. + */ + QueryMetrics reportQueryTime(long timeNs); + + /** + * Registers "query bytes" metric. + */ + QueryMetrics reportQueryBytes(long byteCount); + + /** + * Registers "wait time" metric. + */ + QueryMetrics reportWaitTime(long timeNs); + + /** + * Registers "segment time" metric. + */ + QueryMetrics reportSegmentTime(long timeNs); + + /** + * Registers "segmentAndCache time" metric. + */ + QueryMetrics reportSegmentAndCacheTime(long timeNs); + + /** + * Registers "interval chunk time" metric. + */ + QueryMetrics reportIntervalChunkTime(long timeNs); + + /** + * Registers "cpu time" metric. + */ + QueryMetrics reportCpuTime(long timeNs); + + /** + * Registers "time to first byte" metric. + */ + QueryMetrics reportNodeTimeToFirstByte(long timeNs); + + /** + * Registers "node time" metric. + */ + QueryMetrics reportNodeTime(long timeNs); + + /** + * Registers "node bytes" metric. + */ + QueryMetrics reportNodeBytes(long byteCount); + + /** + * Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object. + */ + void emit(ServiceEmitter emitter); +} diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index e62348228fd..daabdfcf18e 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,7 +21,6 @@ package io.druid.query; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.timeline.LogicalSegment; @@ -46,16 +45,27 @@ public abstract class QueryToolChest mergeResults(QueryRunner runner); /** - * Creates a builder that is used to generate a metric for this specific query type. This exists - * to allow for query-specific dimensions on metrics. That is, the ToolChest is expected to set some + * Creates a {@link QueryMetrics} object that is used to generate metrics for this specific query type. This exists + * to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some * meaningful dimensions for metrics given this query type. Examples might be the topN threshold for * a TopN query or the number of dimensions included for a groupBy query. + * + *

QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source + * tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected + * instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or + * metrics. + * + *

If some custom dimensions and/or metrics should be emitted for a query type, a plan described in + * "Making subinterfaces of QueryMetrics" section in {@link QueryMetrics}'s class-level Javadocs should be followed. + * + *

One way or another, this method should ensure that {@link QueryMetrics#query(Query)} is called with the given + * query passed on the created QueryMetrics object before returning. * * @param query The query that is being processed * - * @return A MetricEvent.Builder that can be used to make metrics for the provided query + * @return A QueryMetrics that can be used to make metrics for the provided query */ - public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query); + public abstract QueryMetrics makeMetrics(QueryType query); /** * Creates a Function that can take in a ResultType and return a new ResultType having applied diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java index 1e3b5c81bcb..c137f75c9b1 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java @@ -42,12 +42,16 @@ import java.util.concurrent.ExecutorService; public class DataSourceMetadataQueryRunnerFactory implements QueryRunnerFactory, DataSourceMetadataQuery> { - private static final DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest(); + private final DataSourceQueryQueryToolChest toolChest; private final QueryWatcher queryWatcher; @Inject - public DataSourceMetadataQueryRunnerFactory(QueryWatcher queryWatcher) + public DataSourceMetadataQueryRunnerFactory( + DataSourceQueryQueryToolChest toolChest, + QueryWatcher queryWatcher + ) { + this.toolChest = toolChest; this.queryWatcher = queryWatcher; } diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 0ec071756c7..1c5b2eb00ea 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -25,14 +25,14 @@ import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.metamx.emitter.service.ServiceMetricEvent; +import com.google.inject.Inject; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; -import io.druid.query.DataSourceUtil; -import io.druid.query.DruidMetrics; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -51,6 +51,14 @@ public class DataSourceQueryQueryToolChest { }; + private final GenericQueryMetricsFactory queryMetricsFactory; + + @Inject + public DataSourceQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory) + { + this.queryMetricsFactory = queryMetricsFactory; + } + @Override public List filterSegments(DataSourceMetadataQuery query, List segments) { @@ -103,11 +111,9 @@ public class DataSourceQueryQueryToolChest } @Override - public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query) + public QueryMetrics> makeMetrics(DataSourceMetadataQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query) - .setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource())) - .setDimension("type", query.getType()); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java new file mode 100644 index 00000000000..5d8ee7321fc --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.query.DefaultQueryMetrics; +import io.druid.query.DruidMetrics; + +public class DefaultGroupByQueryMetrics extends DefaultQueryMetrics implements GroupByQueryMetrics +{ + + public DefaultGroupByQueryMetrics(ObjectMapper jsonMapper) + { + super(jsonMapper); + } + + @Override + public void query(GroupByQuery query) + { + super.query(query); + numDimensions(query); + numMetrics(query); + numComplexMetrics(query); + } + + @Override + public void numDimensions(GroupByQuery query) + { + builder.setDimension("numDimensions", String.valueOf(query.getDimensions().size())); + } + + @Override + public void numMetrics(GroupByQuery query) + { + builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + } + + @Override + public void numComplexMetrics(GroupByQuery query) + { + int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); + builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetricsFactory.java new file mode 100644 index 00000000000..f70c9c6c987 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetricsFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.druid.jackson.DefaultObjectMapper; + +public class DefaultGroupByQueryMetricsFactory implements GroupByQueryMetricsFactory +{ + private static final GroupByQueryMetricsFactory INSTANCE = + new DefaultGroupByQueryMetricsFactory(new DefaultObjectMapper()); + + /** + * Should be used only in tests, directly or indirectly (via {@link + * GroupByQueryQueryToolChest#GroupByQueryQueryToolChest(io.druid.query.groupby.strategy.GroupByStrategySelector, + * io.druid.query.IntervalChunkingQueryRunnerDecorator)}). + */ + @VisibleForTesting + public static GroupByQueryMetricsFactory instance() + { + return INSTANCE; + } + + private final ObjectMapper jsonMapper; + + @Inject + public DefaultGroupByQueryMetricsFactory(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public GroupByQueryMetrics makeMetrics() + { + return new DefaultGroupByQueryMetrics(jsonMapper); + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetrics.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetrics.java new file mode 100644 index 00000000000..2d9ef261d19 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetrics.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import io.druid.query.QueryMetrics; + +/** + * Specialization of {@link QueryMetrics} for {@link GroupByQuery}. + */ +public interface GroupByQueryMetrics extends QueryMetrics +{ + /** + * Sets the size of {@link GroupByQuery#getDimensions()} of the given query as dimension. + */ + void numDimensions(GroupByQuery query); + + /** + * Sets the number of metrics of the given groupBy query as dimension. + */ + void numMetrics(GroupByQuery query); + + /** + * Sets the number of "complex" metrics of the given groupBy query as dimension. By default it is assumed that + * "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this + * method. + */ + void numComplexMetrics(GroupByQuery query); +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetricsFactory.java new file mode 100644 index 00000000000..da76ddc79e2 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryMetricsFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +public interface GroupByQueryMetricsFactory +{ + + GroupByQueryMetrics makeMetrics(); +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 9394c7d00a9..85f4cf559ad 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -20,6 +20,7 @@ package io.druid.query.groupby; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; @@ -30,7 +31,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; @@ -41,7 +41,6 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; -import io.druid.query.DruidMetrics; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryDataSource; @@ -85,15 +84,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest> makeMetrics(SegmentMetadataQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 7943b84d6df..f5ab114f687 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -20,6 +20,7 @@ package io.druid.query.search; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Preconditions; @@ -29,7 +30,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -37,9 +37,11 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; -import io.druid.query.DruidMetrics; +import io.druid.query.DefaultGenericQueryMetricsFactory; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -72,17 +74,28 @@ public class SearchQueryQueryToolChest extends QueryToolChest> makeMetrics(SearchQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 86b9c20fc27..45948efbe71 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -32,16 +32,17 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.CacheStrategy; -import io.druid.query.DruidMetrics; +import io.druid.query.DefaultGenericQueryMetricsFactory; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -81,17 +82,29 @@ public class SelectQueryQueryToolChest extends QueryToolChest configSupplier; + private final GenericQueryMetricsFactory queryMetricsFactory; - @Inject public SelectQueryQueryToolChest( ObjectMapper jsonMapper, IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator, Supplier configSupplier ) + { + this(jsonMapper, intervalChunkingQueryRunnerDecorator, configSupplier, new DefaultGenericQueryMetricsFactory(jsonMapper)); + } + + @Inject + public SelectQueryQueryToolChest( + ObjectMapper jsonMapper, + IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator, + Supplier configSupplier, + GenericQueryMetricsFactory queryMetricsFactory + ) { this.jsonMapper = jsonMapper; this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; this.configSupplier = configSupplier; + this.queryMetricsFactory = queryMetricsFactory; } @Override @@ -125,9 +138,9 @@ public class SelectQueryQueryToolChest extends QueryToolChest> makeMetrics(SelectQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index 2a98bb205f8..a6569baed71 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -20,19 +20,21 @@ package io.druid.query.timeboundary; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.metamx.emitter.service.ServiceMetricEvent; +import com.google.inject.Inject; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; -import io.druid.query.DataSourceUtil; -import io.druid.query.DruidMetrics; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryMetrics; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -58,6 +60,20 @@ public class TimeBoundaryQueryQueryToolChest { }; + private final GenericQueryMetricsFactory queryMetricsFactory; + + @VisibleForTesting + public TimeBoundaryQueryQueryToolChest() + { + this(DefaultGenericQueryMetricsFactory.instance()); + } + + @Inject + public TimeBoundaryQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory) + { + this.queryMetricsFactory = queryMetricsFactory; + } + @Override public List filterSegments(TimeBoundaryQuery query, List segments) { @@ -107,11 +123,9 @@ public class TimeBoundaryQueryQueryToolChest } @Override - public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query) + public QueryMetrics> makeMetrics(TimeBoundaryQuery query) { - return DruidMetrics.makePartialQueryTimeMetric(query) - .setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())) - .setDimension(DruidMetrics.TYPE, query.getType()); + return queryMetricsFactory.makeMetrics(query); } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java new file mode 100644 index 00000000000..addaac5a76f --- /dev/null +++ b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.timeseries; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.query.DefaultQueryMetrics; +import io.druid.query.DruidMetrics; + +public class DefaultTimeseriesQueryMetrics extends DefaultQueryMetrics + implements TimeseriesQueryMetrics +{ + public DefaultTimeseriesQueryMetrics(ObjectMapper jsonMapper) + { + super(jsonMapper); + } + + @Override + public void query(TimeseriesQuery query) + { + super.query(query); + numMetrics(query); + numComplexMetrics(query); + } + + @Override + public void numMetrics(TimeseriesQuery query) + { + builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + } + + @Override + public void numComplexMetrics(TimeseriesQuery query) + { + int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); + builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + } +} diff --git a/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsFactory.java new file mode 100644 index 00000000000..f3f99d35645 --- /dev/null +++ b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.timeseries; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.druid.jackson.DefaultObjectMapper; + +public class DefaultTimeseriesQueryMetricsFactory implements TimeseriesQueryMetricsFactory +{ + private static final TimeseriesQueryMetricsFactory INSTANCE = + new DefaultTimeseriesQueryMetricsFactory(new DefaultObjectMapper()); + + /** + * Should be used only in tests, directly or indirectly (via {@link + * TimeseriesQueryQueryToolChest#TimeseriesQueryQueryToolChest(io.druid.query.IntervalChunkingQueryRunnerDecorator)}). + */ + @VisibleForTesting + public static TimeseriesQueryMetricsFactory instance() + { + return INSTANCE; + } + + private final ObjectMapper jsonMapper; + + @Inject + public DefaultTimeseriesQueryMetricsFactory(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public TimeseriesQueryMetrics makeMetrics() + { + return new DefaultTimeseriesQueryMetrics(jsonMapper); + } +} diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetrics.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetrics.java new file mode 100644 index 00000000000..db7553525dc --- /dev/null +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetrics.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.timeseries; + +import io.druid.query.QueryMetrics; + +/** + * Specialization of {@link QueryMetrics} for {@link TimeseriesQuery}. + */ +public interface TimeseriesQueryMetrics extends QueryMetrics +{ + /** + * Sets the number of metrics of the given timeseries query as dimension. + */ + void numMetrics(TimeseriesQuery query); + + /** + * Sets the number of "complex" metrics of the given timeseries query as dimension. By default it is assumed that + * "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this + * method. + */ + void numComplexMetrics(TimeseriesQuery query); +} diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetricsFactory.java new file mode 100644 index 00000000000..d84f1542994 --- /dev/null +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryMetricsFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.timeseries; + +public interface TimeseriesQueryMetricsFactory +{ + + TimeseriesQueryMetrics makeMetrics(); +} diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 07bb0964a70..17fb626ad82 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -20,17 +20,16 @@ package io.druid.query.timeseries; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.CacheStrategy; -import io.druid.query.DruidMetrics; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -64,11 +63,22 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest implements TopNQueryMetrics +{ + + public DefaultTopNQueryMetrics(ObjectMapper jsonMapper) + { + super(jsonMapper); + } + + @Override + public void query(TopNQuery query) + { + super.query(query); + threshold(query); + dimension(query); + numMetrics(query); + numComplexMetrics(query); + } + + @Override + public void threshold(TopNQuery query) + { + builder.setDimension("threshold", String.valueOf(query.getThreshold())); + } + + @Override + public void dimension(TopNQuery query) + { + builder.setDimension("dimension", query.getDimensionSpec().getDimension()); + } + + @Override + public void numMetrics(TopNQuery query) + { + builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + } + + @Override + public void numComplexMetrics(TopNQuery query) + { + int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); + builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + } +} diff --git a/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetricsFactory.java new file mode 100644 index 00000000000..c56e2321bf3 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetricsFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import io.druid.jackson.DefaultObjectMapper; + +public class DefaultTopNQueryMetricsFactory implements TopNQueryMetricsFactory +{ + private static final TopNQueryMetricsFactory INSTANCE = new DefaultTopNQueryMetricsFactory(new DefaultObjectMapper()); + + /** + * Should be used only in tests, directly or indirectly (via {@link TopNQueryQueryToolChest#TopNQueryQueryToolChest( + * TopNQueryConfig, io.druid.query.IntervalChunkingQueryRunnerDecorator)}). + */ + @VisibleForTesting + public static TopNQueryMetricsFactory instance() + { + return INSTANCE; + } + + private final ObjectMapper jsonMapper; + + @Inject + public DefaultTopNQueryMetricsFactory(ObjectMapper jsonMapper) + { + this.jsonMapper = jsonMapper; + } + + @Override + public TopNQueryMetrics makeMetrics() + { + return new DefaultTopNQueryMetrics(jsonMapper); + } +} diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java b/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java new file mode 100644 index 00000000000..a3492f71742 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.query.QueryMetrics; + +/** + * Specialization of {@link QueryMetrics} for {@link TopNQuery}. + */ +public interface TopNQueryMetrics extends QueryMetrics +{ + /** + * Sets {@link TopNQuery#getThreshold()} of the given query as dimension. + */ + void threshold(TopNQuery query); + + /** + * Sets {@link TopNQuery#getDimensionSpec()}.{@link io.druid.query.dimension.DimensionSpec#getDimension() + * getDimension()} of the given query as dimension. + */ + void dimension(TopNQuery query); + + /** + * Sets the number of metrics of the given topN query as dimension. + */ + void numMetrics(TopNQuery query); + + /** + * Sets the number of "complex" metrics of the given topN query as dimension. By default it is assumed that "complex" + * metric is a metric of not long or double type, but it could be redefined in the implementation of this method. + */ + void numComplexMetrics(TopNQuery query); +} diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryMetricsFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryMetricsFactory.java new file mode 100644 index 00000000000..e0ba452e61e --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryMetricsFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +public interface TopNQueryMetricsFactory +{ + + TopNQueryMetrics makeMetrics(); +} diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 8d3d06ab348..a7826dbdb9d 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -20,13 +20,13 @@ package io.druid.query.topn; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.java.util.common.ISE; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; @@ -35,7 +35,6 @@ import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValue; import io.druid.query.CacheStrategy; -import io.druid.query.DruidMetrics; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -68,18 +67,30 @@ public class TopNQueryQueryToolChest extends QueryToolChest OBJECT_TYPE_REFERENCE = new TypeReference() { }; + private final TopNQueryConfig config; - private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator; + private final TopNQueryMetricsFactory queryMetricsFactory; - @Inject + @VisibleForTesting public TopNQueryQueryToolChest( TopNQueryConfig config, IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator ) + { + this(config, intervalChunkingQueryRunnerDecorator, DefaultTopNQueryMetricsFactory.instance()); + } + + @Inject + public TopNQueryQueryToolChest( + TopNQueryConfig config, + IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator, + TopNQueryMetricsFactory queryMetricsFactory + ) { this.config = config; this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; + this.queryMetricsFactory = queryMetricsFactory; } protected static String[] extractFactoryName(final List aggregatorFactories) @@ -140,22 +151,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest> queryMetrics = new DefaultQueryMetrics<>(new DefaultObjectMapper()); + TopNQuery query = new TopNQueryBuilder() + .dataSource("xx") + .granularity(Granularities.ALL) + .dimension(new ListFilteredDimensionSpec( + new DefaultDimensionSpec("tags", "tags"), + ImmutableSet.of("t3"), + null + )) + .metric("count") + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) + .threshold(5) + .filters(new SelectorDimFilter("tags", "t3", null)) + .build(); + queryMetrics.query(query); + + queryMetrics.reportQueryTime(0).emit(serviceEmitter); + Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals(12, actualEvent.size()); + Assert.assertTrue(actualEvent.containsKey("feed")); + Assert.assertTrue(actualEvent.containsKey("timestamp")); + Assert.assertEquals("", actualEvent.get("host")); + Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); + List expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals(); + List expectedStringIntervals = + expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList()); + Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL)); + Assert.assertEquals("true", actualEvent.get("hasFilters")); + Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration")); + Assert.assertEquals("", actualEvent.get(DruidMetrics.ID)); + Assert.assertEquals("query/time", actualEvent.get("metric")); + Assert.assertEquals(0L, actualEvent.get("value")); + } + + @Test + public void testDefaultQueryMetricsMetricNamesAndUnits() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultQueryMetrics> queryMetrics = new DefaultQueryMetrics<>(new DefaultObjectMapper()); + testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics); + } + + public static void testQueryMetricsDefaultMetricNamesAndUnits( + CachingEmitter cachingEmitter, + ServiceEmitter serviceEmitter, + QueryMetrics> queryMetrics + ) + { + queryMetrics.reportQueryTime(1000001).emit(serviceEmitter); + Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/time", actualEvent.get("metric")); + // query/time and most metrics below are measured in milliseconds by default + Assert.assertEquals(1L, actualEvent.get("value")); + + queryMetrics.reportWaitTime(2000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/wait/time", actualEvent.get("metric")); + Assert.assertEquals(2L, actualEvent.get("value")); + + queryMetrics.reportSegmentTime(3000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/segment/time", actualEvent.get("metric")); + Assert.assertEquals(3L, actualEvent.get("value")); + + queryMetrics.reportSegmentAndCacheTime(4000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/segmentAndCache/time", actualEvent.get("metric")); + Assert.assertEquals(4L, actualEvent.get("value")); + + queryMetrics.reportIntervalChunkTime(5000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/intervalChunk/time", actualEvent.get("metric")); + Assert.assertEquals(5L, actualEvent.get("value")); + + queryMetrics.reportCpuTime(6000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/cpu/time", actualEvent.get("metric")); + // CPU time is measured in microseconds by default + Assert.assertEquals(6000L, actualEvent.get("value")); + + queryMetrics.reportNodeTimeToFirstByte(7000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/ttfb", actualEvent.get("metric")); + Assert.assertEquals(7L, actualEvent.get("value")); + + queryMetrics.reportNodeTime(8000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/time", actualEvent.get("metric")); + Assert.assertEquals(8L, actualEvent.get("value")); + + queryMetrics.reportQueryBytes(9).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/bytes", actualEvent.get("metric")); + Assert.assertEquals(9L, actualEvent.get("value")); + + queryMetrics.reportNodeBytes(10).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); + Assert.assertEquals(10L, actualEvent.get("value")); + } +} diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index 99b91fd8d33..e05caa4336d 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -29,7 +29,9 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.Druids; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -117,6 +119,7 @@ public class DataSourceMetadataQueryTest ; final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( (QueryRunnerFactory) new DataSourceMetadataQueryRunnerFactory( + new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()), QueryRunnerTestHelper.NOOP_QUERYWATCHER ), new IncrementalIndexSegment(rtIndex, "test"), null @@ -147,7 +150,10 @@ public class DataSourceMetadataQueryTest @Test public void testFilterSegments() { - List segments = new DataSourceQueryQueryToolChest().filterSegments( + GenericQueryMetricsFactory queryMetricsFactory = DefaultGenericQueryMetricsFactory.instance(); + DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest(queryMetricsFactory); + List segments = toolChest + .filterSegments( null, Arrays.asList( new LogicalSegment() diff --git a/processing/src/test/java/io/druid/query/groupby/DefaultGroupByQueryMetricsTest.java b/processing/src/test/java/io/druid/query/groupby/DefaultGroupByQueryMetricsTest.java new file mode 100644 index 00000000000..f04600c2818 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/DefaultGroupByQueryMetricsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.granularity.PeriodGranularity; +import io.druid.query.CachingEmitter; +import io.druid.query.DefaultQueryMetricsTest; +import io.druid.query.DruidMetrics; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.MapLookupExtractor; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.lookup.LookupExtractionFn; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +public class DefaultGroupByQueryMetricsTest +{ + + /** + * Tests that passed a query {@link DefaultGroupByQueryMetrics} produces events with a certain set of dimensions, + * no more, no less. + */ + @Test + public void testDefaultGroupByQueryMetricsQuery() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(new DefaultObjectMapper()); + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions( + Lists.newArrayList( + new ExtractionDimensionSpec( + "quality", + "alias", + new LookupExtractionFn( + new MapLookupExtractor( + ImmutableMap.of( + "mezzanine", + "mezzanine0" + ), + false + ), false, null, true, + false + ) + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setDimFilter(new SelectorDimFilter("quality", "mezzanine", null)) + .setContext(ImmutableMap.of("bySegment", true)); + GroupByQuery query = builder.build(); + queryMetrics.query(query); + + queryMetrics.reportQueryTime(0).emit(serviceEmitter); + Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals(15, actualEvent.size()); + Assert.assertTrue(actualEvent.containsKey("feed")); + Assert.assertTrue(actualEvent.containsKey("timestamp")); + Assert.assertEquals("", actualEvent.get("host")); + Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals(QueryRunnerTestHelper.dataSource, actualEvent.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); + Interval expectedInterval = new Interval("2011-04-02/2011-04-04"); + Assert.assertEquals(Collections.singletonList(expectedInterval.toString()), actualEvent.get(DruidMetrics.INTERVAL)); + Assert.assertEquals("true", actualEvent.get("hasFilters")); + Assert.assertEquals(expectedInterval.toDuration().toString(), actualEvent.get("duration")); + Assert.assertEquals("", actualEvent.get(DruidMetrics.ID)); + + // GroupBy-specific dimensions + Assert.assertEquals("1", actualEvent.get("numDimensions")); + Assert.assertEquals("2", actualEvent.get("numMetrics")); + Assert.assertEquals("0", actualEvent.get("numComplexMetrics")); + + // Metric + Assert.assertEquals("query/time", actualEvent.get("metric")); + Assert.assertEquals(0L, actualEvent.get("value")); + } + + @Test + public void testDefaultGroupByQueryMetricsMetricNamesAndUnits() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(new DefaultObjectMapper()); + DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics); + } +} diff --git a/processing/src/test/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java b/processing/src/test/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java new file mode 100644 index 00000000000..fb03645c067 --- /dev/null +++ b/processing/src/test/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetricsTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.timeseries; + +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.CachingEmitter; +import io.druid.query.DefaultQueryMetricsTest; +import io.druid.query.DruidMetrics; +import io.druid.query.Druids; +import io.druid.query.QueryRunnerTestHelper; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DefaultTimeseriesQueryMetricsTest +{ + + /** + * Tests that passed a query {@link DefaultTimeseriesQueryMetrics} produces events with a certain set of dimensions, + * no more, no less. + */ + @Test + public void testDefaultTimeseriesQueryMetricsQuery() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(new DefaultObjectMapper()); + TimeseriesQuery query = Druids + .newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexDoubleSum + ) + ) + .postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant)) + .descending(true) + .build(); + queryMetrics.query(query); + + queryMetrics.reportQueryTime(0).emit(serviceEmitter); + Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals(14, actualEvent.size()); + Assert.assertTrue(actualEvent.containsKey("feed")); + Assert.assertTrue(actualEvent.containsKey("timestamp")); + Assert.assertEquals("", actualEvent.get("host")); + Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals(QueryRunnerTestHelper.dataSource, actualEvent.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); + List expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals(); + List expectedStringIntervals = + expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList()); + Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL)); + Assert.assertEquals("false", actualEvent.get("hasFilters")); + Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration")); + Assert.assertEquals("", actualEvent.get(DruidMetrics.ID)); + + // Timeseries-specific dimensions + Assert.assertEquals("2", actualEvent.get("numMetrics")); + Assert.assertEquals("0", actualEvent.get("numComplexMetrics")); + + // Metric + Assert.assertEquals("query/time", actualEvent.get("metric")); + Assert.assertEquals(0L, actualEvent.get("value")); + } + + @Test + public void testDefaultTimeseriesQueryMetricsMetricNamesAndUnits() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(new DefaultObjectMapper()); + DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics); + } +} diff --git a/processing/src/test/java/io/druid/query/topn/DefaultTopNQueryMetricsTest.java b/processing/src/test/java/io/druid/query/topn/DefaultTopNQueryMetricsTest.java new file mode 100644 index 00000000000..185795fc1db --- /dev/null +++ b/processing/src/test/java/io/druid/query/topn/DefaultTopNQueryMetricsTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import com.google.common.collect.ImmutableSet; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.CachingEmitter; +import io.druid.query.DefaultQueryMetricsTest; +import io.druid.query.DruidMetrics; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.ListFilteredDimensionSpec; +import io.druid.query.filter.SelectorDimFilter; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DefaultTopNQueryMetricsTest +{ + + /** + * Tests that passed a query {@link DefaultTopNQueryMetrics} produces events with a certain set of dimensions, + * no more, no less. + */ + @Test + public void testDefaultTopNQueryMetricsQuery() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(new DefaultObjectMapper()); + TopNQuery query = new TopNQueryBuilder() + .dataSource("xx") + .granularity(Granularities.ALL) + .dimension(new ListFilteredDimensionSpec( + new DefaultDimensionSpec("tags", "tags"), + ImmutableSet.of("t3"), + null + )) + .metric("count") + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) + .threshold(5) + .filters(new SelectorDimFilter("tags", "t3", null)) + .build(); + queryMetrics.query(query); + + queryMetrics.reportQueryTime(0).emit(serviceEmitter); + Map actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals(16, actualEvent.size()); + Assert.assertTrue(actualEvent.containsKey("feed")); + Assert.assertTrue(actualEvent.containsKey("timestamp")); + Assert.assertEquals("", actualEvent.get("host")); + Assert.assertEquals("", actualEvent.get("service")); + Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE)); + Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE)); + List expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals(); + List expectedStringIntervals = + expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList()); + Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL)); + Assert.assertEquals("true", actualEvent.get("hasFilters")); + Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration")); + Assert.assertEquals("", actualEvent.get(DruidMetrics.ID)); + + // TopN-specific dimensions + Assert.assertEquals("5", actualEvent.get("threshold")); + Assert.assertEquals("tags", actualEvent.get("dimension")); + Assert.assertEquals("1", actualEvent.get("numMetrics")); + Assert.assertEquals("0", actualEvent.get("numComplexMetrics")); + + // Metric + Assert.assertEquals("query/time", actualEvent.get("metric")); + Assert.assertEquals(0L, actualEvent.get("value")); + } + + @Test + public void testDefaultTopNQueryMetricsMetricNamesAndUnits() + { + CachingEmitter cachingEmitter = new CachingEmitter(); + ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter); + DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(new DefaultObjectMapper()); + DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics); + } +} diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 358f2685f81..cd6401462a0 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -36,14 +36,12 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; - import io.druid.java.util.common.IAE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RE; @@ -56,6 +54,7 @@ import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -83,6 +82,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -161,14 +161,14 @@ public class DirectDruidClient implements QueryRunner try { log.debug("Querying queryId[%s] url[%s]", query.getId(), url); - final long requestStartTime = System.currentTimeMillis(); + final long requestStartTimeNs = System.nanoTime(); - final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query); - builder.setDimension("server", host); + final QueryMetrics> queryMetrics = toolChest.makeMetrics(query); + queryMetrics.server(host); final HttpResponseHandler responseHandler = new HttpResponseHandler() { - private long responseStartTime; + private long responseStartTimeNs; private final AtomicLong byteCount = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); @@ -177,8 +177,8 @@ public class DirectDruidClient implements QueryRunner public ClientResponse handleResponse(HttpResponse response) { log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); - responseStartTime = System.currentTimeMillis(); - emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime)); + responseStartTimeNs = System.nanoTime(); + queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); try { final String responseContext = response.headers().get("X-Druid-Response-Context"); @@ -267,17 +267,19 @@ public class DirectDruidClient implements QueryRunner @Override public ClientResponse done(ClientResponse clientResponse) { - long stopTime = System.currentTimeMillis(); + long stopTimeNs = System.nanoTime(); + long nodeTimeNs = stopTimeNs - responseStartTimeNs; log.debug( "Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", query.getId(), url, byteCount.get(), - stopTime - responseStartTime, - byteCount.get() / (0.0001 * (stopTime - responseStartTime)) + TimeUnit.NANOSECONDS.toMillis(nodeTimeNs), + byteCount.get() / TimeUnit.NANOSECONDS.toSeconds(nodeTimeNs) ); - emitter.emit(builder.build("query/node/time", stopTime - requestStartTime)); - emitter.emit(builder.build("query/node/bytes", byteCount.get())); + queryMetrics.reportNodeTime(nodeTimeNs); + queryMetrics.reportNodeBytes(byteCount.get()); + queryMetrics.emit(emitter); synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out diff --git a/server/src/main/java/io/druid/guice/QueryToolChestModule.java b/server/src/main/java/io/druid/guice/QueryToolChestModule.java index 3eac4509904..0b7d84fe003 100644 --- a/server/src/main/java/io/druid/guice/QueryToolChestModule.java +++ b/server/src/main/java/io/druid/guice/QueryToolChestModule.java @@ -23,14 +23,18 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.datasourcemetadata.DataSourceMetadataQuery; import io.druid.query.datasourcemetadata.DataSourceQueryQueryToolChest; +import io.druid.query.groupby.DefaultGroupByQueryMetricsFactory; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryMetricsFactory; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.metadata.SegmentMetadataQueryConfig; import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest; @@ -43,10 +47,14 @@ import io.druid.query.select.SelectQueryConfig; import io.druid.query.select.SelectQueryQueryToolChest; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest; +import io.druid.query.timeseries.DefaultTimeseriesQueryMetricsFactory; import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryMetricsFactory; import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.topn.DefaultTopNQueryMetricsFactory; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNQueryConfig; +import io.druid.query.topn.TopNQueryMetricsFactory; import io.druid.query.topn.TopNQueryQueryToolChest; import java.util.Map; @@ -79,6 +87,11 @@ public class QueryToolChestModule implements Module binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); + binder.bind(GenericQueryMetricsFactory.class).to(DefaultGenericQueryMetricsFactory.class); + binder.bind(TopNQueryMetricsFactory.class).to(DefaultTopNQueryMetricsFactory.class); + binder.bind(GroupByQueryMetricsFactory.class).to(DefaultGroupByQueryMetricsFactory.class); + binder.bind(TimeseriesQueryMetricsFactory.class).to(DefaultTimeseriesQueryMetricsFactory.class); + JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 4b1ccef9553..37ef7489831 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,12 +22,10 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; @@ -40,6 +38,7 @@ import io.druid.query.CPUTimeMetricQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -60,7 +59,6 @@ import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; @@ -166,15 +164,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker } final QueryToolChest> toolChest = factory.getToolchest(); - final Function, ServiceMetricEvent.Builder> builderFn = - new Function, ServiceMetricEvent.Builder>() - { - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolChest.makeMetricBuilder(query); - } - }; final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); @@ -262,7 +251,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ) ) ), - builderFn, + toolChest, sinkSegmentIdentifier, cpuTimeAccumulator ), @@ -273,7 +262,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ) ) ), - builderFn, + toolChest, emitter, cpuTimeAccumulator, true @@ -286,14 +275,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker */ private QueryRunner withPerSinkMetrics( final QueryRunner sinkRunner, - final Function, ServiceMetricEvent.Builder> builderFn, + final QueryToolChest> queryToolChest, final String sinkSegmentIdentifier, final AtomicLong cpuTimeAccumulator ) { - final ImmutableMap dims = ImmutableMap.of("segment", sinkSegmentIdentifier); - // Note: query/segmentAndCache/time and query/segment/time are effectively the same here. They don't split apart + // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart // cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this // better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the // sinkRunner. @@ -301,18 +289,18 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker return CPUTimeMetricQueryRunner.safeBuild( new MetricsEmittingQueryRunner<>( emitter, - builderFn, + queryToolChest, new MetricsEmittingQueryRunner<>( emitter, - builderFn, + queryToolChest, sinkRunner, - "query/segment/time", - dims + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier) ), - "query/segmentAndCache/time", - dims + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier) ).withWaitMeasuredFromNow(), - builderFn, + queryToolChest, emitter, cpuTimeAccumulator, false diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index be1a2cdad60..20a54c9f376 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -32,7 +32,9 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.query.DruidMetrics; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryToolChestWarehouse; import io.druid.server.log.RequestLogger; import io.druid.server.metrics.QueryCountStatsProvider; @@ -103,6 +105,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private final DruidHttpClientConfig httpClientConfig; private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private final GenericQueryMetricsFactory queryMetricsFactory; private HttpClient broadcastClient; @@ -115,7 +118,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu @Router Provider httpClientProvider, DruidHttpClientConfig httpClientConfig, ServiceEmitter emitter, - RequestLogger requestLogger + RequestLogger requestLogger, + GenericQueryMetricsFactory queryMetricsFactory ) { this.warehouse = warehouse; @@ -126,6 +130,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu this.httpClientConfig = httpClientConfig; this.emitter = emitter; this.requestLogger = requestLogger; + this.queryMetricsFactory = queryMetricsFactory; } @Override @@ -278,7 +283,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu { final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); if (query != null) { - return newMetricsEmittingProxyResponseListener(request, response, query, System.currentTimeMillis()); + return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime()); } else { return super.newProxyResponseListener(request, response); } @@ -331,10 +336,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu HttpServletRequest request, HttpServletResponse response, Query query, - long start + long startNs ) { - return new MetricsEmittingProxyResponseListener(request, response, query, start); + return new MetricsEmittingProxyResponseListener(request, response, query, startNs); } @Override @@ -361,13 +366,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private final HttpServletRequest req; private final HttpServletResponse res; private final Query query; - private final long start; + private final long startNs; public MetricsEmittingProxyResponseListener( HttpServletRequest request, HttpServletResponse response, Query query, - long start + long startNs ) { super(request, response); @@ -375,13 +380,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu this.req = request; this.res = response; this.query = query; - this.start = start; + this.startNs = startNs; } @Override public void onComplete(Result result) { - final long requestTime = System.currentTimeMillis() - start; + final long requestTimeNs = System.nanoTime() - startNs; try { boolean success = result.isSucceeded(); if (success) { @@ -389,10 +394,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu } else { failedQueryCount.incrementAndGet(); } - emitter.emit( - DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr()) - .build("query/time", requestTime) + QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + warehouse.getToolChest(query), + query, + req.getRemoteAddr() ); + queryMetrics.reportQueryTime(requestTimeNs).emit(emitter); requestLogger.log( new RequestLogLine( new DateTime(), @@ -401,7 +409,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu new QueryStats( ImmutableMap.of( "query/time", - requestTime, + TimeUnit.NANOSECONDS.toMillis(requestTimeNs), "success", success && result.getResponse().getStatus() == javax.ws.rs.core.Response.Status.OK.getStatusCode() diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java index 033b372ec62..d94966a902a 100644 --- a/server/src/main/java/io/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -29,6 +29,7 @@ import io.druid.client.TimelineServerView; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.Query; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; import io.druid.server.http.security.StateResourceFilter; @@ -67,10 +68,22 @@ public class BrokerQueryResource extends QueryResource RequestLogger requestLogger, QueryManager queryManager, AuthConfig authConfig, + GenericQueryMetricsFactory queryMetricsFactory, TimelineServerView brokerServerView ) { - super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig); + super( + warehouse, + config, + jsonMapper, + smileMapper, + texasRanger, + emitter, + requestLogger, + queryManager, + authConfig, + queryMetricsFactory + ); this.brokerServerView = brokerServerView; } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 4720b67176f..2bd7d192cbb 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -38,9 +38,11 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.Yielders; import io.druid.query.DruidMetrics; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryMetrics; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -74,6 +76,7 @@ import java.io.OutputStream; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -99,6 +102,7 @@ public class QueryResource implements QueryCountStatsProvider protected final RequestLogger requestLogger; protected final QueryManager queryManager; protected final AuthConfig authConfig; + private final GenericQueryMetricsFactory queryMetricsFactory; private final AtomicLong successfulQueryCount = new AtomicLong(); private final AtomicLong failedQueryCount = new AtomicLong(); private final AtomicLong interruptedQueryCount = new AtomicLong(); @@ -113,7 +117,8 @@ public class QueryResource implements QueryCountStatsProvider ServiceEmitter emitter, RequestLogger requestLogger, QueryManager queryManager, - AuthConfig authConfig + AuthConfig authConfig, + GenericQueryMetricsFactory queryMetricsFactory ) { this.warehouse = warehouse; @@ -125,6 +130,7 @@ public class QueryResource implements QueryCountStatsProvider this.requestLogger = requestLogger; this.queryManager = queryManager; this.authConfig = authConfig; + this.queryMetricsFactory = queryMetricsFactory; } @DELETE @@ -170,7 +176,7 @@ public class QueryResource implements QueryCountStatsProvider @Context final HttpServletRequest req // used to get request content-type, remote address and AuthorizationInfo ) throws IOException { - final long start = System.currentTimeMillis(); + final long startNs = System.nanoTime(); Query query = null; QueryToolChest toolChest = null; String queryId = null; @@ -261,25 +267,31 @@ public class QueryResource implements QueryCountStatsProvider os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. os.close(); successfulQueryCount.incrementAndGet(); - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .setDimension("success", "true") - .build("query/time", queryTime) - ); - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/bytes", os.getCount()) + final long queryTimeNs = System.nanoTime() - startNs; + QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + theToolChest, + theQuery, + req.getRemoteAddr() ); + queryMetrics.success(true); + queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); + + DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + theToolChest, + theQuery, + req.getRemoteAddr() + ).reportQueryBytes(os.getCount()).emit(emitter); requestLogger.log( new RequestLogLine( - new DateTime(start), + new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), req.getRemoteAddr(), theQuery, new QueryStats( ImmutableMap.of( - "query/time", queryTime, + "query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs), "query/bytes", os.getCount(), "success", true ) @@ -328,21 +340,24 @@ public class QueryResource implements QueryCountStatsProvider try { log.warn(e, "Exception while processing queryId [%s]", queryId); interruptedQueryCount.incrementAndGet(); - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) - .setDimension("success", "false") - .build("query/time", queryTime) + final long queryTimeNs = System.nanoTime() - startNs; + QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + toolChest, + query, + req.getRemoteAddr() ); + queryMetrics.success(false); + queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); requestLogger.log( new RequestLogLine( - new DateTime(start), + new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), req.getRemoteAddr(), query, new QueryStats( ImmutableMap.of( "query/time", - queryTime, + TimeUnit.NANOSECONDS.toMillis(queryTimeNs), "success", false, "interrupted", @@ -370,20 +385,23 @@ public class QueryResource implements QueryCountStatsProvider failedQueryCount.incrementAndGet(); try { - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) - .setDimension("success", "false") - .build("query/time", queryTime) + final long queryTimeNs = System.nanoTime() - startNs; + QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + toolChest, + query, + req.getRemoteAddr() ); + queryMetrics.success(false); + queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); requestLogger.log( new RequestLogLine( - new DateTime(start), + new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), req.getRemoteAddr(), query, new QueryStats(ImmutableMap.of( "query/time", - queryTime, + TimeUnit.NANOSECONDS.toMillis(queryTimeNs), "success", false, "exception", diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 9360c4e2171..65d94aab0d1 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -21,13 +21,11 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; @@ -44,6 +42,7 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -255,7 +254,6 @@ public class ServerManager implements QuerySegmentWalker } final QueryToolChest> toolChest = factory.getToolchest(); - final Function, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); DataSource dataSource = query.getDataSource(); @@ -312,7 +310,6 @@ public class ServerManager implements QuerySegmentWalker holder.getVersion(), input.getChunkNumber() ), - builderFn, cpuTimeAccumulator ); } @@ -327,7 +324,7 @@ public class ServerManager implements QuerySegmentWalker toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), toolChest ), - builderFn, + toolChest, emitter, cpuTimeAccumulator, true @@ -362,7 +359,6 @@ public class ServerManager implements QuerySegmentWalker return new NoopQueryRunner(); } - final Function, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); FunctionalIterable> queryRunners = FunctionalIterable @@ -390,7 +386,7 @@ public class ServerManager implements QuerySegmentWalker final ReferenceCountingSegment adapter = chunk.getObject(); return Arrays.asList( - buildAndDecorateQueryRunner(factory, toolChest, adapter, input, builderFn, cpuTimeAccumulator) + buildAndDecorateQueryRunner(factory, toolChest, adapter, input, cpuTimeAccumulator) ); } } @@ -401,7 +397,7 @@ public class ServerManager implements QuerySegmentWalker toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), toolChest ), - builderFn, + toolChest, emitter, cpuTimeAccumulator, true @@ -413,65 +409,45 @@ public class ServerManager implements QuerySegmentWalker final QueryToolChest> toolChest, final ReferenceCountingSegment adapter, final SegmentDescriptor segmentDescriptor, - final Function, ServiceMetricEvent.Builder> builderFn, final AtomicLong cpuTimeAccumulator ) { SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor); + String segmentId = adapter.getIdentifier(); return CPUTimeMetricQueryRunner.safeBuild( new SpecificSegmentQueryRunner( new MetricsEmittingQueryRunner( emitter, - builderFn, + toolChest, new BySegmentQueryRunner( - adapter.getIdentifier(), + segmentId, adapter.getDataInterval().getStart(), new CachingQueryRunner( - adapter.getIdentifier(), + segmentId, segmentDescriptor, objectMapper, cache, toolChest, new MetricsEmittingQueryRunner( emitter, - new Function, ServiceMetricEvent.Builder>() - { - @Override - public ServiceMetricEvent.Builder apply(@Nullable final Query input) - { - return toolChest.makeMetricBuilder(input); - } - }, + toolChest, new ReferenceCountingSegmentQueryRunner(factory, adapter, segmentDescriptor), - "query/segment/time", - ImmutableMap.of("segment", adapter.getIdentifier()) + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(segmentId) ), cachingExec, cacheConfig ) ), - "query/segmentAndCache/time", - ImmutableMap.of("segment", adapter.getIdentifier()) + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(segmentId) ).withWaitMeasuredFromNow(), segmentSpec ), - builderFn, + toolChest, emitter, cpuTimeAccumulator, false ); } - - private static Function, ServiceMetricEvent.Builder> getBuilderFn(final QueryToolChest> toolChest) - { - return new Function, ServiceMetricEvent.Builder>() - { - @Nullable - @Override - public ServiceMetricEvent.Builder apply(@Nullable Query input) - { - return toolChest.makeMetricBuilder(input); - } - }; - } } diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index b2d19a28bae..0ac353aba18 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -41,6 +41,7 @@ import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.initialization.Initialization; import io.druid.java.util.common.lifecycle.Lifecycle; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryToolChest; @@ -215,10 +216,11 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest } }; + ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); ServletHolder holder = new ServletHolder( new AsyncQueryForwardingServlet( new MapQueryToolChestWarehouse(ImmutableMap., QueryToolChest>of()), - injector.getInstance(ObjectMapper.class), + jsonMapper, injector.getInstance(Key.get(ObjectMapper.class, Smile.class)), hostFinder, injector.getProvider(org.eclipse.jetty.client.HttpClient.class), @@ -231,7 +233,8 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest { // noop } - } + }, + new DefaultGenericQueryMetricsFactory(jsonMapper) ) { @Override diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index 1e48d345394..f8ca466b97a 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -29,6 +29,7 @@ import io.druid.concurrent.Execs; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -139,7 +140,8 @@ public class QueryResourceTest new NoopServiceEmitter(), new NoopRequestLogger(), queryManager, - new AuthConfig() + new AuthConfig(), + new DefaultGenericQueryMetricsFactory(jsonMapper) ); } @@ -213,7 +215,8 @@ public class QueryResourceTest new NoopServiceEmitter(), new NoopRequestLogger(), queryManager, - new AuthConfig(true) + new AuthConfig(true), + new DefaultGenericQueryMetricsFactory(jsonMapper) ); Response response = queryResource.doPost( @@ -283,7 +286,8 @@ public class QueryResourceTest new NoopServiceEmitter(), new NoopRequestLogger(), queryManager, - new AuthConfig(true) + new AuthConfig(true), + new DefaultGenericQueryMetricsFactory(jsonMapper) ); final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," @@ -379,7 +383,8 @@ public class QueryResourceTest new NoopServiceEmitter(), new NoopRequestLogger(), queryManager, - new AuthConfig(true) + new AuthConfig(true), + new DefaultGenericQueryMetricsFactory(jsonMapper) ); final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 7018d998025..7656e7583af 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.LocalCacheProvider; import io.druid.jackson.DefaultObjectMapper; @@ -43,9 +42,11 @@ import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.guava.YieldingSequenceBase; import io.druid.query.ConcatQueryRunner; +import io.druid.query.DefaultQueryMetrics; import io.druid.query.Druids; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -573,9 +574,9 @@ public class ServerManagerTest } @Override - public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query) + public QueryMetrics> makeMetrics(QueryType query) { - return new ServiceMetricEvent.Builder(); + return new DefaultQueryMetrics<>(new DefaultObjectMapper()); } @Override