diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 7955aae220b..a85f5660260 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -440,7 +440,7 @@ public class DetermineHashedPartitionsJob implements Jobby public int getPartition(LongWritable interval, BytesWritable text, int numPartitions) { - if ("local".equals(config.get("mapred.job.tracker")) || determineIntervals) { + if ("local".equals(JobHelper.getJobTrackerAddress(config)) || determineIntervals) { return 0; } else { return reducerLookup.get(interval); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 8052469daa2..0064a4363d5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -485,7 +485,8 @@ public class DeterminePartitionsJob implements Jobby final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes final int index = bytes.getInt(); - if (config.get("mapred.job.tracker").equals("local")) { + String jobTrackerAddress = JobHelper.getJobTrackerAddress(config); + if ("local".equals(jobTrackerAddress)) { return index % numPartitions; } else { if (index >= numPartitions) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index b5708b94354..a848714c8eb 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -535,7 +535,7 @@ public class IndexGeneratorJob implements Jobby final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes int shardNum = bytes.getInt(); - if ("local".equals(config.get("mapreduce.jobtracker.address")) || "local".equals(config.get("mapred.job.tracker"))) { + if ("local".equals(JobHelper.getJobTrackerAddress(config))) { return shardNum % numPartitions; } else { if (shardNum >= numPartitions) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 3adbf1b78cd..bbe4f2bc37e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -831,4 +831,14 @@ public class JobHelper throw Throwables.propagate(e); } } + + public static String getJobTrackerAddress(Configuration config) + { + String jobTrackerAddress = config.get("mapred.job.tracker"); + if (jobTrackerAddress == null) { + // New Property name for Hadoop 3.0 and later versions + jobTrackerAddress = config.get("mapreduce.jobtracker.address"); + } + return jobTrackerAddress; + } }