QueryMetrics: abstraction layer of query metrics emitting (part of #3798) (#3954)

* QueryMetrics: abstraction layer of query metrics emitting

* Minor fixes

* QueryMetrics.emit() for bulk emit and improve Javadoc

* Fixes

* Fix

* Javadoc fixes

* Typo

* Use DefaultObjectMapper

* Add tests

* Address PR comments

* Remove QueryMetrics.userDimensions(); Rename QueryMetric.register() to report()

* Dedicated TopNQueryMetricsFactory, GroupByQueryMetricsFactory and TimeseriesQueryMetricsFactory

* Typo

* More elaborate Javadoc of QueryMetrics

* Formatting

* Replace QueryMetric enum with lambdas

* Add comments and VisibleForTesting annotations
This commit is contained in:
Roman Leventov 2017-03-23 18:23:59 -06:00 committed by Charles Allen
parent c9fc7d1709
commit 4b5ae31207
50 changed files with 2073 additions and 331 deletions

View File

@ -21,12 +21,13 @@ package io.druid.query.scan;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.google.inject.Inject;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
@ -39,6 +40,14 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
};
private final GenericQueryMetricsFactory queryMetricsFactory;
@Inject
public ScanQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
@ -74,9 +83,9 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(ScanQuery query)
public QueryMetrics<Query<?>> makeMetrics(ScanQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -27,6 +27,7 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
@ -63,7 +64,9 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class MultiSegmentScanQueryTest
{
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(
DefaultGenericQueryMetricsFactory.instance()
);
private static final QueryRunnerFactory<ScanResultValue, ScanQuery> factory = new ScanQueryRunnerFactory(
toolChest,

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.TableDataSource;
@ -95,7 +96,9 @@ public class ScanQueryRunnerTest
);
public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class);
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(
DefaultGenericQueryMetricsFactory.instance()
);
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException

View File

@ -19,11 +19,8 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.utils.VMUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
@ -36,14 +33,14 @@ import java.util.concurrent.atomic.AtomicLong;
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final ServiceEmitter emitter;
private final AtomicLong cpuTimeAccumulator;
private final boolean report;
private CPUTimeMetricQueryRunner(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong cpuTimeAccumulator,
boolean report
@ -53,7 +50,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
this.builderFn = builderFn;
this.queryToolChest = queryToolChest;
this.emitter = emitter;
this.cpuTimeAccumulator = cpuTimeAccumulator == null ? new AtomicLong(0L) : cpuTimeAccumulator;
this.report = report;
@ -85,10 +82,9 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
public void after(boolean isDone, Throwable thrown) throws Exception
{
if (report) {
final long cpuTime = cpuTimeAccumulator.get();
if (cpuTime > 0) {
final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query));
emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000));
final long cpuTimeNs = cpuTimeAccumulator.get();
if (cpuTimeNs > 0) {
queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter);
}
}
}
@ -98,7 +94,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong accumulator,
boolean report
@ -107,7 +103,7 @@ public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
if (!VMUtils.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);
return new CPUTimeMetricQueryRunner<>(delegate, queryToolChest, emitter, accumulator, report);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.druid.jackson.DefaultObjectMapper;
public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFactory
{
private static final GenericQueryMetricsFactory INSTANCE =
new DefaultGenericQueryMetricsFactory(new DefaultObjectMapper());
/**
* Should be used only in tests, directly or indirectly (e. g. in {@link
* io.druid.query.search.SearchQueryQueryToolChest#SearchQueryQueryToolChest(
* io.druid.query.search.search.SearchQueryConfig, IntervalChunkingQueryRunnerDecorator)}).
*/
@VisibleForTesting
public static GenericQueryMetricsFactory instance()
{
return INSTANCE;
}
private final ObjectMapper jsonMapper;
@Inject
public DefaultGenericQueryMetricsFactory(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public QueryMetrics<Query<?>> makeMetrics(Query<?> query)
{
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(jsonMapper);
queryMetrics.query(query);
return queryMetrics;
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
{
protected final ObjectMapper jsonMapper;
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
public DefaultQueryMetrics(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public void query(QueryType query)
{
dataSource(query);
queryType(query);
interval(query);
hasFilters(query);
duration(query);
queryId(query);
}
@Override
public void dataSource(QueryType query)
{
builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
}
@Override
public void queryType(QueryType query)
{
builder.setDimension(DruidMetrics.TYPE, query.getType());
}
@Override
public void interval(QueryType query)
{
builder.setDimension(
DruidMetrics.INTERVAL,
query.getIntervals().stream().map(Interval::toString).toArray(String[]::new)
);
}
@Override
public void hasFilters(QueryType query)
{
builder.setDimension("hasFilters", String.valueOf(query.hasFilters()));
}
@Override
public void duration(QueryType query)
{
builder.setDimension("duration", query.getDuration().toString());
}
@Override
public void queryId(QueryType query)
{
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
}
@Override
public void context(QueryType query)
{
try {
builder.setDimension(
"context",
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public void server(String host)
{
builder.setDimension("server", host);
}
@Override
public void remoteAddress(String remoteAddress)
{
builder.setDimension("remoteAddress", remoteAddress);
}
@Override
public void status(String status)
{
builder.setDimension(DruidMetrics.STATUS, status);
}
@Override
public void success(boolean success)
{
builder.setDimension("success", String.valueOf(success));
}
@Override
public void segment(String segmentIdentifier)
{
builder.setDimension("segment", segmentIdentifier);
}
@Override
public void chunkInterval(Interval interval)
{
builder.setDimension("chunkInterval", interval.toString());
}
@Override
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
{
return defaultTimeMetric("query/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
{
metrics.put("query/bytes", byteCount);
return this;
}
@Override
public QueryMetrics<QueryType> reportWaitTime(long timeNs)
{
return defaultTimeMetric("query/wait/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentTime(long timeNs)
{
return defaultTimeMetric("query/segment/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
{
return defaultTimeMetric("query/segmentAndCache/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs)
{
return defaultTimeMetric("query/intervalChunk/time", timeNs);
}
@Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{
metrics.put("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
return this;
}
@Override
public QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs)
{
return defaultTimeMetric("query/node/ttfb", timeNs);
}
@Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{
return defaultTimeMetric("query/node/time", timeNs);
}
private QueryMetrics<QueryType> defaultTimeMetric(String metricName, long timeNs)
{
metrics.put(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
return this;
}
@Override
public QueryMetrics<QueryType> reportNodeBytes(long byteCount)
{
metrics.put("query/node/bytes", byteCount);
return this;
}
@Override
public void emit(ServiceEmitter emitter)
{
for (Map.Entry<String, Number> metric : metrics.entrySet()) {
emitter.emit(builder.build(metric.getKey(), metric.getValue()));
}
metrics.clear();
}
}

View File

@ -20,14 +20,7 @@
package io.druid.query;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;
import java.util.List;
@ -61,50 +54,21 @@ public class DruidMetrics
return retVal;
}
public static <T> ServiceMetricEvent.Builder makePartialQueryTimeMetric(Query<T> query)
{
return new ServiceMetricEvent.Builder()
.setDimension(DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension(TYPE, query.getType())
.setDimension(
INTERVAL,
Lists.transform(
query.getIntervals(),
new Function<Interval, String>()
{
@Override
public String apply(Interval input)
{
return input.toString();
}
}
).toArray(new String[query.getIntervals().size()])
)
.setDimension("hasFilters", String.valueOf(query.hasFilters()))
.setDimension("duration", query.getDuration().toString())
.setDimension(ID, Strings.nullToEmpty(query.getId()));
}
public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(
public static <T> QueryMetrics<?> makeRequestMetrics(
final GenericQueryMetricsFactory queryMetricsFactory,
final QueryToolChest<T, Query<T>> toolChest,
final ObjectMapper jsonMapper,
final Query<T> query,
final String remoteAddr
) throws JsonProcessingException
{
final ServiceMetricEvent.Builder baseMetric = toolChest == null
? makePartialQueryTimeMetric(query)
: toolChest.makeMetricBuilder(query);
return baseMetric
.setDimension(
"context",
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setDimension("remoteAddress", remoteAddr);
QueryMetrics<? super Query<T>> queryMetrics;
if (toolChest != null) {
queryMetrics = toolChest.makeMetrics(query);
} else {
queryMetrics = queryMetricsFactory.makeMetrics(query);
}
queryMetrics.context(query);
queryMetrics.remoteAddress(remoteAddr);
return queryMetrics;
}
}

View File

@ -19,12 +19,9 @@
package io.druid.query;
import com.google.common.base.Function;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.guava.Sequence;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@ -90,15 +87,7 @@ public class FluentQueryRunnerBuilder<T>
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
{
return toolChest.makeMetricBuilder(tQuery);
}
},
toolChest,
emitter,
new AtomicLong(0L),
true

View File

@ -0,0 +1,33 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
/**
* This factory is used for DI of custom {@link QueryMetrics} implementations for all query types, which don't (yet)
* need to emit custom dimensions and/or metrics, i. e. they are good with the generic {@link QueryMetrics} interface.
*/
public interface GenericQueryMetricsFactory
{
/**
* Creates a {@link QueryMetrics} for query, which doesn't have predefined QueryMetrics subclass. This method must
* call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning.
*/
QueryMetrics<Query<?>> makeMetrics(Query<?> query);
}

View File

@ -20,10 +20,8 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.guava.Sequence;
@ -107,17 +105,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
toolChest,
baseRunner,
"query/intervalChunk/time",
ImmutableMap.of("chunkInterval", singleInterval.toString())
QueryMetrics::reportIntervalChunkTime,
queryMetrics -> queryMetrics.chunkInterval(singleInterval)
).withWaitMeasuredFromNow()
),
executor, queryWatcher

View File

@ -19,76 +19,74 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
import io.druid.java.util.common.guava.Sequences;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.ObjLongConsumer;
/**
*/
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
private final long creationTime;
private final String metricName;
private final Map<String, String> userDimensions;
private final long creationTimeNs;
private final ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric;
private final Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions;
private MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
long creationTime,
String metricName,
Map<String, String> userDimensions
long creationTimeNs,
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
)
{
this.emitter = emitter;
this.builderFn = builderFn;
this.queryToolChest = queryToolChest;
this.queryRunner = queryRunner;
this.creationTime = creationTime;
this.metricName = metricName;
this.userDimensions = userDimensions;
this.creationTimeNs = creationTimeNs;
this.reportMetric = reportMetric;
this.applyCustomDimensions = applyCustomDimensions;
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
String metricName,
Map<String, String> userDimensions
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
)
{
this(emitter, builderFn, queryRunner, -1, metricName, userDimensions);
this(emitter, queryToolChest, queryRunner, -1, reportMetric, applyCustomDimensions);
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(
return new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
queryToolChest,
queryRunner,
System.currentTimeMillis(),
metricName,
userDimensions
System.nanoTime(),
reportMetric,
applyCustomDimensions
);
}
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
final QueryMetrics<? super Query<T>> queryMetrics = queryToolChest.makeMetrics(query);
for (Map.Entry<String, String> userDimension : userDimensions.entrySet()) {
builder.setDimension(userDimension.getKey(), userDimension.getValue());
}
applyCustomDimensions.accept(queryMetrics);
return Sequences.wrap(
// Use LazySequence because want to account execution time of queryRunner.run() (it prepares the underlying
@ -104,28 +102,29 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}),
new SequenceWrapper()
{
private long startTime;
private long startTimeNs;
@Override
public void before()
{
startTime = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}
@Override
public void after(boolean isDone, Throwable thrown)
{
if (thrown != null) {
builder.setDimension(DruidMetrics.STATUS, "failed");
queryMetrics.status("failed");
} else if (!isDone) {
builder.setDimension(DruidMetrics.STATUS, "short");
queryMetrics.status("short");
}
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build(metricName, timeTaken));
long timeTakenNs = System.nanoTime() - startTimeNs;
reportMetric.accept(queryMetrics, timeTakenNs);
if (creationTime > 0) {
emitter.emit(builder.build("query/wait/time", startTime - creationTime));
if (creationTimeNs > 0) {
queryMetrics.reportWaitTime(startTimeNs - creationTimeNs);
}
queryMetrics.emit(emitter);
}
}
);

View File

@ -0,0 +1,249 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.metamx.emitter.service.ServiceEmitter;
import org.joda.time.Interval;
/**
* Abstraction wrapping {@link com.metamx.emitter.service.ServiceMetricEvent.Builder} and allowing to control what
* metrics are actually emitted, what dimensions do they have, etc.
*
*
* Goals of QueryMetrics
* ---------------------
* 1. Skipping or partial filtering of particular dimensions or metrics entirely. Implementation could leave the body
* of the corresponding method empty, or implement random filtering like:
* public void reportCpuTime(long timeNs)
* {
* if (ThreadLocalRandom.current().nextDouble() < 0.1) {
* super.reportCpuTime(timeNs);
* }
* }
*
* 2. Ability to add new dimensions and metrics, possibly expensive to compute, or expensive to process (long string
* values, high cardinality, etc.) and not to affect existing Druid installations, by skipping (see 1.) those
* dimensions and metrics entirely in the default QueryMetrics implementations. Users who need those expensive
* dimensions and metrics, could explicitly emit them in their own QueryMetrics.
*
* 3. Control over the time unit, in which time metrics are emitted. By default (see {@link DefaultQueryMetrics} and
* it's subclasses) it's milliseconds, but if queries are fast, it could be not precise enough.
*
* 4. Control over the dimension and metric names.
*
* Here, "control" is provided to the operator of a Druid cluster, who would exercise that control through a
* site-specific extension adding XxxQueryMetricsFactory impl(s).
*
*
* Types of methods in this interface
* ----------------------------------
* 1. Methods, pulling some dimensions from the query object. These methods are used to populate the metric before the
* query is run. These methods accept a single `QueryType query` parameter. {@link #query(Query)} calls all methods
* of this type, hence pulling all available information from the query object as dimensions.
*
* 2. Methods for setting dimensions, which become known in the process of the query execution or after the query is
* completed.
*
* 3. Methods to register metrics to be emitted later in bulk via {@link #emit(ServiceEmitter)}. These methods
* return this QueryMetrics object back for chaining. Names of these methods start with "report" prefix.
*
*
* Implementors expectations
* -------------------------
* QueryMetrics is expected to be changed often, in every Druid release (including "patch" releases). Users who create
* their custom implementations of QueryMetrics should be ready to fix the code of their QueryMetrics (implement new
* methods) when they update Druid. Broken builds of custom extensions, containing custom QueryMetrics is the way to
* notify users that Druid core "wants" to emit new dimension or metric, and the user handles them manually: if the new
* dimension or metric is useful and not very expensive to process and store then emit, skip (see above Goals, 1.)
* otherwise.
*
* <p>If implementors of custom QueryMetrics don't want to fix builds on every Druid release (e. g. if they want to add
* a single dimension to emitted events and don't want to alter other dimensions and emitted metrics), they could
* inherit their custom QueryMetrics from {@link DefaultQueryMetrics} or query-specific default implementation class,
* such as {@link io.druid.query.topn.DefaultTopNQueryMetrics}. Those classes are guaranteed to stay around and
* implement new methods, added to the QueryMetrics interface (or a query-specific subinterface). However, there is no
* 100% guarantee of compatibility, because methods could not only be added to QueryMetrics, existing methods could also
* be changed or removed.
*
* <p>QueryMetrics is designed for use from a single thread, implementations shouldn't care about thread-safety.
*
*
* Adding new methods to QueryMetrics
* ----------------------------------
* 1. When adding a new method for setting a dimension, which could be pulled from the query object, always make them
* accept a single `QueryType query` parameter, letting the implementations to do all the work of carving the dimension
* value out of the query object.
*
* 2. When adding a new method for setting a dimension, which becomes known in the process of the query execution or
* after the query is completed, design it so that as little work as possible is done for preparing arguments for this
* method, and as much work as possible is done in the implementations of this method, if they decide to actually emit
* this dimension.
*
* 3. When adding a new method for registering metrics, make it to accept the metric value in the smallest reasonable
* unit (i. e. nanoseconds for time metrics, bytes for metrics of data size, etc.), allowing the implementations of
* this method to round the value up to more coarse-grained units, if they don't need the maximum precision.
*
*
* Making subinterfaces of QueryMetrics for emitting custom dimensions and/or metrics for specific query types
* -----------------------------------------------------------------------------------------------------------
* If a query type (e. g. {@link io.druid.query.search.search.SearchQuery} (it's runners) needs to emit custom
* dimensions and/or metrics which doesn't make sense for all other query types, the following steps should be executed:
* 1. Create `interface SearchQueryMetrics extends QueryMetrics` (here and below "Search" is the query type) with
* additional methods (see "Adding new methods" section above).
*
* 2. Create `class DefaultSearchQueryMetrics implements SearchQueryMetrics`. This class should implement extra methods
* from SearchQueryMetrics interfaces with empty bodies, AND DELEGATE ALL OTHER METHODS TO A QueryMetrics OBJECT,
* provided as a sole parameter in DefaultSearchQueryMetrics constructor.
*
* 3. Create `interface SearchQueryMetricsFactory` with a single method
* `SearchQueryMetrics makeMetrics(SearchQuery query);`.
*
* 4. Create `class DefaultSearchQueryMetricsFactory implements SearchQueryMetricsFactory`, which accepts {@link
* GenericQueryMetricsFactory} as injected constructor parameter, and implements makeMetrics() as
* `return new DefaultSearchQueryMetrics(genericQueryMetricsFactory.makeMetrics(query));`
*
* 5. Inject and use SearchQueryMetricsFactory instead of {@link GenericQueryMetricsFactory} in {@link
* io.druid.query.search.SearchQueryQueryToolChest}.
*
* 6. Specify `binder.bind(SearchQueryMetricsFactory.class).to(DefaultSearchQueryMetricsFactory.class)` in
* QueryToolChestModule (if the query type belongs to the core druid-processing, e. g. SearchQuery) or in some
* extension-specific Guice module otherwise, if the query type is defined in an extension, e. g. ScanQuery.
*
* This complex procedure is needed to ensure custom {@link GenericQueryMetricsFactory} specified by users still works
* for the query type when query type decides to create their custom QueryMetrics subclass.
*
* {@link io.druid.query.topn.TopNQueryMetrics}, {@link io.druid.query.groupby.GroupByQueryMetrics}, and {@link
* io.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are introduced at the
* same time as the whole QueryMetrics abstraction and their default implementations have to actually emit more
* dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct examples for
* following the plan specified above.
*
* @param <QueryType>
*/
public interface QueryMetrics<QueryType extends Query<?>>
{
/**
* Pulls all information from the query object into dimensions of future metrics.
*/
void query(QueryType query);
/**
* Sets {@link Query#getDataSource()} of the given query as dimension.
*/
void dataSource(QueryType query);
/**
* Sets {@link Query#getType()} of the given query as dimension.
*/
void queryType(QueryType query);
/**
* Sets {@link Query#getIntervals()} of the given query as dimension.
*/
void interval(QueryType query);
/**
* Sets {@link Query#hasFilters()} of the given query as dimension.
*/
void hasFilters(QueryType query);
/**
* Sets {@link Query#getDuration()} of the given query as dimension.
*/
void duration(QueryType query);
/**
* Sets {@link Query#getId()} of the given query as dimension.
*/
void queryId(QueryType query);
/**
* Sets {@link Query#getContext()} of the given query as dimension.
*/
void context(QueryType query);
void server(String host);
void remoteAddress(String remoteAddress);
void status(String status);
void success(boolean success);
void segment(String segmentIdentifier);
void chunkInterval(Interval interval);
/**
* Registers "query time" metric.
*/
QueryMetrics<QueryType> reportQueryTime(long timeNs);
/**
* Registers "query bytes" metric.
*/
QueryMetrics<QueryType> reportQueryBytes(long byteCount);
/**
* Registers "wait time" metric.
*/
QueryMetrics<QueryType> reportWaitTime(long timeNs);
/**
* Registers "segment time" metric.
*/
QueryMetrics<QueryType> reportSegmentTime(long timeNs);
/**
* Registers "segmentAndCache time" metric.
*/
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
/**
* Registers "interval chunk time" metric.
*/
QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs);
/**
* Registers "cpu time" metric.
*/
QueryMetrics<QueryType> reportCpuTime(long timeNs);
/**
* Registers "time to first byte" metric.
*/
QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs);
/**
* Registers "node time" metric.
*/
QueryMetrics<QueryType> reportNodeTime(long timeNs);
/**
* Registers "node bytes" metric.
*/
QueryMetrics<QueryType> reportNodeBytes(long byteCount);
/**
* Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object.
*/
void emit(ServiceEmitter emitter);
}

View File

@ -21,7 +21,6 @@ package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment;
@ -46,16 +45,27 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
/**
* Creates a builder that is used to generate a metric for this specific query type. This exists
* to allow for query-specific dimensions on metrics. That is, the ToolChest is expected to set some
* Creates a {@link QueryMetrics} object that is used to generate metrics for this specific query type. This exists
* to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some
* meaningful dimensions for metrics given this query type. Examples might be the topN threshold for
* a TopN query or the number of dimensions included for a groupBy query.
*
* <p>QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source
* tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected
* instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or
* metrics.
*
* <p>If some custom dimensions and/or metrics should be emitted for a query type, a plan described in
* "Making subinterfaces of QueryMetrics" section in {@link QueryMetrics}'s class-level Javadocs should be followed.
*
* <p>One way or another, this method should ensure that {@link QueryMetrics#query(Query)} is called with the given
* query passed on the created QueryMetrics object before returning.
*
* @param query The query that is being processed
*
* @return A MetricEvent.Builder that can be used to make metrics for the provided query
* @return A QueryMetrics that can be used to make metrics for the provided query
*/
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public abstract QueryMetrics<? super QueryType> makeMetrics(QueryType query);
/**
* Creates a Function that can take in a ResultType and return a new ResultType having applied

View File

@ -42,12 +42,16 @@ import java.util.concurrent.ExecutorService;
public class DataSourceMetadataQueryRunnerFactory
implements QueryRunnerFactory<Result<DataSourceMetadataResultValue>, DataSourceMetadataQuery>
{
private static final DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest();
private final DataSourceQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
@Inject
public DataSourceMetadataQueryRunnerFactory(QueryWatcher queryWatcher)
public DataSourceMetadataQueryRunnerFactory(
DataSourceQueryQueryToolChest toolChest,
QueryWatcher queryWatcher
)
{
this.toolChest = toolChest;
this.queryWatcher = queryWatcher;
}

View File

@ -25,14 +25,14 @@ import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.google.inject.Inject;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -51,6 +51,14 @@ public class DataSourceQueryQueryToolChest
{
};
private final GenericQueryMetricsFactory queryMetricsFactory;
@Inject
public DataSourceQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public <T extends LogicalSegment> List<T> filterSegments(DataSourceMetadataQuery query, List<T> segments)
{
@ -103,11 +111,9 @@ public class DataSourceQueryQueryToolChest
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(DataSourceMetadataQuery query)
public QueryMetrics<Query<?>> makeMetrics(DataSourceMetadataQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension("dataSource", DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension("type", query.getType());
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -0,0 +1,61 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.query.DefaultQueryMetrics;
import io.druid.query.DruidMetrics;
public class DefaultGroupByQueryMetrics extends DefaultQueryMetrics<GroupByQuery> implements GroupByQueryMetrics
{
public DefaultGroupByQueryMetrics(ObjectMapper jsonMapper)
{
super(jsonMapper);
}
@Override
public void query(GroupByQuery query)
{
super.query(query);
numDimensions(query);
numMetrics(query);
numComplexMetrics(query);
}
@Override
public void numDimensions(GroupByQuery query)
{
builder.setDimension("numDimensions", String.valueOf(query.getDimensions().size()));
}
@Override
public void numMetrics(GroupByQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(GroupByQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.druid.jackson.DefaultObjectMapper;
public class DefaultGroupByQueryMetricsFactory implements GroupByQueryMetricsFactory
{
private static final GroupByQueryMetricsFactory INSTANCE =
new DefaultGroupByQueryMetricsFactory(new DefaultObjectMapper());
/**
* Should be used only in tests, directly or indirectly (via {@link
* GroupByQueryQueryToolChest#GroupByQueryQueryToolChest(io.druid.query.groupby.strategy.GroupByStrategySelector,
* io.druid.query.IntervalChunkingQueryRunnerDecorator)}).
*/
@VisibleForTesting
public static GroupByQueryMetricsFactory instance()
{
return INSTANCE;
}
private final ObjectMapper jsonMapper;
@Inject
public DefaultGroupByQueryMetricsFactory(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public GroupByQueryMetrics makeMetrics()
{
return new DefaultGroupByQueryMetrics(jsonMapper);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import io.druid.query.QueryMetrics;
/**
* Specialization of {@link QueryMetrics} for {@link GroupByQuery}.
*/
public interface GroupByQueryMetrics extends QueryMetrics<GroupByQuery>
{
/**
* Sets the size of {@link GroupByQuery#getDimensions()} of the given query as dimension.
*/
void numDimensions(GroupByQuery query);
/**
* Sets the number of metrics of the given groupBy query as dimension.
*/
void numMetrics(GroupByQuery query);
/**
* Sets the number of "complex" metrics of the given groupBy query as dimension. By default it is assumed that
* "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this
* method.
*/
void numComplexMetrics(GroupByQuery query);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
public interface GroupByQueryMetricsFactory
{
GroupByQueryMetrics makeMetrics();
}

View File

@ -20,6 +20,7 @@
package io.druid.query.groupby;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
@ -30,7 +31,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
@ -41,7 +41,6 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.DruidMetrics;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
@ -85,15 +84,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private final GroupByStrategySelector strategySelector;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final GroupByQueryMetricsFactory queryMetricsFactory;
@Inject
@VisibleForTesting
public GroupByQueryQueryToolChest(
GroupByStrategySelector strategySelector,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this(strategySelector, intervalChunkingQueryRunnerDecorator, DefaultGroupByQueryMetricsFactory.instance());
}
@Inject
public GroupByQueryQueryToolChest(
GroupByStrategySelector strategySelector,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
GroupByQueryMetricsFactory queryMetricsFactory
)
{
this.strategySelector = strategySelector;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -212,15 +223,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
public GroupByQueryMetrics makeMetrics(GroupByQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension("numDimensions", String.valueOf(query.getDimensions().size()))
.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()))
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
GroupByQueryMetrics queryMetrics = queryMetricsFactory.makeMetrics();
queryMetrics.query(query);
return queryMetrics;
}
@Override

View File

@ -31,7 +31,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.impl.TimestampSpec;
@ -40,8 +39,10 @@ import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
@ -78,13 +79,19 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
private final SegmentMetadataQueryConfig config;
private final GenericQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config)
{
this(config, DefaultGenericQueryMetricsFactory.instance());
}
@Inject
public SegmentMetadataQueryQueryToolChest(
SegmentMetadataQueryConfig config
)
public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config, GenericQueryMetricsFactory queryMetricsFactory)
{
this.config = config;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -147,9 +154,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
public QueryMetrics<Query<?>> makeMetrics(SegmentMetadataQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -20,6 +20,7 @@
package io.druid.query.search;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
@ -29,7 +30,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
@ -37,9 +37,11 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -72,17 +74,28 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
};
private final SearchQueryConfig config;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final GenericQueryMetricsFactory queryMetricsFactory;
@Inject
@VisibleForTesting
public SearchQueryQueryToolChest(
SearchQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this(config, intervalChunkingQueryRunnerDecorator, DefaultGenericQueryMetricsFactory.instance());
}
@Inject
public SearchQueryQueryToolChest(
SearchQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -113,9 +126,9 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query)
public QueryMetrics<Query<?>> makeMetrics(SearchQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -32,16 +32,17 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -81,17 +82,29 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
private final ObjectMapper jsonMapper;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final Supplier<SelectQueryConfig> configSupplier;
private final GenericQueryMetricsFactory queryMetricsFactory;
@Inject
public SelectQueryQueryToolChest(
ObjectMapper jsonMapper,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
Supplier<SelectQueryConfig> configSupplier
)
{
this(jsonMapper, intervalChunkingQueryRunnerDecorator, configSupplier, new DefaultGenericQueryMetricsFactory(jsonMapper));
}
@Inject
public SelectQueryQueryToolChest(
ObjectMapper jsonMapper,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
Supplier<SelectQueryConfig> configSupplier,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.jsonMapper = jsonMapper;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.configSupplier = configSupplier;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -125,9 +138,9 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query)
public QueryMetrics<Query<?>> makeMetrics(SelectQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -20,19 +20,21 @@
package io.druid.query.timeboundary;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.google.inject.Inject;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.DruidMetrics;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -58,6 +60,20 @@ public class TimeBoundaryQueryQueryToolChest
{
};
private final GenericQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public TimeBoundaryQueryQueryToolChest()
{
this(DefaultGenericQueryMetricsFactory.instance());
}
@Inject
public TimeBoundaryQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
public <T extends LogicalSegment> List<T> filterSegments(TimeBoundaryQuery query, List<T> segments)
{
@ -107,11 +123,9 @@ public class TimeBoundaryQueryQueryToolChest
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeBoundaryQuery query)
public QueryMetrics<Query<?>> makeMetrics(TimeBoundaryQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()))
.setDimension(DruidMetrics.TYPE, query.getType());
return queryMetricsFactory.makeMetrics(query);
}
@Override

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.timeseries;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.query.DefaultQueryMetrics;
import io.druid.query.DruidMetrics;
public class DefaultTimeseriesQueryMetrics extends DefaultQueryMetrics<TimeseriesQuery>
implements TimeseriesQueryMetrics
{
public DefaultTimeseriesQueryMetrics(ObjectMapper jsonMapper)
{
super(jsonMapper);
}
@Override
public void query(TimeseriesQuery query)
{
super.query(query);
numMetrics(query);
numComplexMetrics(query);
}
@Override
public void numMetrics(TimeseriesQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(TimeseriesQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.timeseries;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.druid.jackson.DefaultObjectMapper;
public class DefaultTimeseriesQueryMetricsFactory implements TimeseriesQueryMetricsFactory
{
private static final TimeseriesQueryMetricsFactory INSTANCE =
new DefaultTimeseriesQueryMetricsFactory(new DefaultObjectMapper());
/**
* Should be used only in tests, directly or indirectly (via {@link
* TimeseriesQueryQueryToolChest#TimeseriesQueryQueryToolChest(io.druid.query.IntervalChunkingQueryRunnerDecorator)}).
*/
@VisibleForTesting
public static TimeseriesQueryMetricsFactory instance()
{
return INSTANCE;
}
private final ObjectMapper jsonMapper;
@Inject
public DefaultTimeseriesQueryMetricsFactory(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public TimeseriesQueryMetrics makeMetrics()
{
return new DefaultTimeseriesQueryMetrics(jsonMapper);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.timeseries;
import io.druid.query.QueryMetrics;
/**
* Specialization of {@link QueryMetrics} for {@link TimeseriesQuery}.
*/
public interface TimeseriesQueryMetrics extends QueryMetrics<TimeseriesQuery>
{
/**
* Sets the number of metrics of the given timeseries query as dimension.
*/
void numMetrics(TimeseriesQuery query);
/**
* Sets the number of "complex" metrics of the given timeseries query as dimension. By default it is assumed that
* "complex" metric is a metric of not long or double type, but it could be redefined in the implementation of this
* method.
*/
void numComplexMetrics(TimeseriesQuery query);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.timeseries;
public interface TimeseriesQueryMetricsFactory
{
TimeseriesQueryMetrics makeMetrics();
}

View File

@ -20,17 +20,16 @@
package io.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -64,11 +63,22 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
};
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final TimeseriesQueryMetricsFactory queryMetricsFactory;
@Inject
@VisibleForTesting
public TimeseriesQueryQueryToolChest(IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator)
{
this(intervalChunkingQueryRunnerDecorator, DefaultTimeseriesQueryMetricsFactory.instance());
}
@Inject
public TimeseriesQueryQueryToolChest(
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
TimeseriesQueryMetricsFactory queryMetricsFactory
)
{
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -101,17 +111,11 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query)
public TimeseriesQueryMetrics makeMetrics(TimeseriesQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension(
"numMetrics",
String.valueOf(query.getAggregatorSpecs().size())
)
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
TimeseriesQueryMetrics queryMetrics = queryMetricsFactory.makeMetrics();
queryMetrics.query(query);
return queryMetrics;
}
@Override

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.query.DefaultQueryMetrics;
import io.druid.query.DruidMetrics;
public class DefaultTopNQueryMetrics extends DefaultQueryMetrics<TopNQuery> implements TopNQueryMetrics
{
public DefaultTopNQueryMetrics(ObjectMapper jsonMapper)
{
super(jsonMapper);
}
@Override
public void query(TopNQuery query)
{
super.query(query);
threshold(query);
dimension(query);
numMetrics(query);
numComplexMetrics(query);
}
@Override
public void threshold(TopNQuery query)
{
builder.setDimension("threshold", String.valueOf(query.getThreshold()));
}
@Override
public void dimension(TopNQuery query)
{
builder.setDimension("dimension", query.getDimensionSpec().getDimension());
}
@Override
public void numMetrics(TopNQuery query)
{
builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()));
}
@Override
public void numComplexMetrics(TopNQuery query)
{
int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs());
builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs));
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.druid.jackson.DefaultObjectMapper;
public class DefaultTopNQueryMetricsFactory implements TopNQueryMetricsFactory
{
private static final TopNQueryMetricsFactory INSTANCE = new DefaultTopNQueryMetricsFactory(new DefaultObjectMapper());
/**
* Should be used only in tests, directly or indirectly (via {@link TopNQueryQueryToolChest#TopNQueryQueryToolChest(
* TopNQueryConfig, io.druid.query.IntervalChunkingQueryRunnerDecorator)}).
*/
@VisibleForTesting
public static TopNQueryMetricsFactory instance()
{
return INSTANCE;
}
private final ObjectMapper jsonMapper;
@Inject
public DefaultTopNQueryMetricsFactory(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}
@Override
public TopNQueryMetrics makeMetrics()
{
return new DefaultTopNQueryMetrics(jsonMapper);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.QueryMetrics;
/**
* Specialization of {@link QueryMetrics} for {@link TopNQuery}.
*/
public interface TopNQueryMetrics extends QueryMetrics<TopNQuery>
{
/**
* Sets {@link TopNQuery#getThreshold()} of the given query as dimension.
*/
void threshold(TopNQuery query);
/**
* Sets {@link TopNQuery#getDimensionSpec()}.{@link io.druid.query.dimension.DimensionSpec#getDimension()
* getDimension()} of the given query as dimension.
*/
void dimension(TopNQuery query);
/**
* Sets the number of metrics of the given topN query as dimension.
*/
void numMetrics(TopNQuery query);
/**
* Sets the number of "complex" metrics of the given topN query as dimension. By default it is assumed that "complex"
* metric is a metric of not long or double type, but it could be redefined in the implementation of this method.
*/
void numComplexMetrics(TopNQuery query);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
public interface TopNQueryMetricsFactory
{
TopNQueryMetrics makeMetrics();
}

View File

@ -20,13 +20,13 @@
package io.druid.query.topn;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
@ -35,7 +35,6 @@ import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValue;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -68,18 +67,30 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final TopNQueryConfig config;
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final TopNQueryMetricsFactory queryMetricsFactory;
@Inject
@VisibleForTesting
public TopNQueryQueryToolChest(
TopNQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{
this(config, intervalChunkingQueryRunnerDecorator, DefaultTopNQueryMetricsFactory.instance());
}
@Inject
public TopNQueryQueryToolChest(
TopNQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
TopNQueryMetricsFactory queryMetricsFactory
)
{
this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory;
}
protected static String[] extractFactoryName(final List<AggregatorFactory> aggregatorFactories)
@ -140,22 +151,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query)
public TopNQueryMetrics makeMetrics(TopNQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension(
"threshold",
String.valueOf(query.getThreshold())
)
.setDimension("dimension", query.getDimensionSpec().getDimension())
.setDimension(
"numMetrics",
String.valueOf(query.getAggregatorSpecs().size())
)
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
TopNQueryMetrics queryMetrics = queryMetricsFactory.makeMetrics();
queryMetrics.query(query);
return queryMetrics;
}
@Override

View File

@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import java.io.IOException;
public class CachingEmitter implements Emitter
{
private Event lastEmittedEvent;
public Event getLastEmittedEvent()
{
return lastEmittedEvent;
}
@Override
public void start()
{
}
@Override
public void emit(Event event)
{
lastEmittedEvent = event;
}
@Override
public void flush() throws IOException
{
}
@Override
public void close() throws IOException
{
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.ListFilteredDimensionSpec;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DefaultQueryMetricsTest
{
/**
* Tests that passed a query {@link DefaultQueryMetrics} produces events with a certain set of dimensions, no more,
* no less.
*/
@Test
public void testDefaultQueryMetricsQuery()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(new DefaultObjectMapper());
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
.granularity(Granularities.ALL)
.dimension(new ListFilteredDimensionSpec(
new DefaultDimensionSpec("tags", "tags"),
ImmutableSet.of("t3"),
null
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null))
.build();
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals(12, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
Assert.assertEquals("", actualEvent.get("host"));
Assert.assertEquals("", actualEvent.get("service"));
Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List<Interval> expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals();
List<String> expectedStringIntervals =
expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList());
Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
Assert.assertEquals("true", actualEvent.get("hasFilters"));
Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
}
@Test
public void testDefaultQueryMetricsMetricNamesAndUnits()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(new DefaultObjectMapper());
testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics);
}
public static void testQueryMetricsDefaultMetricNamesAndUnits(
CachingEmitter cachingEmitter,
ServiceEmitter serviceEmitter,
QueryMetrics<? extends Query<?>> queryMetrics
)
{
queryMetrics.reportQueryTime(1000001).emit(serviceEmitter);
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/time", actualEvent.get("metric"));
// query/time and most metrics below are measured in milliseconds by default
Assert.assertEquals(1L, actualEvent.get("value"));
queryMetrics.reportWaitTime(2000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/wait/time", actualEvent.get("metric"));
Assert.assertEquals(2L, actualEvent.get("value"));
queryMetrics.reportSegmentTime(3000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/segment/time", actualEvent.get("metric"));
Assert.assertEquals(3L, actualEvent.get("value"));
queryMetrics.reportSegmentAndCacheTime(4000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/segmentAndCache/time", actualEvent.get("metric"));
Assert.assertEquals(4L, actualEvent.get("value"));
queryMetrics.reportIntervalChunkTime(5000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/intervalChunk/time", actualEvent.get("metric"));
Assert.assertEquals(5L, actualEvent.get("value"));
queryMetrics.reportCpuTime(6000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/cpu/time", actualEvent.get("metric"));
// CPU time is measured in microseconds by default
Assert.assertEquals(6000L, actualEvent.get("value"));
queryMetrics.reportNodeTimeToFirstByte(7000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/ttfb", actualEvent.get("metric"));
Assert.assertEquals(7L, actualEvent.get("value"));
queryMetrics.reportNodeTime(8000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/time", actualEvent.get("metric"));
Assert.assertEquals(8L, actualEvent.get("value"));
queryMetrics.reportQueryBytes(9).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/bytes", actualEvent.get("metric"));
Assert.assertEquals(9L, actualEvent.get("value"));
queryMetrics.reportNodeBytes(10).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
Assert.assertEquals(10L, actualEvent.get("value"));
}
}

View File

@ -29,7 +29,9 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Druids;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
@ -117,6 +119,7 @@ public class DataSourceMetadataQueryTest
;
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new DataSourceMetadataQueryRunnerFactory(
new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
), new IncrementalIndexSegment(rtIndex, "test"),
null
@ -147,7 +150,10 @@ public class DataSourceMetadataQueryTest
@Test
public void testFilterSegments()
{
List<LogicalSegment> segments = new DataSourceQueryQueryToolChest().filterSegments(
GenericQueryMetricsFactory queryMetricsFactory = DefaultGenericQueryMetricsFactory.instance();
DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest(queryMetricsFactory);
List<LogicalSegment> segments = toolChest
.filterSegments(
null,
Arrays.asList(
new LogicalSegment()

View File

@ -0,0 +1,126 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.query.CachingEmitter;
import io.druid.query.DefaultQueryMetricsTest;
import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.lookup.LookupExtractionFn;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
public class DefaultGroupByQueryMetricsTest
{
/**
* Tests that passed a query {@link DefaultGroupByQueryMetrics} produces events with a certain set of dimensions,
* no more, no less.
*/
@Test
public void testDefaultGroupByQueryMetricsQuery()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(new DefaultObjectMapper());
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality",
"alias",
new LookupExtractionFn(
new MapLookupExtractor(
ImmutableMap.of(
"mezzanine",
"mezzanine0"
),
false
), false, null, true,
false
)
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setDimFilter(new SelectorDimFilter("quality", "mezzanine", null))
.setContext(ImmutableMap.<String, Object>of("bySegment", true));
GroupByQuery query = builder.build();
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals(15, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
Assert.assertEquals("", actualEvent.get("host"));
Assert.assertEquals("", actualEvent.get("service"));
Assert.assertEquals(QueryRunnerTestHelper.dataSource, actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
Interval expectedInterval = new Interval("2011-04-02/2011-04-04");
Assert.assertEquals(Collections.singletonList(expectedInterval.toString()), actualEvent.get(DruidMetrics.INTERVAL));
Assert.assertEquals("true", actualEvent.get("hasFilters"));
Assert.assertEquals(expectedInterval.toDuration().toString(), actualEvent.get("duration"));
Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
// GroupBy-specific dimensions
Assert.assertEquals("1", actualEvent.get("numDimensions"));
Assert.assertEquals("2", actualEvent.get("numMetrics"));
Assert.assertEquals("0", actualEvent.get("numComplexMetrics"));
// Metric
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
}
@Test
public void testDefaultGroupByQueryMetricsMetricNamesAndUnits()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultGroupByQueryMetrics queryMetrics = new DefaultGroupByQueryMetrics(new DefaultObjectMapper());
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.timeseries;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.CachingEmitter;
import io.druid.query.DefaultQueryMetricsTest;
import io.druid.query.DruidMetrics;
import io.druid.query.Druids;
import io.druid.query.QueryRunnerTestHelper;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DefaultTimeseriesQueryMetricsTest
{
/**
* Tests that passed a query {@link DefaultTimeseriesQueryMetrics} produces events with a certain set of dimensions,
* no more, no less.
*/
@Test
public void testDefaultTimeseriesQueryMetricsQuery()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(new DefaultObjectMapper());
TimeseriesQuery query = Druids
.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.descending(true)
.build();
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals(14, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
Assert.assertEquals("", actualEvent.get("host"));
Assert.assertEquals("", actualEvent.get("service"));
Assert.assertEquals(QueryRunnerTestHelper.dataSource, actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List<Interval> expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals();
List<String> expectedStringIntervals =
expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList());
Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
Assert.assertEquals("false", actualEvent.get("hasFilters"));
Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
// Timeseries-specific dimensions
Assert.assertEquals("2", actualEvent.get("numMetrics"));
Assert.assertEquals("0", actualEvent.get("numComplexMetrics"));
// Metric
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
}
@Test
public void testDefaultTimeseriesQueryMetricsMetricNamesAndUnits()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTimeseriesQueryMetrics queryMetrics = new DefaultTimeseriesQueryMetrics(new DefaultObjectMapper());
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics);
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.CachingEmitter;
import io.druid.query.DefaultQueryMetricsTest;
import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.ListFilteredDimensionSpec;
import io.druid.query.filter.SelectorDimFilter;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DefaultTopNQueryMetricsTest
{
/**
* Tests that passed a query {@link DefaultTopNQueryMetrics} produces events with a certain set of dimensions,
* no more, no less.
*/
@Test
public void testDefaultTopNQueryMetricsQuery()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(new DefaultObjectMapper());
TopNQuery query = new TopNQueryBuilder()
.dataSource("xx")
.granularity(Granularities.ALL)
.dimension(new ListFilteredDimensionSpec(
new DefaultDimensionSpec("tags", "tags"),
ImmutableSet.of("t3"),
null
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null))
.build();
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals(16, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
Assert.assertTrue(actualEvent.containsKey("timestamp"));
Assert.assertEquals("", actualEvent.get("host"));
Assert.assertEquals("", actualEvent.get("service"));
Assert.assertEquals("xx", actualEvent.get(DruidMetrics.DATASOURCE));
Assert.assertEquals(query.getType(), actualEvent.get(DruidMetrics.TYPE));
List<Interval> expectedIntervals = QueryRunnerTestHelper.fullOnInterval.getIntervals();
List<String> expectedStringIntervals =
expectedIntervals.stream().map(Interval::toString).collect(Collectors.toList());
Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
Assert.assertEquals("true", actualEvent.get("hasFilters"));
Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
// TopN-specific dimensions
Assert.assertEquals("5", actualEvent.get("threshold"));
Assert.assertEquals("tags", actualEvent.get("dimension"));
Assert.assertEquals("1", actualEvent.get("numMetrics"));
Assert.assertEquals("0", actualEvent.get("numComplexMetrics"));
// Metric
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
}
@Test
public void testDefaultTopNQueryMetricsMetricNamesAndUnits()
{
CachingEmitter cachingEmitter = new CachingEmitter();
ServiceEmitter serviceEmitter = new ServiceEmitter("", "", cachingEmitter);
DefaultTopNQueryMetrics queryMetrics = new DefaultTopNQueryMetrics(new DefaultObjectMapper());
DefaultQueryMetricsTest.testQueryMetricsDefaultMetricNamesAndUnits(cachingEmitter, serviceEmitter, queryMetrics);
}
}

View File

@ -36,14 +36,12 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
@ -56,6 +54,7 @@ import io.druid.query.BaseQuery;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -83,6 +82,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -161,14 +161,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try {
log.debug("Querying queryId[%s] url[%s]", query.getId(), url);
final long requestStartTime = System.currentTimeMillis();
final long requestStartTimeNs = System.nanoTime();
final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
builder.setDimension("server", host);
final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host);
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private long responseStartTime;
private long responseStartTimeNs;
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
@ -177,8 +177,8 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTime = System.currentTimeMillis();
emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime));
responseStartTimeNs = System.nanoTime();
queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
try {
final String responseContext = response.headers().get("X-Druid-Response-Context");
@ -267,17 +267,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{
long stopTime = System.currentTimeMillis();
long stopTimeNs = System.nanoTime();
long nodeTimeNs = stopTimeNs - responseStartTimeNs;
log.debug(
"Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
query.getId(),
url,
byteCount.get(),
stopTime - responseStartTime,
byteCount.get() / (0.0001 * (stopTime - responseStartTime))
TimeUnit.NANOSECONDS.toMillis(nodeTimeNs),
byteCount.get() / TimeUnit.NANOSECONDS.toSeconds(nodeTimeNs)
);
emitter.emit(builder.build("query/node/time", stopTime - requestStartTime));
emitter.emit(builder.build("query/node/bytes", byteCount.get()));
queryMetrics.reportNodeTime(nodeTimeNs);
queryMetrics.reportNodeBytes(byteCount.get());
queryMetrics.emit(emitter);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out

View File

@ -23,14 +23,18 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import io.druid.query.datasourcemetadata.DataSourceQueryQueryToolChest;
import io.druid.query.groupby.DefaultGroupByQueryMetricsFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryMetricsFactory;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.metadata.SegmentMetadataQueryConfig;
import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
@ -43,10 +47,14 @@ import io.druid.query.select.SelectQueryConfig;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
import io.druid.query.timeseries.DefaultTimeseriesQueryMetricsFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryMetricsFactory;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.topn.DefaultTopNQueryMetricsFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryMetricsFactory;
import io.druid.query.topn.TopNQueryQueryToolChest;
import java.util.Map;
@ -79,6 +87,11 @@ public class QueryToolChestModule implements Module
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(GenericQueryMetricsFactory.class).to(DefaultGenericQueryMetricsFactory.class);
binder.bind(TopNQueryMetricsFactory.class).to(DefaultTopNQueryMetricsFactory.class);
binder.bind(GroupByQueryMetricsFactory.class).to(DefaultGroupByQueryMetricsFactory.class);
binder.bind(TimeseriesQueryMetricsFactory.class).to(DefaultTimeseriesQueryMetricsFactory.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);

View File

@ -22,12 +22,10 @@ package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
@ -40,6 +38,7 @@ import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -60,7 +59,6 @@ import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
@ -166,15 +164,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(query);
}
};
final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
@ -262,7 +251,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
)
)
),
builderFn,
toolChest,
sinkSegmentIdentifier,
cpuTimeAccumulator
),
@ -273,7 +262,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
)
)
),
builderFn,
toolChest,
emitter,
cpuTimeAccumulator,
true
@ -286,14 +275,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
*/
private <T> QueryRunner<T> withPerSinkMetrics(
final QueryRunner<T> sinkRunner,
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
final QueryToolChest<?, ? super Query<T>> queryToolChest,
final String sinkSegmentIdentifier,
final AtomicLong cpuTimeAccumulator
)
{
final ImmutableMap<String, String> dims = ImmutableMap.of("segment", sinkSegmentIdentifier);
// Note: query/segmentAndCache/time and query/segment/time are effectively the same here. They don't split apart
// Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart
// cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this
// better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the
// sinkRunner.
@ -301,18 +289,18 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
return CPUTimeMetricQueryRunner.safeBuild(
new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
queryToolChest,
new MetricsEmittingQueryRunner<>(
emitter,
builderFn,
queryToolChest,
sinkRunner,
"query/segment/time",
dims
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier)
),
"query/segmentAndCache/time",
dims
QueryMetrics::reportSegmentAndCacheTime,
queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier)
).withWaitMeasuredFromNow(),
builderFn,
queryToolChest,
emitter,
cpuTimeAccumulator,
false

View File

@ -32,7 +32,9 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.QueryCountStatsProvider;
@ -103,6 +105,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
private final DruidHttpClientConfig httpClientConfig;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final GenericQueryMetricsFactory queryMetricsFactory;
private HttpClient broadcastClient;
@ -115,7 +118,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
@Router Provider<HttpClient> httpClientProvider,
DruidHttpClientConfig httpClientConfig,
ServiceEmitter emitter,
RequestLogger requestLogger
RequestLogger requestLogger,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.warehouse = warehouse;
@ -126,6 +130,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
this.httpClientConfig = httpClientConfig;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryMetricsFactory = queryMetricsFactory;
}
@Override
@ -278,7 +283,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
{
final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
if (query != null) {
return newMetricsEmittingProxyResponseListener(request, response, query, System.currentTimeMillis());
return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime());
} else {
return super.newProxyResponseListener(request, response);
}
@ -331,10 +336,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
HttpServletRequest request,
HttpServletResponse response,
Query query,
long start
long startNs
)
{
return new MetricsEmittingProxyResponseListener(request, response, query, start);
return new MetricsEmittingProxyResponseListener(request, response, query, startNs);
}
@Override
@ -361,13 +366,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
private final HttpServletRequest req;
private final HttpServletResponse res;
private final Query query;
private final long start;
private final long startNs;
public MetricsEmittingProxyResponseListener(
HttpServletRequest request,
HttpServletResponse response,
Query query,
long start
long startNs
)
{
super(request, response);
@ -375,13 +380,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
this.req = request;
this.res = response;
this.query = query;
this.start = start;
this.startNs = startNs;
}
@Override
public void onComplete(Result result)
{
final long requestTime = System.currentTimeMillis() - start;
final long requestTimeNs = System.nanoTime() - startNs;
try {
boolean success = result.isSucceeded();
if (success) {
@ -389,10 +394,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
} else {
failedQueryCount.incrementAndGet();
}
emitter.emit(
DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr())
.build("query/time", requestTime)
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
warehouse.getToolChest(query),
query,
req.getRemoteAddr()
);
queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(),
@ -401,7 +409,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time",
requestTime,
TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
"success",
success
&& result.getResponse().getStatus() == javax.ws.rs.core.Response.Status.OK.getStatusCode()

View File

@ -29,6 +29,7 @@ import io.druid.client.TimelineServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.Query;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.http.security.StateResourceFilter;
@ -67,10 +68,22 @@ public class BrokerQueryResource extends QueryResource
RequestLogger requestLogger,
QueryManager queryManager,
AuthConfig authConfig,
GenericQueryMetricsFactory queryMetricsFactory,
TimelineServerView brokerServerView
)
{
super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig);
super(
warehouse,
config,
jsonMapper,
smileMapper,
texasRanger,
emitter,
requestLogger,
queryManager,
authConfig,
queryMetricsFactory
);
this.brokerServerView = brokerServerView;
}

View File

@ -38,9 +38,11 @@ import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
@ -74,6 +76,7 @@ import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -99,6 +102,7 @@ public class QueryResource implements QueryCountStatsProvider
protected final RequestLogger requestLogger;
protected final QueryManager queryManager;
protected final AuthConfig authConfig;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final AtomicLong successfulQueryCount = new AtomicLong();
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();
@ -113,7 +117,8 @@ public class QueryResource implements QueryCountStatsProvider
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryManager queryManager,
AuthConfig authConfig
AuthConfig authConfig,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.warehouse = warehouse;
@ -125,6 +130,7 @@ public class QueryResource implements QueryCountStatsProvider
this.requestLogger = requestLogger;
this.queryManager = queryManager;
this.authConfig = authConfig;
this.queryMetricsFactory = queryMetricsFactory;
}
@DELETE
@ -170,7 +176,7 @@ public class QueryResource implements QueryCountStatsProvider
@Context final HttpServletRequest req // used to get request content-type, remote address and AuthorizationInfo
) throws IOException
{
final long start = System.currentTimeMillis();
final long startNs = System.nanoTime();
Query query = null;
QueryToolChest toolChest = null;
String queryId = null;
@ -261,25 +267,31 @@ public class QueryResource implements QueryCountStatsProvider
os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
os.close();
successfulQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.setDimension("success", "true")
.build("query/time", queryTime)
);
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.build("query/bytes", os.getCount())
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
theToolChest,
theQuery,
req.getRemoteAddr()
);
queryMetrics.success(true);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
theToolChest,
theQuery,
req.getRemoteAddr()
).reportQueryBytes(os.getCount()).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(start),
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
theQuery,
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time", queryTime,
"query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs),
"query/bytes", os.getCount(),
"success", true
)
@ -328,21 +340,24 @@ public class QueryResource implements QueryCountStatsProvider
try {
log.warn(e, "Exception while processing queryId [%s]", queryId);
interruptedQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
.setDimension("success", "false")
.build("query/time", queryTime)
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
query,
req.getRemoteAddr()
);
queryMetrics.success(false);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(start),
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time",
queryTime,
TimeUnit.NANOSECONDS.toMillis(queryTimeNs),
"success",
false,
"interrupted",
@ -370,20 +385,23 @@ public class QueryResource implements QueryCountStatsProvider
failedQueryCount.incrementAndGet();
try {
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
.setDimension("success", "false")
.build("query/time", queryTime)
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
query,
req.getRemoteAddr()
);
queryMetrics.success(false);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(start),
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
query,
new QueryStats(ImmutableMap.<String, Object>of(
"query/time",
queryTime,
TimeUnit.NANOSECONDS.toMillis(queryTimeNs),
"success",
false,
"exception",

View File

@ -21,13 +21,11 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
@ -44,6 +42,7 @@ import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -255,7 +254,6 @@ public class ServerManager implements QuerySegmentWalker
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
DataSource dataSource = query.getDataSource();
@ -312,7 +310,6 @@ public class ServerManager implements QuerySegmentWalker
holder.getVersion(),
input.getChunkNumber()
),
builderFn,
cpuTimeAccumulator
);
}
@ -327,7 +324,7 @@ public class ServerManager implements QuerySegmentWalker
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),
builderFn,
toolChest,
emitter,
cpuTimeAccumulator,
true
@ -362,7 +359,6 @@ public class ServerManager implements QuerySegmentWalker
return new NoopQueryRunner<T>();
}
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn = getBuilderFn(toolChest);
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
@ -390,7 +386,7 @@ public class ServerManager implements QuerySegmentWalker
final ReferenceCountingSegment adapter = chunk.getObject();
return Arrays.asList(
buildAndDecorateQueryRunner(factory, toolChest, adapter, input, builderFn, cpuTimeAccumulator)
buildAndDecorateQueryRunner(factory, toolChest, adapter, input, cpuTimeAccumulator)
);
}
}
@ -401,7 +397,7 @@ public class ServerManager implements QuerySegmentWalker
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),
builderFn,
toolChest,
emitter,
cpuTimeAccumulator,
true
@ -413,65 +409,45 @@ public class ServerManager implements QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest,
final ReferenceCountingSegment adapter,
final SegmentDescriptor segmentDescriptor,
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
final AtomicLong cpuTimeAccumulator
)
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
String segmentId = adapter.getIdentifier();
return CPUTimeMetricQueryRunner.safeBuild(
new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
toolChest,
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
segmentId,
adapter.getDataInterval().getStart(),
new CachingQueryRunner<T>(
adapter.getIdentifier(),
segmentId,
segmentDescriptor,
objectMapper,
cache,
toolChest,
new MetricsEmittingQueryRunner<T>(
emitter,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Override
public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
},
toolChest,
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(segmentId)
),
cachingExec,
cacheConfig
)
),
"query/segmentAndCache/time",
ImmutableMap.of("segment", adapter.getIdentifier())
QueryMetrics::reportSegmentAndCacheTime,
queryMetrics -> queryMetrics.segment(segmentId)
).withWaitMeasuredFromNow(),
segmentSpec
),
builderFn,
toolChest,
emitter,
cpuTimeAccumulator,
false
);
}
private static <T> Function<Query<T>, ServiceMetricEvent.Builder> getBuilderFn(final QueryToolChest<T, Query<T>> toolChest)
{
return new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
{
return toolChest.makeMetricBuilder(input);
}
};
}
}

View File

@ -41,6 +41,7 @@ import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.initialization.Initialization;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryToolChest;
@ -215,10 +216,11 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
}
};
ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
ServletHolder holder = new ServletHolder(
new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.<Class<? extends Query>, QueryToolChest>of()),
injector.getInstance(ObjectMapper.class),
jsonMapper,
injector.getInstance(Key.get(ObjectMapper.class, Smile.class)),
hostFinder,
injector.getProvider(org.eclipse.jetty.client.HttpClient.class),
@ -231,7 +233,8 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
{
// noop
}
}
},
new DefaultGenericQueryMetricsFactory(jsonMapper)
)
{
@Override

View File

@ -29,6 +29,7 @@ import io.druid.concurrent.Execs;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -139,7 +140,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig()
new AuthConfig(),
new DefaultGenericQueryMetricsFactory(jsonMapper)
);
}
@ -213,7 +215,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true)
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
);
Response response = queryResource.doPost(
@ -283,7 +286,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true)
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
);
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","
@ -379,7 +383,8 @@ public class QueryResourceTest
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true)
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
);
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","

View File

@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.jackson.DefaultObjectMapper;
@ -43,9 +42,11 @@ import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.java.util.common.guava.YieldingSequenceBase;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.DefaultQueryMetrics;
import io.druid.query.Druids;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -573,9 +574,9 @@ public class ServerManagerTest
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(QueryType query)
public QueryMetrics<Query<?>> makeMetrics(QueryType query)
{
return new ServiceMetricEvent.Builder();
return new DefaultQueryMetrics<>(new DefaultObjectMapper());
}
@Override