diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java index f34ff2988f2..425b33cedff 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java @@ -504,7 +504,6 @@ public class DeterminePartitionsJob implements Jobby public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer { private static final double SHARD_COMBINE_THRESHOLD = 0.25; - private static final double SHARD_OVERSIZE_THRESHOLD = 1.5; private static final int HIGH_CARDINALITY_THRESHOLD = 3000000; @Override @@ -672,7 +671,7 @@ public class DeterminePartitionsJob implements Jobby // Make sure none of these shards are oversized boolean oversized = false; for(final DimPartition partition : dimPartitions.partitions) { - if(partition.rows > config.getTargetPartitionSize() * SHARD_OVERSIZE_THRESHOLD) { + if(partition.rows > config.getMaxPartitionSize()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } diff --git a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 1dfad9de181..364b880518c 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -236,7 +236,7 @@ public class HadoopDruidIndexerConfig this.partitionsSpec = partitionsSpec; } else { // Backwards compatibility - this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, false); + this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false); } if(granularitySpec != null) { @@ -431,6 +431,11 @@ public class HadoopDruidIndexerConfig return partitionsSpec.getTargetPartitionSize(); } + public long getMaxPartitionSize() + { + return partitionsSpec.getMaxPartitionSize(); + } + public boolean isUpdaterJobSpecSet() { return (updaterJobSpec != null); diff --git a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java index e30bad393f6..5571422585c 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/partitions/PartitionsSpec.java @@ -8,22 +8,30 @@ import javax.annotation.Nullable; public class PartitionsSpec { + private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; + @Nullable private final String partitionDimension; private final long targetPartitionSize; + private final long maxPartitionSize; + private final boolean assumeGrouped; @JsonCreator public PartitionsSpec( @JsonProperty("partitionDimension") @Nullable String partitionDimension, @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, + @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped ) { this.partitionDimension = partitionDimension; this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; + this.maxPartitionSize = maxPartitionSize == null + ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) + : maxPartitionSize; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; } @@ -46,6 +54,12 @@ public class PartitionsSpec return targetPartitionSize; } + @JsonProperty + public long getMaxPartitionSize() + { + return maxPartitionSize; + } + @JsonProperty public boolean isAssumeGrouped() { diff --git a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index 5fdff8ce8b8..87ee95fbfb0 100644 --- a/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexer/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -248,6 +248,12 @@ public class HadoopDruidIndexerConfigTest 100 ); + Assert.assertEquals( + "getMaxPartitionSize", + partitionsSpec.getMaxPartitionSize(), + 150 + ); + Assert.assertEquals( "getPartitionDimension", partitionsSpec.getPartitionDimension(), @@ -285,6 +291,58 @@ public class HadoopDruidIndexerConfigTest 100 ); + Assert.assertEquals( + "getMaxPartitionSize", + partitionsSpec.getMaxPartitionSize(), + 150 + ); + + Assert.assertEquals( + "getPartitionDimension", + partitionsSpec.getPartitionDimension(), + "foo" + ); + } + + @Test + public void testPartitionsSpecMaxPartitionSize() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonMapper.readValue( + "{" + + "\"partitionsSpec\":{" + + " \"targetPartitionSize\":100," + + " \"maxPartitionSize\":200," + + " \"partitionDimension\":\"foo\"" + + " }" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec(); + + Assert.assertEquals( + "isDeterminingPartitions", + partitionsSpec.isDeterminingPartitions(), + true + ); + + Assert.assertEquals( + "getTargetPartitionSize", + partitionsSpec.getTargetPartitionSize(), + 100 + ); + + Assert.assertEquals( + "getMaxPartitionSize", + partitionsSpec.getMaxPartitionSize(), + 200 + ); + Assert.assertEquals( "getPartitionDimension", partitionsSpec.getPartitionDimension(),