diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 39ad77fe8a4..c0107a18aa0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -44,6 +44,7 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -209,8 +210,9 @@ public class IndexGeneratorJob implements Jobby } } - public static class IndexGeneratorPartitioner extends Partitioner + public static class IndexGeneratorPartitioner extends Partitioner implements Configurable { + private Configuration config; @Override public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) @@ -218,7 +220,7 @@ public class IndexGeneratorJob implements Jobby final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes int shardNum = bytes.getInt(); - if (System.getProperty("mapred.job.tracker").equals("local")) { + if (config.get("mapred.job.tracker").equals("local")) { return shardNum % numPartitions; } else { if (shardNum >= numPartitions) { @@ -228,6 +230,18 @@ public class IndexGeneratorJob implements Jobby } } + + @Override + public Configuration getConf() + { + return config; + } + + @Override + public void setConf(Configuration config) + { + this.config = config; + } } public static class IndexGeneratorReducer extends Reducer