mirror of https://github.com/apache/druid.git
Remove setParitionerClass call from SortableBytes (#5677)
* Remove setParitionerClass call from SortableBytes since callers override the paritioner class themselves * Get rid of SortableBytesPartitioner class * Make partitioner class a parameter
This commit is contained in:
parent
2f8904e25f
commit
13ba840653
|
@ -192,14 +192,13 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
config.addInputPaths(dimSelectionJob);
|
config.addInputPaths(dimSelectionJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob);
|
SortableBytes.useSortableBytesAsMapOutputKey(dimSelectionJob, DeterminePartitionsDimSelectionPartitioner.class);
|
||||||
dimSelectionJob.setMapOutputValueClass(Text.class);
|
dimSelectionJob.setMapOutputValueClass(Text.class);
|
||||||
dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class);
|
dimSelectionJob.setCombinerClass(DeterminePartitionsDimSelectionCombiner.class);
|
||||||
dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class);
|
dimSelectionJob.setReducerClass(DeterminePartitionsDimSelectionReducer.class);
|
||||||
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
|
dimSelectionJob.setOutputKeyClass(BytesWritable.class);
|
||||||
dimSelectionJob.setOutputValueClass(Text.class);
|
dimSelectionJob.setOutputValueClass(Text.class);
|
||||||
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
|
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
|
||||||
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
|
|
||||||
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
|
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
|
||||||
JobHelper.setupClasspath(
|
JobHelper.setupClasspath(
|
||||||
JobHelper.distributedClassPath(config.getWorkingPath()),
|
JobHelper.distributedClassPath(config.getWorkingPath()),
|
||||||
|
|
|
@ -173,7 +173,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
job.setMapperClass(IndexGeneratorMapper.class);
|
job.setMapperClass(IndexGeneratorMapper.class);
|
||||||
job.setMapOutputValueClass(BytesWritable.class);
|
job.setMapOutputValueClass(BytesWritable.class);
|
||||||
|
|
||||||
SortableBytes.useSortableBytesAsMapOutputKey(job);
|
SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class);
|
||||||
|
|
||||||
int numReducers = Iterables.size(config.getAllBuckets().get());
|
int numReducers = Iterables.size(config.getAllBuckets().get());
|
||||||
if (numReducers == 0) {
|
if (numReducers == 0) {
|
||||||
|
@ -186,7 +186,6 @@ public class IndexGeneratorJob implements Jobby
|
||||||
}
|
}
|
||||||
|
|
||||||
job.setNumReduceTasks(numReducers);
|
job.setNumReduceTasks(numReducers);
|
||||||
job.setPartitionerClass(IndexGeneratorPartitioner.class);
|
|
||||||
|
|
||||||
setReducerClass(job);
|
setReducerClass(job);
|
||||||
job.setOutputKeyClass(BytesWritable.class);
|
job.setOutputKeyClass(BytesWritable.class);
|
||||||
|
|
|
@ -93,12 +93,12 @@ public class SortableBytes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void useSortableBytesAsMapOutputKey(Job job)
|
public static void useSortableBytesAsMapOutputKey(Job job, Class<? extends Partitioner> partitionerClass)
|
||||||
{
|
{
|
||||||
job.setMapOutputKeyClass(BytesWritable.class);
|
job.setMapOutputKeyClass(BytesWritable.class);
|
||||||
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);
|
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);
|
||||||
job.setSortComparatorClass(SortableBytesSortingComparator.class);
|
job.setSortComparatorClass(SortableBytesSortingComparator.class);
|
||||||
job.setPartitionerClass(SortableBytesPartitioner.class);
|
job.setPartitionerClass(partitionerClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SortableBytesGroupingComparator extends WritableComparator
|
public static class SortableBytesGroupingComparator extends WritableComparator
|
||||||
|
@ -153,16 +153,4 @@ public class SortableBytes
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SortableBytesPartitioner extends Partitioner<BytesWritable, Object>
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public int getPartition(BytesWritable bytesWritable, Object o, int numPartitions)
|
|
||||||
{
|
|
||||||
final byte[] bytes = bytesWritable.getBytes();
|
|
||||||
int length = ByteBuffer.wrap(bytes).getInt();
|
|
||||||
|
|
||||||
return (ByteBuffer.wrap(bytes, 4, length).hashCode() & Integer.MAX_VALUE) % numPartitions;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue