diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java new file mode 100644 index 00000000000..4604f23a982 --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.RangeSet; +import org.apache.druid.data.input.InputRow; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This is a special shardSpec which is temporarily used during batch ingestion. In Druid, there is a concept + * of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core + * partition set is represented as a range of partitionIds. For {@link NumberedShardSpec}, the core partition set + * is [0, {@link NumberedShardSpec#partitions}). + * + * The NumberedShardSpec is used for dynamic partitioning which is based on the number of rows in each segment. + * In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many + * segments will be created per time chunk. However, in batch ingestion with time chunk locking, the core partition + * set is the set of segments created by an initial task or an overwriting task. Since the core partition set is + * determined when the task publishes segments at the end, the task postpones creating proper NumberedShardSpec + * until the end. + * + * This shardSpec is used for such use case. A non-appending batch task can use this shardSpec until it publishes + * segments at last. When it publishes segments, it should convert the shardSpec of those segments to NumberedShardSpec. + * See {@code SegmentPublisherHelper#annotateShardSpec} for converting to NumberedShardSpec. Note that, when + * the segment lock is used, the Overlord coordinates the segment allocation and this class is never used. Instead, + * the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment could have + * either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root + * generation segments). + * + * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion. + * + * Finally, this shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between + * them is this shardSpec should never be published and so never be used in other places such as Broker timeline. + * + * @see NumberedShardSpec + */ +public class BuildingNumberedShardSpec implements ShardSpec +{ + public static final String TYPE = "building_numbered"; + + private final int partitionId; + + @JsonCreator + public BuildingNumberedShardSpec(@JsonProperty("partitionId") int partitionId) + { + Preconditions.checkArgument(partitionId >= 0, "partitionId >= 0"); + this.partitionId = partitionId; + } + + public NumberedShardSpec toNumberedShardSpec(int numTotalPartitions) + { + return new NumberedShardSpec(partitionId, numTotalPartitions); + } + + @Override + public PartitionChunk createChunk(T obj) + { + // This method can be called in AppenderatorImpl to create a sinkTimeline. + // The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now. + return new NumberedPartitionChunk<>(partitionId, 0, obj); + } + + @JsonProperty("partitionId") + @Override + public int getPartitionNum() + { + return partitionId; + } + + @Override + public ShardSpecLookup getLookup(List shardSpecs) + { + return NumberedShardSpec.createLookup(shardSpecs); + } + + // The below methods are used on the query side, and so must not be called for this shardSpec. + + @Override + public boolean isInChunk(long timestamp, InputRow inputRow) + { + throw new UnsupportedOperationException(); + } + + @JsonIgnore + @Override + public List getDomainDimensions() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean possibleInDomain(Map> domain) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCompatible(Class other) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) o; + return partitionId == shardSpec.partitionId; + } + + @Override + public int hashCode() + { + return Objects.hash(partitionId); + } + + @Override + public String toString() + { + return "BuildingNumberedShardSpec{" + + "partitionId=" + partitionId + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java index d5b36576a65..dbbb8f66e93 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.RangeSet; import org.apache.druid.data.input.InputRow; +import org.apache.druid.timeline.DataSegment; import java.util.Collections; import java.util.List; @@ -31,10 +32,28 @@ import java.util.Map; import java.util.Objects; /** - * ShardSpec for segments which overshadow others with their minorVersion. + * This shardSpec is used only for the segments created by overwriting tasks with segment lock enabled. + * When the segment lock is used, there is a concept of atomic update group which is a set of segments atomically + * becoming queryable together in Brokers. It is a similar concept to the core partition set (explained + * {@link NumberedShardSpec}), but different in a sense that there is only one core partition set per time chunk + * while there could be multiple atomic update groups in one time chunk. + * + * The atomic update group has the root partition range and the minor version to determine the visibility between + * atomic update groups; the group of the highest minor version in the same root partition range becomes queryable + * when they have the same major version ({@link DataSegment#getVersion()}). + * + * Note that this shardSpec is used only when you overwrite existing segments with segment lock enabled. + * If the task doesn't overwrite segments, it will use NumberedShardSpec instead even when segment lock is used. + * Similar to NumberedShardSpec, the size of the atomic update group is determined when the task publishes segments + * at the end of ingestion. As a result, {@link #atomicUpdateGroupSize} is set to + * {@link PartitionIds#UNKNOWN_ATOMIC_UPDATE_GROUP_SIZE} first, and updated when publishing segments + * in {@code SegmentPublisherHelper#annotateShardSpec}. + * + * @see AtomicUpdateGroup */ public class NumberedOverwriteShardSpec implements OverwriteShardSpec { + public static final String TYPE = "numbered_overwrite"; private final int partitionId; private final short startRootPartitionId; diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java index 0e258ee786b..7c7b9753aaa 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedPartialShardSpec.java @@ -40,6 +40,13 @@ public class NumberedPartialShardSpec implements PartialShardSpec public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId) { if (specOfPreviousMaxPartitionId == null) { + // The shardSpec is created by the Overlord. + // - For streaming ingestion tasks, the core partition set is always 0. + // - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false). + // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of + // the same datasource. Since there is no restriction for those tasks in segment allocation, the + // allocated IDs for each task can interleave. As a result, the core partition set cannot be + // represented as a range. We always set 0 for the core partition set size. return new NumberedShardSpec(0, 0); } else { final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId; diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java index d6f98d71751..6f8898e298f 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java @@ -67,6 +67,11 @@ public class NumberedShardSpec implements ShardSpec @Override public ShardSpecLookup getLookup(final List shardSpecs) + { + return createLookup(shardSpecs); + } + + static ShardSpecLookup createLookup(List shardSpecs) { return (long timestamp, InputRow row) -> shardSpecs.get(0); } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java index 9cbee2c7571..6afaa939471 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/PartialShardSpec.java @@ -28,8 +28,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; /** - * Class to contain all information of a {@link ShardSpec} except for the partition ID. - * This class is mainly used by the indexing tasks to allocate new segments using the Overlord. + * This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending + * segments to an existing datasource (either streaming ingestion or batch append) or any case when segment + * lock is used. The implementations of this interface contain all information of the corresponding {@link ShardSpec} + * except the partition ID. + * The ingestion tasks send all information required for allocating a new segment using this interface and the Overlord + * determines the partition ID to create a new segment. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java index 43aaf701db3..06ff8dd6720 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java @@ -19,6 +19,7 @@ package org.apache.druid.timeline.partition; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.collect.RangeSet; @@ -38,36 +39,63 @@ import java.util.Map; @JsonSubTypes.Type(name = "linear", value = LinearShardSpec.class), @JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class), @JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class), - @JsonSubTypes.Type(name = "numbered_overwrite", value = NumberedOverwriteShardSpec.class) + @JsonSubTypes.Type(name = NumberedOverwriteShardSpec.TYPE, value = NumberedOverwriteShardSpec.class), + @JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class) }) public interface ShardSpec { + @JsonIgnore PartitionChunk createChunk(T obj); + @JsonIgnore boolean isInChunk(long timestamp, InputRow inputRow); + /** + * Returns the partition ID of this segment. + */ int getPartitionNum(); + /** + * Returns the start root partition ID of the atomic update group which this segment belongs to. + * + * @see AtomicUpdateGroup + */ default int getStartRootPartitionId() { return getPartitionNum(); } + /** + * Returns the end root partition ID of the atomic update group which this segment belongs to. + * + * @see AtomicUpdateGroup + */ default int getEndRootPartitionId() { return getPartitionNum() + 1; } + /** + * Returns the minor version associated to the atomic update group which this segment belongs to. + * + * @see AtomicUpdateGroup + */ default short getMinorVersion() { return 0; } + /** + * Returns the atomic update group size which this segment belongs to. + * + * @see AtomicUpdateGroup + */ default short getAtomicUpdateGroupSize() { return 1; } + @JsonIgnore ShardSpecLookup getLookup(List shardSpecs); /** @@ -75,16 +103,19 @@ public interface ShardSpec * * @return list of dimensions who has its possible range. Dimensions with unknown possible range are not listed */ + @JsonIgnore List getDomainDimensions(); /** * if given domain ranges are not possible in this shard, return false; otherwise return true; * @return possibility of in domain */ + @JsonIgnore boolean possibleInDomain(Map> domain); /** - * Returns true if two segments of this and other shardSpecs can exist in the same timeChunk. + * Returns true if two segments of this and other shardSpecs can exist in the same time chunk. */ + @JsonIgnore boolean isCompatible(Class other); } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpecLookup.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpecLookup.java index 25c785b542e..610e92a0ea6 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpecLookup.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpecLookup.java @@ -23,5 +23,9 @@ import org.apache.druid.data.input.InputRow; public interface ShardSpecLookup { + /** + * Returns a {@link ShardSpec} for the given timestamp and the inputRow. + * The timestamp must be bucketed using {@code GranularitySpec#getQueryGranularity}. + */ ShardSpec getShardSpec(long timestamp, InputRow row); } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java new file mode 100644 index 00000000000..21c5a03644e --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpecTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class BuildingNumberedShardSpecTest +{ + @Test + public void testToNumberedShardSpec() + { + Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).toNumberedShardSpec(10)); + } + + @Test + public void testCreateChunk() + { + Assert.assertEquals( + new NumberedPartitionChunk<>(5, 0, "test"), + new BuildingNumberedShardSpec(5).createChunk("test") + ); + } + + @Test + public void testShardSpecLookup() + { + final List shardSpecs = ImmutableList.of( + new BuildingNumberedShardSpec(1), + new BuildingNumberedShardSpec(2), + new BuildingNumberedShardSpec(3) + ); + final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs); + // Timestamp doesn't matter. It always returns the first shardSpec. + final long currentTime = DateTimes.nowUtc().getMillis(); + Assert.assertEquals( + shardSpecs.get(0), + lookup.getShardSpec( + currentTime, + new MapBasedInputRow( + currentTime, + ImmutableList.of("dim"), ImmutableMap.of("dim", "val", "time", currentTime) + ) + ) + ); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.registerSubtypes(new NamedType(BuildingNumberedShardSpec.class, BuildingNumberedShardSpec.TYPE)); + final BuildingNumberedShardSpec original = new BuildingNumberedShardSpec(5); + final String json = mapper.writeValueAsString(original); + final BuildingNumberedShardSpec fromJson = (BuildingNumberedShardSpec) mapper.readValue(json, ShardSpec.class); + Assert.assertEquals(original, fromJson); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(BuildingNumberedShardSpec.class).usingGetClass().verify(); + } +} diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java new file mode 100644 index 00000000000..d605105c48d --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Assert; +import org.junit.Test; + +public class NumberedOverwriteShardSpecTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(NumberedOverwriteShardSpec.class).usingGetClass().verify(); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerSubtypes(new NamedType(NumberedOverwriteShardSpec.class, NumberedOverwriteShardSpec.TYPE)); + final NumberedOverwriteShardSpec original = new NumberedOverwriteShardSpec( + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 2, + 0, + 10, + (short) 1, + (short) 3 + ); + final String json = mapper.writeValueAsString(original); + final NumberedOverwriteShardSpec fromJson = (NumberedOverwriteShardSpec) mapper.readValue(json, ShardSpec.class); + Assert.assertEquals(original, fromJson); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java index ead0f635599..2cb4db551c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/LocalSegmentAllocator.java @@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentAllocator; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; -import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.timeline.partition.BuildingNumberedShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -77,7 +77,7 @@ class LocalSegmentAllocator implements SegmentAllocator dataSource, interval, version, - new NumberedShardSpec(partitionId, 0) + new BuildingNumberedShardSpec(partitionId) ); }; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2d2ec3a0f2b..cb1bc398651 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlers; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.BuildingNumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.apache.druid.utils.CollectionUtils; @@ -984,7 +985,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen dataSource, interval, version, - new NumberedShardSpec(partitionNum, 0) + new BuildingNumberedShardSpec(partitionNum) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 2a5b02e1db1..e08361450fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -252,7 +252,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); } } @@ -301,7 +301,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); } } @@ -334,7 +334,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); } } } @@ -414,7 +414,11 @@ public class CompactionTaskRunTest extends IngestionTestBase for (int i = 0; i < 6; i++) { Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 3 + i / 2, 3 + i / 2 + 1), segments.get(i).getInterval()); - Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec()); + if (lockGranularity == LockGranularity.SEGMENT) { + Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec()); + } else { + Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec()); + } } Assert.assertTrue(compactionFuture.get().lhs.isSuccess()); @@ -434,7 +438,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); } } } @@ -472,7 +476,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Assert.assertEquals(1, segments.size()); Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval()); - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState()); // hour segmentGranularity @@ -490,7 +494,7 @@ public class CompactionTaskRunTest extends IngestionTestBase for (int i = 0; i < 3; i++) { Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval()); - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); } } @@ -594,7 +598,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec()); } } @@ -667,7 +671,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec()); } } @@ -757,7 +761,7 @@ public class CompactionTaskRunTest extends IngestionTestBase segments.get(i).getShardSpec() ); } else { - Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 7b1baa9bc1f..e1e21a56719 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -43,6 +43,8 @@ import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -57,10 +59,13 @@ import java.io.IOException; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -155,6 +160,19 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + assertShardSpec(interval, appendToExisting); + } + + private void runOverwriteTask( + @Nullable Interval interval, + Granularity segmentGranularity, + LockGranularity actualLockGranularity + ) + { + final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true); + task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + assertShardSpecAfterOverwrite(interval, actualLockGranularity); } private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity) @@ -173,7 +191,11 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv ); // Reingest the same data. Each segment should get replaced by a segment with a newer version. - runTestTask(inputInterval, secondSegmentGranularity); + runOverwriteTask( + inputInterval, + secondSegmentGranularity, + secondSegmentGranularity.equals(Granularities.DAY) ? lockGranularity : LockGranularity.TIME_CHUNK + ); // Verify that the segment has been replaced. final Collection newSegments = @@ -184,6 +206,62 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Assert.assertEquals(new HashSet<>(newSegments), visibles); } + private void assertShardSpec(@Nullable Interval interval, boolean appendToExisting) + { + final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval; + final Collection segments = + getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE); + if (!appendToExisting && lockGranularity != LockGranularity.SEGMENT) { + // Check the core partition set in the shardSpec + final Map> intervalToSegments = new HashMap<>(); + segments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + for (List segmentsPerInterval : intervalToSegments.values()) { + for (DataSegment segment : segmentsPerInterval) { + Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass()); + final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec(); + Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions()); + } + } + } else { + for (DataSegment segment : segments) { + Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass()); + final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec(); + Assert.assertEquals(0, shardSpec.getPartitions()); + } + } + } + + private void assertShardSpecAfterOverwrite(@Nullable Interval interval, LockGranularity actualLockGranularity) + { + final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval; + final Collection segments = + getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE); + final Map> intervalToSegments = new HashMap<>(); + segments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + if (actualLockGranularity != LockGranularity.SEGMENT) { + // Check the core partition set in the shardSpec + for (List segmentsPerInterval : intervalToSegments.values()) { + for (DataSegment segment : segmentsPerInterval) { + Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass()); + final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec(); + Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions()); + } + } + } else { + for (List segmentsPerInterval : intervalToSegments.values()) { + for (DataSegment segment : segmentsPerInterval) { + Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass()); + final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec(); + Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getAtomicUpdateGroupSize()); + } + } + } + } + @Test public void testWithoutInterval() { @@ -213,9 +291,12 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv @Test public void testRunInSequential() { - final ParallelIndexSupervisorTask task = newTask(Intervals.of("2017-12/P1M"), false, false); + final Interval interval = Intervals.of("2017-12/P1M"); + final boolean appendToExisting = false; + final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); + assertShardSpec(interval, appendToExisting); } @Test @@ -229,10 +310,12 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv @Test public void testWith1MaxNumConcurrentSubTasks() { + final Interval interval = Intervals.of("2017-12/P1M"); + final boolean appendToExisting = false; final ParallelIndexSupervisorTask task = newTask( - Intervals.of("2017-12/P1M"), + interval, Granularities.DAY, - false, + appendToExisting, true, new ParallelIndexTuningConfig( null, @@ -266,6 +349,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode()); Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner()); + assertShardSpec(interval, appendToExisting); } @Test diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java new file mode 100644 index 00000000000..76093beb8c8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.BuildingNumberedShardSpec; +import org.apache.druid.timeline.partition.OverwriteShardSpec; +import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class SegmentPublisherHelper +{ + /** + * This method fills missing information in the shard spec if necessary when publishing segments. + * + * - When time chunk lock is used, the non-appending task should set the proper size of the core partitions for + * dynamically-partitioned segments. See {@link #annotateNumberedShardSpecFn}. + * - When segment lock is used, the overwriting task should set the proper size of the atomic update group. + * See {@link #annotateAtomicUpdateGroupFn}. + */ + static Set annotateShardSpec(Set segments) + { + final Map> intervalToSegments = new HashMap<>(); + segments.forEach( + segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) + ); + + for (Entry> entry : intervalToSegments.entrySet()) { + final Interval interval = entry.getKey(); + final List segmentsPerInterval = entry.getValue(); + final ShardSpec firstShardSpec = segmentsPerInterval.get(0).getShardSpec(); + final boolean anyMismatch = segmentsPerInterval.stream().anyMatch( + segment -> segment.getShardSpec().getClass() != firstShardSpec.getClass() + ); + if (anyMismatch) { + throw new ISE( + "Mismatched shardSpecs in interval[%s] for segments[%s]", + interval, + segmentsPerInterval + ); + } + final Function annotateFn; + if (firstShardSpec instanceof OverwriteShardSpec) { + annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size()); + } else if (firstShardSpec instanceof BuildingNumberedShardSpec) { + annotateFn = annotateNumberedShardSpecFn(segmentsPerInterval.size()); + } else { + annotateFn = null; + } + + if (annotateFn != null) { + intervalToSegments.put(interval, segmentsPerInterval.stream().map(annotateFn).collect(Collectors.toList())); + } + } + + return intervalToSegments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + } + + private static Function annotateAtomicUpdateGroupFn(int atomicUpdateGroupSize) + { + // The segments which are published together consist an atomicUpdateGroup. + return segment -> { + final OverwriteShardSpec shardSpec = (OverwriteShardSpec) segment.getShardSpec(); + return segment.withShardSpec(shardSpec.withAtomicUpdateGroupSize((short) atomicUpdateGroupSize)); + }; + } + + private static Function annotateNumberedShardSpecFn(int corePartitionSetSize) + { + return segment -> { + final BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) segment.getShardSpec(); + return segment.withShardSpec(shardSpec.toNumberedShardSpec(corePartitionSetSize)); + }; + } + + private SegmentPublisherHelper() + { + } +} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index cb9b9ffb444..26d8fac9dff 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -20,21 +20,11 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.OverwriteShardSpec; -import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import java.util.stream.Collectors; public interface TransactionalSegmentPublisher { @@ -63,7 +53,7 @@ public interface TransactionalSegmentPublisher { return publishAnnotatedSegments( segmentsToBeOverwritten, - annotateAtomicUpdateGroupSize(segmentsToPublish), + SegmentPublisherHelper.annotateShardSpec(segmentsToPublish), commitMetadata ); } @@ -76,52 +66,4 @@ public interface TransactionalSegmentPublisher { return false; } - - static Set annotateAtomicUpdateGroupSize(Set segments) - { - final Map> intervalToSegments = new HashMap<>(); - segments.forEach( - segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment) - ); - - for (Entry> entry : intervalToSegments.entrySet()) { - final Interval interval = entry.getKey(); - final List segmentsPerInterval = entry.getValue(); - final boolean isNonFirstGeneration = segmentsPerInterval.get(0).getShardSpec() instanceof OverwriteShardSpec; - - final boolean anyMismatch = segmentsPerInterval.stream().anyMatch( - segment -> (segment.getShardSpec() instanceof OverwriteShardSpec) != isNonFirstGeneration - ); - if (anyMismatch) { - throw new ISE( - "WTH? some segments have empty overshadwedSegments but others are not? " - + "segments with non-overwritingShardSpec: [%s]," - + "segments with overwritingShardSpec: [%s]", - segmentsPerInterval.stream() - .filter(segment -> !(segment.getShardSpec() instanceof OverwriteShardSpec)) - .collect(Collectors.toList()), - segmentsPerInterval.stream() - .filter(segment -> segment.getShardSpec() instanceof OverwriteShardSpec) - .collect(Collectors.toList()) - ); - } - - if (isNonFirstGeneration) { - // The segments which are published together consist an atomicUpdateGroup. - - intervalToSegments.put( - interval, - segmentsPerInterval - .stream() - .map(segment -> { - final OverwriteShardSpec shardSpec = (OverwriteShardSpec) segment.getShardSpec(); - return segment.withShardSpec(shardSpec.withAtomicUpdateGroupSize((short) segmentsPerInterval.size())); - }) - .collect(Collectors.toList()) - ); - } - } - - return intervalToSegments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); - } }