Allow append to existing datasources when dynamic partitioning is used (#10033)

* Fill in the core partition set size properly for batch ingestion with
dynamic partitioning

* incomplete javadoc

* Address comments

* fix tests

* fix json serde, add tests

* checkstyle

* Set core partition set size for hash-partitioned segments properly in
batch ingestion

* test for both parallel and single-threaded task

* unused variables

* fix test

* unused imports

* add hash/range buckets

* some test adjustment and missing json serde

* centralized partition id allocation in parallel and simple tasks

* remove string partition chunk

* revive string partition chunk

* fill numCorePartitions for hadoop

* clean up hash stuffs

* resolved todos

* javadocs

* Fix tests

* add more tests

* doc

* unused imports

* Allow append to existing datasources when dynamic partitioing is used

* fix test

* checkstyle

* checkstyle

* fix test

* fix test

* fix other tests..

* checkstyle

* hansle unknown core partitions size in overlord segment allocation

* fail to append when numCorePartitions is unknown

* log

* fix comment; rename to be more intuitive

* double append test

* cleanup complete(); add tests

* fix build

* add tests

* address comments

* checkstyle
This commit is contained in:
Jihoon Son 2020-06-25 13:37:31 -07:00 committed by GitHub
parent 0f51b3c190
commit aaee72c781
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1507 additions and 509 deletions

View File

@ -29,6 +29,7 @@ import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
@ -36,9 +37,12 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Utility methods useful for implementing deep storage extensions.
@ -93,6 +97,15 @@ public class SegmentUtils
return Collections2.transform(segments, DataSegment::getId);
}
public static Map<Interval, List<DataSegment>> groupSegmentsByInterval(Collection<DataSegment> segments)
{
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
return intervalToSegments;
}
private SegmentUtils()
{
// no instantiation

View File

@ -168,7 +168,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
List<String> dimensions,
List<String> metrics,
ShardSpec shardSpec,
CompactionState lastCompactionState,
@Nullable CompactionState lastCompactionState,
Integer binaryVersion,
long size
)

View File

@ -97,10 +97,4 @@ public interface BucketNumberedShardSpec<T extends BuildingShardSpec> extends Sh
{
throw new UnsupportedOperationException();
}
@Override
default boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
}

View File

@ -94,10 +94,4 @@ public interface BuildingShardSpec<T extends ShardSpec> extends ShardSpec
{
throw new UnsupportedOperationException();
}
@Override
default boolean isCompatible(Class<? extends ShardSpec> other)
{
throw new UnsupportedOperationException();
}
}

View File

@ -68,17 +68,11 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// The shardSpec is created by the Overlord.
// For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size if this is an initial segment.
return new HashBasedNumberedShardSpec(
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions(),
partitionId,
numCorePartitions,
bucketId,
numBuckets,
partitionDimensions,
@ -86,12 +80,6 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
);
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
return new HashBasedNumberedShardSpec(partitionId, 0, bucketId, numBuckets, partitionDimensions, objectMapper);
}
@Override
public Class<? extends ShardSpec> getShardSpecClass()
{

View File

@ -54,7 +54,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum, // partitionId, hash bucketId
@JsonProperty("partitionNum") int partitionNum, // partitionId
@JsonProperty("partitions") int partitions, // core partition set size
@JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility
@JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility
@ -90,12 +90,6 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
return partitionDimensions;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == HashBasedNumberedShardSpec.class;
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{

View File

@ -21,8 +21,6 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
public class LinearPartialShardSpec implements PartialShardSpec
{
private static final LinearPartialShardSpec INSTANCE = new LinearPartialShardSpec();
@ -37,16 +35,9 @@ public class LinearPartialShardSpec implements PartialShardSpec
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
return new LinearShardSpec(
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1
);
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// numCorePartitions is ignored
return new LinearShardSpec(partitionId);
}

View File

@ -73,12 +73,6 @@ public final class LinearShardSpec implements ShardSpec
return true;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == LinearShardSpec.class;
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{

View File

@ -89,12 +89,6 @@ public class NoneShardSpec implements ShardSpec
return true;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NoneShardSpec.class;
}
@Override
public boolean equals(Object obj)
{

View File

@ -22,8 +22,7 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
public class NumberedOverwritePartialShardSpec implements PartialShardSpec
{
@ -43,6 +42,12 @@ public class NumberedOverwritePartialShardSpec implements PartialShardSpec
this.minorVersion = minorVersion;
}
@VisibleForTesting
public NumberedOverwritePartialShardSpec(int startRootPartitionId, int endRootPartitionId, int minorVersion)
{
this(startRootPartitionId, endRootPartitionId, (short) minorVersion);
}
@JsonProperty
public int getStartRootPartitionId()
{
@ -62,29 +67,25 @@ public class NumberedOverwritePartialShardSpec implements PartialShardSpec
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// specOfPreviousMaxPartitionId is the max partitionId of the same shardSpec
// and could be null if all existing segments are first-generation segments.
return new NumberedOverwriteShardSpec(
specOfPreviousMaxPartitionId == null
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: specOfPreviousMaxPartitionId.getPartitionNum() + 1,
partitionId,
startRootPartitionId,
endRootPartitionId,
minorVersion
);
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
return new NumberedOverwriteShardSpec(partitionId, startRootPartitionId, endRootPartitionId, minorVersion);
}
@Override
public Class<? extends ShardSpec> getShardSpecClass()
{
return NumberedOverwriteShardSpec.class;
}
@Override
public boolean useNonRootGenerationPartitionSpace()
{
return true;
}
}

View File

@ -205,12 +205,6 @@ public class NumberedOverwriteShardSpec implements OverwriteShardSpec
return true;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NumberedOverwriteShardSpec.class || other == NumberedShardSpec.class;
}
@Override
public boolean equals(Object o)
{

View File

@ -21,8 +21,6 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
public class NumberedPartialShardSpec implements PartialShardSpec
{
private static final NumberedPartialShardSpec INSTANCE = new NumberedPartialShardSpec();
@ -37,27 +35,9 @@ public class NumberedPartialShardSpec implements PartialShardSpec
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
if (specOfPreviousMaxPartitionId == null) {
// The shardSpec is created by the Overlord.
// - For streaming ingestion tasks, the core partition set is always 0.
// - For batch tasks, this code is executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size.
return new NumberedShardSpec(0, 0);
} else {
final NumberedShardSpec prevSpec = (NumberedShardSpec) specOfPreviousMaxPartitionId;
return new NumberedShardSpec(prevSpec.getPartitionNum() + 1, prevSpec.getNumCorePartitions());
}
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
return new NumberedShardSpec(partitionId, 0);
return new NumberedShardSpec(partitionId, numCorePartitions);
}
@Override

View File

@ -93,12 +93,6 @@ public class NumberedShardSpec implements ShardSpec
return true;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == NumberedShardSpec.class || other == NumberedOverwriteShardSpec.class;
}
@Override
@JsonProperty("partitions")
public int getNumCorePartitions()

View File

@ -45,4 +45,16 @@ public interface OverwriteShardSpec extends ShardSpec
}
OverwriteShardSpec withAtomicUpdateGroupSize(short atomicUpdateGroupSize);
/**
* Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space.
* This shardSpec uses non-root-generation partition space and thus does not share the space with other shardSpecs.
*
* @see PartitionIds
*/
@Override
default boolean sharePartitionSpace(PartialShardSpec partialShardSpec)
{
return partialShardSpec.useNonRootGenerationPartitionSpace();
}
}

View File

@ -25,8 +25,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Nullable;
/**
* This interface is used in the segment allocation protocol when it is coordinated by the Overlord; when appending
* segments to an existing datasource (either streaming ingestion or batch append) or any case when segment
@ -45,21 +43,31 @@ import javax.annotation.Nullable;
public interface PartialShardSpec
{
/**
* Creates a new ShardSpec based on {@code specOfPreviousMaxPartitionId}. If it's null, it assumes that this is the
* first call for the time chunk where the new segment is created.
* Note that {@code specOfPreviousMaxPartitionId} can also be null for {@link OverwriteShardSpec} if all segments
* in the timeChunk are first-generation segments.
* Creates a new ShardSpec with given partitionId and numCorePartitions.
*
* @param objectMapper jsonMapper used only for {@link HashBasedNumberedShardSpec}
* @param partitionId partitionId of the shardSpec. must be carefully chosen to be unique in a time chunk
* @param numCorePartitions the core partition set size. Should be set properly to determine if this segment belongs
* to the core partitions.
*/
ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId);
/**
* Creates a new shardSpec having the given partitionId.
*/
ShardSpec complete(ObjectMapper objectMapper, int partitionId);
ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions);
/**
* Returns the class of the shardSpec created by this factory.
*/
@JsonIgnore
Class<? extends ShardSpec> getShardSpecClass();
/**
* Returns true if this partialShardSpec needs a partitionId of a non-root generation.
* Any partialShardSpec to overwrite a subset of segments in a time chunk such as
* {@link NumberedOverwritePartialShardSpec} should return true.
*
*
* @see PartitionIds
*/
default boolean useNonRootGenerationPartitionSpace()
{
return false;
}
}

View File

@ -125,8 +125,14 @@ public interface ShardSpec
boolean possibleInDomain(Map<String, RangeSet<String>> domain);
/**
* Returns true if two segments of this and other shardSpecs can exist in the same time chunk.
* Returns true if this shardSpec and the given {@link PartialShardSpec} share the same partition space.
* All shardSpecs except {@link OverwriteShardSpec} use the root-generation partition space and thus share the same
* space.
*
* @see PartitionIds
*/
@JsonIgnore
boolean isCompatible(Class<? extends ShardSpec> other);
default boolean sharePartitionSpace(PartialShardSpec partialShardSpec)
{
return !partialShardSpec.useNonRootGenerationPartitionSpace();
}
}

View File

@ -85,34 +85,9 @@ public class SingleDimensionPartialShardSpec implements PartialShardSpec
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
// The shardSpec is created by the Overlord.
// For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
// In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
// the same datasource. Since there is no restriction for those tasks in segment allocation, the
// allocated IDs for each task can interleave. As a result, the core partition set cannot be
// represented as a range. We always set 0 for the core partition set size if this is an initial segment.
return new SingleDimensionShardSpec(
partitionDimension,
start,
end,
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions()
);
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
// TODO: bucketId and numBuckets should be added to SingleDimensionShardSpec in a follow-up PR.
return new SingleDimensionShardSpec(
partitionDimension,
start,
end,
partitionId,
0
);
return new SingleDimensionShardSpec(partitionDimension, start, end, partitionId, numCorePartitions);
}
@Override

View File

@ -71,17 +71,6 @@ public class SingleDimensionShardSpec implements ShardSpec
this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
}
public SingleDimensionShardSpec withNumCorePartitions(int numCorePartitions)
{
return new SingleDimensionShardSpec(
dimension,
start,
end,
partitionNum,
numCorePartitions
);
}
@JsonProperty("dimension")
public String getDimension()
{
@ -165,12 +154,6 @@ public class SingleDimensionShardSpec implements ShardSpec
return !rangeSet.subRangeSet(getRange()).isEmpty();
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return other == SingleDimensionShardSpec.class;
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{

View File

@ -19,8 +19,14 @@
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.apache.commons.io.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -28,6 +34,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
/**
*/
@ -57,4 +64,51 @@ public class SegmentUtilsTest
{
SegmentUtils.getVersionFromDir(tempFolder.newFolder());
}
@Test
public void testGroupSegmentsByInterval()
{
final List<DataSegment> segments = ImmutableList.of(
newSegment(Intervals.of("2020-01-01/P1D"), 0),
newSegment(Intervals.of("2020-01-02/P1D"), 0),
newSegment(Intervals.of("2020-01-01/P1D"), 1),
newSegment(Intervals.of("2020-01-03/P1D"), 0),
newSegment(Intervals.of("2020-01-02/P1D"), 1),
newSegment(Intervals.of("2020-01-02/P1D"), 2)
);
Assert.assertEquals(
ImmutableMap.of(
Intervals.of("2020-01-01/P1D"),
ImmutableList.of(
newSegment(Intervals.of("2020-01-01/P1D"), 0),
newSegment(Intervals.of("2020-01-01/P1D"), 1)
),
Intervals.of("2020-01-02/P1D"),
ImmutableList.of(
newSegment(Intervals.of("2020-01-02/P1D"), 0),
newSegment(Intervals.of("2020-01-02/P1D"), 1),
newSegment(Intervals.of("2020-01-02/P1D"), 2)
),
Intervals.of("2020-01-03/P1D"),
ImmutableList.of(newSegment(Intervals.of("2020-01-03/P1D"), 0))
),
SegmentUtils.groupSegmentsByInterval(segments)
);
}
private static DataSegment newSegment(Interval interval, int partitionId)
{
return new DataSegment(
"datasource",
interval,
"version",
null,
ImmutableList.of("dim"),
ImmutableList.of("met"),
new NumberedShardSpec(partitionId, 0),
null,
9,
10L
);
}
}

View File

@ -97,12 +97,6 @@ public class DataSegmentTest
{
return true;
}
@Override
public boolean isCompatible(Class<? extends ShardSpec> other)
{
return false;
}
};
}

View File

@ -31,7 +31,7 @@ import org.junit.Test;
public class BuildingHashBasedNumberedShardSpecTest
{
private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
@Test
public void testConvert()
{

View File

@ -30,7 +30,7 @@ import java.util.Map;
public class HashBasedNumberedPartialShardSpecTest
{
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectMapper MAPPER = ShardSpecTestUtils.initObjectMapper();
@Test
public void testEquals()
@ -73,5 +73,21 @@ public class HashBasedNumberedPartialShardSpecTest
Assert.assertEquals(expected.getPartitionDimensions(), map.get("partitionDimensions"));
Assert.assertEquals(expected.getBucketId(), map.get("bucketId"));
Assert.assertEquals(expected.getNumBuckets(), map.get("numPartitions"));
Assert.assertEquals(expected.getBucketId(), map.get("bucketId"));
}
@Test
public void testComplete()
{
final HashBasedNumberedPartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim"),
2,
4
);
final ShardSpec shardSpec = partialShardSpec.complete(MAPPER, 1, 3);
Assert.assertEquals(
new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), MAPPER),
shardSpec
);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import org.junit.Assert;
import org.junit.Test;
public class NumberedOverwritePartialShardSpecTest
{
@Test
public void testUseNonRootGenerationPartitionSpace()
{
final NumberedOverwritePartialShardSpec partialShardSpec = new NumberedOverwritePartialShardSpec(0, 1, (short) 1);
Assert.assertTrue(partialShardSpec.useNonRootGenerationPartitionSpace());
}
@Test
public void tetsGetShardSpecClass()
{
final NumberedOverwritePartialShardSpec partialShardSpec = new NumberedOverwritePartialShardSpec(0, 1, (short) 1);
Assert.assertSame(NumberedOverwriteShardSpec.class, partialShardSpec.getShardSpecClass());
}
}

View File

@ -50,4 +50,20 @@ public class NumberedOverwriteShardSpecTest
final NumberedOverwriteShardSpec fromJson = (NumberedOverwriteShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testSharePartitionSpace()
{
final NumberedOverwriteShardSpec shardSpec = new NumberedOverwriteShardSpec(
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID,
0,
3,
(short) 1,
(short) 1
);
Assert.assertFalse(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class NumberedPartialShardSpecTest
{
@Test
public void testSerde() throws IOException
{
final NumberedPartialShardSpec expected = NumberedPartialShardSpec.instance();
final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
final byte[] json = mapper.writeValueAsBytes(expected);
final PartialShardSpec fromJson = mapper.readValue(json, PartialShardSpec.class);
Assert.assertSame(NumberedPartialShardSpec.class, fromJson.getClass());
}
@Test
public void testComplete()
{
final NumberedPartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
final ShardSpec shardSpec = partialShardSpec.complete(new ObjectMapper(), 1, 3);
Assert.assertEquals(new NumberedShardSpec(1, 3), shardSpec);
}
}

View File

@ -38,8 +38,8 @@ public class PartitionHolderCompletenessTest
return ImmutableList.of(
new Object[]{
ImmutableList.of(
new NumberedShardSpec(0, 3),
new NumberedShardSpec(1, 3),
new NumberedShardSpec(0, 3),
new NumberedShardSpec(2, 3)
),
NumberedShardSpec.class.getSimpleName()
@ -47,9 +47,9 @@ public class PartitionHolderCompletenessTest
new Object[]{
// Simulate empty hash buckets
ImmutableList.of(
new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(0, 3, 0, 5, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper())
new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper())
),
HashBasedNumberedShardSpec.class.getSimpleName()
},
@ -57,8 +57,8 @@ public class PartitionHolderCompletenessTest
// Simulate empty range buckets
ImmutableList.of(
new SingleDimensionShardSpec("dim", null, "aaa", 0, 3),
new SingleDimensionShardSpec("dim", "bbb", "fff", 1, 3),
new SingleDimensionShardSpec("dim", "ttt", "zzz", 2, 3)
new SingleDimensionShardSpec("dim", "ttt", "zzz", 2, 3),
new SingleDimensionShardSpec("dim", "bbb", "fff", 1, 3)
),
StringUtils.format(
"%s with empty buckets",
@ -68,9 +68,9 @@ public class PartitionHolderCompletenessTest
new Object[]{
// Simulate old format segments with missing numCorePartitions
ImmutableList.of(
new SingleDimensionShardSpec("dim", null, "bbb", 0, null),
new SingleDimensionShardSpec("dim", "bbb", "fff", 1, null),
new SingleDimensionShardSpec("dim", "fff", null, 2, null)
new SingleDimensionShardSpec("dim", "fff", null, 2, null),
new SingleDimensionShardSpec("dim", null, "bbb", 0, null)
),
StringUtils.format(
"%s with missing numCorePartitions",

View File

@ -47,7 +47,7 @@ public class SingleDimensionPartialShardSpecTest
"end",
10
);
final ObjectMapper mapper = new ObjectMapper();
final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
final byte[] json = mapper.writeValueAsBytes(expected);
final SingleDimensionPartialShardSpec fromJson = (SingleDimensionPartialShardSpec) mapper.readValue(
json,
@ -55,4 +55,18 @@ public class SingleDimensionPartialShardSpecTest
);
Assert.assertEquals(expected, fromJson);
}
@Test
public void testComplete()
{
final SingleDimensionPartialShardSpec partialShardSpec = new SingleDimensionPartialShardSpec(
"dim",
2,
"end2",
null,
3
);
final ShardSpec shardSpec = partialShardSpec.complete(new ObjectMapper(), 1, 2);
Assert.assertEquals(new SingleDimensionShardSpec("dim", "end2", null, 1, 2), shardSpec);
}
}

View File

@ -189,7 +189,7 @@ that range if there's some stray data with unexpected timestamps.
|--------|-----------|-------|---------|
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the `dynamic` partitionsSpec.|false|no|
### `tuningConfig`
@ -693,7 +693,7 @@ that range if there's some stray data with unexpected timestamps.
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs.|false|no|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. The current limitation is that you can append to any datasources regardless of their original partitioning scheme, but the appended segments should be partitioned using the `dynamic` partitionsSpec.|false|no|
### `tuningConfig`

View File

@ -59,6 +59,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -226,8 +227,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
}
boolean determineLockGranularityandTryLockWithSegments(TaskActionClient client, List<DataSegment> segments)
throws IOException
boolean determineLockGranularityandTryLockWithSegments(
TaskActionClient client,
List<DataSegment> segments,
BiConsumer<LockGranularity, List<DataSegment>> segmentCheckFunction
) throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
@ -236,6 +240,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
taskLockHelper = new TaskLockHelper(false);
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
return tryTimeChunkLock(
client,
new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()))
@ -243,6 +248,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
} else {
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT);
segmentCheckFunction.accept(result.lockGranularity, segments);
return tryLockWithDetermineResult(client, result);
}
}
@ -363,7 +369,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
*/
public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkState(
Preconditions.checkArgument(
!tuningConfig.isForceGuaranteedRollup() || !ioConfig.isAppendToExisting(),
"Perfect rollup cannot be guaranteed when appending to existing dataSources"
);

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -46,7 +47,8 @@ public interface CompactionInputSpec
* Validate the specified input against the most recent published segments.
* This method is used to check whether the specified input has gone stale.
*
* @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
* @param lockGranularityInUse {@link LockGranularity} in use
* @param latestSegments most recent published segments in the interval returned by {@link #findInterval}
*/
boolean validateSegments(List<DataSegment> latestSegments);
boolean validateSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
@ -76,7 +77,7 @@ public class CompactionIntervalSpec implements CompactionInputSpec
}
@Override
public boolean validateSegments(List<DataSegment> latestSegments)
public boolean validateSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments)
{
final Interval segmentsInterval = JodaUtils.umbrellaInterval(
latestSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList())

View File

@ -47,6 +47,7 @@ import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
@ -342,8 +343,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final List<DataSegment> segments = segmentProvider.checkAndGetSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments);
final List<DataSegment> segments = segmentProvider.findSegments(taskActionClient);
return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments);
}
@Override
@ -372,6 +373,7 @@ public class CompactionTask extends AbstractBatchIndexTask
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
@ -480,6 +482,7 @@ public class CompactionTask extends AbstractBatchIndexTask
@VisibleForTesting
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec,
@ -493,7 +496,8 @@ public class CompactionTask extends AbstractBatchIndexTask
{
Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
toolbox,
segmentProvider
segmentProvider,
lockGranularityInUse
);
final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
@ -631,10 +635,12 @@ public class CompactionTask extends AbstractBatchIndexTask
private static Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
TaskToolbox toolbox,
SegmentProvider segmentProvider
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
) throws IOException, SegmentLoadingException
{
final List<DataSegment> usedSegments = segmentProvider.checkAndGetSegments(toolbox.getTaskActionClient());
final List<DataSegment> usedSegments = segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
@ -852,19 +858,21 @@ public class CompactionTask extends AbstractBatchIndexTask
this.interval = inputSpec.findInterval(dataSource);
}
List<DataSegment> checkAndGetSegments(TaskActionClient actionClient) throws IOException
List<DataSegment> findSegments(TaskActionClient actionClient) throws IOException
{
final List<DataSegment> latestSegments = new ArrayList<>(
return new ArrayList<>(
actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, interval, null, Segments.ONLY_VISIBLE))
);
}
if (!inputSpec.validateSegments(latestSegments)) {
void checkSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments)
{
if (!inputSpec.validateSegments(lockGranularityInUse, latestSegments)) {
throw new ISE(
"Specified segments in the spec are different from the current used segments. "
+ "Possibly new segments would have been added or some segments have been unpublished."
);
}
return latestSegments;
}
}

View File

@ -1196,7 +1196,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
@Nullable
private static PartitionsSpec getDefaultPartitionsSpec(
private static PartitionsSpec getPartitionsSpec(
boolean forceGuaranteedRollup,
@Nullable PartitionsSpec partitionsSpec,
@Nullable Integer maxRowsPerSegment,
@ -1224,11 +1224,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
} else {
if (forceGuaranteedRollup) {
if (!partitionsSpec.isForceGuaranteedRollupCompatibleType()) {
throw new ISE(partitionsSpec.getClass().getSimpleName() + " cannot be used for perfect rollup");
throw new IAE(partitionsSpec.getClass().getSimpleName() + " cannot be used for perfect rollup");
}
} else {
if (!(partitionsSpec instanceof DynamicPartitionsSpec)) {
throw new ISE("DynamicPartitionsSpec must be used for best-effort rollup");
throw new IAE("DynamicPartitionsSpec must be used for best-effort rollup");
}
}
return partitionsSpec;
@ -1263,7 +1263,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
this(
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
maxBytesInMemory != null ? maxBytesInMemory : 0,
getDefaultPartitionsSpec(
getPartitionsSpec(
forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup,
partitionsSpec,
maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment,

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -73,14 +74,18 @@ public class SpecificSegmentsSpec implements CompactionInputSpec
}
@Override
public boolean validateSegments(List<DataSegment> latestSegments)
public boolean validateSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments)
{
final List<String> thoseSegments = latestSegments
.stream()
.map(segment -> segment.getId().toString())
.sorted()
.collect(Collectors.toList());
return this.segments.equals(thoseSegments);
if (lockGranularityInUse == LockGranularity.TIME_CHUNK) {
return this.segments.equals(thoseSegments);
} else {
return thoseSegments.containsAll(segments);
}
}
@Override

View File

@ -182,7 +182,7 @@ public class TaskLockHelper
private boolean tryLockSegments(TaskActionClient actionClient, List<DataSegment> segments) throws IOException
{
final Map<Interval, List<DataSegment>> intervalToSegments = groupSegmentsByInterval(segments);
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
final Closer lockCloserOnError = Closer.create();
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
@ -304,13 +304,4 @@ public class TaskLockHelper
throw new ISE("All atomicUpdateGroup must be compacted together");
}
}
private static Map<Interval, List<DataSegment>> groupSegmentsByInterval(List<DataSegment> segments)
{
final Map<Interval, List<DataSegment>> map = new HashMap<>();
for (DataSegment segment : segments) {
map.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment);
}
return map;
}
}

View File

@ -195,7 +195,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
this.ingestionSchema = ingestionSchema;
if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
if (ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
@ -429,7 +429,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (isParallelMode()) {
this.toolbox = toolbox;
if (getIngestionSchema().getTuningConfig().isForceGuaranteedRollup()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
return runMultiPhaseParallel(toolbox);
} else {
return runSinglePhaseParallel(toolbox);

View File

@ -47,7 +47,6 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
@ -869,37 +868,6 @@ public class SegmentAllocateActionTest
Assert.assertNull(id1);
}
@Test
public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0, 2))
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1, 2))
.size(0)
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdWithShardSpec id1 = allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testWithPartialShardSpecAndOvershadowingSegments() throws IOException
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
@ -101,7 +102,9 @@ public class CompactionInputSpecTest
@Test
public void testValidateSegments()
{
Assert.assertTrue(inputSpec.validateSegments(SEGMENTS));
Assert.assertTrue(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, SEGMENTS));
Assert.assertTrue(inputSpec.validateSegments(LockGranularity.SEGMENT, SEGMENTS));
Assert.assertFalse(inputSpec.validateSegments(LockGranularity.SEGMENT, SEGMENTS.subList(0, SEGMENTS.size() - 1)));
}
@Test
@ -109,10 +112,10 @@ public class CompactionInputSpecTest
{
final List<DataSegment> someSegmentIsMissing = new ArrayList<>(SEGMENTS);
someSegmentIsMissing.remove(0);
Assert.assertFalse(inputSpec.validateSegments(someSegmentIsMissing));
Assert.assertFalse(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, someSegmentIsMissing));
final List<DataSegment> someSegmentIsUnknown = new ArrayList<>(SEGMENTS);
someSegmentIsUnknown.add(newSegment(Intervals.of("2018-01-01/2018-01-02")));
Assert.assertFalse(inputSpec.validateSegments(someSegmentIsUnknown));
Assert.assertFalse(inputSpec.validateSegments(LockGranularity.TIME_CHUNK, someSegmentIsUnknown));
}
}

View File

@ -20,27 +20,47 @@
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -48,12 +68,17 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@RunWith(Parameterized.class)
@ -76,6 +101,8 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
private final LockGranularity lockGranularity;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private File inputDir;
public CompactionTaskParallelRunTest(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
@ -83,29 +110,33 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
}
@Before
public void setup()
public void setup() throws IOException
{
getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class);
inputDir = temporaryFolder.newFolder();
final File tmpFile = File.createTempFile("druid", "index", inputDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
}
@Test
public void testRunParallel() throws Exception
public void testRunParallel()
{
runIndexTask();
runIndexTask(null, true);
final CompactionTask compactionTask = new CompactionTask(
null,
null,
final Builder builder = new Builder(
DATA_SOURCE,
null,
null,
new CompactionIOConfig(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)),
null,
null,
null,
null,
newTuningConfig(),
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
@ -116,14 +147,122 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
RETRY_POLICY_FACTORY,
appenderatorsManager
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
runTask(compactionTask);
}
@Test
public void testDruidInputSourceCreateSplitsWithIndividualSplits() throws Exception
public void testCompactHashAndDynamicPartitionedSegments()
{
runIndexTask();
runIndexTask(new HashedPartitionsSpec(null, 2, null), false);
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
null,
null,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY,
appenderatorsManager
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask)
);
Assert.assertEquals(3, intervalToSegments.size());
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2014-01-01T00/PT1H"),
Intervals.of("2014-01-01T01/PT1H"),
Intervals.of("2014-01-01T02/PT1H")
),
intervalToSegments.keySet()
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segmentsInInterval = entry.getValue();
Assert.assertEquals(1, segmentsInInterval.size());
final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec();
if (lockGranularity == LockGranularity.TIME_CHUNK) {
Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec;
Assert.assertEquals(0, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions());
} else {
Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass());
final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec;
Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize());
}
}
}
@Test
public void testCompactRangeAndDynamicPartitionedSegments()
{
runIndexTask(new SingleDimensionPartitionsSpec(2, null, "dim", false), false);
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
null,
null,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY,
appenderatorsManager
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.build();
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask)
);
Assert.assertEquals(3, intervalToSegments.size());
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2014-01-01T00/PT1H"),
Intervals.of("2014-01-01T01/PT1H"),
Intervals.of("2014-01-01T02/PT1H")
),
intervalToSegments.keySet()
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segmentsInInterval = entry.getValue();
Assert.assertEquals(1, segmentsInInterval.size());
final ShardSpec shardSpec = segmentsInInterval.get(0).getShardSpec();
if (lockGranularity == LockGranularity.TIME_CHUNK) {
Assert.assertSame(NumberedShardSpec.class, shardSpec.getClass());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) shardSpec;
Assert.assertEquals(0, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getNumCorePartitions());
} else {
Assert.assertSame(NumberedOverwriteShardSpec.class, shardSpec.getClass());
final NumberedOverwriteShardSpec numberedShardSpec = (NumberedOverwriteShardSpec) shardSpec;
Assert.assertEquals(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, numberedShardSpec.getPartitionNum());
Assert.assertEquals(1, numberedShardSpec.getAtomicUpdateGroupSize());
}
}
}
@Test
public void testDruidInputSourceCreateSplitsWithIndividualSplits()
{
runIndexTask(null, true);
List<InputSplit<List<WindowedSegmentId>>> splits = Lists.newArrayList(
DruidInputSource.createSplits(
@ -152,42 +291,45 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertEquals(segmentIdsFromCoordinator, segmentIdsFromSplits);
}
private void runIndexTask() throws Exception
private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting)
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
IndexTask indexTask = new IndexTask(
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
null,
IndexTaskTest.createIngestionSpec(
getObjectMapper(),
tmpDir,
CompactionTaskRunTest.DEFAULT_PARSE_SPEC,
new LocalInputSource(inputDir, "druid*"),
new CsvInputFormat(
Arrays.asList("ts", "dim", "val"),
"|",
null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
false,
0
),
appendToExisting
);
ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, 2, !appendToExisting);
ParallelIndexSupervisorTask indexTask = new ParallelIndexSupervisorTask(
null,
null,
null,
new ParallelIndexIngestionSpec(
new DataSchema(
DATA_SOURCE,
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))),
new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
ImmutableList.of(INTERVAL_TO_INDEX)
),
null
),
IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true),
false
ioConfig,
tuningConfig
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
new NoopChatHandlerProvider(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
rowIngestionMetersFactory,
appenderatorsManager
);
@ -195,41 +337,10 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
runTask(indexTask);
}
private void runTask(Task task)
private Set<DataSegment> runTask(Task task)
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
private static ParallelIndexTuningConfig newTuningConfig()
{
return new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
return getIndexingServiceClient().getPublishedSegments(task);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
@ -579,6 +580,7 @@ public class CompactionTaskTest
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -645,6 +647,7 @@ public class CompactionTaskTest
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -712,6 +715,7 @@ public class CompactionTaskTest
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -779,6 +783,7 @@ public class CompactionTaskTest
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -840,6 +845,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
customSpec,
@ -881,6 +887,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -915,6 +922,7 @@ public class CompactionTaskTest
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -955,6 +963,7 @@ public class CompactionTaskTest
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -978,6 +987,7 @@ public class CompactionTaskTest
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1019,6 +1029,7 @@ public class CompactionTaskTest
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1054,6 +1065,7 @@ public class CompactionTaskTest
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,

View File

@ -188,7 +188,7 @@ public class IndexTaskSerdeTest
@Test
public void testForceGuaranteedRollupWithDynamicPartitionsSpec()
{
expectedException.expect(IllegalStateException.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec cannot be used for perfect rollup");
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
@ -222,7 +222,7 @@ public class IndexTaskSerdeTest
@Test
public void testBestEffortRollupWithHashedPartitionsSpec()
{
expectedException.expect(IllegalStateException.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,

View File

@ -239,13 +239,13 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getNumCorePartitions());
Assert.assertEquals(2, segments.get(0).getShardSpec().getNumCorePartitions());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getNumCorePartitions());
Assert.assertEquals(2, segments.get(1).getShardSpec().getNumCorePartitions());
}
@Test

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
@ -29,10 +28,12 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.QueryPlus;
@ -69,6 +70,8 @@ import java.util.Set;
@SuppressWarnings("SameParameterValue")
abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
{
protected static final String DATASOURCE = "dataSource";
private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig().setLegacy(false),
@ -85,6 +88,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
{
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class);
}
boolean isUseInputFormatApi()
@ -100,10 +104,39 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
PartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
TaskState expectedTaskStatus
)
{
return runTestTask(
timestampSpec,
dimensionsSpec,
inputFormat,
parseSpec,
interval,
inputDir,
filter,
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskStatus,
false
);
}
Set<DataSegment> runTestTask(
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,
@Nullable ParseSpec parseSpec,
Interval interval,
File inputDir,
String filter,
PartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
TaskState expectedTaskStatus,
boolean appendToExisting
)
{
final ParallelIndexSupervisorTask task = newTask(
timestampSpec,
@ -114,13 +147,14 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
inputDir,
filter,
partitionsSpec,
maxNumConcurrentSubTasks
maxNumConcurrentSubTasks,
appendToExisting
);
return runTask(task, expectedTaskStatus);
}
Set<DataSegment> runTask(ParallelIndexSupervisorTask task, TaskState expectedTaskStatus)
Set<DataSegment> runTask(Task task, TaskState expectedTaskStatus)
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task);
@ -128,41 +162,6 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
return getIndexingServiceClient().getPublishedSegments(task);
}
ParallelIndexTuningConfig newTuningConfig(
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
)
{
return new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file.
partitionsSpec,
null,
null,
null,
true,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
private ParallelIndexSupervisorTask newTask(
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@ -171,8 +170,9 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
PartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
boolean appendToExisting
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
@ -181,7 +181,11 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
interval == null ? null : Collections.singletonList(interval)
);
ParallelIndexTuningConfig tuningConfig = newTuningConfig(partitionsSpec, maxNumConcurrentSubTasks);
ParallelIndexTuningConfig tuningConfig = newTuningConfig(
partitionsSpec,
maxNumConcurrentSubTasks,
!appendToExisting
);
final ParallelIndexIngestionSpec ingestionSpec;
@ -191,11 +195,11 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
null,
new LocalInputSource(inputDir, filter),
inputFormat,
false
appendToExisting
);
ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
DATASOURCE,
timestampSpec,
dimensionsSpec,
new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
@ -209,7 +213,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
Preconditions.checkArgument(inputFormat == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, filter, null),
false
appendToExisting
);
//noinspection unchecked
ingestionSpec = new ParallelIndexIngestionSpec(

View File

@ -37,6 +37,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -46,6 +47,7 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
@ -139,34 +141,35 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
false,
0
);
static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING =
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
@ -223,6 +226,42 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
temporaryFolder.delete();
}
protected ParallelIndexTuningConfig newTuningConfig(
PartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
boolean forceGuaranteedRollup
)
{
return new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file.
partitionsSpec,
null,
null,
null,
forceGuaranteedRollup,
null,
null,
null,
null,
maxNumConcurrentSubTasks,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
}
protected LocalIndexingServiceClient getIndexingServiceClient()
{
return indexingServiceClient;

View File

@ -27,13 +27,16 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@ -49,9 +52,12 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
@ -128,9 +134,78 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
@Test
public void testRun() throws Exception
{
final Set<DataSegment> publishedSegments;
final Set<DataSegment> publishedSegments = runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
TaskState.SUCCESS,
false
);
assertHashedPartition(publishedSegments);
}
@Test
public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend()
{
final Set<DataSegment> publishedSegments = new HashSet<>();
publishedSegments.addAll(
runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
TaskState.SUCCESS,
false
)
);
// Append
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(5, null),
TaskState.SUCCESS,
true
)
);
// And append again
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segments = entry.getValue();
final List<DataSegment> hashedSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == HashBasedNumberedShardSpec.class)
.collect(Collectors.toList());
final List<DataSegment> linearSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class)
.collect(Collectors.toList());
for (DataSegment hashedSegment : hashedSegments) {
final HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec) hashedSegment.getShardSpec();
for (DataSegment linearSegment : linearSegments) {
Assert.assertEquals(hashedSegment.getInterval(), linearSegment.getInterval());
Assert.assertEquals(hashedSegment.getVersion(), linearSegment.getVersion());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) linearSegment.getShardSpec();
Assert.assertEquals(hashShardSpec.getNumCorePartitions(), numberedShardSpec.getNumCorePartitions());
Assert.assertTrue(hashShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum());
}
}
}
}
private Set<DataSegment> runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
if (isUseInputFormatApi()) {
publishedSegments = runTestTask(
return runTestTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
@ -138,12 +213,13 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
INTERVAL_TO_INDEX,
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
partitionsSpec,
maxNumConcurrentSubTasks,
TaskState.SUCCESS
expectedTaskState,
appendToExisting
);
} else {
publishedSegments = runTestTask(
return runTestTask(
null,
null,
null,
@ -151,12 +227,12 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
INTERVAL_TO_INDEX,
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
partitionsSpec,
maxNumConcurrentSubTasks,
TaskState.SUCCESS
expectedTaskState,
appendToExisting
);
}
assertHashedPartition(publishedSegments);
}
private void assertHashedPartition(Set<DataSegment> publishedSegments) throws IOException

View File

@ -21,14 +21,28 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Ordering;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -146,4 +160,82 @@ public class ParallelIndexSupervisorTaskTest
Assert.assertEquals(expectedIds, actualIds);
}
}
public static class ConstructorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupAreSet()
{
final boolean appendToExisting = true;
final boolean forceGuaranteedRollup = true;
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new InlineInputSource("test"),
new JsonInputFormat(null, null, null),
appendToExisting
);
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
10,
1000L,
null,
null,
null,
new HashedPartitionsSpec(null, 10, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.UNCOMPRESSED,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
forceGuaranteedRollup,
true,
10000L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
null,
10,
100,
20L,
new Duration(3600),
128,
null,
null,
false,
null,
null
);
final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"datasource",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
null,
null,
null
),
ioConfig,
tuningConfig
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Perfect rollup cannot be guaranteed when appending to existing dataSources");
new ParallelIndexSupervisorTask(
null,
null,
null,
indexIngestionSpec,
null,
null,
null,
null,
null,
null
);
}
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
@ -226,4 +228,127 @@ public class ParallelIndexTuningConfigTest
null
);
}
@Test
public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFailToCreate()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final boolean forceGuaranteedRollup = false;
new ParallelIndexTuningConfig(
null,
null,
10,
1000L,
null,
null,
null,
new HashedPartitionsSpec(null, 10, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.UNCOMPRESSED,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
forceGuaranteedRollup,
true,
10000L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
null,
10,
100,
20L,
new Duration(3600),
128,
null,
null,
false,
null,
null
);
}
@Test
public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuaranteedRollupFailToCreate()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup");
final boolean forceGuaranteedRollup = false;
new ParallelIndexTuningConfig(
null,
null,
10,
1000L,
null,
null,
null,
new SingleDimensionPartitionsSpec(100, null, "dim", false),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.UNCOMPRESSED,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
forceGuaranteedRollup,
true,
10000L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
null,
10,
100,
20L,
new Duration(3600),
128,
null,
null,
false,
null,
null
);
}
@Test
public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFailToCreate()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("cannot be used for perfect rollup");
final boolean forceGuaranteedRollup = true;
new ParallelIndexTuningConfig(
null,
null,
10,
1000L,
null,
null,
null,
new DynamicPartitionsSpec(100, null),
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.UNCOMPRESSED,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
new IndexSpec(),
1,
forceGuaranteedRollup,
true,
10000L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
null,
10,
100,
20L,
new Duration(3600),
128,
null,
null,
false,
null,
null
);
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.SpecificSegmentsSpec;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class PartialCompactionTest extends AbstractMultiPhaseParallelIndexingTest
{
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList("ts", "dim1", "dim2", "val"),
null,
false,
false,
0
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private File inputDir;
public PartialCompactionTest()
{
super(LockGranularity.SEGMENT, true);
}
@Before
public void setup() throws IOException
{
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 10; i++) {
try (final Writer writer =
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
for (int j = 0; j < 10; j++) {
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
}
}
}
}
@Test
public void testPartialCompactHashAndDynamicPartitionedSegments()
{
final Map<Interval, List<DataSegment>> hashPartitionedSegments = SegmentUtils.groupSegmentsByInterval(
runTestTask(
new HashedPartitionsSpec(null, 3, null),
TaskState.SUCCESS,
false
)
);
final Map<Interval, List<DataSegment>> linearlyPartitionedSegments = SegmentUtils.groupSegmentsByInterval(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
);
// Pick half of each partition lists to compact together
hashPartitionedSegments.values().forEach(
segmentsInInterval -> segmentsInInterval.sort(
Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum())
)
);
linearlyPartitionedSegments.values().forEach(
segmentsInInterval -> segmentsInInterval.sort(
Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum())
)
);
final List<DataSegment> segmentsToCompact = new ArrayList<>();
for (List<DataSegment> segmentsInInterval : hashPartitionedSegments.values()) {
segmentsToCompact.addAll(
segmentsInInterval.subList(segmentsInInterval.size() / 2, segmentsInInterval.size())
);
}
for (List<DataSegment> segmentsInInterval : linearlyPartitionedSegments.values()) {
segmentsToCompact.addAll(
segmentsInInterval.subList(0, segmentsInInterval.size() / 2)
);
}
final CompactionTask compactionTask = newCompactionTaskBuilder()
.inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
.tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false))
.build();
final Map<Interval, List<DataSegment>> compactedSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask, TaskState.SUCCESS)
);
for (List<DataSegment> segmentsInInterval : compactedSegments.values()) {
final int expectedAtomicUpdateGroupSize = segmentsInInterval.size();
for (DataSegment segment : segmentsInInterval) {
Assert.assertEquals(expectedAtomicUpdateGroupSize, segment.getShardSpec().getAtomicUpdateGroupSize());
}
}
}
@Test
public void testPartialCompactRangeAndDynamicPartitionedSegments()
{
final Map<Interval, List<DataSegment>> rangePartitionedSegments = SegmentUtils.groupSegmentsByInterval(
runTestTask(
new SingleDimensionPartitionsSpec(10, null, "dim1", false),
TaskState.SUCCESS,
false
)
);
final Map<Interval, List<DataSegment>> linearlyPartitionedSegments = SegmentUtils.groupSegmentsByInterval(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
);
// Pick half of each partition lists to compact together
rangePartitionedSegments.values().forEach(
segmentsInInterval -> segmentsInInterval.sort(
Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum())
)
);
linearlyPartitionedSegments.values().forEach(
segmentsInInterval -> segmentsInInterval.sort(
Comparator.comparing(segment -> segment.getShardSpec().getPartitionNum())
)
);
final List<DataSegment> segmentsToCompact = new ArrayList<>();
for (List<DataSegment> segmentsInInterval : rangePartitionedSegments.values()) {
segmentsToCompact.addAll(
segmentsInInterval.subList(segmentsInInterval.size() / 2, segmentsInInterval.size())
);
}
for (List<DataSegment> segmentsInInterval : linearlyPartitionedSegments.values()) {
segmentsToCompact.addAll(
segmentsInInterval.subList(0, segmentsInInterval.size() / 2)
);
}
final CompactionTask compactionTask = newCompactionTaskBuilder()
.inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
.tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, false))
.build();
final Map<Interval, List<DataSegment>> compactedSegments = SegmentUtils.groupSegmentsByInterval(
runTask(compactionTask, TaskState.SUCCESS)
);
for (List<DataSegment> segmentsInInterval : compactedSegments.values()) {
final int expectedAtomicUpdateGroupSize = segmentsInInterval.size();
for (DataSegment segment : segmentsInInterval) {
Assert.assertEquals(expectedAtomicUpdateGroupSize, segment.getShardSpec().getAtomicUpdateGroupSize());
}
}
}
private Set<DataSegment> runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
return runTestTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
null,
INTERVAL_TO_INDEX,
inputDir,
"test_*",
partitionsSpec,
2,
expectedTaskState,
appendToExisting
);
}
private Builder newCompactionTaskBuilder()
{
return new Builder(
DATASOURCE,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
new DropwizardRowIngestionMetersFactory(),
getIndexingServiceClient(),
getCoordinatorClient(),
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY,
new TestAppenderatorsManager()
);
}
}

View File

@ -32,12 +32,15 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.hamcrest.Matchers;
import org.joda.time.Interval;
@ -59,8 +62,12 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
@ -201,9 +208,95 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
public void createsCorrectRangePartitions() throws Exception
{
int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments;
final Set<DataSegment> publishedSegments = runTestTask(
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS,
false
);
if (!useMultivalueDim) {
assertRangePartitions(publishedSegments);
}
}
@Test
public void testAppendLinearlyPartitionedSegmentsToHashPartitionedDatasourceSuccessfullyAppend()
{
if (useMultivalueDim) {
return;
}
final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = new HashSet<>();
publishedSegments.addAll(
runTestTask(
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
TaskState.SUCCESS,
false
)
);
// Append
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(5, null),
TaskState.SUCCESS,
true
)
);
// And append again
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
publishedSegments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final List<DataSegment> segments = entry.getValue();
final List<DataSegment> rangedSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == SingleDimensionShardSpec.class)
.collect(Collectors.toList());
final List<DataSegment> linearSegments = segments
.stream()
.filter(segment -> segment.getShardSpec().getClass() == NumberedShardSpec.class)
.collect(Collectors.toList());
for (DataSegment rangedSegment : rangedSegments) {
final SingleDimensionShardSpec rangeShardSpec = (SingleDimensionShardSpec) rangedSegment.getShardSpec();
for (DataSegment linearSegment : linearSegments) {
Assert.assertEquals(rangedSegment.getInterval(), linearSegment.getInterval());
Assert.assertEquals(rangedSegment.getVersion(), linearSegment.getVersion());
final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) linearSegment.getShardSpec();
Assert.assertEquals(rangeShardSpec.getNumCorePartitions(), numberedShardSpec.getNumCorePartitions());
Assert.assertTrue(rangeShardSpec.getPartitionNum() < numberedShardSpec.getPartitionNum());
}
}
}
}
private Set<DataSegment> runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
if (isUseInputFormatApi()) {
publishedSegments = runTestTask(
return runTestTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
@ -211,17 +304,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
INTERVAL_TO_INDEX,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
partitionsSpec,
maxNumConcurrentSubTasks,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
expectedTaskState,
appendToExisting
);
} else {
publishedSegments = runTestTask(
return runTestTask(
null,
null,
null,
@ -229,20 +318,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
INTERVAL_TO_INDEX,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
partitionsSpec,
maxNumConcurrentSubTasks,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
expectedTaskState,
appendToExisting
);
}
if (!useMultivalueDim) {
assertRangePartitions(publishedSegments);
}
}
private void assertRangePartitions(Set<DataSegment> publishedSegments) throws IOException

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -59,10 +60,8 @@ import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -155,12 +154,23 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
}
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting)
private void runTestTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
// The task could run differently between when appendToExisting is false and true even when this is an initial write
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(interval, appendToExisting);
assertShardSpec(
task,
interval == null ? LockGranularity.TIME_CHUNK : lockGranularity,
appendToExisting,
originalSegmentsIfAppend
);
}
private void runOverwriteTask(
@ -172,18 +182,13 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpecAfterOverwrite(interval, actualLockGranularity);
}
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity)
{
runTestTask(interval, segmentGranularity, false);
assertShardSpecAfterOverwrite(task, actualLockGranularity);
}
private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity)
{
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY);
runTestTask(inputInterval, Granularities.DAY, false, Collections.emptyList());
final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Collection<DataSegment> allSegments = new HashSet<>(
@ -191,11 +196,15 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
runOverwriteTask(
inputInterval,
secondSegmentGranularity,
secondSegmentGranularity.equals(Granularities.DAY) ? lockGranularity : LockGranularity.TIME_CHUNK
);
final LockGranularity actualLockGranularity;
if (inputInterval == null) {
actualLockGranularity = LockGranularity.TIME_CHUNK;
} else {
actualLockGranularity = secondSegmentGranularity.equals(Granularities.DAY)
? lockGranularity
: LockGranularity.TIME_CHUNK;
}
runOverwriteTask(inputInterval, secondSegmentGranularity, actualLockGranularity);
// Verify that the segment has been replaced.
final Collection<DataSegment> newSegments =
@ -206,17 +215,17 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
private void assertShardSpec(@Nullable Interval interval, boolean appendToExisting)
private void assertShardSpec(
ParallelIndexSupervisorTask task,
LockGranularity actualLockGranularity,
boolean appendToExisting,
Collection<DataSegment> originalSegmentsIfAppend
)
{
final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval;
final Collection<DataSegment> segments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE);
if (!appendToExisting && lockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
if (!appendToExisting && actualLockGranularity == LockGranularity.TIME_CHUNK) {
// Initial write
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
for (DataSegment segment : segmentsPerInterval) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
@ -225,23 +234,27 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
}
} else {
// Append or initial write with segment lock
final Map<Interval, List<DataSegment>> intervalToOriginalSegments = SegmentUtils.groupSegmentsByInterval(
originalSegmentsIfAppend
);
for (DataSegment segment : segments) {
Assert.assertSame(NumberedShardSpec.class, segment.getShardSpec().getClass());
final NumberedShardSpec shardSpec = (NumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(0, shardSpec.getNumCorePartitions());
final List<DataSegment> originalSegmentsInInterval = intervalToOriginalSegments.get(segment.getInterval());
final int expectedNumCorePartitions =
originalSegmentsInInterval == null || originalSegmentsInInterval.isEmpty()
? 0
: originalSegmentsInInterval.get(0).getShardSpec().getNumCorePartitions();
Assert.assertEquals(expectedNumCorePartitions, shardSpec.getNumCorePartitions());
}
}
}
private void assertShardSpecAfterOverwrite(@Nullable Interval interval, LockGranularity actualLockGranularity)
private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask task, LockGranularity actualLockGranularity)
{
final Interval nonNullInterval = interval == null ? Intervals.ETERNITY : interval;
final Collection<DataSegment> segments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", nonNullInterval, Segments.ONLY_VISIBLE);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
segments.forEach(
segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>()).add(segment)
);
final Collection<DataSegment> segments = getIndexingServiceClient().getPublishedSegments(task);
final Map<Interval, List<DataSegment>> intervalToSegments = SegmentUtils.groupSegmentsByInterval(segments);
if (actualLockGranularity != LockGranularity.SEGMENT) {
// Check the core partition set in the shardSpec
for (List<DataSegment> segmentsPerInterval : intervalToSegments.values()) {
@ -296,7 +309,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
final ParallelIndexSupervisorTask task = newTask(interval, appendToExisting, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
assertShardSpec(interval, appendToExisting);
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
@ -349,18 +362,18 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
assertShardSpec(interval, appendToExisting);
assertShardSpec(task, lockGranularity, appendToExisting, Collections.emptyList());
}
@Test
public void testAppendToExisting()
{
final Interval interval = Intervals.of("2017-12/P1M");
runTestTask(interval, Granularities.DAY, true);
runTestTask(interval, Granularities.DAY, true, Collections.emptyList());
final Collection<DataSegment> oldSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(interval, Granularities.DAY, true);
runTestTask(interval, Granularities.DAY, true, oldSegments);
final Collection<DataSegment> newSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(newSegments.containsAll(oldSegments));
@ -369,6 +382,29 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Assert.assertEquals(new HashSet<>(newSegments), visibles);
}
@Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
testRunAndOverwrite(interval, Granularities.DAY);
final Collection<DataSegment> beforeAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
runTestTask(
interval,
Granularities.DAY,
true,
beforeAppendSegments
);
final Collection<DataSegment> afterAppendSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
Assert.assertTrue(afterAppendSegments.containsAll(beforeAppendSegments));
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline
.forSegments(afterAppendSegments);
final Set<DataSegment> visibles = timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE);
Assert.assertEquals(new HashSet<>(afterAppendSegments), visibles);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
boolean appendToExisting,

View File

@ -153,7 +153,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
dataSource,
interval,
maxVersion,
partialShardSpec.complete(objectMapper, 0)
partialShardSpec.complete(objectMapper, 0, 0)
);
}

View File

@ -51,7 +51,8 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.skife.jdbi.v2.Folder3;
@ -810,16 +811,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return null;
} else {
//noinspection ConstantConditions
if (FluentIterable
.from(existingChunks)
.transformAndConcat(TimelineObjectHolder::getObject)
.anyMatch(chunk -> !chunk.getObject().getShardSpec().isCompatible(partialShardSpec.getShardSpecClass()))) {
// All existing segments should have a compatible shardSpec with partialShardSpec.
return null;
}
// max partitionId of the SAME shardSpec
// max partitionId of the shardSpecs which share the same partition space.
SegmentIdWithShardSpec maxId = null;
if (!existingChunks.isEmpty()) {
@ -829,10 +821,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the same shardSpec to find out the max partitionId.
// Note that OverwriteShardSpec has the higher range for partitionId than others.
// Here we check only the segments of the shardSpec which shares the same partition space with the given
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().getClass() == partialShardSpec.getShardSpecClass())) {
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
// Don't use the stream API for performance.
if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
@ -851,7 +843,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
maxId = pendings.stream()
.filter(id -> id.getShardSpec().getClass() == partialShardSpec.getShardSpecClass())
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.max((id1, id2) -> {
final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
if (versionCompare != 0) {
@ -873,9 +865,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
if (maxId == null) {
final ShardSpec shardSpec = partialShardSpec.complete(jsonMapper, null);
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. When appending segments, null maxId means that we are allocating the very initial
// segment for this time chunk. Since the core partitions set is not determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks;
return new SegmentIdWithShardSpec(dataSource, interval, version, shardSpec);
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(maxVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
@ -885,13 +889,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
maxId
);
return null;
} else if (maxId.getShardSpec().getNumCorePartitions() == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
maxId,
maxId.getShardSpec()
);
return null;
} else {
final ShardSpec newShardSpec = partialShardSpec.complete(jsonMapper, maxId.getShardSpec());
return new SegmentIdWithShardSpec(
dataSource,
maxId.getInterval(),
Preconditions.checkNotNull(versionOfExistingChunks, "versionOfExistingChunks"),
newShardSpec
partialShardSpec.complete(
jsonMapper,
maxId.getShardSpec().getPartitionNum() + 1,
maxId.getShardSpec().getNumCorePartitions()
)
);
}
}

View File

@ -44,6 +44,7 @@ import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -60,6 +61,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -1113,4 +1115,49 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(0, shardSpec.getNumCorePartitions());
Assert.assertEquals(3, shardSpec.getNumBuckets());
}
@Test
public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCorePartitionSize() throws IOException
{
final String datasource = "datasource";
final Interval interval = Intervals.of("2020-01-01/P1D");
final String version = "version";
final List<String> dimensions = ImmutableList.of("dim");
final List<String> metrics = ImmutableList.of("met");
final Set<DataSegment> originalSegments = new HashSet<>();
for (int i = 0; i < 6; i++) {
final String start = i == 0 ? null : String.valueOf(i - 1);
final String end = i == 5 ? null : String.valueOf(i);
originalSegments.add(
new DataSegment(
datasource,
interval,
version,
ImmutableMap.of(),
dimensions,
metrics,
new SingleDimensionShardSpec(
"dim",
start,
end,
i,
null // emulate shardSpecs created in older versions of Druid
),
9,
10L
)
);
}
coordinator.announceHistoricalSegments(originalSegments);
final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
datasource,
"seq",
null,
interval,
NumberedPartialShardSpec.instance(),
version,
false
);
Assert.assertNull(id);
}
}

View File

@ -53,7 +53,7 @@ public class SegmentIdWithShardSpecTest
Assert.assertEquals(INTERVAL, id2.getInterval());
Assert.assertEquals(VERSION, id2.getVersion());
Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum());
Assert.assertEquals(SHARD_SPEC_1.getNumCorePartitions(), ((NumberedShardSpec) id2.getShardSpec()).getNumCorePartitions());
Assert.assertEquals(SHARD_SPEC_1.getNumCorePartitions(), id2.getShardSpec().getNumCorePartitions());
}
@Test

View File

@ -30,10 +30,13 @@ import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionPartialShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -60,7 +63,7 @@ public class NumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions());
Assert.assertEquals(2, spec.getNumCorePartitions());
}
@Test
@ -71,7 +74,7 @@ public class NumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) spec).getNumCorePartitions());
Assert.assertEquals(2, spec.getNumCorePartitions());
}
@Test
@ -195,6 +198,16 @@ public class NumberedShardSpecTest
);
}
@Test
public void testSharePartitionSpace()
{
final NumberedShardSpec shardSpec = new NumberedShardSpec(0, 1);
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}
private void testVersionedIntervalTimelineBehaviorForNumberedShardSpec(
List<PartitionChunk<OvershadowableString>> chunks,
Set<OvershadowableString> expectedObjects
@ -217,14 +230,6 @@ public class NumberedShardSpecTest
Assert.assertEquals(expectedObjects, actualObjects);
}
@Test
public void testCompatible()
{
final NumberedShardSpec spec = new NumberedShardSpec(0, 0);
Assert.assertTrue(spec.isCompatible(NumberedShardSpec.class));
Assert.assertTrue(spec.isCompatible(NumberedOverwriteShardSpec.class));
}
private static final class OvershadowableString implements Overshadowable<OvershadowableString>
{
private final int partitionId;

View File

@ -30,7 +30,11 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionPartialShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.junit.Assert;
import org.junit.Test;
@ -142,6 +146,16 @@ public class SingleDimensionShardSpecTest
Assert.assertTrue(shard7.possibleInDomain(domain2));
}
@Test
public void testSharePartitionSpace()
{
final SingleDimensionShardSpec shardSpec = makeSpec("start", "end");
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}
private static RangeSet<String> rangeSet(List<Range<String>> ranges)
{
ImmutableRangeSet.Builder<String> builder = ImmutableRangeSet.builder();

View File

@ -85,14 +85,14 @@ public class HashBasedNumberedShardSpecTest
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getNumCorePartitions());
Assert.assertEquals(2, spec.getNumCorePartitions());
final ShardSpec specWithPartitionDimensions = ServerTestHelper.MAPPER.readValue(
"{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1, \"partitionDimensions\":[\"visitor_id\"]}",
ShardSpec.class
);
Assert.assertEquals(1, specWithPartitionDimensions.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumCorePartitions());
Assert.assertEquals(2, specWithPartitionDimensions.getNumCorePartitions());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getNumBuckets());
Assert.assertEquals(
ImmutableList.of("visitor_id"),
@ -199,6 +199,23 @@ public class HashBasedNumberedShardSpecTest
);
}
@Test
public void testSharePartitionSpace()
{
final HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec(
1,
2,
1,
3,
ImmutableList.of("visitor_id"),
ServerTestHelper.MAPPER
);
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
{
for (ShardSpec spec : specs) {