diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java index d5bb8a4925a..4b4a5c8b995 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java @@ -500,6 +500,7 @@ public class DeterminePartitionsJob implements Jobby { private static final double SHARD_COMBINE_THRESHOLD = 0.25; private static final double SHARD_OVERSIZE_THRESHOLD = 1.5; + private static final int HIGH_CARDINALITY_THRESHOLD = 3000000; @Override protected void innerReduce( @@ -634,7 +635,9 @@ public class DeterminePartitionsJob implements Jobby final int totalRows = dimPartitionss.values().iterator().next().getRows(); - int maxCardinality = -1; + int maxCardinality = Integer.MIN_VALUE; + long minVariance = Long.MAX_VALUE; + DimPartitions minVariancePartitions = null; DimPartitions maxCardinalityPartitions = null; for(final DimPartitions dimPartitions : dimPartitionss.values()) { @@ -660,10 +663,18 @@ public class DeterminePartitionsJob implements Jobby continue; } - if(dimPartitions.getCardinality() > maxCardinality) { - maxCardinality = dimPartitions.getCardinality(); + final int cardinality = dimPartitions.getCardinality(); + final long variance = dimPartitions.getVariance(); + + if(cardinality > maxCardinality) { + maxCardinality = cardinality; maxCardinalityPartitions = dimPartitions; } + + if(variance < minVariance) { + minVariance = variance; + minVariancePartitions = dimPartitions; + } } if(maxCardinalityPartitions == null) { @@ -675,8 +686,12 @@ public class DeterminePartitionsJob implements Jobby context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles() ); + final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD + ? maxCardinalityPartitions + : minVariancePartitions; + final List chosenShardSpecs = Lists.transform( - maxCardinalityPartitions.partitions, new Function() + chosenPartitions.partitions, new Function() { @Override public ShardSpec apply(DimPartition dimPartition) @@ -752,6 +767,19 @@ public class DeterminePartitionsJob implements Jobby return sum; } + public long getVariance() + { + final long meanRows = getRows() / partitions.size(); + + long variance = 0; + for(final DimPartition dimPartition : partitions) { + variance += (dimPartition.rows - meanRows) * (dimPartition.rows - meanRows); + } + + variance /= partitions.size(); + return variance; + } + public int getRows() { int sum = 0;