Make `numCorePartitions` as 0 for tombstones (#15379)

* Make numCorePartitions as 0 in the TombstoneShardSpec.

* fix up test

* Add tombstone core partition tests

* review comment

* Need to register the test shard type to make jackson happy
This commit is contained in:
Abhishek Radhakrishnan 2023-11-20 09:42:51 -08:00 committed by GitHub
parent ba1b6fa5a9
commit 470c8ed7b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 7 deletions

View File

@ -29,7 +29,9 @@ import java.util.Map;
import java.util.Objects;
/**
* A shard spec to represent tombstones. Its partition number is always zero and contains 1 core partitions.
* A shard spec to represent tombstones. Its partition number is always zero and contains zero core partitions as it
* contains no data. This allows other shard types appending to an existing {@link TombstoneShardSpec} to exist independently
* in the timeline even if the {@link TombstoneShardSpec} is dropped.
*/
public class TombstoneShardSpec implements ShardSpec
{
@ -69,7 +71,7 @@ public class TombstoneShardSpec implements ShardSpec
@JsonProperty("partitions")
public int getNumCorePartitions()
{
return 1;
return 0;
}
@Override
@ -88,8 +90,8 @@ public class TombstoneShardSpec implements ShardSpec
public String toString()
{
return "TombstoneShardSpec{" +
"partitionNum=" + 0 +
", partitions=" + 1 +
"partitionNum=" + getPartitionNum() +
", partitions=" + getNumCorePartitions() +
'}';
}

View File

@ -60,7 +60,7 @@ public class TombstoneShardSpecTest
@Test
public void getNumCorePartitions()
{
assertEquals(1, tombstoneShardSpec.getNumCorePartitions());
assertEquals(0, tombstoneShardSpec.getNumCorePartitions());
}
@Test

View File

@ -19,6 +19,7 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -42,6 +43,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -55,6 +57,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -1965,7 +1968,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
* - Later, after the above was dropped, another segment on same interval was created by the stream but this
* time there was an integrity violation in the pending segments table because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
* method returned an segment id that already existed in the pending segments table
* method returned a segment id that already existed in the pending segments table
*/
@Test
public void testAllocatePendingSegmentAfterDroppingExistingSegment()
@ -2907,6 +2910,110 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(3, resultForEternity.size());
}
@Test
public void testTimelineVisibilityWith0CorePartitionTombstone() throws IOException
{
final Interval interval = Intervals.of("2020/2021");
// Create and commit a tombstone segment
final DataSegment tombstoneSegment = createSegment(
interval,
"version",
new TombstoneShardSpec()
);
final Set<DataSegment> tombstones = new HashSet<>(Collections.singleton(tombstoneSegment));
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
NumberedPartialShardSpec.instance(),
"version",
false
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
final DataSegment dataSegment = createSegment(
interval,
"version",
identifier.getShardSpec()
);
final Set<DataSegment> dataSegments = new HashSet<>(Collections.singleton(dataSegment));
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
// Mark the tombstone as unused
markAllSegmentsUnused(tombstones);
final Collection<DataSegment> allUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);
// The appended data segment will still be visible in the timeline since the
// tombstone contains 0 core partitions
SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(1, segmentTimeline.lookup(interval).size());
Assert.assertEquals(dataSegment, segmentTimeline.lookup(interval).get(0).getObject().getChunk(1).getObject());
}
@Test
public void testTimelineWith1CorePartitionTombstone() throws IOException
{
// Register the old generation tombstone spec for this test.
mapper.registerSubtypes(TombstoneShardSpecWith1CorePartition.class);
final Interval interval = Intervals.of("2020/2021");
// Create and commit an old generation tombstone with 1 core partition
final DataSegment tombstoneSegment = createSegment(
interval,
"version",
new TombstoneShardSpecWith1CorePartition()
);
final Set<DataSegment> tombstones = new HashSet<>(Collections.singleton(tombstoneSegment));
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
// Allocate and commit a data segment by appending to the same interval
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
DS.WIKI,
"seq",
tombstoneSegment.getVersion(),
interval,
NumberedPartialShardSpec.instance(),
"version",
false
);
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());
Assert.assertEquals(1, identifier.getShardSpec().getNumCorePartitions());
final DataSegment dataSegment = createSegment(
interval,
"version",
identifier.getShardSpec()
);
final Set<DataSegment> dataSegments = new HashSet<>(Collections.singleton(dataSegment));
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
// Mark the tombstone as unused
markAllSegmentsUnused(tombstones);
final Collection<DataSegment> allUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);
// The appended data segment will not be visible in the timeline since the old generation
// tombstone contains 1 core partition
SegmentTimeline segmentTimeline = SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
}
private static class DS
{
static final String WIKI = "wiki";
@ -2936,7 +3043,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet);
Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
Assert.assertTrue(committedSegments.containsAll(segmentsSet));
return segments;
}
@ -2961,4 +3068,17 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
);
}
/**
* This test-only shard type is to test the behavior of "old generation" tombstones with 1 core partition.
*/
private static class TombstoneShardSpecWith1CorePartition extends TombstoneShardSpec
{
@Override
@JsonProperty("partitions")
public int getNumCorePartitions()
{
return 1;
}
}
}