Allocate pending segments at latest committed version (#15459)

The segment allocation algorithm reuses an already allocated pending segment if the new allocation request is made for the same parameters:

datasource
sequence name
same interval
same value of skipSegmentLineageCheck (false for batch append, true for streaming append)
same previous segment id (used only when skipSegmentLineageCheck = false)
The above parameters can thus uniquely identify a pending segment (enforced by the UNIQUE constraint on the sequence_name_prev_id_sha1 column in druid_pendingSegments metadata table).

This reuse is done in order to

allow replica tasks (in case of streaming ingestion) to use the same set of segment IDs.
allow re-run of a failed batch task to use the same segment ID and prevent unnecessary allocations
This commit is contained in:
Kashif Faraz 2023-12-14 16:18:39 +05:30 committed by GitHub
parent e43bb74c3a
commit feeb4f0fb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 366 additions and 231 deletions

View File

@ -55,11 +55,11 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -70,9 +70,6 @@ import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class SegmentAllocateActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
@ -403,6 +400,72 @@ public class SegmentAllocateActionTest
assertSameIdentifier(id2, id7);
}
@Test
public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws IOException
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);
final String sequenceName = "sequence_1";
// Allocate segments when there are no committed segments
final SegmentIdWithShardSpec pendingSegmentV01 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
final SegmentIdWithShardSpec pendingSegmentV02 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
assertSameIdentifier(pendingSegmentV01, pendingSegmentV02);
// Commit a segment for version V1
final DataSegment segmentV1
= DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.plusDays(1).toString())
.shardSpec(new LinearShardSpec(0))
.size(100)
.build();
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
Collections.singleton(segmentV1)
);
// Verify that new allocations use version V1
final SegmentIdWithShardSpec pendingSegmentV11 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
final SegmentIdWithShardSpec pendingSegmentV12 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
assertSameIdentifier(pendingSegmentV11, pendingSegmentV12);
Assert.assertEquals(segmentV1.getVersion(), pendingSegmentV11.getVersion());
Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11);
// Commit a segment for version V2 to overshadow V1
final DataSegment segmentV2
= DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.plusDays(2).toString())
.shardSpec(new LinearShardSpec(0))
.size(100)
.build();
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
Collections.singleton(segmentV2)
);
Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion()) > 0);
// Verify that new segment allocations use version V2
final SegmentIdWithShardSpec pendingSegmentV21 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
final SegmentIdWithShardSpec pendingSegmentV22 =
allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null);
assertSameIdentifier(pendingSegmentV21, pendingSegmentV22);
Assert.assertEquals(segmentV2.getVersion(), pendingSegmentV21.getVersion());
Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01);
Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11);
}
@Test
public void testMultipleSequences()
{

View File

@ -645,10 +645,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "version");
Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC());
final Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
handle -> {
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, ImmutableList.of(interval))
.lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s] as it already has [%,d] versions.",
dataSource, interval, existingChunks.size()
);
return null;
}
if (skipSegmentLineageCheck) {
return allocatePendingSegment(
handle,
@ -656,7 +669,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
sequenceName,
allocateInterval,
partialShardSpec,
maxVersion
maxVersion,
existingChunks
);
} else {
return allocatePendingSegmentWithSegmentLineageCheck(
@ -666,7 +680,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
previousSegmentId,
allocateInterval,
partialShardSpec,
maxVersion
maxVersion,
existingChunks
);
}
}
@ -803,26 +818,32 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Nullable final String previousSegmentId,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion
final String maxVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
),
final String sql = StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
);
final Query<Map<String, Object>> query
= handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull);
final String usedSegmentVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion();
final CheckExistingSegmentIdResult result = findExistingPendingSegment(
query,
interval,
sequenceName,
previousSegmentIdNotNull,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("sequence_prev_id", previousSegmentIdNotNull)
usedSegmentVersion
);
if (result.found) {
@ -835,7 +856,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
dataSource,
interval,
partialShardSpec,
maxVersion
maxVersion,
existingChunks
);
if (newIdentifier == null) {
return null;
@ -854,6 +876,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
.hash()
.asBytes()
);
@ -878,11 +902,26 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final List<SegmentCreateRequest> requests
) throws IOException
{
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
.lookup(interval);
if (existingChunks.size() > 1) {
log.warn(
"Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.",
dataSource, interval, existingChunks.size()
);
return Collections.emptyMap();
}
final String existingVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion();
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> existingSegmentIds;
if (skipSegmentLineageCheck) {
existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests);
existingSegmentIds =
getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, existingVersion, requests);
} else {
existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests);
existingSegmentIds =
getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, existingVersion, requests);
}
// For every request see if a segment id already exists
@ -901,8 +940,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
// For each of the remaining requests, create a new segment
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments);
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = createNewSegments(
handle,
dataSource,
interval,
skipSegmentLineageCheck,
existingChunks,
requestsForNewSegments
);
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
@ -925,14 +970,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@SuppressWarnings("UnstableApiUsage")
private String getSequenceNameAndPrevIdSha(
SegmentCreateRequest request,
Interval interval,
SegmentIdWithShardSpec pendingSegmentId,
boolean skipSegmentLineageCheck
)
{
final Hasher hasher = Hashing.sha1().newHasher()
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
.putByte((byte) 0xff);
if (skipSegmentLineageCheck) {
final Interval interval = pendingSegmentId.getInterval();
hasher
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis());
@ -941,6 +988,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
}
hasher.putByte((byte) 0xff);
hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
@ -951,28 +1001,32 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final String sequenceName,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String maxVersion
final String maxVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
),
final String sql = StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
);
final Query<Map<String, Object>> query
= handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
final CheckExistingSegmentIdResult result = findExistingPendingSegment(
query,
interval,
sequenceName,
null,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("start", interval.getStart().toString()),
Pair.of("end", interval.getEnd().toString())
existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion()
);
if (result.found) {
@ -984,7 +1038,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
dataSource,
interval,
partialShardSpec,
maxVersion
maxVersion,
existingChunks
);
if (newIdentifier == null) {
return null;
@ -1004,6 +1059,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.putByte((byte) 0xff)
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis())
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
.hash()
.asBytes()
);
@ -1011,7 +1068,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// always insert empty previous sequence id
insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName);
log.info(
"Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].",
newIdentifier, dataSource, sequenceName, interval
);
return newIdentifier;
}
@ -1023,6 +1083,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Handle handle,
String dataSource,
Interval interval,
String usedSegmentVersion,
List<SegmentCreateRequest> requests
) throws IOException
{
@ -1052,7 +1113,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec segmentId =
jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class);
sequenceToSegmentId.put(record.getSequenceName(), segmentId);
// Consider only the pending segments allocated for the latest used segment version
if (usedSegmentVersion == null || segmentId.getVersion().equals(usedSegmentVersion)) {
sequenceToSegmentId.put(record.getSequenceName(), segmentId);
}
}
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<>();
@ -1071,6 +1136,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Handle handle,
String dataSource,
Interval interval,
String usedSegmentVersion,
List<SegmentCreateRequest> requests
) throws IOException
{
@ -1090,14 +1156,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<>();
for (SegmentCreateRequest request : requests) {
CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
CheckExistingSegmentIdResult result = findExistingPendingSegment(
handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("sequence_name", request.getSequenceName())
.bind("sequence_prev_id", request.getPreviousSegmentId()),
interval,
request.getSequenceName(),
request.getPreviousSegmentId()
request.getPreviousSegmentId(),
usedSegmentVersion
);
requestToResult.put(request, result);
}
@ -1105,50 +1172,43 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return requestToResult;
}
private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
private CheckExistingSegmentIdResult findExistingPendingSegment(
final Query<Map<String, Object>> query,
final Interval interval,
final String sequenceName,
final @Nullable String previousSegmentId,
final Pair<String, String>... queryVars
final @Nullable String usedSegmentVersion
) throws IOException
{
Query<Map<String, Object>> boundQuery = query;
for (Pair<String, String> var : queryVars) {
boundQuery = boundQuery.bind(var.lhs, var.rhs);
}
final List<byte[]> existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list();
if (existingBytes.isEmpty()) {
final List<byte[]> records = query.map(ByteArrayMapper.FIRST).list();
if (records.isEmpty()) {
return new CheckExistingSegmentIdResult(false, null);
} else {
final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdWithShardSpec.class
);
}
if (existingIdentifier.getInterval().isEqual(interval)) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier,
sequenceName,
previousSegmentId
);
for (byte[] record : records) {
final SegmentIdWithShardSpec pendingSegment
= jsonMapper.readValue(record, SegmentIdWithShardSpec.class);
return new CheckExistingSegmentIdResult(true, existingIdentifier);
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
previousSegmentId,
interval
);
return new CheckExistingSegmentIdResult(true, null);
// Consider only pending segments matching the expected version
if (usedSegmentVersion == null || pendingSegment.getVersion().equals(usedSegmentVersion)) {
if (pendingSegment.getInterval().isEqual(interval)) {
log.info(
"Found existing pending segment[%s] for sequence[%s], previous segment[%s], version[%s] in DB",
pendingSegment, sequenceName, previousSegmentId, usedSegmentVersion
);
return new CheckExistingSegmentIdResult(true, pendingSegment);
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s], previous segment[%s] in DB"
+ " as it does not match requested interval[%s], version[%s].",
pendingSegment, sequenceName, previousSegmentId, interval, usedSegmentVersion
);
return new CheckExistingSegmentIdResult(true, null);
}
}
}
return new CheckExistingSegmentIdResult(false, null);
}
private static class CheckExistingSegmentIdResult
@ -1164,6 +1224,52 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
private static class UniqueAllocateRequest
{
private final Interval interval;
private final String previousSegmentId;
private final String sequenceName;
private final boolean skipSegmentLineageCheck;
private final int hashCode;
public UniqueAllocateRequest(
Interval interval,
SegmentCreateRequest request,
boolean skipSegmentLineageCheck
)
{
this.interval = interval;
this.sequenceName = request.getSequenceName();
this.previousSegmentId = request.getPreviousSegmentId();
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, skipSegmentLineageCheck);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UniqueAllocateRequest that = (UniqueAllocateRequest) o;
return skipSegmentLineageCheck == that.skipSegmentLineageCheck
&& Objects.equals(interval, that.interval)
&& Objects.equals(sequenceName, that.sequenceName)
&& Objects.equals(previousSegmentId, that.previousSegmentId);
}
@Override
public int hashCode()
{
return hashCode;
}
}
private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@ -1264,7 +1370,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("sequence_prev_id", request.getPreviousSegmentId())
.bind(
"sequence_name_prev_id_sha1",
getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck)
getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck)
)
.bind("payload", jsonMapper.writeValueAsBytes(segmentId));
}
@ -1480,6 +1586,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<TimelineObjectHolder<String, DataSegment>> existingChunks,
List<SegmentCreateRequest> requests
) throws IOException
{
@ -1487,22 +1594,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return Collections.emptyMap();
}
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
.lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.",
dataSource,
interval,
existingChunks.size()
);
return Collections.emptyMap();
}
// Shard spec of any of the requests (as they are all compatible) can be used to
// identify existing shard specs that share partition space with the requested ones.
final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec();
@ -1542,15 +1633,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet());
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>();
final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new HashMap<>();
final Map<UniqueAllocateRequest, SegmentIdWithShardSpec> uniqueRequestToSegment = new HashMap<>();
for (SegmentCreateRequest request : requests) {
// Check if the required segment has already been created in this batch
final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck);
final UniqueAllocateRequest uniqueRequest =
new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck);
final SegmentIdWithShardSpec createdSegment;
if (sequenceHashToSegment.containsKey(sequenceHash)) {
createdSegment = sequenceHashToSegment.get(sequenceHash);
if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
createdSegment = uniqueRequestToSegment.get(uniqueRequest);
} else {
createdSegment = createNewSegment(
request,
@ -1564,8 +1656,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// Add to pendingSegments to consider for partitionId
if (createdSegment != null) {
pendingSegments.add(createdSegment);
sequenceHashToSegment.put(sequenceHash, createdSegment);
log.info("Created new segment [%s]", createdSegment);
uniqueRequestToSegment.put(uniqueRequest, createdSegment);
log.info("Created new segment[%s]", createdSegment);
}
}
@ -1574,7 +1666,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size());
log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size());
return createdSegments;
}
@ -1694,140 +1786,122 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final String dataSource,
final Interval interval,
final PartialShardSpec partialShardSpec,
final String existingVersion
final String existingVersion,
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
) throws IOException
{
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.",
dataSource,
interval,
existingChunks.size()
);
return null;
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
@Nullable
final String versionOfExistingChunk;
if (existingChunks.isEmpty()) {
versionOfExistingChunk = null;
} else {
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
versionOfExistingChunk = existingHolder.getVersion();
@Nullable
final String versionOfExistingChunk;
if (existingChunks.isEmpty()) {
versionOfExistingChunk = null;
} else {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
versionOfExistingChunk = existingHolder.getVersion();
// Don't use the stream API for performance.
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the shardSpec which shares the same partition space with the given
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
// Don't use the stream API for performance.
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the shardSpec which shares the same partition space with the given
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
// Fetch the pending segments for this interval to determine max partitionId
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()
// Fetch the pending segments for this interval to determine max partitionId
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()
);
if (committedMaxId != null) {
pendings.add(committedMaxId);
}
// If there is an existing chunk, find the max id with the same version as the existing chunk.
// There may still be a pending segment with a higher version (but no corresponding used segments)
// which may generate a clash with an existing segment once the new id is generated
final SegmentIdWithShardSpec overallMaxId;
overallMaxId = pendings.stream()
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.filter(id -> versionOfExistingChunk == null
|| id.getVersion().equals(versionOfExistingChunk))
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
.thenComparing(id -> id.getShardSpec().getPartitionNum()))
.orElse(null);
// Determine the version of the new segment
final String newSegmentVersion;
if (versionOfExistingChunk != null) {
newSegmentVersion = versionOfExistingChunk;
} else if (overallMaxId != null) {
newSegmentVersion = overallMaxId.getVersion();
} else {
// this is the first segment for this interval
newSegmentVersion = null;
}
if (overallMaxId == null) {
// When appending segments, null overallMaxId means that we are allocating the very initial
// segment for this time chunk.
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. Since the core partitions set is not determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
if (committedMaxId != null) {
pendings.add(committedMaxId);
}
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
existingVersion,
overallMaxId
);
return null;
} else if (committedMaxId != null
&& committedMaxId.getShardSpec().getNumCorePartitions()
== SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
committedMaxId,
committedMaxId.getShardSpec()
);
return null;
} else {
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
// If there is an existing chunk, find the max id with the same version as the existing chunk.
// There may still be a pending segment with a higher version (but no corresponding used segments)
// which may generate a clash with an existing segment once the new id is generated
final SegmentIdWithShardSpec overallMaxId;
overallMaxId = pendings.stream()
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.filter(id -> versionOfExistingChunk == null
|| id.getVersion().equals(versionOfExistingChunk))
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
.thenComparing(id -> id.getShardSpec().getPartitionNum()))
.orElse(null);
// Determine the version of the new segment
final String newSegmentVersion;
if (versionOfExistingChunk != null) {
newSegmentVersion = versionOfExistingChunk;
} else if (overallMaxId != null) {
newSegmentVersion = overallMaxId.getVersion();
} else {
// this is the first segment for this interval
newSegmentVersion = null;
}
if (overallMaxId == null) {
// When appending segments, null overallMaxId means that we are allocating the very initial
// segment for this time chunk.
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. Since the core partitions set is not determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
existingVersion,
overallMaxId
);
return null;
} else if (committedMaxId != null
&& committedMaxId.getShardSpec().getNumCorePartitions()
== SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
committedMaxId,
committedMaxId.getShardSpec()
);
return null;
} else {
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
return new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
partialShardSpec.complete(
jsonMapper,
overallMaxId.getShardSpec().getPartitionNum() + 1,
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
}
return new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
partialShardSpec.complete(
jsonMapper,
overallMaxId.getShardSpec().getPartitionNum() + 1,
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
}
}

View File

@ -2078,7 +2078,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
* - verify that the id for segment5 is correct
* - Later, after the above was dropped, another segment on same interval was created by the stream but this
* time there was an integrity violation in the pending segments table because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
* method returned a segment id that already existed in the pending segments table
*/
@Test
@ -2178,7 +2177,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
// Since all core partitions have been dropped
Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions());
}
/**