Add a new metric query/segments/count that is not emitted by default (#11394)

* Add a new metric query/segments/count that is not emitted by default

* docs

* test the default implementation of the metric

* fix spelling error in docs

* document the fact that query retries will result in additional metric emissions

* update using recommended text from @jihoonson
This commit is contained in:
Lucas Capistrant 2021-07-22 19:57:35 -05:00 committed by GitHub
parent ce1faa5635
commit 9767b42e85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 51 additions and 7 deletions

View File

@ -107,6 +107,7 @@ import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
@ -342,7 +343,8 @@ public class CachingClusteredClientBenchmark
processingConfig,
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
}

View File

@ -59,6 +59,7 @@ Available Metrics
|`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/interrupted/count`|number of queries interrupted due to cancellation.|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/timeout/count`|number of timed out queries.|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will re-send the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.|Varies.|
|`sqlQuery/time`|Milliseconds taken to complete a SQL query.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s|
|`sqlQuery/bytes`|number of bytes returned in SQL query response.|id, nativeQueryIds, dataSource, remoteAddress, success.| |

View File

@ -70,6 +70,7 @@ import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@ -367,7 +368,8 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(

View File

@ -340,6 +340,13 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return this;
}
@Override
public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
{
// Don't emit by default.
return this;
}
@Override
public void emit(ServiceEmitter emitter)
{

View File

@ -270,6 +270,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
QueryMetrics<QueryType> reportQueryBytes(long byteCount);
/**
* Registeres "segments queried count" metric.
*/
QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount);
/**
* Registers "wait time" metric.
*/

View File

@ -291,6 +291,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs);
}
@Override
public QueryMetrics reportQueriedSegmentCount(long segmentCount)
{
return delegateQueryMetrics.reportQueriedSegmentCount(segmentCount);
}
@Override
public void emit(ServiceEmitter emitter)
{

View File

@ -148,5 +148,12 @@ public class DefaultQueryMetricsTest
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
Assert.assertEquals(10L, actualEvent.get("value"));
// Here we are testing that Queried Segment Count does not get emitted by the DefaultQueryMetrics and the last
// metric remains as query/node/bytes
queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
Assert.assertEquals(10L, actualEvent.get("value"));
}
}

View File

@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DruidProcessingConfig;
@ -129,6 +130,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final ForkJoinPool pool;
private final QueryScheduler scheduler;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final ServiceEmitter emitter;
@Inject
public CachingClusteredClient(
@ -142,7 +144,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
DruidProcessingConfig processingConfig,
@Merging ForkJoinPool pool,
QueryScheduler scheduler,
JoinableFactory joinableFactory
JoinableFactory joinableFactory,
ServiceEmitter emitter
)
{
this.warehouse = warehouse;
@ -156,6 +159,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.pool = pool;
this.scheduler = scheduler;
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.emitter = emitter;
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
log.warn(
@ -369,6 +373,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers);
queryPlus = queryPlus.withQuery(query);
queryPlus = queryPlus.withQueryMetrics(toolChest);
queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter);
final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segmentServers);
LazySequence<T> mergedResultSequence = new LazySequence<>(() -> {

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -335,7 +336,8 @@ public class CachingClusteredClientFunctionalityTest
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
}

View File

@ -53,6 +53,7 @@ import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerManagerTest;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.LinearShardSpec;
@ -138,7 +139,8 @@ public class CachingClusteredClientPerfTest
Mockito.mock(DruidProcessingConfig.class),
ForkJoinPool.commonPool(),
queryScheduler,
NoopJoinableFactory.INSTANCE
NoopJoinableFactory.INSTANCE,
new NoopServiceEmitter()
);
Query<SegmentDescriptor> fakeQuery = makeFakeQuery(interval);

View File

@ -125,6 +125,7 @@ import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
@ -2850,7 +2851,8 @@ public class CachingClusteredClientTest
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
@ -145,7 +146,8 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
new NoopServiceEmitter()
);
servers = new ArrayList<>();
}