[bugfix] Run cold schema refresh thread periodically (#16873)

* Fix build

* Run coldSchemaExec thread periodically

* Bugfix: Run cold schema refresh periodically

* Rename metrics for deep storage only segment schema process
This commit is contained in:
Rishabh Singh 2024-08-13 11:44:01 +05:30 committed by GitHub
parent d7dfbebf97
commit f67ff92d07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 121 additions and 28 deletions

View File

@ -382,6 +382,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
|`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.| |`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.|
|`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.| |`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.|
|`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of segments for which schema is cached after back filling in the database.||This value gets reset after each database poll. Eventually it should be 0.| |`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of segments for which schema is cached after back filling in the database.||This value gets reset after each database poll. Eventually it should be 0.|
|`metadatacache/deepStorageOnly/segment/count`|Number of available segments present only in deep storage.|`dataSource`||
|`metadatacache/deepStorageOnly/refresh/count`|Number of deep storage only segments with cached schema.|`dataSource`||
|`metadatacache/deepStorageOnly/process/time`|Time taken in milliseconds to process deep storage only segment schema.||Under a minute|
## General Health ## General Health

View File

@ -24,7 +24,6 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource;
@ -35,12 +34,15 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.SchemaPayloadPlus; import org.apache.druid.segment.SchemaPayloadPlus;
@ -69,7 +71,6 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -100,12 +101,15 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class); private static final EmittingLogger log = new EmittingLogger(CoordinatorSegmentMetadataCache.class);
private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L; private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50); private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = TimeUnit.SECONDS.toMillis(50);
private static final String DEEP_STORAGE_ONLY_METRIC_PREFIX = "metadatacache/deepStorageOnly/";
private final SegmentMetadataCacheConfig config; private final SegmentMetadataCacheConfig config;
private final ColumnTypeMergePolicy columnTypeMergePolicy; private final ColumnTypeMergePolicy columnTypeMergePolicy;
private final SegmentSchemaCache segmentSchemaCache; private final SegmentSchemaCache segmentSchemaCache;
private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue; private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private final Supplier<SegmentsMetadataManagerConfig> segmentsMetadataManagerConfigSupplier;
private final ServiceEmitter emitter;
private volatile SegmentReplicationStatus segmentReplicationStatus = null; private volatile SegmentReplicationStatus segmentReplicationStatus = null;
// Datasource schema built from only cold segments. // Datasource schema built from only cold segments.
@ -114,7 +118,6 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
// Period for cold schema processing thread. This is a multiple of segment polling period. // Period for cold schema processing thread. This is a multiple of segment polling period.
// Cold schema processing runs slower than the segment poll to save processing cost of all segments. // Cold schema processing runs slower than the segment poll to save processing cost of all segments.
// The downside is a delay in columns from cold segment reflecting in the datasource schema. // The downside is a delay in columns from cold segment reflecting in the datasource schema.
private final long coldSchemaExecPeriodMillis;
private final ScheduledExecutorService coldSchemaExec; private final ScheduledExecutorService coldSchemaExec;
private @Nullable Future<?> cacheExecFuture = null; private @Nullable Future<?> cacheExecFuture = null;
private @Nullable Future<?> coldSchemaExecFuture = null; private @Nullable Future<?> coldSchemaExecFuture = null;
@ -139,18 +142,19 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
this.segmentSchemaCache = segmentSchemaCache; this.segmentSchemaCache = segmentSchemaCache;
this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue; this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.coldSchemaExecPeriodMillis = this.segmentsMetadataManagerConfigSupplier = segmentsMetadataManagerConfigSupplier;
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * COLD_SCHEMA_PERIOD_MULTIPLIER; this.emitter = emitter;
coldSchemaExec = Executors.newSingleThreadScheduledExecutor( this.coldSchemaExec = Execs.scheduledSingleThreaded("DruidColdSchema-ScheduledExecutor-%d");
new ThreadFactoryBuilder()
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
.setDaemon(false)
.build()
);
initServerViewTimelineCallback(serverView); initServerViewTimelineCallback(serverView);
} }
long getColdSchemaExecPeriodMillis()
{
return (segmentsMetadataManagerConfigSupplier.get().getPollDuration().toStandardDuration().getMillis())
* COLD_SCHEMA_PERIOD_MULTIPLIER;
}
private void initServerViewTimelineCallback(final CoordinatorServerView serverView) private void initServerViewTimelineCallback(final CoordinatorServerView serverView)
{ {
serverView.registerTimelineCallback( serverView.registerTimelineCallback(
@ -232,9 +236,10 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
try { try {
segmentSchemaBackfillQueue.onLeaderStart(); segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop); cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldSchemaExecFuture = coldSchemaExec.schedule( coldSchemaExecFuture = coldSchemaExec.scheduleWithFixedDelay(
this::coldDatasourceSchemaExec, this::coldDatasourceSchemaExec,
coldSchemaExecPeriodMillis, getColdSchemaExecPeriodMillis(),
getColdSchemaExecPeriodMillis(),
TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
); );
@ -558,9 +563,7 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
Set<String> dataSourceWithColdSegmentSet = new HashSet<>(); Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
int datasources = 0; int datasources = 0, dataSourceWithColdSegments = 0, totalColdSegments = 0;
int segments = 0;
int dataSourceWithColdSegments = 0;
Collection<ImmutableDruidDataSource> immutableDataSources = Collection<ImmutableDruidDataSource> immutableDataSources =
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments(); sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
@ -571,6 +574,9 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
int coldSegments = 0;
int coldSegmentWithSchema = 0;
for (DataSegment segment : dataSegments) { for (DataSegment segment : dataSegments) {
Integer replicationFactor = getReplicationFactor(segment.getId()); Integer replicationFactor = getReplicationFactor(segment.getId());
if (replicationFactor != null && replicationFactor != 0) { if (replicationFactor != null && replicationFactor != 0) {
@ -580,36 +586,66 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
if (optionalSchema.isPresent()) { if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature); mergeRowSignature(columnTypes, rowSignature);
coldSegmentWithSchema++;
} }
segments++; coldSegments++;
} }
if (columnTypes.isEmpty()) { if (coldSegments == 0) {
// this datasource doesn't have any cold segment // this datasource doesn't have any cold segment
continue; continue;
} }
totalColdSegments += coldSegments;
String dataSourceName = dataSource.getName();
ServiceMetricEvent.Builder metricBuilder =
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSourceName);
emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + "segment/count", coldSegments));
if (columnTypes.isEmpty()) {
// this datasource doesn't have schema for cold segments
continue;
}
final RowSignature.Builder builder = RowSignature.builder(); final RowSignature.Builder builder = RowSignature.builder();
columnTypes.forEach(builder::add); columnTypes.forEach(builder::add);
RowSignature coldSignature = builder.build(); RowSignature coldSignature = builder.build();
String dataSourceName = dataSource.getName();
dataSourceWithColdSegmentSet.add(dataSourceName); dataSourceWithColdSegmentSet.add(dataSourceName);
dataSourceWithColdSegments++; dataSourceWithColdSegments++;
log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature); DataSourceInformation druidTable = new DataSourceInformation(dataSourceName, coldSignature);
DataSourceInformation oldTable = coldSchemaTable.put(dataSourceName, druidTable);
coldSchemaTable.put(dataSourceName, new DataSourceInformation(dataSourceName, coldSignature)); if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
log.info("[%s] has new cold signature: %s.", dataSource, druidTable.getRowSignature());
} else {
log.debug("[%s] signature is unchanged.", dataSource);
}
emitter.emit(metricBuilder.setMetric(DEEP_STORAGE_ONLY_METRIC_PREFIX + "refresh/count", coldSegmentWithSchema));
log.debug("[%s] signature from cold segments is [%s]", dataSourceName, coldSignature);
} }
// remove any stale datasource from the map // remove any stale datasource from the map
coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet); coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
emitter.emit(
new ServiceMetricEvent.Builder().setMetric(
DEEP_STORAGE_ONLY_METRIC_PREFIX + "process/time",
stopwatch.millisElapsed()
)
);
String executionStatsLog = StringUtils.format( String executionStatsLog = StringUtils.format(
"Cold schema processing took [%d] millis. " "Cold schema processing took [%d] millis. "
+ "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segments.", + "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segment schema.",
stopwatch.millisElapsed(), datasources, segments, dataSourceWithColdSegments stopwatch.millisElapsed(), datasources, totalColdSegments, dataSourceWithColdSegments
); );
if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) { if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
log.info(executionStatsLog); log.info(executionStatsLog);

View File

@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
@ -1788,7 +1789,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas()); Assert.assertEquals(existingMetadata.getNumReplicas(), currentMetadata.getNumReplicas());
} }
private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest() private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest(ServiceEmitter emitter)
{ {
// foo has both hot and cold segments // foo has both hot and cold segments
DataSegment coldSegment = DataSegment coldSegment =
@ -1862,7 +1863,7 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
SEGMENT_CACHE_CONFIG_DEFAULT, SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(), new NoopEscalator(),
new InternalQueryConfig(), new InternalQueryConfig(),
new NoopServiceEmitter(), emitter,
segmentSchemaCache, segmentSchemaCache,
backFillQueue, backFillQueue,
sqlSegmentsMetadataManager, sqlSegmentsMetadataManager,
@ -1893,10 +1894,17 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
@Test @Test
public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws IOException public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws IOException
{ {
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(emitter);
schema.coldDatasourceSchemaExec(); schema.coldDatasourceSchemaExec();
emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet());
// verify that cold schema for both foo and cold is present // verify that cold schema for both foo and cold is present
@ -1955,7 +1963,8 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
@Test @Test
public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws IOException public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws IOException
{ {
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(); StubServiceEmitter emitter = new StubServiceEmitter("coordinator", "host");
CoordinatorSegmentMetadataCache schema = setupForColdDatasourceSchemaTest(emitter);
Set<SegmentId> segmentIds = new HashSet<>(); Set<SegmentId> segmentIds = new HashSet<>();
segmentIds.add(segment1.getId()); segmentIds.add(segment1.getId());
@ -1971,7 +1980,13 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
schema.coldDatasourceSchemaExec(); schema.coldDatasourceSchemaExec();
// could datasource should be present now emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "foo"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/segment/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, "cold"), 1);
emitter.verifyEmitted("metadatacache/deepStorageOnly/process/time", 1);
// cold datasource should be present now
Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet()); Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), schema.getDataSourceInformationMap().keySet());
RowSignature coldSignature = schema.getDatasource("cold").getRowSignature(); RowSignature coldSignature = schema.getDatasource("cold").getRowSignature();
@ -2160,6 +2175,45 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
Assert.assertEquals(Collections.singleton("beta"), schema.getDataSourceInformationMap().keySet()); Assert.assertEquals(Collections.singleton("beta"), schema.getDataSourceInformationMap().keySet());
} }
@Test
public void testColdDatasourceSchemaExecRunsPeriodically() throws InterruptedException
{
// Make sure the thread runs more than once
CountDownLatch latch = new CountDownLatch(2);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
long getColdSchemaExecPeriodMillis()
{
return 10;
}
@Override
protected void coldDatasourceSchemaExec()
{
latch.countDown();
super.coldDatasourceSchemaExec();
}
};
schema.onLeaderStart();
schema.awaitInitialization();
latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals(0, latch.getCount());
}
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{ {
final DataSourceInformation fooDs = schema.getDatasource("foo"); final DataSourceInformation fooDs = schema.getDatasource("foo");