Optimize unused segment query for segment allocation (#16623)

This commit is contained in:
AmatyaAvadhanula 2024-06-18 20:45:04 +05:30 committed by GitHub
parent a10310388f
commit be3593f099
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 98 additions and 15 deletions

View File

@ -249,6 +249,44 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
List<String> retrieveUnusedSegmentIdsForExactIntervalAndVersion(
String dataSource,
Interval interval,
String version
)
{
final String sql = "SELECT id FROM %1$s"
+ " WHERE used = :used"
+ " AND dataSource = :dataSource"
+ " AND version = :version"
+ " AND start = :start AND %2$send%2$s = :end";
final List<String> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
final Query<Map<String, Object>> query = handle
.createQuery(StringUtils.format(
sql,
dbTables.getSegmentsTable(),
connector.getQuoteString()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", false)
.bind("dataSource", dataSource)
.bind("version", version)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
try (final ResultIterator<String> iterator = query.map((index, r, ctx) -> r.getString(1)).iterator()) {
return ImmutableList.copyOf(iterator);
}
}
);
log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].",
matchingSegments.size(), dataSource, interval, version);
return matchingSegments;
}
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
@ -1881,7 +1919,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
// If yes, try to compute allocated partition num using the max unused segment shard spec
SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
SegmentId unusedMaxId = getUnusedMaxId(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
@ -1893,7 +1931,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
int maxPartitionNum = Math.max(
allocatedId.getShardSpec().getPartitionNum(),
unusedMaxId.getShardSpec().getPartitionNum() + 1
unusedMaxId.getPartitionNum() + 1
);
return new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
@ -1906,25 +1944,25 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version)
private SegmentId getUnusedMaxId(String datasource, Interval interval, String version)
{
List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
List<String> unusedSegmentIds = retrieveUnusedSegmentIdsForExactIntervalAndVersion(
datasource,
interval,
ImmutableList.of(version),
null,
null
version
);
SegmentIdWithShardSpec unusedMaxId = null;
SegmentId unusedMaxId = null;
int maxPartitionNum = -1;
for (DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getInterval().equals(interval)) {
int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
if (maxPartitionNum < partitionNum) {
maxPartitionNum = partitionNum;
unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
}
for (String id : unusedSegmentIds) {
final SegmentId segmentId = SegmentId.tryParse(datasource, id);
if (segmentId == null) {
continue;
}
int partitionNum = segmentId.getPartitionNum();
if (maxPartitionNum < partitionNum) {
maxPartitionNum = partitionNum;
unusedMaxId = segmentId;
}
}
return unusedMaxId;

View File

@ -3275,4 +3275,49 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
);
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
}
@Test
public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws Exception
{
DataSegment unusedForDifferentVersion = createSegment(
Intervals.of("2024/2025"),
"v0",
new NumberedShardSpec(0, 0)
);
DataSegment unusedSegmentForExactIntervalAndVersion = createSegment(
Intervals.of("2024/2025"),
"v1",
new NumberedShardSpec(0, 0)
);
DataSegment unusedSegmentForDifferentInterval = createSegment(
Intervals.of("2023/2024"),
"v1",
new NumberedShardSpec(0, 0)
);
coordinator.commitSegments(
ImmutableSet.of(
unusedForDifferentVersion,
unusedSegmentForDifferentInterval,
unusedSegmentForExactIntervalAndVersion
),
null
);
coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY);
DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
Intervals.of("2024/2025"),
"v1",
new NumberedShardSpec(1, 0)
);
coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion), null);
List<String> unusedSegmentIdsForIntervalAndVersion =
coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1");
Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size());
Assert.assertEquals(
unusedSegmentForExactIntervalAndVersion.getId().toString(),
unusedSegmentIdsForIntervalAndVersion.get(0)
);
}
}