From 4c8932e00e03ebc1cd86f9003fd84b0a3e217542 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Tue, 18 Jun 2024 12:02:13 +0530 Subject: [PATCH] Fix attempts to publish the same pending segments multiple times (#16605) * Fix attempts to publish the same pending segments multiple times --- .../IndexerSQLMetadataStorageCoordinator.java | 6 ++++ ...exerSQLMetadataStorageCoordinatorTest.java | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2b02f09926b..b9adcd01e14 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1460,8 +1460,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor )); final String now = DateTimes.nowUtc().toString(); + final Set processedSegmentIds = new HashSet<>(); for (PendingSegmentRecord pendingSegment : pendingSegments) { final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); + if (processedSegmentIds.contains(segmentId)) { + continue; + } final Interval interval = segmentId.getInterval(); insertBatch.add() @@ -1479,6 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("payload", jsonMapper.writeValueAsBytes(segmentId)) .bind("task_allocator_id", pendingSegment.getTaskAllocatorId()) .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId()); + + processedSegmentIds.add(segmentId); } int[] updated = insertBatch.execute(); return Arrays.stream(updated).sum(); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index fc43d7126fe..91f3279eeb6 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -330,6 +330,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata ); } + @Test + public void testDuplicatePendingSegmentEntriesAreNotInserted() + { + final PendingSegmentRecord pendingSegment0 = new PendingSegmentRecord( + new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(0, 0)), + "sequenceName0", + "sequencePrevId0", + null, + "taskAllocatorId" + ); + final PendingSegmentRecord pendingSegment1 = new PendingSegmentRecord( + new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(1, 0)), + "sequenceName1", + "sequencePrevId1", + null, + "taskAllocatorId" + ); + final int actualInserted = derbyConnector.retryWithHandle( + handle -> coordinator.insertPendingSegmentsIntoMetastore( + handle, + ImmutableList.of(pendingSegment0, pendingSegment0, pendingSegment1, pendingSegment1, pendingSegment1), + "foo", + true + ) + ); + Assert.assertEquals(2, actualInserted); + } + @Test public void testSimpleAnnounce() throws IOException {