Process retrieval of parent and child segment ids in batches (#16734)

This commit is contained in:
AmatyaAvadhanula 2024-07-15 18:24:23 +05:30 committed by GitHub
parent 78a4a09d01
commit 6891866c43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 144 additions and 48 deletions

View File

@ -2954,28 +2954,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return Collections.emptyMap(); return Collections.emptyMap();
} }
final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
final String sql = StringUtils.format(
"SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", segmentIdList)
);
final Map<String, String> upgradedFromSegmentIds = new HashMap<>(); final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
connector.retryWithHandle( final List<List<String>> partitions = Lists.partition(ImmutableList.copyOf(segmentIds), 100);
handle -> { for (List<String> partition : partitions) {
Query<Map<String, Object>> query = handle.createQuery(sql) final String sql = StringUtils.format(
.bind("dataSource", dataSource); "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", segmentIdList, query); dbTables.getSegmentsTable(),
return query.map((index, r, ctx) -> { SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", partition)
final String id = r.getString(1); );
final String upgradedFromSegmentId = r.getString(2); connector.retryWithHandle(
if (upgradedFromSegmentId != null) { handle -> {
upgradedFromSegmentIds.put(id, upgradedFromSegmentId); Query<Map<String, Object>> query = handle.createQuery(sql)
} .bind("dataSource", dataSource);
return null; SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", partition, query);
}).list(); return query.map((index, r, ctx) -> {
} final String id = r.getString(1);
); final String upgradedFromSegmentId = r.getString(2);
if (upgradedFromSegmentId != null) {
upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
}
return null;
}).list();
}
);
}
return upgradedFromSegmentIds; return upgradedFromSegmentIds;
} }
@ -2989,39 +2991,40 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return Collections.emptyMap(); return Collections.emptyMap();
} }
final List<String> upgradedFromSegmentIdList = ImmutableList.copyOf(segmentIds);
final String sql = StringUtils.format(
"SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
"upgraded_from_segment_id",
upgradedFromSegmentIdList
)
);
final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>(); final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
retrieveSegmentsById(dataSource, segmentIds) retrieveSegmentsById(dataSource, segmentIds)
.stream() .stream()
.map(DataSegment::getId) .map(DataSegment::getId)
.map(SegmentId::toString) .map(SegmentId::toString)
.forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id)); .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id));
connector.retryWithHandle(
handle -> { final List<List<String>> partitions = Lists.partition(ImmutableList.copyOf(segmentIds), 100);
Query<Map<String, Object>> query = handle.createQuery(sql) for (List<String> partition : partitions) {
.bind("dataSource", dataSource); final String sql = StringUtils.format(
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition( "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
"upgraded_from_segment_id", dbTables.getSegmentsTable(),
upgradedFromSegmentIdList, SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("upgraded_from_segment_id", partition)
query );
);
return query.map((index, r, ctx) -> { connector.retryWithHandle(
final String upgradedToId = r.getString(1); handle -> {
final String id = r.getString(2); Query<Map<String, Object>> query = handle.createQuery(sql)
upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()) .bind("dataSource", dataSource);
.add(upgradedToId); SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
return null; "upgraded_from_segment_id",
}).list(); partition,
} query
); );
return query.map((index, r, ctx) -> {
final String upgradedToId = r.getString(1);
final String id = r.getString(2);
upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
.add(upgradedToId);
return null;
}).list();
}
);
}
return upgradedToSegmentIds; return upgradedToSegmentIds;
} }

View File

@ -3452,6 +3452,48 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
); );
} }
@Test
public void testRetrieveUpgradedFromSegmentIdsInBatches()
{
final int size = 500;
final int batchSize = 100;
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < size; i++) {
segments.add(
new DataSegment(
"DS",
Intervals.ETERNITY,
"v " + (i % 5),
ImmutableMap.of("num", i / 5),
ImmutableList.of("dim"),
ImmutableList.of("agg"),
new NumberedShardSpec(i / 5, 0),
0,
100L
)
);
}
Map<String, String> expected = new HashMap<>();
for (int i = 0; i < batchSize; i++) {
for (int j = 1; j < 5; j++) {
expected.put(
segments.get(5 * i + j).getId().toString(),
segments.get(5 * i).getId().toString()
);
}
}
insertUsedSegments(ImmutableSet.copyOf(segments), expected);
Map<String, String> actual = coordinator.retrieveUpgradedFromSegmentIds(
"DS",
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
Assert.assertEquals(400, actual.size());
Assert.assertEquals(expected, actual);
}
@Test @Test
public void testRetrieveUpgradedToSegmentIds() public void testRetrieveUpgradedToSegmentIds()
{ {
@ -3478,6 +3520,57 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
); );
} }
@Test
public void testRetrieveUpgradedToSegmentIdsInBatches()
{
final int size = 500;
final int batchSize = 100;
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < size; i++) {
segments.add(
new DataSegment(
"DS",
Intervals.ETERNITY,
"v " + (i % 5),
ImmutableMap.of("num", i / 5),
ImmutableList.of("dim"),
ImmutableList.of("agg"),
new NumberedShardSpec(i / 5, 0),
0,
100L
)
);
}
Map<String, Set<String>> expected = new HashMap<>();
for (DataSegment segment : segments) {
final String id = segment.getId().toString();
expected.put(id, new HashSet<>());
expected.get(id).add(id);
}
Map<String, String> upgradeMap = new HashMap<>();
for (int i = 0; i < batchSize; i++) {
for (int j = 1; j < 5; j++) {
upgradeMap.put(
segments.get(5 * i + j).getId().toString(),
segments.get(5 * i).getId().toString()
);
expected.get(segments.get(5 * i).getId().toString())
.add(segments.get(5 * i + j).getId().toString());
}
}
insertUsedSegments(ImmutableSet.copyOf(segments), upgradeMap);
Map<String, Set<String>> actual = coordinator.retrieveUpgradedToSegmentIds(
"DS",
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
Assert.assertEquals(500, actual.size());
Assert.assertEquals(expected, actual);
}
private void insertUsedSegments(Set<DataSegment> segments, Map<String, String> upgradedFromSegmentIdMap) private void insertUsedSegments(Set<DataSegment> segments, Map<String, String> upgradedFromSegmentIdMap)
{ {
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();