DeterminePartitionsJob: Select partition dimension to minimize segment size variance when cardinality is low

This commit is contained in:
Gian Merlino 2013-02-13 09:50:30 -08:00
parent a0b159fed7
commit a665bfa2ef
1 changed files with 32 additions and 4 deletions

View File

@ -500,6 +500,7 @@ public class DeterminePartitionsJob implements Jobby
{ {
private static final double SHARD_COMBINE_THRESHOLD = 0.25; private static final double SHARD_COMBINE_THRESHOLD = 0.25;
private static final double SHARD_OVERSIZE_THRESHOLD = 1.5; private static final double SHARD_OVERSIZE_THRESHOLD = 1.5;
private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
@Override @Override
protected void innerReduce( protected void innerReduce(
@ -634,7 +635,9 @@ public class DeterminePartitionsJob implements Jobby
final int totalRows = dimPartitionss.values().iterator().next().getRows(); final int totalRows = dimPartitionss.values().iterator().next().getRows();
int maxCardinality = -1; int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null;
DimPartitions maxCardinalityPartitions = null; DimPartitions maxCardinalityPartitions = null;
for(final DimPartitions dimPartitions : dimPartitionss.values()) { for(final DimPartitions dimPartitions : dimPartitionss.values()) {
@ -660,10 +663,18 @@ public class DeterminePartitionsJob implements Jobby
continue; continue;
} }
if(dimPartitions.getCardinality() > maxCardinality) { final int cardinality = dimPartitions.getCardinality();
maxCardinality = dimPartitions.getCardinality(); final long variance = dimPartitions.getVariance();
if(cardinality > maxCardinality) {
maxCardinality = cardinality;
maxCardinalityPartitions = dimPartitions; maxCardinalityPartitions = dimPartitions;
} }
if(variance < minVariance) {
minVariance = variance;
minVariancePartitions = dimPartitions;
}
} }
if(maxCardinalityPartitions == null) { if(maxCardinalityPartitions == null) {
@ -675,8 +686,12 @@ public class DeterminePartitionsJob implements Jobby
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles() context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
); );
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD
? maxCardinalityPartitions
: minVariancePartitions;
final List<ShardSpec> chosenShardSpecs = Lists.transform( final List<ShardSpec> chosenShardSpecs = Lists.transform(
maxCardinalityPartitions.partitions, new Function<DimPartition, ShardSpec>() chosenPartitions.partitions, new Function<DimPartition, ShardSpec>()
{ {
@Override @Override
public ShardSpec apply(DimPartition dimPartition) public ShardSpec apply(DimPartition dimPartition)
@ -752,6 +767,19 @@ public class DeterminePartitionsJob implements Jobby
return sum; return sum;
} }
public long getVariance()
{
final long meanRows = getRows() / partitions.size();
long variance = 0;
for(final DimPartition dimPartition : partitions) {
variance += (dimPartition.rows - meanRows) * (dimPartition.rows - meanRows);
}
variance /= partitions.size();
return variance;
}
public int getRows() public int getRows()
{ {
int sum = 0; int sum = 0;