Fix attempts to publish the same pending segments multiple times (#16605)

* Fix attempts to publish the same pending segments multiple times
This commit is contained in:
AmatyaAvadhanula 2024-06-18 12:02:13 +05:30 committed by GitHub
parent 51b2f6cb45
commit 4c8932e00e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 34 additions and 0 deletions

View File

@ -1460,8 +1460,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
)); ));
final String now = DateTimes.nowUtc().toString(); final String now = DateTimes.nowUtc().toString();
final Set<SegmentIdWithShardSpec> processedSegmentIds = new HashSet<>();
for (PendingSegmentRecord pendingSegment : pendingSegments) { for (PendingSegmentRecord pendingSegment : pendingSegments) {
final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); final SegmentIdWithShardSpec segmentId = pendingSegment.getId();
if (processedSegmentIds.contains(segmentId)) {
continue;
}
final Interval interval = segmentId.getInterval(); final Interval interval = segmentId.getInterval();
insertBatch.add() insertBatch.add()
@ -1479,6 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("payload", jsonMapper.writeValueAsBytes(segmentId)) .bind("payload", jsonMapper.writeValueAsBytes(segmentId))
.bind("task_allocator_id", pendingSegment.getTaskAllocatorId()) .bind("task_allocator_id", pendingSegment.getTaskAllocatorId())
.bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId()); .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId());
processedSegmentIds.add(segmentId);
} }
int[] updated = insertBatch.execute(); int[] updated = insertBatch.execute();
return Arrays.stream(updated).sum(); return Arrays.stream(updated).sum();

View File

@ -330,6 +330,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
); );
} }
@Test
public void testDuplicatePendingSegmentEntriesAreNotInserted()
{
final PendingSegmentRecord pendingSegment0 = new PendingSegmentRecord(
new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(0, 0)),
"sequenceName0",
"sequencePrevId0",
null,
"taskAllocatorId"
);
final PendingSegmentRecord pendingSegment1 = new PendingSegmentRecord(
new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(1, 0)),
"sequenceName1",
"sequencePrevId1",
null,
"taskAllocatorId"
);
final int actualInserted = derbyConnector.retryWithHandle(
handle -> coordinator.insertPendingSegmentsIntoMetastore(
handle,
ImmutableList.of(pendingSegment0, pendingSegment0, pendingSegment1, pendingSegment1, pendingSegment1),
"foo",
true
)
);
Assert.assertEquals(2, actualInserted);
}
@Test @Test
public void testSimpleAnnounce() throws IOException public void testSimpleAnnounce() throws IOException
{ {