mirror of https://github.com/apache/druid.git
Merge pull request #149 from metamx/determine-partitions-reducers
DeterminePartitionsJob: Only create as many reducers as needed
This commit is contained in:
commit
8c1004d8c4
|
@ -28,6 +28,7 @@ import com.google.common.base.Splitter;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ComparisonChain;
|
import com.google.common.collect.ComparisonChain;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSortedSet;
|
import com.google.common.collect.ImmutableSortedSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.JobContext;
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.Partitioner;
|
||||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
@ -71,6 +73,7 @@ import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -189,6 +192,8 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
dimSelectionJob.setOutputValueClass(Text.class);
|
dimSelectionJob.setOutputValueClass(Text.class);
|
||||||
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
|
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
|
||||||
dimSelectionJob.setJarByClass(DeterminePartitionsJob.class);
|
dimSelectionJob.setJarByClass(DeterminePartitionsJob.class);
|
||||||
|
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
|
||||||
|
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size());
|
||||||
|
|
||||||
config.intoConfiguration(dimSelectionJob);
|
config.intoConfiguration(dimSelectionJob);
|
||||||
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
|
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
|
||||||
|
@ -371,11 +376,21 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
{
|
{
|
||||||
private final HadoopDruidIndexerConfig config;
|
private final HadoopDruidIndexerConfig config;
|
||||||
private final String partitionDimension;
|
private final String partitionDimension;
|
||||||
|
private final Map<DateTime, Integer> intervalIndexes;
|
||||||
|
|
||||||
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
|
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.partitionDimension = partitionDimension;
|
this.partitionDimension = partitionDimension;
|
||||||
|
|
||||||
|
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
|
||||||
|
int idx = 0;
|
||||||
|
for(final Interval bucketInterval: config.getGranularitySpec().bucketIntervals()) {
|
||||||
|
timeIndexBuilder.put(bucketInterval.getStart(), idx);
|
||||||
|
idx ++;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.intervalIndexes = timeIndexBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void emitDimValueCounts(
|
public void emitDimValueCounts(
|
||||||
|
@ -391,7 +406,12 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
}
|
}
|
||||||
|
|
||||||
final Interval interval = maybeInterval.get();
|
final Interval interval = maybeInterval.get();
|
||||||
final byte[] groupKey = interval.getStart().toString().getBytes(Charsets.UTF_8);
|
final int intervalIndex = intervalIndexes.get(interval.getStart());
|
||||||
|
|
||||||
|
final ByteBuffer buf = ByteBuffer.allocate(4 + 8);
|
||||||
|
buf.putInt(intervalIndex);
|
||||||
|
buf.putLong(interval.getStartMillis());
|
||||||
|
final byte[] groupKey = buf.array();
|
||||||
|
|
||||||
// Emit row-counter value.
|
// Emit row-counter value.
|
||||||
write(context, groupKey, new DimValueCount("", "", 1));
|
write(context, groupKey, new DimValueCount("", "", 1));
|
||||||
|
@ -414,6 +434,24 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class DeterminePartitionsDimSelectionPartitioner
|
||||||
|
extends Partitioner<BytesWritable, Text>
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
|
||||||
|
{
|
||||||
|
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
||||||
|
bytes.position(4); // Skip length added by SortableBytes
|
||||||
|
final int index = bytes.getInt();
|
||||||
|
|
||||||
|
if (index >= numPartitions) {
|
||||||
|
throw new ISE("Not enough partitions, index[%,d] >= numPartitions[%,d]", index, numPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static abstract class DeterminePartitionsDimSelectionBaseReducer
|
private static abstract class DeterminePartitionsDimSelectionBaseReducer
|
||||||
extends Reducer<BytesWritable, Text, BytesWritable, Text>
|
extends Reducer<BytesWritable, Text, BytesWritable, Text>
|
||||||
{
|
{
|
||||||
|
@ -511,7 +549,9 @@ public class DeterminePartitionsJob implements Jobby
|
||||||
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||||
) throws IOException, InterruptedException
|
) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
final DateTime bucket = new DateTime(new String(keyBytes.getGroupKey(), Charsets.UTF_8));
|
final ByteBuffer groupKey = ByteBuffer.wrap(keyBytes.getGroupKey());
|
||||||
|
groupKey.position(4); // Skip partition
|
||||||
|
final DateTime bucket = new DateTime(groupKey.getLong());
|
||||||
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
|
final PeekingIterator<DimValueCount> iterator = Iterators.peekingIterator(combinedIterable.iterator());
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
|
|
|
@ -220,7 +220,7 @@ public class IndexGeneratorJob implements Jobby
|
||||||
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
|
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
|
||||||
{
|
{
|
||||||
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
||||||
bytes.position(4); // Skip length added by BytesWritable
|
bytes.position(4); // Skip length added by SortableBytes
|
||||||
int shardNum = bytes.getInt();
|
int shardNum = bytes.getInt();
|
||||||
|
|
||||||
if (shardNum >= numPartitions) {
|
if (shardNum >= numPartitions) {
|
||||||
|
|
Loading…
Reference in New Issue