From feeb4f0fb03fce90e523c7e1c10e71a19478400c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 14 Dec 2023 16:18:39 +0530 Subject: [PATCH] Allocate pending segments at latest committed version (#15459) The segment allocation algorithm reuses an already allocated pending segment if the new allocation request is made for the same parameters: datasource sequence name same interval same value of skipSegmentLineageCheck (false for batch append, true for streaming append) same previous segment id (used only when skipSegmentLineageCheck = false) The above parameters can thus uniquely identify a pending segment (enforced by the UNIQUE constraint on the sequence_name_prev_id_sha1 column in druid_pendingSegments metadata table). This reuse is done in order to allow replica tasks (in case of streaming ingestion) to use the same set of segment IDs. allow re-run of a failed batch task to use the same segment ID and prevent unnecessary allocations --- .../actions/SegmentAllocateActionTest.java | 71 ++- .../IndexerSQLMetadataStorageCoordinator.java | 524 ++++++++++-------- ...exerSQLMetadataStorageCoordinatorTest.java | 2 - 3 files changed, 366 insertions(+), 231 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 13c499e47e2..4ccb8707750 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -55,11 +55,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -70,9 +70,6 @@ import java.util.stream.Collectors; @RunWith(Parameterized.class) public class SegmentAllocateActionTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -403,6 +400,72 @@ public class SegmentAllocateActionTest assertSameIdentifier(id2, id7); } + @Test + public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws IOException + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final String sequenceName = "sequence_1"; + + // Allocate segments when there are no committed segments + final SegmentIdWithShardSpec pendingSegmentV01 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV02 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV01, pendingSegmentV02); + + // Commit a segment for version V1 + final DataSegment segmentV1 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(1).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV1) + ); + + // Verify that new allocations use version V1 + final SegmentIdWithShardSpec pendingSegmentV11 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV12 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + + assertSameIdentifier(pendingSegmentV11, pendingSegmentV12); + Assert.assertEquals(segmentV1.getVersion(), pendingSegmentV11.getVersion()); + + Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11); + + // Commit a segment for version V2 to overshadow V1 + final DataSegment segmentV2 + = DataSegment.builder() + .dataSource(DATA_SOURCE) + .interval(Granularities.HOUR.bucket(PARTY_TIME)) + .version(PARTY_TIME.plusDays(2).toString()) + .shardSpec(new LinearShardSpec(0)) + .size(100) + .build(); + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( + Collections.singleton(segmentV2) + ); + Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion()) > 0); + + // Verify that new segment allocations use version V2 + final SegmentIdWithShardSpec pendingSegmentV21 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + final SegmentIdWithShardSpec pendingSegmentV22 = + allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, sequenceName, null); + assertSameIdentifier(pendingSegmentV21, pendingSegmentV22); + Assert.assertEquals(segmentV2.getVersion(), pendingSegmentV21.getVersion()); + + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01); + Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11); + } + @Test public void testMultipleSequences() { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c62e59c0b25..9e4fc578eda 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -645,10 +645,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "version"); - Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); + final Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( handle -> { + // Get the time chunk and associated data segments for the given interval, if any + final List> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, ImmutableList.of(interval)) + .lookup(interval); + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s] as it already has [%,d] versions.", + dataSource, interval, existingChunks.size() + ); + return null; + } + if (skipSegmentLineageCheck) { return allocatePendingSegment( handle, @@ -656,7 +669,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor sequenceName, allocateInterval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); } else { return allocatePendingSegmentWithSegmentLineageCheck( @@ -666,7 +680,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor previousSegmentId, allocateInterval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); } } @@ -803,26 +818,32 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor @Nullable final String previousSegmentId, final Interval interval, final PartialShardSpec partialShardSpec, - final String maxVersion + final String maxVersion, + final List> existingChunks ) throws IOException { final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; - final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( - handle.createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ), + + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ); + final Query> query + = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull); + + final String usedSegmentVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion(); + final CheckExistingSegmentIdResult result = findExistingPendingSegment( + query, interval, sequenceName, previousSegmentIdNotNull, - Pair.of("dataSource", dataSource), - Pair.of("sequence_name", sequenceName), - Pair.of("sequence_prev_id", previousSegmentIdNotNull) + usedSegmentVersion ); if (result.found) { @@ -835,7 +856,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor dataSource, interval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); if (newIdentifier == null) { return null; @@ -854,6 +876,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .putBytes(StringUtils.toUtf8(sequenceName)) .putByte((byte) 0xff) .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(newIdentifier.getVersion())) .hash() .asBytes() ); @@ -878,11 +902,26 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final List requests ) throws IOException { + // Get the time chunk and associated data segments for the given interval, if any + final List> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) + .lookup(interval); + if (existingChunks.size() > 1) { + log.warn( + "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", + dataSource, interval, existingChunks.size() + ); + return Collections.emptyMap(); + } + + final String existingVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion(); final Map existingSegmentIds; if (skipSegmentLineageCheck) { - existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests); + existingSegmentIds = + getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, existingVersion, requests); } else { - existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests); + existingSegmentIds = + getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, existingVersion, requests); } // For every request see if a segment id already exists @@ -901,8 +940,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } // For each of the remaining requests, create a new segment - final Map createdSegments = - createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments); + final Map createdSegments = createNewSegments( + handle, + dataSource, + interval, + skipSegmentLineageCheck, + existingChunks, + requestsForNewSegments + ); // SELECT -> INSERT can fail due to races; callers must be prepared to retry. // Avoiding ON DUPLICATE KEY since it's not portable. @@ -925,14 +970,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor @SuppressWarnings("UnstableApiUsage") private String getSequenceNameAndPrevIdSha( SegmentCreateRequest request, - Interval interval, + SegmentIdWithShardSpec pendingSegmentId, boolean skipSegmentLineageCheck ) { final Hasher hasher = Hashing.sha1().newHasher() .putBytes(StringUtils.toUtf8(request.getSequenceName())) .putByte((byte) 0xff); + if (skipSegmentLineageCheck) { + final Interval interval = pendingSegmentId.getInterval(); hasher .putLong(interval.getStartMillis()) .putLong(interval.getEndMillis()); @@ -941,6 +988,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId())); } + hasher.putByte((byte) 0xff); + hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion())); + return BaseEncoding.base16().encode(hasher.hash().asBytes()); } @@ -951,28 +1001,32 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final String sequenceName, final Interval interval, final PartialShardSpec partialShardSpec, - final String maxVersion + final String maxVersion, + final List> existingChunks ) throws IOException { - final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( - handle.createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "start = :start AND " - + "%2$send%2$s = :end", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - ) - ), + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "start = :start AND " + + "%2$send%2$s = :end", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ); + final Query> query + = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("sequence_name", sequenceName) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + final CheckExistingSegmentIdResult result = findExistingPendingSegment( + query, interval, sequenceName, null, - Pair.of("dataSource", dataSource), - Pair.of("sequence_name", sequenceName), - Pair.of("start", interval.getStart().toString()), - Pair.of("end", interval.getEnd().toString()) + existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion() ); if (result.found) { @@ -984,7 +1038,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor dataSource, interval, partialShardSpec, - maxVersion + maxVersion, + existingChunks ); if (newIdentifier == null) { return null; @@ -1004,6 +1059,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .putByte((byte) 0xff) .putLong(interval.getStartMillis()) .putLong(interval.getEndMillis()) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(newIdentifier.getVersion())) .hash() .asBytes() ); @@ -1011,7 +1068,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // always insert empty previous sequence id insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); - log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName); + log.info( + "Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].", + newIdentifier, dataSource, sequenceName, interval + ); return newIdentifier; } @@ -1023,6 +1083,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Handle handle, String dataSource, Interval interval, + String usedSegmentVersion, List requests ) throws IOException { @@ -1052,7 +1113,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final PendingSegmentsRecord record = dbSegments.next(); final SegmentIdWithShardSpec segmentId = jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class); - sequenceToSegmentId.put(record.getSequenceName(), segmentId); + + // Consider only the pending segments allocated for the latest used segment version + if (usedSegmentVersion == null || segmentId.getVersion().equals(usedSegmentVersion)) { + sequenceToSegmentId.put(record.getSequenceName(), segmentId); + } } final Map requestToResult = new HashMap<>(); @@ -1071,6 +1136,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Handle handle, String dataSource, Interval interval, + String usedSegmentVersion, List requests ) throws IOException { @@ -1090,14 +1156,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final Map requestToResult = new HashMap<>(); for (SegmentCreateRequest request : requests) { - CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( + CheckExistingSegmentIdResult result = findExistingPendingSegment( handle.createQuery(sql) .bind("dataSource", dataSource) .bind("sequence_name", request.getSequenceName()) .bind("sequence_prev_id", request.getPreviousSegmentId()), interval, request.getSequenceName(), - request.getPreviousSegmentId() + request.getPreviousSegmentId(), + usedSegmentVersion ); requestToResult.put(request, result); } @@ -1105,50 +1172,43 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return requestToResult; } - private CheckExistingSegmentIdResult checkAndGetExistingSegmentId( + private CheckExistingSegmentIdResult findExistingPendingSegment( final Query> query, final Interval interval, final String sequenceName, final @Nullable String previousSegmentId, - final Pair... queryVars + final @Nullable String usedSegmentVersion ) throws IOException { - Query> boundQuery = query; - for (Pair var : queryVars) { - boundQuery = boundQuery.bind(var.lhs, var.rhs); - } - final List existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list(); - - if (existingBytes.isEmpty()) { + final List records = query.map(ByteArrayMapper.FIRST).list(); + if (records.isEmpty()) { return new CheckExistingSegmentIdResult(false, null); - } else { - final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdWithShardSpec.class - ); + } - if (existingIdentifier.getInterval().isEqual(interval)) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier, - sequenceName, - previousSegmentId - ); + for (byte[] record : records) { + final SegmentIdWithShardSpec pendingSegment + = jsonMapper.readValue(record, SegmentIdWithShardSpec.class); - return new CheckExistingSegmentIdResult(true, existingIdentifier); - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier, - sequenceName, - previousSegmentId, - interval - ); - - return new CheckExistingSegmentIdResult(true, null); + // Consider only pending segments matching the expected version + if (usedSegmentVersion == null || pendingSegment.getVersion().equals(usedSegmentVersion)) { + if (pendingSegment.getInterval().isEqual(interval)) { + log.info( + "Found existing pending segment[%s] for sequence[%s], previous segment[%s], version[%s] in DB", + pendingSegment, sequenceName, previousSegmentId, usedSegmentVersion + ); + return new CheckExistingSegmentIdResult(true, pendingSegment); + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s], previous segment[%s] in DB" + + " as it does not match requested interval[%s], version[%s].", + pendingSegment, sequenceName, previousSegmentId, interval, usedSegmentVersion + ); + return new CheckExistingSegmentIdResult(true, null); + } } } + + return new CheckExistingSegmentIdResult(false, null); } private static class CheckExistingSegmentIdResult @@ -1164,6 +1224,52 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } } + private static class UniqueAllocateRequest + { + private final Interval interval; + private final String previousSegmentId; + private final String sequenceName; + private final boolean skipSegmentLineageCheck; + + private final int hashCode; + + public UniqueAllocateRequest( + Interval interval, + SegmentCreateRequest request, + boolean skipSegmentLineageCheck + ) + { + this.interval = interval; + this.sequenceName = request.getSequenceName(); + this.previousSegmentId = request.getPreviousSegmentId(); + this.skipSegmentLineageCheck = skipSegmentLineageCheck; + + this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, skipSegmentLineageCheck); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UniqueAllocateRequest that = (UniqueAllocateRequest) o; + return skipSegmentLineageCheck == that.skipSegmentLineageCheck + && Objects.equals(interval, that.interval) + && Objects.equals(sequenceName, that.sequenceName) + && Objects.equals(previousSegmentId, that.previousSegmentId); + } + + @Override + public int hashCode() + { + return hashCode; + } + } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @@ -1264,7 +1370,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("sequence_prev_id", request.getPreviousSegmentId()) .bind( "sequence_name_prev_id_sha1", - getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck) + getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck) ) .bind("payload", jsonMapper.writeValueAsBytes(segmentId)); } @@ -1480,6 +1586,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor String dataSource, Interval interval, boolean skipSegmentLineageCheck, + List> existingChunks, List requests ) throws IOException { @@ -1487,22 +1594,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return Collections.emptyMap(); } - // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.", - dataSource, - interval, - existingChunks.size() - ); - return Collections.emptyMap(); - } - // Shard spec of any of the requests (as they are all compatible) can be used to // identify existing shard specs that share partition space with the requested ones. final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec(); @@ -1542,15 +1633,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()); final Map createdSegments = new HashMap<>(); - final Map sequenceHashToSegment = new HashMap<>(); + final Map uniqueRequestToSegment = new HashMap<>(); for (SegmentCreateRequest request : requests) { // Check if the required segment has already been created in this batch - final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck); + final UniqueAllocateRequest uniqueRequest = + new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck); final SegmentIdWithShardSpec createdSegment; - if (sequenceHashToSegment.containsKey(sequenceHash)) { - createdSegment = sequenceHashToSegment.get(sequenceHash); + if (uniqueRequestToSegment.containsKey(uniqueRequest)) { + createdSegment = uniqueRequestToSegment.get(uniqueRequest); } else { createdSegment = createNewSegment( request, @@ -1564,8 +1656,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // Add to pendingSegments to consider for partitionId if (createdSegment != null) { pendingSegments.add(createdSegment); - sequenceHashToSegment.put(sequenceHash, createdSegment); - log.info("Created new segment [%s]", createdSegment); + uniqueRequestToSegment.put(uniqueRequest, createdSegment); + log.info("Created new segment[%s]", createdSegment); } } @@ -1574,7 +1666,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } } - log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size()); + log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size()); return createdSegments; } @@ -1694,140 +1786,122 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final String dataSource, final Interval interval, final PartialShardSpec partialShardSpec, - final String existingVersion + final String existingVersion, + final List> existingChunks ) throws IOException { - // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s]: already have [%,d] chunks.", - dataSource, - interval, - existingChunks.size() - ); - return null; + // max partitionId of published data segments which share the same partition space. + SegmentIdWithShardSpec committedMaxId = null; + @Nullable + final String versionOfExistingChunk; + if (existingChunks.isEmpty()) { + versionOfExistingChunk = null; } else { - // max partitionId of published data segments which share the same partition space. - SegmentIdWithShardSpec committedMaxId = null; + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + versionOfExistingChunk = existingHolder.getVersion(); - @Nullable - final String versionOfExistingChunk; - if (existingChunks.isEmpty()) { - versionOfExistingChunk = null; - } else { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - versionOfExistingChunk = existingHolder.getVersion(); - - // Don't use the stream API for performance. - for (DataSegment segment : FluentIterable - .from(existingHolder.getObject()) - .transform(PartitionChunk::getObject) - // Here we check only the segments of the shardSpec which shares the same partition space with the given - // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. - // See PartitionIds. - .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { - if (committedMaxId == null - || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { - committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); - } + // Don't use the stream API for performance. + for (DataSegment segment : FluentIterable + .from(existingHolder.getObject()) + .transform(PartitionChunk::getObject) + // Here we check only the segments of the shardSpec which shares the same partition space with the given + // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. + // See PartitionIds. + .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { + if (committedMaxId == null + || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); } } + } - // Fetch the pending segments for this interval to determine max partitionId - // across all shard specs (published + pending). - // A pending segment having a higher partitionId must also be considered - // to avoid clashes when inserting the pending segment created here. - final Set pendings = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet() + // Fetch the pending segments for this interval to determine max partitionId + // across all shard specs (published + pending). + // A pending segment having a higher partitionId must also be considered + // to avoid clashes when inserting the pending segment created here. + final Set pendings = new HashSet<>( + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet() + ); + if (committedMaxId != null) { + pendings.add(committedMaxId); + } + + // If there is an existing chunk, find the max id with the same version as the existing chunk. + // There may still be a pending segment with a higher version (but no corresponding used segments) + // which may generate a clash with an existing segment once the new id is generated + final SegmentIdWithShardSpec overallMaxId; + overallMaxId = pendings.stream() + .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) + .filter(id -> versionOfExistingChunk == null + || id.getVersion().equals(versionOfExistingChunk)) + .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) + .thenComparing(id -> id.getShardSpec().getPartitionNum())) + .orElse(null); + + + // Determine the version of the new segment + final String newSegmentVersion; + if (versionOfExistingChunk != null) { + newSegmentVersion = versionOfExistingChunk; + } else if (overallMaxId != null) { + newSegmentVersion = overallMaxId.getVersion(); + } else { + // this is the first segment for this interval + newSegmentVersion = null; + } + + if (overallMaxId == null) { + // When appending segments, null overallMaxId means that we are allocating the very initial + // segment for this time chunk. + // This code is executed when the Overlord coordinates segment allocation, which is either you append segments + // or you use segment lock. Since the core partitions set is not determined for appended segments, we set + // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the + // OvershadowableManager handles the atomic segment update. + final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() + ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + : PartitionIds.ROOT_GEN_START_PARTITION_ID; + String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; + return new SegmentIdWithShardSpec( + dataSource, + interval, + version, + partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - if (committedMaxId != null) { - pendings.add(committedMaxId); - } + } else if (!overallMaxId.getInterval().equals(interval)) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + existingVersion, + overallMaxId + ); + return null; + } else if (committedMaxId != null + && committedMaxId.getShardSpec().getNumCorePartitions() + == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { + log.warn( + "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", + committedMaxId, + committedMaxId.getShardSpec() + ); + return null; + } else { + // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. + // When the core partitions have been dropped, using pending segments may lead to an incorrect state + // where the chunk is believed to have core partitions and queries results are incorrect. - // If there is an existing chunk, find the max id with the same version as the existing chunk. - // There may still be a pending segment with a higher version (but no corresponding used segments) - // which may generate a clash with an existing segment once the new id is generated - final SegmentIdWithShardSpec overallMaxId; - overallMaxId = pendings.stream() - .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) - .filter(id -> versionOfExistingChunk == null - || id.getVersion().equals(versionOfExistingChunk)) - .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) - .thenComparing(id -> id.getShardSpec().getPartitionNum())) - .orElse(null); - - - // Determine the version of the new segment - final String newSegmentVersion; - if (versionOfExistingChunk != null) { - newSegmentVersion = versionOfExistingChunk; - } else if (overallMaxId != null) { - newSegmentVersion = overallMaxId.getVersion(); - } else { - // this is the first segment for this interval - newSegmentVersion = null; - } - - if (overallMaxId == null) { - // When appending segments, null overallMaxId means that we are allocating the very initial - // segment for this time chunk. - // This code is executed when the Overlord coordinates segment allocation, which is either you append segments - // or you use segment lock. Since the core partitions set is not determined for appended segments, we set - // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the - // OvershadowableManager handles the atomic segment update. - final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() - ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID - : PartitionIds.ROOT_GEN_START_PARTITION_ID; - String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; - return new SegmentIdWithShardSpec( - dataSource, - interval, - version, - partialShardSpec.complete(jsonMapper, newPartitionId, 0) - ); - } else if (!overallMaxId.getInterval().equals(interval)) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - existingVersion, - overallMaxId - ); - return null; - } else if (committedMaxId != null - && committedMaxId.getShardSpec().getNumCorePartitions() - == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { - log.warn( - "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", - committedMaxId, - committedMaxId.getShardSpec() - ); - return null; - } else { - // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. - // When the core partitions have been dropped, using pending segments may lead to an incorrect state - // where the chunk is believed to have core partitions and queries results are incorrect. - - return new SegmentIdWithShardSpec( - dataSource, - interval, - Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), - partialShardSpec.complete( - jsonMapper, - overallMaxId.getShardSpec().getPartitionNum() + 1, - committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() - ) - ); - } + return new SegmentIdWithShardSpec( + dataSource, + interval, + Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), + partialShardSpec.complete( + jsonMapper, + overallMaxId.getShardSpec().getPartitionNum() + 1, + committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() + ) + ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9e977dec3e8..4ee72e74f92 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2078,7 +2078,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest * - verify that the id for segment5 is correct * - Later, after the above was dropped, another segment on same interval was created by the stream but this * time there was an integrity violation in the pending segments table because the - * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} * method returned a segment id that already existed in the pending segments table */ @Test @@ -2178,7 +2177,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString()); // Since all core partitions have been dropped Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions()); - } /**