Set the core partition set size properly for batch ingestion with dynamic partitioning (#10012)

* Fill in the core partition set size properly for batch ingestion with
dynamic partitioning

* incomplete javadoc

* Address comments

* fix tests

* fix json serde, add tests

* checkstyle
This commit is contained in:
Jihoon Son 2020-06-12 21:39:37 -07:00 committed by GitHub
parent 8a7e7e773a
commit 9a10f8352b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 588 additions and 81 deletions

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* This is a special shardSpec which is temporarily used during batch ingestion. In Druid, there is a concept
* of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
* partition set is represented as a range of partitionIds. For {@link NumberedShardSpec}, the core partition set
* is [0, {@link NumberedShardSpec#partitions}).
*
* The NumberedShardSpec is used for dynamic partitioning which is based on the number of rows in each segment.
* In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
* segments will be created per time chunk. However, in batch ingestion with time chunk locking, the core partition
* set is the set of segments created by an initial task or an overwriting task. Since the core partition set is
* determined when the task publishes segments at the end, the task postpones creating proper NumberedShardSpec
* until the end.
*
* This shardSpec is used for such use case. A non-appending batch task can use this shardSpec until it publishes
* segments at last. When it publishes segments, it should convert the shardSpec of those segments to NumberedShardSpec.
* See {@code SegmentPublisherHelper#annotateShardSpec} for converting to NumberedShardSpec. Note that, when
* the segment lock is used, the Overlord coordinates the segment allocation and this class is never used. Instead,
* the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment could have
* either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
* generation segments).
*
* This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
*
* Finally, this shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
* them is this shardSpec should never be published and so never be used in other places such as Broker timeline.
*
* @see NumberedShardSpec
*/
public class BuildingNumberedShardSpec implements ShardSpec
{
public static final String TYPE = "building_numbered";
private final int partitionId;
@JsonCreator
public BuildingNumberedShardSpec(@JsonProperty("partitionId") int partitionId)
{
Preconditions.checkArgument(partitionId >= 0, "partitionId >= 0");
this.partitionId = partitionId;
}
public NumberedShardSpec toNumberedShardSpec(int numTotalPartitions)
{
return new NumberedShardSpec(partitionId, numTotalPartitions);
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
// This method can be called in AppenderatorImpl to create a sinkTimeline.
// The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
return new NumberedPartitionChunk<>(partitionId, 0, obj);
}
@JsonProperty("partitionId")
@Override
public int getPartitionNum()
{
return partitionId;
}
@Override
public ShardSpecLookup getLookup(List<ShardSpec> shardSpecs)
{
return NumberedShardSpec.createLookup(shardSpecs);
}
// The below methods are used on the query side, and so must not be called for this shardSpec.
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
throw new UnsupportedOperationException();
}
@JsonIgnore
@Override
public List<String> getDomainDimensions()
{
throw new UnsupportedOperationException();
}
@Override
public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
throw new UnsupportedOperationException();
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) o;
return partitionId == shardSpec.partitionId;
}
@Override
public int hashCode()
{
return Objects.hash(partitionId);
}
@Override
public String toString()
{
return "BuildingNumberedShardSpec{" +
"partitionId=" + partitionId +
'}';
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.List;
@ -31,10 +32,28 @@ import java.util.Map;
import java.util.Objects;
/**
* ShardSpec for segments which overshadow others with their minorVersion.
* This shardSpec is used only for the segments created by overwriting tasks with segment lock enabled.
* When the segment lock is used, there is a concept of atomic update group which is a set of segments atomically
* becoming queryable together in Brokers. It is a similar concept to the core partition set (explained
* {@link NumberedShardSpec}), but different in a sense that there is only one core partition set per time chunk
* while there could be multiple atomic update groups in one time chunk.
*
* The atomic update group has the root partition range and the minor version to determine the visibility between
* atomic update groups; the group of the highest minor version in the same root partition range becomes queryable
* when they have the same major version ({@link DataSegment#getVersion()}).
*
* Note that this shardSpec is used only when you overwrite existing segments with segment lock enabled.
* If the task doesn't overwrite segments, it will use NumberedShardSpec instead even when segment lock is used.
* Similar to NumberedShardSpec, the size of the atomic update group is determined when the task publishes segments
* at the end of ingestion. As a result, {@link #atomicUpdateGroupSize} is set to
* {@link PartitionIds#UNKNOWN_ATOMIC_UPDATE_GROUP_SIZE} first, and updated when publishing segments
* in {@code SegmentPublisherHelper#annotateShardSpec}.
*
* @see AtomicUpdateGroup
*/
public class NumberedOverwriteShardSpec implements OverwriteShardSpec
{
public static final String TYPE = "numbered_overwrite";
private final int partitionId;
private final short startRootPartitionId;

View File

@ -40,6 +40,13 @@ public class NumberedPartialShardSpec implements PartialShardSpec
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
if (specOfPreviousMaxPartitionId == null) {
// The shardSpec is created by the Overlord.
// - For streaming ingestion tasks, the core partition set is always 0.
// - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size.
return new NumberedShardSpec(0, 0);
} else {
final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId;

View File

@ -67,6 +67,11 @@ public class NumberedShardSpec implements ShardSpec
@Override
public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
{
return createLookup(shardSpecs);
}
static ShardSpecLookup createLookup(List<ShardSpec> shardSpecs)
{
return (long timestamp, InputRow row) -> shardSpecs.get(0);
}

View File

@ -28,8 +28,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
/**
* Class to contain all information of a {@link ShardSpec} except for the partition ID.
* This class is mainly used by the indexing tasks to allocate new segments using the Overlord.
* This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending
* segments to an existing datasource (either streaming ingestion or batch append) or any case when segment
* lock is used. The implementations of this interface contain all information of the corresponding {@link ShardSpec}
* except the partition ID.
* The ingestion tasks send all information required for allocating a new segment using this interface and the Overlord
* determines the partition ID to create a new segment.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({

View File

@ -19,6 +19,7 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.RangeSet;
@ -38,36 +39,63 @@ import java.util.Map;
@JsonSubTypes.Type(name = "linear", value = LinearShardSpec.class),
@JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class),
@JsonSubTypes.Type(name = "numbered_overwrite", value = NumberedOverwriteShardSpec.class)
@JsonSubTypes.Type(name = NumberedOverwriteShardSpec.TYPE, value = NumberedOverwriteShardSpec.class),
@JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class)
})
public interface ShardSpec
{
@JsonIgnore
<T> PartitionChunk<T> createChunk(T obj);
@JsonIgnore
boolean isInChunk(long timestamp, InputRow inputRow);
/**
* Returns the partition ID of this segment.
*/
int getPartitionNum();
/**
* Returns the start root partition ID of the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default int getStartRootPartitionId()
{
return getPartitionNum();
}
/**
* Returns the end root partition ID of the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default int getEndRootPartitionId()
{
return getPartitionNum() + 1;
}
/**
* Returns the minor version associated to the atomic update group which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default short getMinorVersion()
{
return 0;
}
/**
* Returns the atomic update group size which this segment belongs to.
*
* @see AtomicUpdateGroup
*/
default short getAtomicUpdateGroupSize()
{
return 1;
}
@JsonIgnore
ShardSpecLookup getLookup(List<ShardSpec> shardSpecs);
/**
@ -75,16 +103,19 @@ public interface ShardSpec
*
* @return list of dimensions who has its possible range. Dimensions with unknown possible range are not listed
*/
@JsonIgnore
List<String> getDomainDimensions();
/**
* if given domain ranges are not possible in this shard, return false; otherwise return true;
* @return possibility of in domain
*/
@JsonIgnore
boolean possibleInDomain(Map<String, RangeSet<String>> domain);
/**
* Returns true if two segments of this and other shardSpecs can exist in the same timeChunk.
* Returns true if two segments of this and other shardSpecs can exist in the same time chunk.
*/
@JsonIgnore
boolean isCompatible(Class<? extends ShardSpec> other);
}

View File

@ -23,5 +23,9 @@ import org.apache.druid.data.input.InputRow;
public interface ShardSpecLookup
{
/**
* Returns a {@link ShardSpec} for the given timestamp and the inputRow.
* The timestamp must be bucketed using {@code GranularitySpec#getQueryGranularity}.
*/
ShardSpec getShardSpec(long timestamp, InputRow row);
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class BuildingNumberedShardSpecTest
{
@Test
public void testToNumberedShardSpec()
{
Assert.assertEquals(new NumberedShardSpec(5, 10), new BuildingNumberedShardSpec(5).toNumberedShardSpec(10));
}
@Test
public void testCreateChunk()
{
Assert.assertEquals(
new NumberedPartitionChunk<>(5, 0, "test"),
new BuildingNumberedShardSpec(5).createChunk("test")
);
}
@Test
public void testShardSpecLookup()
{
final List<ShardSpec> shardSpecs = ImmutableList.of(
new BuildingNumberedShardSpec(1),
new BuildingNumberedShardSpec(2),
new BuildingNumberedShardSpec(3)
);
final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
// Timestamp doesn't matter. It always returns the first shardSpec.
final long currentTime = DateTimes.nowUtc().getMillis();
Assert.assertEquals(
shardSpecs.get(0),
lookup.getShardSpec(
currentTime,
new MapBasedInputRow(
currentTime,
ImmutableList.of("dim"), ImmutableMap.of("dim", "val", "time", currentTime)
)
)
);
}
@Test
public void testSerde() throws JsonProcessingException
{
final ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.registerSubtypes(new NamedType(BuildingNumberedShardSpec.class, BuildingNumberedShardSpec.TYPE));
final BuildingNumberedShardSpec original = new BuildingNumberedShardSpec(5);
final String json = mapper.writeValueAsString(original);
final BuildingNumberedShardSpec fromJson = (BuildingNumberedShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(BuildingNumberedShardSpec.class).usingGetClass().verify();
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;
public class NumberedOverwriteShardSpecTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(NumberedOverwriteShardSpec.class).usingGetClass().verify();
}
@Test
public void testSerde() throws JsonProcessingException
{
final ObjectMapper mapper = new ObjectMapper();
mapper.registerSubtypes(new NamedType(NumberedOverwriteShardSpec.class, NumberedOverwriteShardSpec.TYPE));
final NumberedOverwriteShardSpec original = new NumberedOverwriteShardSpec(
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 2,
0,
10,
(short) 1,
(short) 3
);
final String json = mapper.writeValueAsString(original);
final NumberedOverwriteShardSpec fromJson = (NumberedOverwriteShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
}

View File

@ -30,7 +30,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -77,7 +77,7 @@ class LocalSegmentAllocator implements SegmentAllocator
dataSource,
interval,
version,
new NumberedShardSpec(partitionId, 0)
new BuildingNumberedShardSpec(partitionId)
);
};
}

View File

@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.utils.CollectionUtils;
@ -984,7 +985,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
dataSource,
interval,
version,
new NumberedShardSpec(partitionNum, 0)
new BuildingNumberedShardSpec(partitionNum)
);
}

View File

@ -252,7 +252,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
@ -301,7 +301,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
@ -334,7 +334,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
}
@ -414,7 +414,11 @@ public class CompactionTaskRunTest extends IngestionTestBase
for (int i = 0; i < 6; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 3 + i / 2, 3 + i / 2 + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec());
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec());
} else {
Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec());
}
}
Assert.assertTrue(compactionFuture.get().lhs.isSuccess());
@ -434,7 +438,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
}
@ -472,7 +476,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState());
// hour segmentGranularity
@ -490,7 +494,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState());
}
}
@ -594,7 +598,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec());
}
}
@ -667,7 +671,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(i % 2, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(i % 2, 2), segments.get(i).getShardSpec());
}
}
@ -757,7 +761,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
}

View File

@ -43,6 +43,8 @@ import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@ -57,10 +59,13 @@ import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -155,6 +160,19 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(interval, appendToExisting);
}
private void runOverwriteTask(
@Nullable Interval interval,
Granularity segmentGranularity,
LockGranularity actualLockGranularity
)
{
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(interval, actualLockGranularity);
}
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity)
@ -173,7 +191,11 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
runTestTask(inputInterval, secondSegmentGranularity);
runOverwriteTask(
inputInterval,
secondSegmentGranularity,
secondSegmentGranularity.equals(Granularities.DAY) ? lockGranularity : LockGranularity.TIME_CHUNK
);
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
@ -184,6 +206,62 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
private void assertShardSpec(@Nullable Interval interval, boolean appendToExisting)
{
final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval;
final Collection<DataSegment> segments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE);
if (!appendToExisting && lockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions());
}
}
} else {
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(0, shardSpec.getPartitions());
}
}
}
private void assertShardSpecAfterOverwrite(@Nullable Interval interval, LockGranularity actualLockGranularity)
{
final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval;
final Collection<DataSegment> segments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
if (actualLockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getPartitions());
}
}
} else {
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass());
final NumberedOverwriteShardSpec shardSpec = (NumberedOverwriteShardSpec) segment.getShardSpec();
Assert.assertEquals(segmentsPerInterval.size(), shardSpec.getAtomicUpdateGroupSize());
}
}
}
}
@Test
public void testWithoutInterval()
{
@ -213,9 +291,12 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
@Test
public void testRunInSequential()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2017-12/P1M"), false, false);
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(interval, appendToExisting);
}
@Test
@ -229,10 +310,12 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
@Test
public void testWith1MaxNumConcurrentSubTasks()
{
final Interval interval = Intervals.of("2017-12/P1M");
final boolean appendToExisting = false;
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017-12/P1M"),
interval,
Granularities.DAY,
false,
appendToExisting,
true,
new ParallelIndexTuningConfig(
null,
@ -266,6 +349,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(interval, appendToExisting);
}
@Test

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.OverwriteShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class SegmentPublisherHelper
{
/**
* This method fills missing information in the shard spec if necessary when publishing segments.
*
* - When time chunk lock is used, the non-appending task should set the proper size of the core partitions for
* dynamically-partitioned segments. See {@link #annotateNumberedShardSpecFn}.
* - When segment lock is used, the overwriting task should set the proper size of the atomic update group.
* See {@link #annotateAtomicUpdateGroupFn}.
*/
static Set<DataSegment> annotateShardSpec(Set<DataSegment> segments)
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segmentsPerInterval = entry.getValue();
final ShardSpec firstShardSpec = segmentsPerInterval.get(0).getShardSpec();
final boolean anyMismatch = segmentsPerInterval.stream().anyMatch(
segment -> segment.getShardSpec().getClass() != firstShardSpec.getClass()
);
if (anyMismatch) {
throw new ISE(
"Mismatched shardSpecs in interval[%s] for segments[%s]",
interval,
segmentsPerInterval
);
}
final Function<DataSegment, DataSegment> annotateFn;
if (firstShardSpec instanceof OverwriteShardSpec) {
annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
} else if (firstShardSpec instanceof BuildingNumberedShardSpec) {
annotateFn = annotateNumberedShardSpecFn(segmentsPerInterval.size());
} else {
annotateFn = null;
}
if (annotateFn != null) {
intervalToSegments.put(interval, segmentsPerInterval.stream().map(annotateFn).collect(Collectors.toList()));
}
}
return intervalToSegments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
}
private static Function<DataSegment, DataSegment> annotateAtomicUpdateGroupFn(int atomicUpdateGroupSize)
{
// The segments which are published together consist an atomicUpdateGroup.
return segment -> {
final OverwriteShardSpec shardSpec = (OverwriteShardSpec) segment.getShardSpec();
return segment.withShardSpec(shardSpec.withAtomicUpdateGroupSize((short) atomicUpdateGroupSize));
};
}
private static Function<DataSegment, DataSegment> annotateNumberedShardSpecFn(int corePartitionSetSize)
{
return segment -> {
final BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) segment.getShardSpec();
return segment.withShardSpec(shardSpec.toNumberedShardSpec(corePartitionSetSize));
};
}
private SegmentPublisherHelper()
{
}
}

View File

@ -20,21 +20,11 @@
package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.OverwriteShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
public interface TransactionalSegmentPublisher
{
@ -63,7 +53,7 @@ public interface TransactionalSegmentPublisher
{
return publishAnnotatedSegments(
segmentsToBeOverwritten,
annotateAtomicUpdateGroupSize(segmentsToPublish),
SegmentPublisherHelper.annotateShardSpec(segmentsToPublish),
commitMetadata
);
}
@ -76,52 +66,4 @@ public interface TransactionalSegmentPublisher
{
return false;
}
static Set<DataSegment> annotateAtomicUpdateGroupSize(Set<DataSegment> segments)
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segmentsPerInterval = entry.getValue();
final boolean isNonFirstGeneration = segmentsPerInterval.get(0).getShardSpec() instanceof OverwriteShardSpec;
final boolean anyMismatch = segmentsPerInterval.stream().anyMatch(
segment -> (segment.getShardSpec() instanceof OverwriteShardSpec) != isNonFirstGeneration
);
if (anyMismatch) {
throw new ISE(
"WTH? some segments have empty overshadwedSegments but others are not? "
+ "segments with non-overwritingShardSpec: [%s],"
+ "segments with overwritingShardSpec: [%s]",
segmentsPerInterval.stream()
.filter(segment -> !(segment.getShardSpec() instanceof OverwriteShardSpec))
.collect(Collectors.toList()),
segmentsPerInterval.stream()
.filter(segment -> segment.getShardSpec() instanceof OverwriteShardSpec)
.collect(Collectors.toList())
);
}
if (isNonFirstGeneration) {
// The segments which are published together consist an atomicUpdateGroup.
intervalToSegments.put(
interval,
segmentsPerInterval
.stream()
.map(segment -> {
final OverwriteShardSpec shardSpec = (OverwriteShardSpec) segment.getShardSpec();
return segment.withShardSpec(shardSpec.withAtomicUpdateGroupSize((short) segmentsPerInterval.size()));
})
.collect(Collectors.toList())
);
}
}
return intervalToSegments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
}
}