Merge pull request #94 from metamx/fix-dpj

Fix DeterminePartitionsJob ISE for dimensions not present in all rows
This commit is contained in:
gianm 2013-02-25 12:21:48 -08:00
commit 4811b50055
1 changed files with 23 additions and 7 deletions

View File

@ -393,6 +393,9 @@ public class DeterminePartitionsJob implements Jobby
final Interval interval = maybeInterval.get();
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1));
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
@ -509,9 +512,23 @@ public class DeterminePartitionsJob implements Jobby
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException
{
PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
// "iterator" will take us over many candidate dimensions
log.info(
"Determining partitions for interval: %s",
config.getGranularitySpec().bucketInterval(bucket).orNull()
);
// First DVC should be the total row count indicator
final DimValueCount firstDvc = iterator.next();
final int totalRows = firstDvc.numRows;
if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
String currentDimPartitionStart = null;
@ -635,8 +652,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
final int totalRows = dimPartitionss.values().iterator().next().getRows();
int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null;
@ -644,12 +659,14 @@ public class DeterminePartitionsJob implements Jobby
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) {
throw new ISE(
"WTF?! Dimension[%s] row count %,d != expected row count %,d",
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
dimPartitions.getRows(),
totalRows
);
continue;
}
// Make sure none of these shards are oversized
@ -683,7 +700,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!");
}
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
);