diff --git a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java index 425b33cedff..a0165b47517 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/DeterminePartitionsJob.java @@ -28,6 +28,7 @@ import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -55,6 +56,7 @@ import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -71,6 +73,7 @@ import org.joda.time.Interval; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -189,6 +192,8 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setJarByClass(DeterminePartitionsJob.class); + dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); + dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size()); config.intoConfiguration(dimSelectionJob); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); @@ -371,11 +376,21 @@ public class DeterminePartitionsJob implements Jobby { private final HadoopDruidIndexerConfig config; private final String partitionDimension; + private final Map intervalIndexes; public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension) { this.config = config; this.partitionDimension = partitionDimension; + + final ImmutableMap.Builder timeIndexBuilder = ImmutableMap.builder(); + int idx = 0; + for(final Interval bucketInterval: config.getGranularitySpec().bucketIntervals()) { + timeIndexBuilder.put(bucketInterval.getStart(), idx); + idx ++; + } + + this.intervalIndexes = timeIndexBuilder.build(); } public void emitDimValueCounts( @@ -391,7 +406,12 @@ public class DeterminePartitionsJob implements Jobby } final Interval interval = maybeInterval.get(); - final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8); + final int intervalIndex = intervalIndexes.get(interval.getStart()); + + final ByteBuffer buf = ByteBuffer.allocate(4 + 8); + buf.putInt(intervalIndex); + buf.putLong(interval.getStartMillis()); + final byte[] groupKey = buf.array(); // Emit row-counter value. write(context, groupKey, new DimValueCount("", "", 1)); @@ -414,6 +434,24 @@ public class DeterminePartitionsJob implements Jobby } } + public static class DeterminePartitionsDimSelectionPartitioner + extends Partitioner + { + @Override + public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) + { + final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); + bytes.position(4); // Skip length added by SortableBytes + final int index = bytes.getInt(); + + if (index >= numPartitions) { + throw new ISE("Not enough partitions, index[%,d] >= numPartitions[%,d]", index, numPartitions); + } + + return index; + } + } + private static abstract class DeterminePartitionsDimSelectionBaseReducer extends Reducer { @@ -511,7 +549,9 @@ public class DeterminePartitionsJob implements Jobby Context context, SortableBytes keyBytes, Iterable combinedIterable ) throws IOException, InterruptedException { - final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8)); + final ByteBuffer groupKey = ByteBuffer.wrap(keyBytes.getGroupKey()); + groupKey.position(4); // Skip partition + final DateTime bucket = new DateTime(groupKey.getLong()); final PeekingIterator iterator = Iterators.peekingIterator(combinedIterable.iterator()); log.info( diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 874a30b7ad4..2c8cf82d822 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -220,7 +220,7 @@ public class IndexGeneratorJob implements Jobby public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) { final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); - bytes.position(4); // Skip length added by BytesWritable + bytes.position(4); // Skip length added by SortableBytes int shardNum = bytes.getInt(); if (shardNum >= numPartitions) {