Allow variable maxPartitionSize in hadoop indexer

This commit is contained in:
Gian Merlino 2013-05-14 13:01:53 -07:00
parent 7d34710edf
commit 9ed80df03e
4 changed files with 79 additions and 3 deletions

View File

@ -504,7 +504,6 @@ public class DeterminePartitionsJob implements Jobby
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
@Override
@ -672,7 +671,7 @@ public class DeterminePartitionsJob implements Jobby
// Make sure none of these shards are oversized
boolean oversized = false;
for(final DimPartition partition : dimPartitions.partitions) {
if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) {
if(partition.rows > config.getMaxPartitionSize()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true;
}

View File

@ -236,7 +236,7 @@ public class HadoopDruidIndexerConfig
this.partitionsSpec = partitionsSpec;
} else {
// Backwards compatibility
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false);
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if(granularitySpec != null) {
@ -431,6 +431,11 @@ public class HadoopDruidIndexerConfig
return partitionsSpec.getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return partitionsSpec.getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
{
return (updaterJobSpec != null);

View File

@ -8,22 +8,30 @@ import javax.annotation.Nullable;
public class PartitionsSpec
{
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
@Nullable
private final String partitionDimension;
private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped;
@JsonCreator
public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@ -46,6 +54,12 @@ public class PartitionsSpec
return targetPartitionSize;
}
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{

View File

@ -248,6 +248,12 @@ public class HadoopDruidIndexerConfigTest
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
@ -285,6 +291,58 @@ public class HadoopDruidIndexerConfigTest
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecMaxPartitionSize() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
200
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),