mirror of https://github.com/apache/druid.git
Skip refresh for unused segments in metadata cache (#16990)
* Skip refresh for unused segments in metadata cache * Cover the condition where a used segment missing schema is marked for refresh * Fix test
This commit is contained in:
parent
428f58cf15
commit
a18f582ef0
|
@ -692,9 +692,16 @@ public class CoordinatorSegmentMetadataCache extends AbstractSegmentMetadataCach
|
|||
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
|
||||
mergeRowSignature(columnTypes, rowSignature);
|
||||
} else {
|
||||
// mark it for refresh, however, this case shouldn't arise by design
|
||||
markSegmentAsNeedRefresh(segmentId);
|
||||
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
|
||||
|
||||
ImmutableDruidDataSource druidDataSource =
|
||||
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());
|
||||
|
||||
if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
|
||||
// mark it for refresh only if it is used
|
||||
// however, this case shouldn't arise by design
|
||||
markSegmentAsNeedRefresh(segmentId);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -2290,6 +2290,102 @@ public class CoordinatorSegmentMetadataCacheTest extends CoordinatorSegmentMetad
|
|||
Assert.assertEquals(0, metadatas.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, IOException
|
||||
{
|
||||
String dataSource = "xyz";
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
|
||||
getQueryLifecycleFactory(walker),
|
||||
serverView,
|
||||
SEGMENT_CACHE_CONFIG_DEFAULT,
|
||||
new NoopEscalator(),
|
||||
new InternalQueryConfig(),
|
||||
new NoopServiceEmitter(),
|
||||
segmentSchemaCache,
|
||||
backFillQueue,
|
||||
sqlSegmentsMetadataManager,
|
||||
segmentsMetadataManagerConfigSupplier
|
||||
) {
|
||||
@Override
|
||||
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
|
||||
throws IOException
|
||||
{
|
||||
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
List<DataSegment> segments = ImmutableList.of(
|
||||
newSegment(dataSource, 1),
|
||||
newSegment(dataSource, 2),
|
||||
newSegment(dataSource, 3)
|
||||
);
|
||||
|
||||
final DruidServer historicalServer = druidServers.stream()
|
||||
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
|
||||
.findAny()
|
||||
.orElse(null);
|
||||
|
||||
Assert.assertNotNull(historicalServer);
|
||||
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();
|
||||
|
||||
ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
|
||||
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));
|
||||
segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, "fp"));
|
||||
segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, "fp"));
|
||||
|
||||
ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
|
||||
schemaPayloadMap.put("fp", new SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
|
||||
segmentSchemaCache.updateFinalizedSegmentSchema(
|
||||
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
|
||||
);
|
||||
|
||||
schema.addSegment(historicalServerMetadata, segments.get(0));
|
||||
schema.addSegment(historicalServerMetadata, segments.get(1));
|
||||
schema.addSegment(historicalServerMetadata, segments.get(2));
|
||||
|
||||
serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
|
||||
serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
|
||||
serverView.addSegment(segments.get(2), ServerType.HISTORICAL);
|
||||
|
||||
schema.onLeaderStart();
|
||||
schema.awaitInitialization();
|
||||
|
||||
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
// make segment3 unused
|
||||
segmentStatsMap = new ImmutableMap.Builder<>();
|
||||
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));
|
||||
|
||||
segmentSchemaCache.updateFinalizedSegmentSchema(
|
||||
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
|
||||
);
|
||||
|
||||
Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
|
||||
segmentMap.put(segments.get(0).getId(), segments.get(0));
|
||||
segmentMap.put(segments.get(1).getId(), segments.get(1));
|
||||
|
||||
ImmutableDruidDataSource druidDataSource =
|
||||
new ImmutableDruidDataSource(
|
||||
"xyz",
|
||||
Collections.emptyMap(),
|
||||
segmentMap
|
||||
);
|
||||
|
||||
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
|
||||
.thenReturn(druidDataSource);
|
||||
|
||||
Set<SegmentId> segmentsToRefresh = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
|
||||
segmentsToRefresh.remove(segments.get(1).getId());
|
||||
segmentsToRefresh.remove(segments.get(2).getId());
|
||||
|
||||
schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));
|
||||
|
||||
Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
|
||||
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
|
||||
}
|
||||
|
||||
private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
|
||||
{
|
||||
final DataSourceInformation fooDs = schema.getDatasource("foo");
|
||||
|
|
Loading…
Reference in New Issue