fix partitioner for loca mode

This commit is contained in:
nishantmonu51 2014-01-31 21:59:17 +05:30
parent 82b748ad43
commit 569452121e
1 changed files with 16 additions and 2 deletions

View File

@ -44,6 +44,7 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -209,8 +210,9 @@ public class IndexGeneratorJob implements Jobby
} }
} }
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text> public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text> implements Configurable
{ {
private Configuration config;
@Override @Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
@ -218,7 +220,7 @@ public class IndexGeneratorJob implements Jobby
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt(); int shardNum = bytes.getInt();
if (System.getProperty("mapred.job.tracker").equals("local")) { if (config.get("mapred.job.tracker").equals("local")) {
return shardNum % numPartitions; return shardNum % numPartitions;
} else { } else {
if (shardNum >= numPartitions) { if (shardNum >= numPartitions) {
@ -228,6 +230,18 @@ public class IndexGeneratorJob implements Jobby
} }
} }
@Override
public Configuration getConf()
{
return config;
}
@Override
public void setConf(Configuration config)
{
this.config = config;
}
} }
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text> public static class IndexGeneratorReducer extends Reducer<BytesWritable, Text, BytesWritable, Text>