From 9767b42e85a91d4e2b71b3f718d072a9b52d140d Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 22 Jul 2021 19:57:35 -0500 Subject: [PATCH] Add a new metric query/segments/count that is not emitted by default (#11394) * Add a new metric query/segments/count that is not emitted by default * docs * test the default implementation of the metric * fix spelling error in docs * document the fact that query retries will result in additional metric emissions * update using recommended text from @jihoonson --- .../benchmark/query/CachingClusteredClientBenchmark.java | 4 +++- docs/operations/metrics.md | 1 + .../druid/query/movingaverage/MovingAverageQueryTest.java | 4 +++- .../java/org/apache/druid/query/DefaultQueryMetrics.java | 7 +++++++ .../main/java/org/apache/druid/query/QueryMetrics.java | 5 +++++ .../druid/query/search/DefaultSearchQueryMetrics.java | 6 ++++++ .../org/apache/druid/query/DefaultQueryMetricsTest.java | 7 +++++++ .../org/apache/druid/client/CachingClusteredClient.java | 8 +++++++- .../client/CachingClusteredClientFunctionalityTest.java | 4 +++- .../druid/client/CachingClusteredClientPerfTest.java | 4 +++- .../apache/druid/client/CachingClusteredClientTest.java | 4 +++- .../query/QueryRunnerBasedOnClusteredClientTestBase.java | 4 +++- 12 files changed, 51 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index fe89751c32f..36512ad4284 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -107,6 +107,7 @@ import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; @@ -342,7 +343,8 @@ public class CachingClusteredClientBenchmark processingConfig, forkJoinPool, QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 71cc33025a3..d4ca7ad0112 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -59,6 +59,7 @@ Available Metrics |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/interrupted/count`|number of queries interrupted due to cancellation.|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/timeout/count`|number of timed out queries.|This metric is only available if the QueryCountStatsMonitor module is included.|| +|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will re-send the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.|Varies.| |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s| |`sqlQuery/bytes`|number of bytes returned in SQL query response.|id, nativeQueryIds, dataSource, remoteAddress, success.| | diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index f1173aab03c..15fdfa76815 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -70,6 +70,7 @@ import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -367,7 +368,8 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index a351eeb29ed..095fa17d03a 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -340,6 +340,13 @@ public class DefaultQueryMetrics> implements QueryMet return this; } + @Override + public QueryMetrics reportQueriedSegmentCount(long segmentCount) + { + // Don't emit by default. + return this; + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index 9457fbd75f5..304ed6cd833 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -270,6 +270,11 @@ public interface QueryMetrics> */ QueryMetrics reportQueryBytes(long byteCount); + /** + * Registeres "segments queried count" metric. + */ + QueryMetrics reportQueriedSegmentCount(long segmentCount); + /** * Registers "wait time" metric. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index dd52bf5a7ff..108518a7948 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -291,6 +291,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs); } + @Override + public QueryMetrics reportQueriedSegmentCount(long segmentCount) + { + return delegateQueryMetrics.reportQueriedSegmentCount(segmentCount); + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index e0fe0b4a70d..5091ae08a58 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -148,5 +148,12 @@ public class DefaultQueryMetricsTest actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); Assert.assertEquals(10L, actualEvent.get("value")); + + // Here we are testing that Queried Segment Count does not get emitted by the DefaultQueryMetrics and the last + // metric remains as query/node/bytes + queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); + Assert.assertEquals(10L, actualEvent.get("value")); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 44818495890..8175e2a3ce0 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DruidProcessingConfig; @@ -129,6 +130,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final ForkJoinPool pool; private final QueryScheduler scheduler; private final JoinableFactoryWrapper joinableFactoryWrapper; + private final ServiceEmitter emitter; @Inject public CachingClusteredClient( @@ -142,7 +144,8 @@ public class CachingClusteredClient implements QuerySegmentWalker DruidProcessingConfig processingConfig, @Merging ForkJoinPool pool, QueryScheduler scheduler, - JoinableFactory joinableFactory + JoinableFactory joinableFactory, + ServiceEmitter emitter ) { this.warehouse = warehouse; @@ -156,6 +159,7 @@ public class CachingClusteredClient implements QuerySegmentWalker this.pool = pool; this.scheduler = scheduler; this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); + this.emitter = emitter; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( @@ -369,6 +373,8 @@ public class CachingClusteredClient implements QuerySegmentWalker query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers); queryPlus = queryPlus.withQuery(query); + queryPlus = queryPlus.withQueryMetrics(toolChest); + queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter); final SortedMap> segmentsByServer = groupSegmentsByServer(segmentServers); LazySequence mergedResultSequence = new LazySequence<>(() -> { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 1d591e2148d..a897e09d643 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -335,7 +336,8 @@ public class CachingClusteredClientFunctionalityTest }, ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index ab1d6cd7b80..218cac521ba 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -53,6 +53,7 @@ import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerManagerTest; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -138,7 +139,8 @@ public class CachingClusteredClientPerfTest Mockito.mock(DruidProcessingConfig.class), ForkJoinPool.commonPool(), queryScheduler, - NoopJoinableFactory.INSTANCE + NoopJoinableFactory.INSTANCE, + new NoopServiceEmitter() ); Query fakeQuery = makeFakeQuery(interval); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index cda0170d5f0..6fe08c4622b 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -125,6 +125,7 @@ import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.ServerTestHelper; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; @@ -2850,7 +2851,8 @@ public class CachingClusteredClientTest NoQueryLaningStrategy.INSTANCE, new ServerConfig() ), - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); } diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index ae0c269adf6..94c6c596090 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -52,6 +52,7 @@ import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -145,7 +146,8 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase ), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + new NoopServiceEmitter() ); servers = new ArrayList<>(); }