diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 16aaca90837..f85cbec77df 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -66,6 +66,9 @@ Metrics may have additional dimensions beyond those listed above. |`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| | |`init/serverview/time`|Time taken to initialize the broker server view. Useful to detect if brokers are taking too long to start.||Depends on the number of segments.| |`init/metadatacache/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.| +|`segment/metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`| +|`segment/metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`| + ### Historical diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java index 71c19734ec0..ea3dc395671 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.base.Stopwatch; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Interner; @@ -48,6 +49,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; @@ -88,6 +90,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -707,6 +710,8 @@ public class SegmentMetadataCache private Set refreshSegmentsForDataSource(final String dataSource, final Set segments) throws IOException { + final Stopwatch stopwatch = Stopwatch.createStarted(); + if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) { // Sanity check. We definitely expect this to pass. throw new ISE("'segments' must all match 'dataSource'!"); @@ -714,7 +719,10 @@ public class SegmentMetadataCache log.debug("Refreshing metadata for dataSource[%s].", dataSource); - final long startTime = System.currentTimeMillis(); + final ServiceMetricEvent.Builder builder = + new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); + + emitter.emit(builder.build("segment/metadatacache/refresh/count", segments.size())); // Segment id string -> SegmentId object. final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); @@ -783,10 +791,14 @@ public class SegmentMetadataCache yielder.close(); } + long refreshDurationMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + emitter.emit(builder.build("segment/metadatacache/refresh/time", refreshDurationMillis)); + log.debug( "Refreshed metadata for dataSource [%s] in %,d ms (%d segments queried, %d segments left).", dataSource, - System.currentTimeMillis() - startTime, + refreshDurationMillis, retVal.size(), segments.size() - retVal.size() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index fb55d227109..0414878b4a5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -39,6 +39,8 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; @@ -1447,6 +1449,52 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon Assert.assertNull(schema.getDatasource("wat")); } + @Test + public void testRefreshShouldEmitMetrics() throws InterruptedException, IOException + { + String datasource = "xyz"; + CountDownLatch addSegmentLatch = new CountDownLatch(2); + StubServiceEmitter emitter = new StubServiceEmitter("broker", "host"); + SegmentMetadataCache schema = new SegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + segmentManager, + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new BrokerInternalQueryConfig(), + emitter + ) + { + @Override + protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + if (datasource.equals(segment.getDataSource())) { + addSegmentLatch.countDown(); + } + } + + @Override + void removeSegment(final DataSegment segment) + { + super.removeSegment(segment); + } + }; + + List segments = ImmutableList.of( + newSegment(datasource, 1), + newSegment(datasource, 2) + ); + serverView.addSegment(segments.get(0), ServerType.HISTORICAL); + serverView.addSegment(segments.get(1), ServerType.REALTIME); + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(datasource)); + + emitter.verifyEmitted("segment/metadatacache/refresh/time", ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1); + emitter.verifyEmitted("segment/metadatacache/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1); + } + private static DataSegment newSegment(String datasource, int partitionId) { return new DataSegment(