From 5fae20d2878380374a0ac132f00a10e0e01c80ba Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Fri, 3 May 2024 19:09:48 +0530 Subject: [PATCH] Do not allocate ids conflicting with existing segment ids (#16380) * Do not allocate ids conflicting with existing segment ids * Parameterized tests * Add doc and retain test for coverage --- .../actions/SegmentAllocateActionTest.java | 52 ++++++++++ .../IndexerSQLMetadataStorageCoordinator.java | 99 +++++++++++++------ ...exerSQLMetadataStorageCoordinatorTest.java | 55 +++++++++++ 3 files changed, 174 insertions(+), 32 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index f2da105d269..1857f6d67f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -29,6 +29,8 @@ import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -1062,6 +1064,45 @@ public class SegmentAllocateActionTest Assert.assertEquals(Duration.ofDays(1).toMillis(), id2.getInterval().toDurationMillis()); } + @Test + public void testSegmentIdMustNotBeReused() throws IOException + { + final IndexerMetadataStorageCoordinator coordinator = taskActionTestKit.getMetadataStorageCoordinator(); + final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox(); + final Task task0 = NoopTask.ofPriority(25); + lockbox.add(task0); + final NoopTask task1 = NoopTask.ofPriority(50); + lockbox.add(task1); + + // Allocate and commit for older task task0 + final SegmentIdWithShardSpec id0 = + allocate(task0, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "0"); + final DataSegment dataSegment0 = getSegmentForIdentifier(id0); + coordinator.commitSegments(ImmutableSet.of(dataSegment0), null); + lockbox.unlock(task0, Intervals.ETERNITY); + + // Allocate and commit for newer task task1. Pending segments are cleaned up + final SegmentIdWithShardSpec id1 = + allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "1"); + final DataSegment dataSegment1 = getSegmentForIdentifier(id1); + final SegmentIdWithShardSpec id2 = + allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "2"); + final DataSegment dataSegment2 = getSegmentForIdentifier(id2); + coordinator.commitSegments(ImmutableSet.of(dataSegment1, dataSegment2), null); + // Clean up pending segments corresponding to the last pending segment + coordinator.deletePendingSegmentsForTaskAllocatorId(task1.getDataSource(), task1.getTaskAllocatorId()); + + // Drop all segments + coordinator.markSegmentsAsUnusedWithinInterval(task0.getDataSource(), Intervals.ETERNITY); + + // Allocate another id and ensure that it doesn't exist in the druid_segments table + final SegmentIdWithShardSpec theId = + allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "3"); + Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true)); + + lockbox.unlock(task1, Intervals.ETERNITY); + } + private SegmentIdWithShardSpec allocate( final Task task, final DateTime timestamp, @@ -1123,4 +1164,15 @@ public class SegmentAllocateActionTest Assert.assertEquals(expected, actual); Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec()); } + + private DataSegment getSegmentForIdentifier(SegmentIdWithShardSpec identifier) + { + return DataSegment.builder() + .dataSource(identifier.getDataSource()) + .interval(identifier.getInterval()) + .version(identifier.getVersion()) + .shardSpec(identifier.getShardSpec()) + .size(100) + .build(); + } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 618173a5db7..2b02f09926b 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; @@ -1013,33 +1012,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return allocatedSegmentIds; } - @SuppressWarnings("UnstableApiUsage") - private String getSequenceNameAndPrevIdSha( - SegmentCreateRequest request, - SegmentIdWithShardSpec pendingSegmentId, - boolean skipSegmentLineageCheck - ) - { - final Hasher hasher = Hashing.sha1().newHasher() - .putBytes(StringUtils.toUtf8(request.getSequenceName())) - .putByte((byte) 0xff); - - if (skipSegmentLineageCheck) { - final Interval interval = pendingSegmentId.getInterval(); - hasher - .putLong(interval.getStartMillis()) - .putLong(interval.getEndMillis()); - } else { - hasher - .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId())); - } - - hasher.putByte((byte) 0xff); - hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion())); - - return BaseEncoding.base16().encode(hasher.hash().asBytes()); - } - @Nullable private SegmentIdWithShardSpec allocatePendingSegment( final Handle handle, @@ -1727,7 +1699,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. // When the core partitions have been dropped, using pending segments may lead to an incorrect state // where the chunk is believed to have core partitions and queries results are incorrect. - SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec( dataSource, interval, @@ -1739,7 +1710,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ) ); return new PendingSegmentRecord( - pendingSegmentId, + getTrueAllocatedId(pendingSegmentId), request.getSequenceName(), request.getPreviousSegmentId(), request.getUpgradedFromSegmentId(), @@ -1875,8 +1846,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. // When the core partitions have been dropped, using pending segments may lead to an incorrect state // where the chunk is believed to have core partitions and queries results are incorrect. - - return new SegmentIdWithShardSpec( + final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec( dataSource, interval, Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), @@ -1886,9 +1856,74 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() ) ); + return getTrueAllocatedId(allocatedId); } } + /** + * Verifies that the allocated id doesn't already exist in the druid segments table. + * If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval + * Otherwise, use the same id. + * @param allocatedId The segment allcoted on the basis of used and pending segments + * @return a segment id that isn't already used by other unused segments + */ + private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocatedId) + { + // Check if there is a conflict with an existing entry in the segments table + if (retrieveSegmentForId(allocatedId.asSegmentId().toString(), true) == null) { + return allocatedId; + } + + // If yes, try to compute allocated partition num using the max unused segment shard spec + SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId( + allocatedId.getDataSource(), + allocatedId.getInterval(), + allocatedId.getVersion() + ); + // No unused segment. Just return the allocated id + if (unusedMaxId == null) { + return allocatedId; + } + + int maxPartitionNum = Math.max( + allocatedId.getShardSpec().getPartitionNum(), + unusedMaxId.getShardSpec().getPartitionNum() + 1 + ); + return new SegmentIdWithShardSpec( + allocatedId.getDataSource(), + allocatedId.getInterval(), + allocatedId.getVersion(), + new NumberedShardSpec( + maxPartitionNum, + allocatedId.getShardSpec().getNumCorePartitions() + ) + ); + } + + private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version) + { + List unusedSegments = retrieveUnusedSegmentsForInterval( + datasource, + interval, + ImmutableList.of(version), + null, + null + ); + + SegmentIdWithShardSpec unusedMaxId = null; + int maxPartitionNum = -1; + for (DataSegment unusedSegment : unusedSegments) { + if (unusedSegment.getInterval().equals(interval)) { + int partitionNum = unusedSegment.getShardSpec().getPartitionNum(); + if (maxPartitionNum < partitionNum) { + maxPartitionNum = partitionNum; + unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment); + } + } + } + return unusedMaxId; + } + @Override public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) { diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 29b7f92e8e5..fc43d7126fe 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3192,4 +3192,59 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments); Assert.assertEquals(0, segmentTimeline.lookup(interval).size()); } + + @Test + public void testSegmentIdShouldNotBeReallocated() throws IOException + { + final SegmentIdWithShardSpec idWithNullTaskAllocator = coordinator.allocatePendingSegment( + DS.WIKI, + "seq", + "0", + Intervals.ETERNITY, + NumberedPartialShardSpec.instance(), + "version", + false, + null + ); + final DataSegment dataSegment0 = createSegment( + idWithNullTaskAllocator.getInterval(), + idWithNullTaskAllocator.getVersion(), + idWithNullTaskAllocator.getShardSpec() + ); + + final SegmentIdWithShardSpec idWithValidTaskAllocator = coordinator.allocatePendingSegment( + DS.WIKI, + "seq", + "1", + Intervals.ETERNITY, + NumberedPartialShardSpec.instance(), + "version", + false, + "taskAllocatorId" + ); + final DataSegment dataSegment1 = createSegment( + idWithValidTaskAllocator.getInterval(), + idWithValidTaskAllocator.getVersion(), + idWithValidTaskAllocator.getShardSpec() + ); + + // Insert pending segments + coordinator.commitSegments(ImmutableSet.of(dataSegment0, dataSegment1), null); + // Clean up pending segments corresponding to the valid task allocator id + coordinator.deletePendingSegmentsForTaskAllocatorId(DS.WIKI, "taskAllocatorId"); + // Mark all segments as unused + coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY); + + final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment( + DS.WIKI, + "seq", + "2", + Intervals.ETERNITY, + NumberedPartialShardSpec.instance(), + "version", + false, + "taskAllocatorId" + ); + Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true)); + } }