Use targetRowsPerSegment for single-dim partitions (#8624)

When using single-dimension partitioning, use targetRowsPerSegment (if
specified) to size segments. Previously, single-dimension partitioning
would always size segments as close to the max size as possible.

Also, change single-dimension partitioning to allow partitions that have
a size equal to the target or max size. Previously, it would create
partitions up to 1 less than those limits.

Also, fix some IntelliJ inspection warnings in HadoopDruidIndexerConfig.
This commit is contained in:
Chi Cao Minh 2019-10-17 15:55:12 -07:00 committed by Clint Wylie
parent 2c758ef5ff
commit 8b2afa5c49
8 changed files with 286 additions and 193 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexer.partitions;
import javax.annotation.Nullable;
import java.util.List;
/**
@ -33,4 +34,7 @@ public interface DimensionBasedPartitionsSpec extends PartitionsSpec
String TARGET_PARTITION_SIZE = "targetPartitionSize";
List<String> getPartitionDimensions();
@Nullable
Integer getTargetRowsPerSegment();
}

View File

@ -56,9 +56,9 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
// Deprecated properties preserved for backward compatibility:
@Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
Integer targetPartitionSize,
Integer targetPartitionSize, // prefer targetRowsPerSegment
@Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable
Integer maxRowsPerSegment
Integer maxRowsPerSegment // prefer targetRowsPerSegment
)
{
Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment);
@ -114,6 +114,13 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
this(null, numShards, partitionDimensions, null, maxRowsPerSegment);
}
@Nullable
@Override
public Integer getTargetRowsPerSegment()
{
return null;
}
@Nullable
@Override
@JsonProperty

View File

@ -30,8 +30,7 @@ import javax.annotation.Nullable;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class),
// for backward compatibility
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility
@JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class),
@JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class)
})

View File

@ -129,6 +129,7 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
}
@JsonProperty
@Override
@Nullable
public Integer getTargetRowsPerSegment()
{

View File

@ -661,7 +661,7 @@ public class DeterminePartitionsJob implements Jobby
}
// See if we need to cut a new partition ending immediately before this dimension value
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
currentDimPartitionStart,

View File

@ -84,15 +84,15 @@ public class HadoopDruidIndexerConfig
{
private static final Injector INJECTOR;
public static final String CONFIG_PROPERTY = "druid.indexer.config";
public static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
public static final Splitter TAB_SPLITTER = Splitter.on("\t");
public static final Joiner TAB_JOINER = Joiner.on("\t");
static final String CONFIG_PROPERTY = "druid.indexer.config";
static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
static final Splitter TAB_SPLITTER = Splitter.on("\t");
static final Joiner TAB_JOINER = Joiner.on("\t");
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER_V9;
public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
static final IndexMerger INDEX_MERGER_V9;
static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
@ -133,7 +133,7 @@ public class HadoopDruidIndexerConfig
return new HadoopDruidIndexerConfig(spec);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
private static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
// Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without
// the Map<> intermediary
@ -226,14 +226,7 @@ public class HadoopDruidIndexerConfig
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), new Function<HadoopyShardSpec, ShardSpec>()
{
@Override
public ShardSpec apply(HadoopyShardSpec input)
{
return input.getActualSpec();
}
}
entry.getValue(), HadoopyShardSpec::getActualSpec
)
)
);
@ -298,7 +291,7 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getIndexSpecForIntermediatePersists();
}
public boolean isOverwriteFiles()
boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();
}
@ -313,24 +306,30 @@ public class HadoopDruidIndexerConfig
{
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
} else {
return Optional.absent();
}
}
public boolean isDeterminingPartitions()
boolean isDeterminingPartitions()
{
return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true);
}
public int getTargetPartitionSize()
{
final Integer targetPartitionSize = schema.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment();
DimensionBasedPartitionsSpec spec = schema.getTuningConfig().getPartitionsSpec();
if (spec.getTargetRowsPerSegment() != null) {
return spec.getTargetRowsPerSegment();
}
final Integer targetPartitionSize = spec.getMaxRowsPerSegment();
return targetPartitionSize == null ? -1 : targetPartitionSize;
}
public boolean isForceExtendableShardSpecs()
boolean isForceExtendableShardSpecs()
{
return schema.getTuningConfig().isForceExtendableShardSpecs();
}
@ -355,7 +354,7 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum);
}
public int getShardSpecCount(Bucket bucket)
int getShardSpecCount(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}
@ -370,18 +369,18 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getMaxParseExceptions();
}
public boolean isUseYarnRMJobStatusFallback()
boolean isUseYarnRMJobStatusFallback()
{
return schema.getTuningConfig().isUseYarnRMJobStatusFallback();
}
public void setHadoopJobIdFileName(String hadoopJobIdFileName)
void setHadoopJobIdFileName(String hadoopJobIdFileName)
{
this.hadoopJobIdFileName = hadoopJobIdFileName;
}
public String getHadoopJobIdFileName()
String getHadoopJobIdFileName()
{
return hadoopJobIdFileName;
}
@ -390,9 +389,6 @@ public class HadoopDruidIndexerConfig
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
* create objects which depend on the values of these configurations.
* @param job
* @return
* @throws IOException
*/
public Job addInputPaths(Job job) throws IOException
{
@ -410,7 +406,7 @@ public class HadoopDruidIndexerConfig
*
* @return the Bucket that this row belongs to
*/
public Optional<Bucket> getBucket(InputRow inputRow)
Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval(
DateTimes.utc(inputRow.getTimestampFromEpoch())
@ -436,13 +432,13 @@ public class HadoopDruidIndexerConfig
}
public Optional<Set<Interval>> getSegmentGranularIntervals()
Optional<Set<Interval>> getSegmentGranularIntervals()
{
return Optional.fromNullable(
(Set<Interval>) schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
);
}
@ -453,40 +449,35 @@ public class HadoopDruidIndexerConfig
.inputIntervals();
}
public Optional<Iterable<Bucket>> getAllBuckets()
Optional<Iterable<Bucket>> getAllBuckets()
{
Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
if (intervals.isPresent()) {
return Optional.of(
(Iterable<Bucket>) FunctionalIterable
FunctionalIterable
.create(intervals.get())
.transformCat(
new Function<Interval, Iterable<Bucket>>()
{
@Override
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}
return FunctionalIterable
.create(specs)
.transform(
new Function<HadoopyShardSpec, Bucket>()
{
int i = 0;
@Override
public Bucket apply(HadoopyShardSpec input)
{
return new Bucket(input.getShardNum(), bucketTime, i++);
}
}
);
input -> {
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}
return FunctionalIterable
.create(specs)
.transform(
new Function<HadoopyShardSpec, Bucket>()
{
int i = 0;
@Override
public Bucket apply(HadoopyShardSpec input)
{
return new Bucket(input.getShardNum(), bucketTime, i++);
}
}
);
}
)
);
@ -511,7 +502,7 @@ public class HadoopDruidIndexerConfig
* @return the intermediate path for this job run.
*/
public Path makeIntermediatePath()
Path makeIntermediatePath()
{
return new Path(
StringUtils.format(
@ -524,7 +515,7 @@ public class HadoopDruidIndexerConfig
);
}
public Path makeSegmentPartitionInfoPath(Interval bucketInterval)
Path makeSegmentPartitionInfoPath(Interval bucketInterval)
{
return new Path(
StringUtils.format(
@ -536,7 +527,7 @@ public class HadoopDruidIndexerConfig
);
}
public Path makeIntervalInfoPath()
Path makeIntervalInfoPath()
{
return new Path(
StringUtils.format(
@ -546,27 +537,27 @@ public class HadoopDruidIndexerConfig
);
}
public Path makeDescriptorInfoDir()
Path makeDescriptorInfoDir()
{
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
}
public Path makeGroupedDataDir()
Path makeGroupedDataDir()
{
return new Path(makeIntermediatePath(), "groupedData");
}
public Path makeDescriptorInfoPath(DataSegment segment)
Path makeDescriptorInfoPath(DataSegment segment)
{
return new Path(makeDescriptorInfoDir(), StringUtils.removeChar(segment.getId() + ".json", ':'));
}
public void addJobProperties(Job job)
void addJobProperties(Job job)
{
addJobProperties(job.getConfiguration());
}
public void addJobProperties(Configuration conf)
void addJobProperties(Configuration conf)
{
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
@ -597,7 +588,7 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}
public List<String> getAllowedHadoopPrefix()
List<String> getAllowedHadoopPrefix()
{
return allowedHadoopPrefix;
}

View File

@ -41,6 +41,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -51,6 +52,11 @@ import java.util.Map;
@RunWith(Parameterized.class)
public class DeterminePartitionsJobTest
{
@Nullable
private static final Long NO_TARGET_ROWS_PER_SEGMENT = null;
@Nullable
private static final Long NO_MAX_ROWS_PER_SEGMENT = null;
private final HadoopDruidIndexerConfig config;
private final int expectedNumOfSegments;
private final int[] expectedNumOfShardsForEachSegment;
@ -59,19 +65,22 @@ public class DeterminePartitionsJobTest
private final File tmpDir;
@Parameterized.Parameters(name = "assumeGrouped={0}, "
+ "maxRowsPerSegment={1}, "
+ "interval={2}"
+ "expectedNumOfSegments={3}, "
+ "expectedNumOfShardsForEachSegment={4}, "
+ "expectedStartEndForEachShard={5}, "
+ "data={6}")
+ "targetRowsPerSegment={1}, "
+ "maxRowsPerSegment={2}, "
+ "interval={3}"
+ "expectedNumOfSegments={4}, "
+ "expectedNumOfShardsForEachSegment={5}, "
+ "expectedStartEndForEachShard={6}, "
+ "data={7}")
public static Collection<Object[]> constructFeed()
{
return Arrays.asList(
new Object[][]{
{
// Test partitoning by targetRowsPerSegment
true,
3,
2,
NO_MAX_ROWS_PER_SEGMENT,
"2014-10-22T00:00:00Z/P1D",
1,
new int[]{5},
@ -86,7 +95,36 @@ public class DeterminePartitionsJobTest
},
ImmutableList.of(
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,US,50",
"2014102200,b.example.com,US,50",
"2014102200,c.example.com,US,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
"2014102200,f.example.com,US,567",
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333"
)
},
{
true,
NO_TARGET_ROWS_PER_SEGMENT,
2,
"2014-10-22T00:00:00Z/P1D",
1,
new int[]{5},
new String[][][]{
{
{null, "c.example.com"},
{"c.example.com", "e.example.com"},
{"e.example.com", "g.example.com"},
{"g.example.com", "i.example.com"},
{"i.example.com", null}
}
},
ImmutableList.of(
"2014102200,a.example.com,CN,100",
"2014102200,b.example.com,US,50",
"2014102200,c.example.com,US,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
@ -99,7 +137,8 @@ public class DeterminePartitionsJobTest
},
{
false,
3,
NO_TARGET_ROWS_PER_SEGMENT,
2,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{5},
@ -115,8 +154,8 @@ public class DeterminePartitionsJobTest
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.example.com,US,50",
"2014102000,b.example.com,US,50",
"2014102000,c.example.com,US,200",
"2014102000,c.example.com,US,200",
"2014102000,d.example.com,US,250",
@ -137,7 +176,8 @@ public class DeterminePartitionsJobTest
},
{
true,
6,
NO_TARGET_ROWS_PER_SEGMENT,
5,
"2014-10-20T00:00:00Z/P3D",
3,
new int[]{2, 2, 2},
@ -157,7 +197,7 @@ public class DeterminePartitionsJobTest
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,CN,50",
"2014102000,b.example.com,CN,50",
"2014102000,c.example.com,CN,200",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
@ -166,9 +206,8 @@ public class DeterminePartitionsJobTest
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,k.example.com,US,555",
"2014102100,a.example.com,CN,100",
"2014102100,b.exmaple.com,CN,50",
"2014102100,b.example.com,CN,50",
"2014102100,c.example.com,CN,200",
"2014102100,d.example.com,US,250",
"2014102100,e.example.com,US,123",
@ -177,9 +216,8 @@ public class DeterminePartitionsJobTest
"2014102100,h.example.com,US,251",
"2014102100,i.example.com,US,963",
"2014102100,j.example.com,US,333",
"2014102100,k.example.com,US,555",
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,CN,50",
"2014102200,b.example.com,CN,50",
"2014102200,c.example.com,CN,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
@ -187,12 +225,12 @@ public class DeterminePartitionsJobTest
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333",
"2014102200,k.example.com,US,555"
"2014102200,j.example.com,US,333"
)
},
{
true,
NO_TARGET_ROWS_PER_SEGMENT,
1000,
"2014-10-22T00:00:00Z/P1D",
1,
@ -204,7 +242,7 @@ public class DeterminePartitionsJobTest
},
ImmutableList.of(
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,US,50",
"2014102200,b.example.com,US,50",
"2014102200,c.example.com,US,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
@ -221,6 +259,7 @@ public class DeterminePartitionsJobTest
public DeterminePartitionsJobTest(
boolean assumeGrouped,
@Nullable Integer targetRowsPerSegment,
Integer maxRowsPerSegment,
String interval,
int expectedNumOfSegments,
@ -284,7 +323,7 @@ public class DeterminePartitionsJobTest
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
new SingleDimensionPartitionsSpec(null, maxRowsPerSegment, null, assumeGrouped),
new SingleDimensionPartitionsSpec(targetRowsPerSegment, maxRowsPerSegment, null, assumeGrouped),
null,
null,
null,

View File

@ -24,6 +24,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -36,13 +40,13 @@ import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidIndexerConfigTest
{
private static final ObjectMapper JSON_MAPPER;
@ -55,56 +59,18 @@ public class HadoopDruidIndexerConfigTest
@Test
public void testHashedBucketSelection()
{
List<HadoopyShardSpec> specs = new ArrayList<>();
List<HadoopyShardSpec> shardSpecs = new ArrayList<>();
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(
shardSpecs.add(new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()),
i
));
}
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
JSON_MAPPER
),
new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null),
new HadoopTuningConfig(
null,
null,
null,
ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), specs),
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
null,
null,
null,
false,
false,
null,
null,
null,
null
)
);
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()
.shardSpecs(ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), shardSpecs))
.build();
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
final List<String> dims = Arrays.asList("diM1", "dIM2");
final ImmutableMap<String, Object> values = ImmutableMap.of(
@ -133,57 +99,21 @@ public class HadoopDruidIndexerConfigTest
@Test
public void testNoneShardSpecBucketSelection()
{
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
JSON_MAPPER
),
new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null),
new HadoopTuningConfig(
null,
null,
null,
ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(),
Collections.singletonList(new HadoopyShardSpec(
NoneShardSpec.instance(),
1
)),
DateTimes.of("2010-01-01T02:00:00").getMillis(),
Collections.singletonList(new HadoopyShardSpec(
NoneShardSpec.instance(),
2
))
),
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
null,
null,
null,
false,
false,
null,
null,
null,
null
)
Map<Long, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of(
DateTimes.of("2010-01-01T01:00:00").getMillis(),
Collections.singletonList(new HadoopyShardSpec(
NoneShardSpec.instance(),
1
)),
DateTimes.of("2010-01-01T02:00:00").getMillis(),
Collections.singletonList(new HadoopyShardSpec(
NoneShardSpec.instance(),
2
))
);
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()
.shardSpecs(shardSpecs)
.build();
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
final List<String> dims = Arrays.asList("diM1", "dIM2");
final ImmutableMap<String, Object> values = ImmutableMap.of(
@ -197,10 +127,132 @@ public class HadoopDruidIndexerConfigTest
"4"
);
final long ts1 = DateTimes.of("2010-01-01T01:00:01").getMillis();
Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts1, dims, values)).get().getShardNum(), 1);
Assert.assertEquals(1, config.getBucket(new MapBasedInputRow(ts1, dims, values)).get().getShardNum());
final long ts2 = DateTimes.of("2010-01-01T02:00:01").getMillis();
Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts2, dims, values)).get().getShardNum(), 2);
Assert.assertEquals(2, config.getBucket(new MapBasedInputRow(ts2, dims, values)).get().getShardNum());
}
@Test
public void testGetTargetPartitionSizeWithHashedPartitions()
{
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()
.partitionsSpec(HashedPartitionsSpec.defaultSpec())
.build();
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec);
int targetPartitionSize = config.getTargetPartitionSize();
Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, targetPartitionSize);
}
@Test
public void testGetTargetPartitionSizeWithSingleDimensionPartitionsTargetRowsPerSegment()
{
int targetRowsPerSegment = 123;
SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
null,
false
);
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()
.partitionsSpec(partitionsSpec)
.build();
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec);
int targetPartitionSize = config.getTargetPartitionSize();
Assert.assertEquals(targetRowsPerSegment, targetPartitionSize);
}
@Test
public void testGetTargetPartitionSizeWithSingleDimensionPartitionsMaxRowsPerSegment()
{
int maxRowsPerSegment = 456;
SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
null,
maxRowsPerSegment,
null,
false
);
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()
.partitionsSpec(partitionsSpec)
.build();
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(spec);
int targetPartitionSize = config.getTargetPartitionSize();
Assert.assertEquals(maxRowsPerSegment, targetPartitionSize);
}
private static class HadoopIngestionSpecBuilder
{
private static final DataSchema DATA_SCHEMA = new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularities.MINUTE,
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
HadoopDruidIndexerConfigTest.JSON_MAPPER
);
private static final HadoopIOConfig HADOOP_IO_CONFIG = new HadoopIOConfig(
ImmutableMap.of("paths", "bar", "type", "static"),
null,
null
);
@Nullable
private DimensionBasedPartitionsSpec partitionsSpec = null;
private Map<Long, List<HadoopyShardSpec>> shardSpecs = Collections.emptyMap();
HadoopIngestionSpecBuilder partitionsSpec(DimensionBasedPartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
return this;
}
HadoopIngestionSpecBuilder shardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.shardSpecs = shardSpecs;
return this;
}
HadoopIngestionSpec build()
{
HadoopTuningConfig hadoopTuningConfig = new HadoopTuningConfig(
null,
null,
partitionsSpec,
shardSpecs,
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
null,
null,
null,
false,
false,
null,
null,
null,
null
);
return new HadoopIngestionSpec(
DATA_SCHEMA,
HADOOP_IO_CONFIG,
hadoopTuningConfig
);
}
}
}