Fix DeterminePartitionsJob ISE for dimensions not present in all rows

This commit is contained in:
Gian Merlino 2013-02-25 10:57:26 -08:00
parent 8513a5ab2a
commit 7d7ce2b7fe
1 changed files with 23 additions and 7 deletions

View File

@ -393,6 +393,9 @@ public class DeterminePartitionsJob implements Jobby
final Interval interval = maybeInterval.get(); final Interval interval = maybeInterval.get();
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8); final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1));
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) { for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey(); final String dim = dimAndValues.getKey();
@ -509,9 +512,23 @@ public class DeterminePartitionsJob implements Jobby
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator()); final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
// "iterator" will take us over many candidate dimensions log.info(
"Determining partitions for interval: %s",
config.getGranularitySpec().bucketInterval(bucket).orNull()
);
// First DVC should be the total row count indicator
final DimValueCount firstDvc = iterator.next();
final int totalRows = firstDvc.numRows;
if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null; DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null; DimPartition currentDimPartition = null;
String currentDimPartitionStart = null; String currentDimPartitionStart = null;
@ -635,8 +652,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!"); throw new ISE("No suitable partitioning dimension found!");
} }
final int totalRows = dimPartitionss.values().iterator().next().getRows();
int maxCardinality = Integer.MIN_VALUE; int maxCardinality = Integer.MIN_VALUE;
long minVariance = Long.MAX_VALUE; long minVariance = Long.MAX_VALUE;
DimPartitions minVariancePartitions = null; DimPartitions minVariancePartitions = null;
@ -644,12 +659,14 @@ public class DeterminePartitionsJob implements Jobby
for(final DimPartitions dimPartitions : dimPartitionss.values()) { for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) { if(dimPartitions.getRows() != totalRows) {
throw new ISE( log.info(
"WTF?! Dimension[%s] row count %,d != expected row count %,d", "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim, dimPartitions.dim,
dimPartitions.getRows(), dimPartitions.getRows(),
totalRows totalRows
); );
continue;
} }
// Make sure none of these shards are oversized // Make sure none of these shards are oversized
@ -683,7 +700,6 @@ public class DeterminePartitionsJob implements Jobby
throw new ISE("No suitable partitioning dimension found!"); throw new ISE("No suitable partitioning dimension found!");
} }
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
final OutputStream out = Utils.makePathAndOutputStream( final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles() context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
); );