From f67ff92d07d4838e6acd5fb4761baf3c9de6c3e2 Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Tue, 13 Aug 2024 11:44:01 +0530 Subject: [PATCH] [bugfix] Run cold schema refresh thread periodically (#16873) * Fix build * Run coldSchemaExec thread periodically * Bugfix: Run cold schema refresh periodically * Rename metrics for deep storage only segment schema process --- docs/operations/metrics.md | 3 + .../CoordinatorSegmentMetadataCache.java | 82 +++++++++++++------ .../CoordinatorSegmentMetadataCacheTest.java | 64 +++++++++++++-- 3 files changed, 121 insertions(+), 28 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index ec97f44fe39..83cc9550690 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -382,6 +382,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon |`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.| |`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.| |`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of segments for which schema is cached after back filling in the database.||This value gets reset after each database poll. Eventually it should be 0.| +|`metadatacache/deepStorageOnly/segment/count`|Number of available segments present only in deep storage.|`dataSource`|| +|`metadatacache/deepStorageOnly/refresh/count`|Number of deep storage only segments with cached schema.|`dataSource`|| +|`metadatacache/deepStorageOnly/process/time`|Time taken in milliseconds to process deep storage only segment schema.||Under a minute| ## General Health diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 3a4f548b8ba..0c03f7af73c 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -24,7 +24,6 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.ImmutableDruidDataSource; @@ -35,12 +34,15 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.segment.SchemaPayloadPlus; @@ -69,7 +71,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -100,12 +101,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class); private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L; private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50); + private static final String DEEP_STORAGE_ONLY_METRIC_PREFIX = "metadatacache/deepStorageOnly/"; private final SegmentMetadataCacheConfig config; private final ColumnTypeMergePolicy columnTypeMergePolicy; private final SegmentSchemaCache segmentSchemaCache; private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue; private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; + private final Supplier segmentsMetadataManagerConfigSupplier; + private final ServiceEmitter emitter; private volatile SegmentReplicationStatus segmentReplicationStatus = null; // Datasource schema built from only cold segments. @@ -114,7 +118,6 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach // Period for cold schema processing thread. This is a multiple of segment polling period. // Cold schema processing runs slower than the segment poll to save processing cost of all segments. // The downside is a delay in columns from cold segment reflecting in the datasource schema. - private final long coldSchemaExecPeriodMillis; private final ScheduledExecutorService coldSchemaExec; private @Nullable Future cacheExecFuture = null; private @Nullable Future coldSchemaExecFuture = null; @@ -139,18 +142,19 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach this.segmentSchemaCache = segmentSchemaCache; this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue; this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; - this.coldSchemaExecPeriodMillis = - segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER; - coldSchemaExec = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("DruidColdSchema-ScheduledExecutor-%d") - .setDaemon(false) - .build() - ); + this.segmentsMetadataManagerConfigSupplier = segmentsMetadataManagerConfigSupplier; + this.emitter = emitter; + this.coldSchemaExec = Execs.scheduledSingleThreaded("DruidColdSchema-ScheduledExecutor-%d"); initServerViewTimelineCallback(serverView); } + long getColdSchemaExecPeriodMillis() + { + return (segmentsMetadataManagerConfigSupplier.get().getPollDuration().toStandardDuration().getMillis()) + * COLD_SCHEMA_PERIOD_MULTIPLIER; + } + private void initServerViewTimelineCallback(final CoordinatorServerView serverView) { serverView.registerTimelineCallback( @@ -232,9 +236,10 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach try { segmentSchemaBackfillQueue.onLeaderStart(); cacheExecFuture = cacheExec.submit(this::cacheExecLoop); - coldSchemaExecFuture = coldSchemaExec.schedule( + coldSchemaExecFuture = coldSchemaExec.scheduleWithFixedDelay( this::coldDatasourceSchemaExec, - coldSchemaExecPeriodMillis, + getColdSchemaExecPeriodMillis(), + getColdSchemaExecPeriodMillis(), TimeUnit.MILLISECONDS ); @@ -558,9 +563,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach Set dataSourceWithColdSegmentSet = new HashSet<>(); - int datasources = 0; - int segments = 0; - int dataSourceWithColdSegments = 0; + int datasources = 0, dataSourceWithColdSegments = 0, totalColdSegments = 0; Collection immutableDataSources = sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments(); @@ -571,6 +574,9 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach final Map columnTypes = new LinkedHashMap<>(); + int coldSegments = 0; + int coldSegmentWithSchema = 0; + for (DataSegment segment : dataSegments) { Integer replicationFactor = getReplicationFactor(segment.getId()); if (replicationFactor != null && replicationFactor != 0) { @@ -580,36 +586,66 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach if (optionalSchema.isPresent()) { RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); + coldSegmentWithSchema++; } - segments++; + coldSegments++; } - if (columnTypes.isEmpty()) { + if (coldSegments == 0) { // this datasource doesn't have any cold segment continue; } + totalColdSegments += coldSegments; + + String dataSourceName = dataSource.getName(); + + ServiceMetricEvent.Builder metricBuilder = + new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSourceName); + + emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + "segment/count", coldSegments)); + + if (columnTypes.isEmpty()) { + // this datasource doesn't have schema for cold segments + continue; + } + final RowSignature.Builder builder = RowSignature.builder(); columnTypes.forEach(builder::add); RowSignature coldSignature = builder.build(); - String dataSourceName = dataSource.getName(); dataSourceWithColdSegmentSet.add(dataSourceName); dataSourceWithColdSegments++; - log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature); + DataSourceInformation druidTable = new DataSourceInformation(dataSourceName, coldSignature); + DataSourceInformation oldTable = coldSchemaTable.put(dataSourceName, druidTable); - coldSchemaTable.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature)); + if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { + log.info("[%s] has new cold signature: %s.", dataSource, druidTable.getRowSignature()); + } else { + log.debug("[%s] signature is unchanged.", dataSource); + } + + emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + "refresh/count", coldSegmentWithSchema)); + + log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature); } // remove any stale datasource from the map coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet); + emitter.emit( + new ServiceMetricEvent.Builder().setMetric( + DEEP_STORAGE_ONLY_METRIC_PREFIX + "process/time", + stopwatch.millisElapsed() + ) + ); + String executionStatsLog = StringUtils.format( "Cold schema processing took [%d] millis. " - + "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segments.", - stopwatch.millisElapsed(), datasources, segments, dataSourceWithColdSegments + + "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segment schema.", + stopwatch.millisElapsed(), datasources, totalColdSegments, dataSourceWithColdSegments ); if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) { log.info(executionStatsLog); diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index ef1fb1e8edd..8fbc78a7412 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; @@ -1788,7 +1789,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); } - private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest() + private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest(ServiceEmitter emitter) { // foo has both hot and cold segments DataSegment coldSegment = @@ -1862,7 +1863,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new InternalQueryConfig(), - new NoopServiceEmitter(), + emitter, segmentSchemaCache, backFillQueue, sqlSegmentsMetadataManager, @@ -1893,10 +1894,17 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad @Test public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws IOException { - CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); + StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host"); + CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(emitter); schema.coldDatasourceSchemaExec(); + emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1); + Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); // verify that cold schema for both foo and cold is present @@ -1955,7 +1963,8 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad @Test public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws IOException { - CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); + StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host"); + CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(emitter); Set segmentIds = new HashSet<>(); segmentIds.add(segment1.getId()); @@ -1971,7 +1980,13 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad schema.coldDatasourceSchemaExec(); - // could datasource should be present now + emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1); + emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1); + + // cold datasource should be present now Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); RowSignature coldSignature = schema.getDatasource("cold").getRowSignature(); @@ -2160,6 +2175,45 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad Assert.assertEquals(Collections.singleton("beta"), schema.getDataSourceInformationMap().keySet()); } + @Test + public void testColdDatasourceSchemaExecRunsPeriodically() throws InterruptedException + { + // Make sure the thread runs more than once + CountDownLatch latch = new CountDownLatch(2); + + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + segmentSchemaCache, + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier + ) { + @Override + long getColdSchemaExecPeriodMillis() + { + return 10; + } + + @Override + protected void coldDatasourceSchemaExec() + { + latch.countDown(); + super.coldDatasourceSchemaExec(); + } + }; + + schema.onLeaderStart(); + schema.awaitInitialization(); + + latch.await(1, TimeUnit.SECONDS); + Assert.assertEquals(0, latch.getCount()); + } + private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) { final DataSourceInformation fooDs = schema.getDatasource("foo");