diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 4887c907274..c5081f9a44e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -253,13 +253,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return numSegmentsMarkedUnused; } - private List getPendingSegmentsForIntervalWithHandle( + private Set getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, final Interval interval ) throws IOException { - final List identifiers = new ArrayList<>(); + final Set identifiers = new HashSet<>(); final ResultIterator dbSegments = handle.createQuery( @@ -843,15 +843,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .execute(); } + /** + * This function creates a new segment for the given datasource/interval/etc. A critical + * aspect of the creation is to make sure that the new version & new partition number will make + * sense given the existing segments & pending segments also very important is to avoid + * clashes with existing pending & used/unused segments. + * @param handle Database handle + * @param dataSource datasource for the new segment + * @param interval interval for the new segment + * @param partialShardSpec Shard spec info minus segment id stuff + * @param existingVersion Version of segments in interval, used to compute the version of the very first segment in + * interval + * @return + * @throws IOException + */ @Nullable private SegmentIdWithShardSpec createNewSegment( final Handle handle, final String dataSource, final Interval interval, final PartialShardSpec partialShardSpec, - final String maxVersion + final String existingVersion ) throws IOException { + // Get the time chunk and associated data segments for the given interval, if any final List> existingChunks = getTimelineForIntervalsWithHandle( handle, dataSource, @@ -884,66 +899,94 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor // See PartitionIds. .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { // Don't use the stream API for performance. + // Note that this will compute the max id of existing, visible, data segments in the time chunk: if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { maxId = SegmentIdWithShardSpec.fromDataSegment(segment); } } } - final List pendings = getPendingSegmentsForIntervalWithHandle( + // Get the version of the existing chunk, we might need it in some of the cases below + // to compute the new identifier's version + @Nullable + final String versionOfExistingChunk; + if (!existingChunks.isEmpty()) { + // remember only one chunk possible for given interval so get the first & only one + versionOfExistingChunk = existingChunks.get(0).getVersion(); + } else { + versionOfExistingChunk = null; + } + + // next, we need to enrich the maxId computed before with the information of the pending segments + // it is possible that a pending segment has a higher id in which case we need that, it will work, + // and it will avoid clashes when inserting the new pending segment later in the caller of this method + final Set pendings = getPendingSegmentsForIntervalWithHandle( handle, dataSource, interval ); - + // Make sure we add the maxId we obtained from the segments table: if (maxId != null) { pendings.add(maxId); } - + // Now compute the maxId with all the information: pendings + segments: + // The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk + // in the case that there may be a pending segment with a higher version but no corresponding used segments + // which may generate a clash with an existing segment once the new id is generated maxId = pendings.stream() .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) + .filter(id -> versionOfExistingChunk == null ? true : id.getVersion().equals(versionOfExistingChunk)) .max((id1, id2) -> { final int versionCompare = id1.getVersion().compareTo(id2.getVersion()); if (versionCompare != 0) { return versionCompare; } else { - return Integer.compare(id1.getShardSpec().getPartitionNum(), id2.getShardSpec().getPartitionNum()); + return Integer.compare( + id1.getShardSpec().getPartitionNum(), + id2.getShardSpec().getPartitionNum() + ); } }) .orElse(null); - // Find the major version of existing segments - @Nullable final String versionOfExistingChunks; - if (!existingChunks.isEmpty()) { - versionOfExistingChunks = existingChunks.get(0).getVersion(); - } else if (!pendings.isEmpty()) { - versionOfExistingChunks = pendings.get(0).getVersion(); + // The following code attempts to compute the new version, if this + // new version is not null at the end of next block then it will be + // used as the new version in the case for initial or appended segment + final String newSegmentVersion; + if (versionOfExistingChunk != null) { + // segment version overrides, so pick that now that we know it exists + newSegmentVersion = versionOfExistingChunk; + } else if (!pendings.isEmpty() && maxId != null) { + // there is no visible segments in the time chunk, so pick the maxId of pendings, as computed above + newSegmentVersion = maxId.getVersion(); } else { - versionOfExistingChunks = null; + // no segments, no pendings, so this must be the very first segment created for this interval + newSegmentVersion = null; } if (maxId == null) { + // When appending segments, null maxId means that we are allocating the very initial + // segment for this time chunk. // This code is executed when the Overlord coordinates segment allocation, which is either you append segments - // or you use segment lock. When appending segments, null maxId means that we are allocating the very initial - // segment for this time chunk. Since the core partitions set is not determined for appended segments, we set + // or you use segment lock. Since the core partitions set is not determined for appended segments, we set // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the // OvershadowableManager handles the atomic segment update. final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID : PartitionIds.ROOT_GEN_START_PARTITION_ID; - String version = versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks; + String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; return new SegmentIdWithShardSpec( dataSource, interval, version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - } else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(maxVersion) > 0) { + } else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(existingVersion) > 0) { log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", dataSource, interval, - maxVersion, + existingVersion, maxId ); return null; @@ -958,7 +1001,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return new SegmentIdWithShardSpec( dataSource, maxId.getInterval(), - Preconditions.checkNotNull(versionOfExistingChunks, "versionOfExistingChunks"), + Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), partialShardSpec.complete( jsonMapper, maxId.getShardSpec().getPartitionNum() + 1, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index bdd572cc1af..9deb6574e9b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -383,7 +383,7 @@ public abstract class BaseAppenderatorDriver implements Closeable * @param sequenceName sequenceName for this row's segment * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * if {@param allowIncrementalPersists} is set to false then this will not be used - * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. + * @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence. * Should be set to false if replica tasks would index events in same order * @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period * threshold is hit diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java index 1bffd81a85c..644efa5a496 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentAllocator.java @@ -37,7 +37,7 @@ public interface SegmentAllocator * When skipSegmentLineageCheck is false, this can be null if it is the first call * for the same sequenceName. * When skipSegmentLineageCheck is true, this will be ignored. - * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. + * @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence. * Should be set to false if replica tasks would index events in same order * * @return the pending segment identifier, or null if it was impossible to allocate a new segment diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 53212e29ac9..e4a84cb4123 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -167,8 +167,10 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver * @param sequenceName sequenceName for this row's segment * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * if {@param allowIncrementalPersists} is set to false then this will not be used - * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. - * Should be set to false if replica tasks would index events in same order + * @param skipSegmentLineageCheck Should be set {@code false} to perform lineage validation using previousSegmentId for this sequence. + * Note that for Kafka Streams we should disable this check and set this parameter to + * {@code true}. + * if {@code true}, skips, does not enforce, lineage validation. * @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period * threshold is hit * diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 135c8936b6b..3f9431c289f 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -275,7 +275,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest private void markAllSegmentsUnused() { - for (final DataSegment segment : SEGMENTS) { + markAllSegmentsUnused(SEGMENTS); + } + + private void markAllSegmentsUnused(Set segments) + { + for (final DataSegment segment : segments) { Assert.assertEquals( 1, (int) derbyConnector.getDBI().withHandle( @@ -296,6 +301,45 @@ public class IndexerSQLMetadataStorageCoordinatorTest } } + private void markAllSegmentsUsed(Set segments) + { + for (final DataSegment segment : segments) { + Assert.assertEquals( + 1, + (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) + { + String request = StringUtils.format( + "UPDATE %s SET used = true WHERE id = :id", + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() + ); + return handle.createStatement(request).bind("id", segment.getId().toString()).execute(); + } + } + ) + ); + } + } + + private List retrievePendingSegmentIds() + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable(); + return derbyConnector.retryWithHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) + { + return handle.createQuery("SELECT id FROM " + table + " ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } + } + ); + } private List retrieveUsedSegmentIds() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); @@ -313,6 +357,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest ); } + private List retrieveUnusedSegmentIds() + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + return derbyConnector.retryWithHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) + { + return handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } + } + ); + } + + private Boolean insertUsedSegments(Set dataSegments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); @@ -1203,6 +1265,349 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString()); } + /** + * This test simulates an issue detected on the field consisting of the following sequence of events: + * - A kafka stream segment was created on a given interval + * - Later, after the above was published, another segment on same interval was created by the stream + * - Later, after the above was published, another segment on same interval was created by the stream + * - Later a compaction was issued for the three segments above + * - Later, after the above was published, another segment on same interval was created by the stream + * - Later, the compacted segment got dropped due to a drop rule + * - Later, after the above was dropped, another segment on same interval was created by the stream but this + * time there was an integrity violation in the pending segments table because the + * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} + * method returned an segment id that already existed in the pending segments table + */ + @Test + public void testAllocatePendingSegmentAfterDroppingExistingSegment() + { + String maxVersion = "version_newer_newer"; + + // simulate one load using kafka streaming + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + dataSource, + "seq", + null, + interval, + partialShardSpec, + "version", + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString()); + + // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name) + final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment( + dataSource, + "seq2", + identifier.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString()); + + // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name) + final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment( + dataSource, + "seq3", + identifier1.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString()); + + // now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three): + DataSegment segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "version_new", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + List ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new", ids.get(0)); + + // one more load on same interval: + final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment( + dataSource, + "seq4", + identifier1.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString()); + + // now drop the used segment previously loaded: + markAllSegmentsUnused(ImmutableSet.of(segment)); + + // and final load, this reproduces an issue that could happen with multiple streaming appends, + // followed by a reindex, followed by a drop, and more streaming data coming in for same interval + final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( + dataSource, + "seq5", + identifier1.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString()); + + } + + /** + * Slightly different that the above test but that involves reverted compaction + 1) used segments of version = A, id = 0, 1, 2 + 2) overwrote segments of version = B, id = 0 <= compaction + 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing + 4) pending segment of version = B, id = 1 <= appending new data, aborted + 5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused + 6) used segments of version = A, id = 0, 1, 2 + 7) pending segment of version = B, id = 1 + */ + @Test + public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() + { + String maxVersion = "Z"; + + // 1.0) simulate one append load + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + dataSource, + "seq", + null, + interval, + partialShardSpec, + "A", + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString()); + // Assume it publishes; create its corresponding segment + DataSegment segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "A", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + List ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", ids.get(0)); + + + // 1.1) simulate one more append load (as if previous segment was published, note different sequence name) + final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment( + dataSource, + "seq2", + identifier.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString()); + // Assume it publishes; create its corresponding segment + segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "A", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(1), + 9, + 100 + ); + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", ids.get(1)); + + + // 1.2) simulate one more append load (as if previous segment was published, note different sequence name) + final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment( + dataSource, + "seq3", + identifier1.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString()); + // Assume it publishes; create its corresponding segment + segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "A", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(2), + 9, + 100 + ); + // state so far: + // pendings: A: 0,1,2 + // used segments A: 0,1,2 + // unused segments: + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", ids.get(2)); + + + // 2) + // now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three): + DataSegment compactedSegment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "B", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(compactedSegment))); + ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B", ids.get(3)); + // 3) When overshadowing, segments are still marked as "used" in the segments table + // state so far: + // pendings: A: 0,1,2 + // used segments: A: 0,1,2; B: 0 <- new compacted segment, overshadows previous version A + // unused segment: + + // 4) pending segment of version = B, id = 1 <= appending new data, aborted + final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment( + dataSource, + "seq4", + identifier2.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString()); + // no corresponding segment, pending aborted + // state so far: + // pendings: A: 0,1,2; B:1 (note that B_1 does not make it into segments since its task aborted) + // used segments: A: 0,1,2; B: 0 <- compacted segment, overshadows previous version A + // unused segment: + + // 5) reverted compaction (by marking B_0 as unused) + // Revert compaction a manual metadata update which is basically the following two steps: + markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop compacted segment + // pending: version = A, id = 0,1,2 + // version = B, id = 1 + // + // used segment: version = A, id = 0,1,2 + // unused segment: version = B, id = 0 + List pendings = retrievePendingSegmentIds(); + Assert.assertTrue(pendings.size() == 4); + + List used = retrieveUsedSegmentIds(); + Assert.assertTrue(used.size() == 3); + + List unused = retrieveUnusedSegmentIds(); + Assert.assertTrue(unused.size() == 1); + + // Simulate one more append load + final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( + dataSource, + "seq5", + identifier1.toString(), + interval, + partialShardSpec, + maxVersion, + true + ); + // maxid = B_1 -> new partno = 2 + // versionofexistingchunk=A + // ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2 + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", identifier4.toString()); + // Assume it publishes; create its corresponding segment + segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "A", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(3), + 9, + 100 + ); + // pending: version = A, id = 0,1,2,3 + // version = B, id = 1 + // + // used segment: version = A, id = 0,1,2,3 + // unused segment: version = B, id = 0 + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", ids.get(3)); + + } + + @Test + public void testNoPendingSegmentsAndOneUsedSegment() + { + String maxVersion = "Z"; + + // create one used segment + DataSegment segment = new DataSegment( + "ds", + Intervals.of("2017-01-01T00Z/2017-02-01T00Z"), + "A", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment))); + List ids = retrieveUsedSegmentIds(); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", ids.get(0)); + + + // simulate one aborted append load + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + dataSource, + "seq", + null, + interval, + partialShardSpec, + maxVersion, + true + ); + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString()); + + } + + + @Test public void testDeletePendingSegment() throws InterruptedException {