Do not allocate ids conflicting with existing segment ids (#16380)

* Do not allocate ids conflicting with existing segment ids

* Parameterized tests

* Add doc and retain test for coverage
This commit is contained in:
AmatyaAvadhanula 2024-05-03 19:09:48 +05:30 committed by GitHub
parent b16401323b
commit 5fae20d287
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 174 additions and 32 deletions

View File

@ -29,6 +29,8 @@ import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -1062,6 +1064,45 @@ public class SegmentAllocateActionTest
Assert.assertEquals(Duration.ofDays(1).toMillis(), id2.getInterval().toDurationMillis());
}
@Test
public void testSegmentIdMustNotBeReused() throws IOException
{
final IndexerMetadataStorageCoordinator coordinator = taskActionTestKit.getMetadataStorageCoordinator();
final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox();
final Task task0 = NoopTask.ofPriority(25);
lockbox.add(task0);
final NoopTask task1 = NoopTask.ofPriority(50);
lockbox.add(task1);
// Allocate and commit for older task task0
final SegmentIdWithShardSpec id0 =
allocate(task0, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "0");
final DataSegment dataSegment0 = getSegmentForIdentifier(id0);
coordinator.commitSegments(ImmutableSet.of(dataSegment0), null);
lockbox.unlock(task0, Intervals.ETERNITY);
// Allocate and commit for newer task task1. Pending segments are cleaned up
final SegmentIdWithShardSpec id1 =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "1");
final DataSegment dataSegment1 = getSegmentForIdentifier(id1);
final SegmentIdWithShardSpec id2 =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "2");
final DataSegment dataSegment2 = getSegmentForIdentifier(id2);
coordinator.commitSegments(ImmutableSet.of(dataSegment1, dataSegment2), null);
// Clean up pending segments corresponding to the last pending segment
coordinator.deletePendingSegmentsForTaskAllocatorId(task1.getDataSource(), task1.getTaskAllocatorId());
// Drop all segments
coordinator.markSegmentsAsUnusedWithinInterval(task0.getDataSource(), Intervals.ETERNITY);
// Allocate another id and ensure that it doesn't exist in the druid_segments table
final SegmentIdWithShardSpec theId =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "3");
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
lockbox.unlock(task1, Intervals.ETERNITY);
}
private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
@ -1123,4 +1164,15 @@ public class SegmentAllocateActionTest
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec());
}
private DataSegment getSegmentForIdentifier(SegmentIdWithShardSpec identifier)
{
return DataSegment.builder()
.dataSource(identifier.getDataSource())
.interval(identifier.getInterval())
.version(identifier.getVersion())
.shardSpec(identifier.getShardSpec())
.size(100)
.build();
}
}

View File

@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
@ -1013,33 +1012,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return allocatedSegmentIds;
}
@SuppressWarnings("UnstableApiUsage")
private String getSequenceNameAndPrevIdSha(
SegmentCreateRequest request,
SegmentIdWithShardSpec pendingSegmentId,
boolean skipSegmentLineageCheck
)
{
final Hasher hasher = Hashing.sha1().newHasher()
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
.putByte((byte) 0xff);
if (skipSegmentLineageCheck) {
final Interval interval = pendingSegmentId.getInterval();
hasher
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis());
} else {
hasher
.putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
}
hasher.putByte((byte) 0xff);
hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
@Nullable
private SegmentIdWithShardSpec allocatePendingSegment(
final Handle handle,
@ -1727,7 +1699,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
@ -1739,7 +1710,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
)
);
return new PendingSegmentRecord(
pendingSegmentId,
getTrueAllocatedId(pendingSegmentId),
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getUpgradedFromSegmentId(),
@ -1875,8 +1846,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
return new SegmentIdWithShardSpec(
final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@ -1886,9 +1856,74 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
return getTrueAllocatedId(allocatedId);
}
}
/**
* Verifies that the allocated id doesn't already exist in the druid segments table.
* If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval
* Otherwise, use the same id.
* @param allocatedId The segment allcoted on the basis of used and pending segments
* @return a segment id that isn't already used by other unused segments
*/
private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocatedId)
{
// Check if there is a conflict with an existing entry in the segments table
if (retrieveSegmentForId(allocatedId.asSegmentId().toString(), true) == null) {
return allocatedId;
}
// If yes, try to compute allocated partition num using the max unused segment shard spec
SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
);
// No unused segment. Just return the allocated id
if (unusedMaxId == null) {
return allocatedId;
}
int maxPartitionNum = Math.max(
allocatedId.getShardSpec().getPartitionNum(),
unusedMaxId.getShardSpec().getPartitionNum() + 1
);
return new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion(),
new NumberedShardSpec(
maxPartitionNum,
allocatedId.getShardSpec().getNumCorePartitions()
)
);
}
private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version)
{
List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
datasource,
interval,
ImmutableList.of(version),
null,
null
);
SegmentIdWithShardSpec unusedMaxId = null;
int maxPartitionNum = -1;
for (DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getInterval().equals(interval)) {
int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
if (maxPartitionNum < partitionNum) {
maxPartitionNum = partitionNum;
unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
}
}
}
return unusedMaxId;
}
@Override
public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval)
{

View File

@ -3192,4 +3192,59 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
}
@Test
public void testSegmentIdShouldNotBeReallocated() throws IOException
{
final SegmentIdWithShardSpec idWithNullTaskAllocator = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
"0",
Intervals.ETERNITY,
NumberedPartialShardSpec.instance(),
"version",
false,
null
);
final DataSegment dataSegment0 = createSegment(
idWithNullTaskAllocator.getInterval(),
idWithNullTaskAllocator.getVersion(),
idWithNullTaskAllocator.getShardSpec()
);
final SegmentIdWithShardSpec idWithValidTaskAllocator = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
"1",
Intervals.ETERNITY,
NumberedPartialShardSpec.instance(),
"version",
false,
"taskAllocatorId"
);
final DataSegment dataSegment1 = createSegment(
idWithValidTaskAllocator.getInterval(),
idWithValidTaskAllocator.getVersion(),
idWithValidTaskAllocator.getShardSpec()
);
// Insert pending segments
coordinator.commitSegments(ImmutableSet.of(dataSegment0, dataSegment1), null);
// Clean up pending segments corresponding to the valid task allocator id
coordinator.deletePendingSegmentsForTaskAllocatorId(DS.WIKI, "taskAllocatorId");
// Mark all segments as unused
coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY);
final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
"2",
Intervals.ETERNITY,
NumberedPartialShardSpec.instance(),
"version",
false,
"taskAllocatorId"
);
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
}
}