Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation (#12484)

* Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation

* fixup! Pass metrics object for Scan, Timeseries and GroupBy queries during cursor creation

* Document vectorized dimension
This commit is contained in:
Rohan Garg 2022-05-09 23:10:17 +05:30 committed by GitHub
parent eb6de94e1f
commit 2dd073c2cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 169 additions and 45 deletions

View File

@ -64,7 +64,7 @@ Metrics may have additional dimensions beyond those listed above.
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment.|several hundred milliseconds|
|`query/segment/time`|Milliseconds taken to query individual segment. Includes time to page in the segment from disk.|id, status, segment, vectorized.|several hundred milliseconds|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|< several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the Historical process).|id, segment.|several hundred milliseconds|

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -100,7 +101,7 @@ public class DistinctCountTimeseriesQueryTest extends InitializedNullHandlingTes
.build();
final Iterable<Result<TimeseriesResultValue>> results =
engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
engine.process(query, new IncrementalIndexStorageAdapter(index), new DefaultTimeseriesQueryMetrics()).toList();
List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
new Result<>(

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
@ -46,7 +47,8 @@ public class QueryRunnerHelper
final VirtualColumns virtualColumns,
final boolean descending,
final Granularity granularity,
final Function<Cursor, Result<T>> mapFn
final Function<Cursor, Result<T>> mapFn,
@Nullable final QueryMetrics<?> queryMetrics
)
{
Preconditions.checkArgument(
@ -55,7 +57,7 @@ public class QueryRunnerHelper
return Sequences.filter(
Sequences.map(
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, null),
adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, queryMetrics),
mapFn
),
Objects::nonNull

View File

@ -85,7 +85,11 @@ public class GroupByQueryEngine
this.intermediateResultsBufferPool = intermediateResultsBufferPool;
}
public Sequence<Row> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<Row> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
throw new ISE(
@ -112,7 +116,7 @@ public class GroupByQueryEngine
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();

View File

@ -102,7 +102,9 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<ResultRow,
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class);
}
return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter);
return strategySelector
.strategize((GroupByQuery) query)
.process((GroupByQuery) query, adapter, (GroupByQueryMetrics) queryPlus.getQueryMetrics());
}
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayDoubleGroupByColumnSelectorStrategy;
import org.apache.druid.query.groupby.epinephelinae.column.ArrayLongGroupByColumnSelectorStrategy;
@ -90,7 +91,7 @@ import java.util.stream.Stream;
* This code runs on data servers, like Historicals.
*
* Used by
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter)}.
* {@link GroupByStrategyV2#process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)}.
*/
public class GroupByQueryEngineV2
{
@ -119,7 +120,8 @@ public class GroupByQueryEngineV2
final GroupByQuery query,
@Nullable final StorageAdapter storageAdapter,
final NonBlockingPool<ByteBuffer> intermediateResultsBufferPool,
final GroupByQueryConfig querySpecificConfig
final GroupByQueryConfig querySpecificConfig,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (storageAdapter == null) {
@ -161,7 +163,8 @@ public class GroupByQueryEngineV2
fudgeTimestamp,
filter,
interval,
querySpecificConfig
querySpecificConfig,
groupByQueryMetrics
);
} else {
result = processNonVectorized(
@ -171,7 +174,8 @@ public class GroupByQueryEngineV2
fudgeTimestamp,
querySpecificConfig,
filter,
interval
interval,
groupByQueryMetrics
);
}
@ -190,7 +194,8 @@ public class GroupByQueryEngineV2
@Nullable final DateTime fudgeTimestamp,
final GroupByQueryConfig querySpecificConfig,
@Nullable final Filter filter,
final Interval interval
final Interval interval,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
final Sequence<Cursor> cursors = storageAdapter.makeCursors(
@ -199,7 +204,7 @@ public class GroupByQueryEngineV2
query.getVirtualColumns(),
query.getGranularity(),
false,
null
groupByQueryMetrics
);
return cursors.flatMap(

View File

@ -33,6 +33,7 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.AggregateResult;
import org.apache.druid.query.groupby.epinephelinae.BufferArrayGrouper;
@ -128,7 +129,8 @@ public class VectorGroupByEngine
@Nullable final DateTime fudgeTimestamp,
@Nullable final Filter filter,
final Interval interval,
final GroupByQueryConfig config
final GroupByQueryConfig config,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
if (!canVectorize(query, storageAdapter, filter)) {
@ -147,7 +149,7 @@ public class VectorGroupByEngine
query.getVirtualColumns(),
false,
QueryContexts.getVectorSize(query),
null
groupByQueryMetrics
);
if (cursor == null) {

View File

@ -27,6 +27,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@ -164,7 +165,7 @@ public interface GroupByStrategy
/**
* Merge a variety of single-segment query runners into a combined runner. Used by
* {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter, GroupByQueryMetrics)} (the runners created
* by that method will be fed into this method).
* <p>
* This method is only called on data servers, like Historicals (not the Broker).
@ -187,7 +188,10 @@ public interface GroupByStrategy
*
* @return result sequence for the storage adapter
*/
Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter);
Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics);
/**
* Returns whether this strategy supports pushing down outer queries. This is used by

View File

@ -41,6 +41,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
@ -51,6 +52,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@ -233,7 +235,8 @@ public class GroupByStrategyV1 implements GroupByStrategy
outerQuery.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
),
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
new IncrementalIndexStorageAdapter(innerQueryResultIndex),
null
);
}
}
@ -269,10 +272,14 @@ public class GroupByStrategyV1 implements GroupByStrategy
}
@Override
public Sequence<ResultRow> process(final GroupByQuery query, final StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
final GroupByQuery query,
final StorageAdapter storageAdapter,
@Nullable final GroupByQueryMetrics groupByQueryMetrics
)
{
return Sequences.map(
engine.process(query, storageAdapter),
engine.process(query, storageAdapter, groupByQueryMetrics),
row -> GroupByQueryHelper.toResultRow(query, row)
);
}

View File

@ -59,6 +59,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryMetrics;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
@ -73,6 +74,7 @@ import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
@ -690,13 +692,18 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
@Override
public Sequence<ResultRow> process(GroupByQuery query, StorageAdapter storageAdapter)
public Sequence<ResultRow> process(
GroupByQuery query,
StorageAdapter storageAdapter,
@Nullable GroupByQueryMetrics groupByQueryMetrics
)
{
return GroupByQueryEngineV2.process(
query,
storageAdapter,
bufferPool,
configSupplier.get().withOverrides(query)
configSupplier.get().withOverrides(query),
groupByQueryMetrics
);
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
@ -44,6 +45,7 @@ import org.apache.druid.segment.filter.Filters;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -61,7 +63,8 @@ public class ScanQueryEngine
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final ResponseContext responseContext
final ResponseContext responseContext,
@Nullable final QueryMetrics<?> queryMetrics
)
{
if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) {
@ -135,7 +138,7 @@ public class ScanQueryEngine
Granularities.ALL,
query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
null
queryMetrics
)
.map(cursor -> new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()

View File

@ -373,7 +373,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
if (timeoutAt == null || timeoutAt == 0L) {
responseContext.putTimeoutTime(JodaUtils.MAX_INSTANT);
}
return engine.process((ScanQuery) query, segment, responseContext);
return engine.process((ScanQuery) query, segment, responseContext, queryPlus.getQueryMetrics());
}
}
}

View File

@ -120,7 +120,8 @@ public class TimeBoundaryQueryRunnerFactory
VirtualColumns.EMPTY,
descending,
Granularities.ALL,
this.skipToFirstMatching
this.skipToFirstMatching,
null
);
final List<Result<DateTime>> resultList = resultSequence.limit(1).toList();
if (resultList.size() > 0) {

View File

@ -82,7 +82,11 @@ public class TimeseriesQueryEngine
* Run a single-segment, single-interval timeseries query on a particular adapter. The query must have been
* scoped down to a single interval before calling this method.
*/
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
public Sequence<Result<TimeseriesResultValue>> process(
final TimeseriesQuery query,
final StorageAdapter adapter,
@Nullable final TimeseriesQueryMetrics timeseriesQueryMetrics
)
{
if (adapter == null) {
throw new SegmentMissingException(
@ -106,9 +110,9 @@ public class TimeseriesQueryEngine
final Sequence<Result<TimeseriesResultValue>> result;
if (doVectorize) {
result = processVectorized(query, adapter, filter, interval, gran, descending);
result = processVectorized(query, adapter, filter, interval, gran, descending, timeseriesQueryMetrics);
} else {
result = processNonVectorized(query, adapter, filter, interval, gran, descending);
result = processNonVectorized(query, adapter, filter, interval, gran, descending, timeseriesQueryMetrics);
}
final int limit = query.getLimit();
@ -125,7 +129,8 @@ public class TimeseriesQueryEngine
@Nullable final Filter filter,
final Interval queryInterval,
final Granularity gran,
final boolean descending
final boolean descending,
final TimeseriesQueryMetrics timeseriesQueryMetrics
)
{
final boolean skipEmptyBuckets = query.isSkipEmptyBuckets();
@ -137,7 +142,7 @@ public class TimeseriesQueryEngine
query.getVirtualColumns(),
descending,
QueryContexts.getVectorSize(query),
null
timeseriesQueryMetrics
);
if (cursor == null) {
@ -251,7 +256,8 @@ public class TimeseriesQueryEngine
@Nullable final Filter filter,
final Interval queryInterval,
final Granularity gran,
final boolean descending
final boolean descending,
final TimeseriesQueryMetrics timeseriesQueryMetrics
)
{
final boolean skipEmptyBuckets = query.isSkipEmptyBuckets();
@ -299,7 +305,8 @@ public class TimeseriesQueryEngine
agg.close();
}
}
}
},
timeseriesQueryMetrics
);
}
}

View File

@ -99,7 +99,7 @@ public class TimeseriesQueryRunnerFactory
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class);
}
return engine.process((TimeseriesQuery) input, adapter);
return engine.process((TimeseriesQuery) input, adapter, (TimeseriesQueryMetrics) queryPlus.getQueryMetrics());
}
}

View File

@ -262,6 +262,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return Sequences.empty();
}
if (queryMetrics != null) {
queryMetrics.vectorized(false);
}
final Interval dataInterval = new Interval(getMinTime(), gran.bucketEnd(getMaxTime()));
if (!interval.overlaps(dataInterval)) {

View File

@ -31,6 +31,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -140,11 +141,12 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
)
);
final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new DefaultTimeseriesQueryMetrics();
final Iterable<Result<TimeseriesResultValue>> iiResults =
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList();
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex), defaultTimeseriesQueryMetrics).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList();
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex), defaultTimeseriesQueryMetrics).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index");

View File

@ -31,6 +31,7 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -139,11 +140,12 @@ public class StringLastTimeseriesQueryTest
)
);
final DefaultTimeseriesQueryMetrics defaultTimeseriesQueryMetrics = new DefaultTimeseriesQueryMetrics();
final Iterable<Result<TimeseriesResultValue>> iiResults =
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList();
engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex), defaultTimeseriesQueryMetrics).toList();
final Iterable<Result<TimeseriesResultValue>> qiResults =
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList();
engine.process(query, new QueryableIndexStorageAdapter(queryableIndex), defaultTimeseriesQueryMetrics).toList();
TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index");
TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index");

View File

@ -50,6 +50,7 @@ import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentResultValue;
@ -207,6 +208,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
private static final Closer RESOURCE_CLOSER = Closer.create();
private final QueryRunner<ResultRow> runner;
private final QueryRunner<ResultRow> originalRunner;
private final GroupByQueryRunnerFactory factory;
private final GroupByQueryConfig config;
private final boolean vectorize;
@ -449,7 +451,9 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize);
// Add vectorization tests for any indexes that support it.
if (!vectorize || QueryRunnerTestHelper.isTestRunnerVectorizable(runner)) {
if (!vectorize ||
(QueryRunnerTestHelper.isTestRunnerVectorizable(runner) &&
config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2))) {
constructors.add(new Object[]{testName, config, factory, runner, vectorize});
}
}
@ -476,6 +480,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
this.originalRunner = runner;
String runnerName = runner.toString();
this.vectorize = vectorize;
}
@ -752,7 +757,15 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
Assert.assertEquals(1, serviceEmitter.getEvents().size());
Assert.assertEquals(vectorize, serviceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}

View File

@ -22,7 +22,9 @@ package org.apache.druid.query.groupby;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -54,6 +56,30 @@ public class GroupByQueryRunnerTestHelper
return queryResult.toList();
}
public static <T> Iterable<T> runQueryWithEmitter(
QueryRunnerFactory factory,
QueryRunner runner,
Query<T> query,
ServiceEmitter serviceEmitter
)
{
MetricsEmittingQueryRunner<ResultRow> metricsEmittingQueryRunner =
new MetricsEmittingQueryRunner<ResultRow>(
serviceEmitter,
factory.getToolchest(),
runner,
(obj, lng) -> {},
(metrics) -> {}
).withWaitMeasuredFromNow();
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(metricsEmittingQueryRunner)),
toolChest
);
return theRunner.run(QueryPlus.wrap(query)).toList();
}
public static ResultRow createExpectedRow(final GroupByQuery query, final String timestamp, Object... vals)
{
return createExpectedRow(query, DateTimes.of(timestamp), vals);

View File

@ -36,9 +36,11 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -211,7 +213,16 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
.virtualColumns(EXPR_COLUMN)
.build();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query)).toList();
StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
MetricsEmittingQueryRunner<ScanResultValue> metricsEmittingQueryRunner =
new MetricsEmittingQueryRunner<ScanResultValue>(
stubServiceEmitter,
TOOL_CHEST,
runner,
(obj, lng) -> {},
(metrics) -> {}
).withWaitMeasuredFromNow();
Iterable<ScanResultValue> results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList();
List<ScanResultValue> expectedResults = toExpected(
toFullEvents(V_0112_0114),
@ -219,6 +230,8 @@ public class ScanQueryRunnerTest extends InitializedNullHandlingTest
0,
3
);
Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
Assert.assertEquals(false, stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null));
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}

View File

@ -34,8 +34,10 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -220,7 +222,16 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.context(makeContext())
.build();
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
MetricsEmittingQueryRunner<Result<TimeseriesResultValue>> metricsEmittingQueryRunner =
new MetricsEmittingQueryRunner<Result<TimeseriesResultValue>>(
stubServiceEmitter,
new TimeseriesQueryQueryToolChest(),
runner,
(obj, lng) -> {},
(metrics) -> {}
).withWaitMeasuredFromNow();
Iterable<Result<TimeseriesResultValue>> results = metricsEmittingQueryRunner.run(QueryPlus.wrap(query)).toList();
final String[] expectedIndex = descending ?
QueryRunnerTestHelper.EXPECTED_FULL_ON_INDEX_VALUES_DESC :
@ -306,6 +317,11 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
++count;
}
Assert.assertEquals(1, stubServiceEmitter.getEvents().size());
Assert.assertEquals(
vectorize,
stubServiceEmitter.getEvents().get(0).toMap().getOrDefault("vectorized", null)
);
Assert.assertEquals(lastResult.toString(), expectedLast, lastResult.getTimestamp());
}

View File

@ -158,7 +158,8 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
.addDimension("sally")
.addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
.build(),
new IncrementalIndexStorageAdapter(index)
new IncrementalIndexStorageAdapter(index),
null
);
final List<Row> results = rows.toList();
@ -236,7 +237,8 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
)
)
.build(),
new IncrementalIndexStorageAdapter(index)
new IncrementalIndexStorageAdapter(index),
null
);
final List<Row> results = rows.toList();
@ -406,7 +408,8 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
.addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
.setDimFilter(DimFilters.dimEquals("sally", (String) null))
.build(),
new IncrementalIndexStorageAdapter(index)
new IncrementalIndexStorageAdapter(index),
null
);
final List<Row> results = rows.toList();