diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java index 62dea7b0fa9..a4b15458997 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java @@ -29,7 +29,9 @@ import java.util.Map; import java.util.Objects; /** - * A shard spec to represent tombstones. Its partition number is always zero and contains 1 core partitions. + * A shard spec to represent tombstones. Its partition number is always zero and contains zero core partitions as it + * contains no data. This allows other shard types appending to an existing {@link TombstoneShardSpec} to exist independently + * in the timeline even if the {@link TombstoneShardSpec} is dropped. */ public class TombstoneShardSpec implements ShardSpec { @@ -69,7 +71,7 @@ public class TombstoneShardSpec implements ShardSpec @JsonProperty("partitions") public int getNumCorePartitions() { - return 1; + return 0; } @Override @@ -88,8 +90,8 @@ public class TombstoneShardSpec implements ShardSpec public String toString() { return "TombstoneShardSpec{" + - "partitionNum=" + 0 + - ", partitions=" + 1 + + "partitionNum=" + getPartitionNum() + + ", partitions=" + getNumCorePartitions() + '}'; } diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java index 2cdfcdf937a..0a08869dbfc 100644 --- a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java @@ -60,7 +60,7 @@ public class TombstoneShardSpecTest @Test public void getNumCorePartitions() { - assertEquals(1, tombstoneShardSpec.getNumCorePartitions()); + assertEquals(0, tombstoneShardSpec.getNumCorePartitions()); } @Test diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 888c1006717..5d76296d67b 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -42,6 +43,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -55,6 +57,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionIds; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; +import org.apache.druid.timeline.partition.TombstoneShardSpec; import org.assertj.core.api.Assertions; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -1965,7 +1968,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest * - Later, after the above was dropped, another segment on same interval was created by the stream but this * time there was an integrity violation in the pending segments table because the * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} - * method returned an segment id that already existed in the pending segments table + * method returned a segment id that already existed in the pending segments table */ @Test public void testAllocatePendingSegmentAfterDroppingExistingSegment() @@ -2907,6 +2910,110 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals(3, resultForEternity.size()); } + @Test + public void testTimelineVisibilityWith0CorePartitionTombstone() throws IOException + { + final Interval interval = Intervals.of("2020/2021"); + // Create and commit a tombstone segment + final DataSegment tombstoneSegment = createSegment( + interval, + "version", + new TombstoneShardSpec() + ); + + final Set tombstones = new HashSet<>(Collections.singleton(tombstoneSegment)); + Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones)); + + // Allocate and commit a data segment by appending to the same interval + final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + DS.WIKI, + "seq", + tombstoneSegment.getVersion(), + interval, + NumberedPartialShardSpec.instance(), + "version", + false + ); + + Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString()); + Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions()); + + final DataSegment dataSegment = createSegment( + interval, + "version", + identifier.getShardSpec() + ); + final Set dataSegments = new HashSet<>(Collections.singleton(dataSegment)); + Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments)); + + // Mark the tombstone as unused + markAllSegmentsUnused(tombstones); + + final Collection allUsedSegments = coordinator.retrieveAllUsedSegments( + DS.WIKI, + Segments.ONLY_VISIBLE + ); + + // The appended data segment will still be visible in the timeline since the + // tombstone contains 0 core partitions + SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments); + Assert.assertEquals(1, segmentTimeline.lookup(interval).size()); + Assert.assertEquals(dataSegment, segmentTimeline.lookup(interval).get(0).getObject().getChunk(1).getObject()); + } + + @Test + public void testTimelineWith1CorePartitionTombstone() throws IOException + { + // Register the old generation tombstone spec for this test. + mapper.registerSubtypes(TombstoneShardSpecWith1CorePartition.class); + + final Interval interval = Intervals.of("2020/2021"); + // Create and commit an old generation tombstone with 1 core partition + final DataSegment tombstoneSegment = createSegment( + interval, + "version", + new TombstoneShardSpecWith1CorePartition() + ); + + final Set tombstones = new HashSet<>(Collections.singleton(tombstoneSegment)); + Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones)); + + // Allocate and commit a data segment by appending to the same interval + final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + DS.WIKI, + "seq", + tombstoneSegment.getVersion(), + interval, + NumberedPartialShardSpec.instance(), + "version", + false + ); + + Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString()); + Assert.assertEquals(1, identifier.getShardSpec().getNumCorePartitions()); + + final DataSegment dataSegment = createSegment( + interval, + "version", + identifier.getShardSpec() + ); + final Set dataSegments = new HashSet<>(Collections.singleton(dataSegment)); + Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments)); + + // Mark the tombstone as unused + markAllSegmentsUnused(tombstones); + + final Collection allUsedSegments = coordinator.retrieveAllUsedSegments( + DS.WIKI, + Segments.ONLY_VISIBLE + ); + + // The appended data segment will not be visible in the timeline since the old generation + // tombstone contains 1 core partition + SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments); + Assert.assertEquals(0, segmentTimeline.lookup(interval).size()); + } + private static class DS { static final String WIKI = "wiki"; @@ -2936,7 +3043,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest } final Set segmentsSet = new HashSet<>(segments); final Set committedSegments = coordinator.commitSegments(segmentsSet); - Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments))); + Assert.assertTrue(committedSegments.containsAll(segmentsSet)); return segments; } @@ -2961,4 +3068,17 @@ public class IndexerSQLMetadataStorageCoordinatorTest } ); } + + /** + * This test-only shard type is to test the behavior of "old generation" tombstones with 1 core partition. + */ + private static class TombstoneShardSpecWith1CorePartition extends TombstoneShardSpec + { + @Override + @JsonProperty("partitions") + public int getNumCorePartitions() + { + return 1; + } + } }