diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 0064a4363d5..edba581b2db 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -192,14 +192,13 @@ public class DeterminePartitionsJob implements Jobby config.addInputPaths(dimSelectionJob); } - SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob); + SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob, DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setMapOutputValueClass(Text.class); dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class); dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class); dimSelectionJob.setOutputKeyClass(BytesWritable.class); dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); - dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 077a642d44d..31799cb65b1 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -173,7 +173,7 @@ public class IndexGeneratorJob implements Jobby job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); - SortableBytes.useSortableBytesAsMapOutputKey(job); + SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class); int numReducers = Iterables.size(config.getAllBuckets().get()); if (numReducers == 0) { @@ -186,7 +186,6 @@ public class IndexGeneratorJob implements Jobby } job.setNumReduceTasks(numReducers); - job.setPartitionerClass(IndexGeneratorPartitioner.class); setReducerClass(job); job.setOutputKeyClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/SortableBytes.java b/indexing-hadoop/src/main/java/io/druid/indexer/SortableBytes.java index 9be7aadb8a7..ba84cd3c16e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/SortableBytes.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/SortableBytes.java @@ -93,12 +93,12 @@ public class SortableBytes ); } - public static void useSortableBytesAsMapOutputKey(Job job) + public static void useSortableBytesAsMapOutputKey(Job job, Class partitionerClass) { job.setMapOutputKeyClass(BytesWritable.class); job.setGroupingComparatorClass(SortableBytesGroupingComparator.class); job.setSortComparatorClass(SortableBytesSortingComparator.class); - job.setPartitionerClass(SortableBytesPartitioner.class); + job.setPartitionerClass(partitionerClass); } public static class SortableBytesGroupingComparator extends WritableComparator @@ -153,16 +153,4 @@ public class SortableBytes return retVal; } } - - public static class SortableBytesPartitioner extends Partitioner - { - @Override - public int getPartition(BytesWritable bytesWritable, Object o, int numPartitions) - { - final byte[] bytes = bytesWritable.getBytes(); - int length = ByteBuffer.wrap(bytes).getInt(); - - return (ByteBuffer.wrap(bytes, 4, length).hashCode() & Integer.MAX_VALUE) % numPartitions; - } - } }