diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 54f75ccb920..ecfad572e74 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2954,28 +2954,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return Collections.emptyMap(); } - final List segmentIdList = ImmutableList.copyOf(segmentIds); - final String sql = StringUtils.format( - "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", - dbTables.getSegmentsTable(), - SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", segmentIdList) - ); final Map upgradedFromSegmentIds = new HashMap<>(); - connector.retryWithHandle( - handle -> { - Query> query = handle.createQuery(sql) - .bind("dataSource", dataSource); - SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", segmentIdList, query); - return query.map((index, r, ctx) -> { - final String id = r.getString(1); - final String upgradedFromSegmentId = r.getString(2); - if (upgradedFromSegmentId != null) { - upgradedFromSegmentIds.put(id, upgradedFromSegmentId); - } - return null; - }).list(); - } - ); + final List> partitions = Lists.partition(ImmutableList.copyOf(segmentIds), 100); + for (List partition : partitions) { + final String sql = StringUtils.format( + "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", + dbTables.getSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", partition) + ); + connector.retryWithHandle( + handle -> { + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource); + SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", partition, query); + return query.map((index, r, ctx) -> { + final String id = r.getString(1); + final String upgradedFromSegmentId = r.getString(2); + if (upgradedFromSegmentId != null) { + upgradedFromSegmentIds.put(id, upgradedFromSegmentId); + } + return null; + }).list(); + } + ); + } return upgradedFromSegmentIds; } @@ -2989,39 +2991,40 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return Collections.emptyMap(); } - final List upgradedFromSegmentIdList = ImmutableList.copyOf(segmentIds); - final String sql = StringUtils.format( - "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", - dbTables.getSegmentsTable(), - SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn( - "upgraded_from_segment_id", - upgradedFromSegmentIdList - ) - ); final Map> upgradedToSegmentIds = new HashMap<>(); retrieveSegmentsById(dataSource, segmentIds) .stream() .map(DataSegment::getId) .map(SegmentId::toString) .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()).add(id)); - connector.retryWithHandle( - handle -> { - Query> query = handle.createQuery(sql) - .bind("dataSource", dataSource); - SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition( - "upgraded_from_segment_id", - upgradedFromSegmentIdList, - query - ); - return query.map((index, r, ctx) -> { - final String upgradedToId = r.getString(1); - final String id = r.getString(2); - upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()) - .add(upgradedToId); - return null; - }).list(); - } - ); + + final List> partitions = Lists.partition(ImmutableList.copyOf(segmentIds), 100); + for (List partition : partitions) { + final String sql = StringUtils.format( + "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s", + dbTables.getSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("upgraded_from_segment_id", partition) + ); + + connector.retryWithHandle( + handle -> { + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource); + SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition( + "upgraded_from_segment_id", + partition, + query + ); + return query.map((index, r, ctx) -> { + final String upgradedToId = r.getString(1); + final String id = r.getString(2); + upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>()) + .add(upgradedToId); + return null; + }).list(); + } + ); + } return upgradedToSegmentIds; } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f352d5e2609..6eccbccaa84 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3452,6 +3452,48 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); } + @Test + public void testRetrieveUpgradedFromSegmentIdsInBatches() + { + final int size = 500; + final int batchSize = 100; + + List segments = new ArrayList<>(); + for (int i = 0; i < size; i++) { + segments.add( + new DataSegment( + "DS", + Intervals.ETERNITY, + "v " + (i % 5), + ImmutableMap.of("num", i / 5), + ImmutableList.of("dim"), + ImmutableList.of("agg"), + new NumberedShardSpec(i / 5, 0), + 0, + 100L + ) + ); + } + Map expected = new HashMap<>(); + for (int i = 0; i < batchSize; i++) { + for (int j = 1; j < 5; j++) { + expected.put( + segments.get(5 * i + j).getId().toString(), + segments.get(5 * i).getId().toString() + ); + } + } + insertUsedSegments(ImmutableSet.copyOf(segments), expected); + + Map actual = coordinator.retrieveUpgradedFromSegmentIds( + "DS", + segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet()) + ); + + Assert.assertEquals(400, actual.size()); + Assert.assertEquals(expected, actual); + } + @Test public void testRetrieveUpgradedToSegmentIds() { @@ -3478,6 +3520,57 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); } + @Test + public void testRetrieveUpgradedToSegmentIdsInBatches() + { + final int size = 500; + final int batchSize = 100; + + List segments = new ArrayList<>(); + for (int i = 0; i < size; i++) { + segments.add( + new DataSegment( + "DS", + Intervals.ETERNITY, + "v " + (i % 5), + ImmutableMap.of("num", i / 5), + ImmutableList.of("dim"), + ImmutableList.of("agg"), + new NumberedShardSpec(i / 5, 0), + 0, + 100L + ) + ); + } + + Map> expected = new HashMap<>(); + for (DataSegment segment : segments) { + final String id = segment.getId().toString(); + expected.put(id, new HashSet<>()); + expected.get(id).add(id); + } + Map upgradeMap = new HashMap<>(); + for (int i = 0; i < batchSize; i++) { + for (int j = 1; j < 5; j++) { + upgradeMap.put( + segments.get(5 * i + j).getId().toString(), + segments.get(5 * i).getId().toString() + ); + expected.get(segments.get(5 * i).getId().toString()) + .add(segments.get(5 * i + j).getId().toString()); + } + } + insertUsedSegments(ImmutableSet.copyOf(segments), upgradeMap); + + Map> actual = coordinator.retrieveUpgradedToSegmentIds( + "DS", + segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet()) + ); + + Assert.assertEquals(500, actual.size()); + Assert.assertEquals(expected, actual); + } + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();