mirror of
https://github.com/apache/druid.git
synced 2025-02-09 03:24:55 +00:00
Fix NullPointerException when in DeterminePartitionsJob for Hadoop 3.0 and later versions (#5724)
In DeterminePartitonsJob - config.get("mapred.job.tracker").equals("local") throws NPE as the property name is changed in hadoop 3.0 to mapreduce.jobtracker.address This patch extracts the logic to fetch jobTrackerAddress in JobHelper and reuses it when needed.
This commit is contained in:
parent
754c80e74a
commit
a0c2ae7a38
@ -440,7 +440,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||||||
public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
|
public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
|
||||||
{
|
{
|
||||||
|
|
||||||
if ("local".equals(config.get("mapred.job.tracker")) || determineIntervals) {
|
if ("local".equals(JobHelper.getJobTrackerAddress(config)) || determineIntervals) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return reducerLookup.get(interval);
|
return reducerLookup.get(interval);
|
||||||
|
@ -485,7 +485,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||||||
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
||||||
bytes.position(4); // Skip length added by SortableBytes
|
bytes.position(4); // Skip length added by SortableBytes
|
||||||
final int index = bytes.getInt();
|
final int index = bytes.getInt();
|
||||||
if (config.get("mapred.job.tracker").equals("local")) {
|
String jobTrackerAddress = JobHelper.getJobTrackerAddress(config);
|
||||||
|
if ("local".equals(jobTrackerAddress)) {
|
||||||
return index % numPartitions;
|
return index % numPartitions;
|
||||||
} else {
|
} else {
|
||||||
if (index >= numPartitions) {
|
if (index >= numPartitions) {
|
||||||
|
@ -535,7 +535,7 @@ public class IndexGeneratorJob implements Jobby
|
|||||||
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
|
||||||
bytes.position(4); // Skip length added by SortableBytes
|
bytes.position(4); // Skip length added by SortableBytes
|
||||||
int shardNum = bytes.getInt();
|
int shardNum = bytes.getInt();
|
||||||
if ("local".equals(config.get("mapreduce.jobtracker.address")) || "local".equals(config.get("mapred.job.tracker"))) {
|
if ("local".equals(JobHelper.getJobTrackerAddress(config))) {
|
||||||
return shardNum % numPartitions;
|
return shardNum % numPartitions;
|
||||||
} else {
|
} else {
|
||||||
if (shardNum >= numPartitions) {
|
if (shardNum >= numPartitions) {
|
||||||
|
@ -831,4 +831,14 @@ public class JobHelper
|
|||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getJobTrackerAddress(Configuration config)
|
||||||
|
{
|
||||||
|
String jobTrackerAddress = config.get("mapred.job.tracker");
|
||||||
|
if (jobTrackerAddress == null) {
|
||||||
|
// New Property name for Hadoop 3.0 and later versions
|
||||||
|
jobTrackerAddress = config.get("mapreduce.jobtracker.address");
|
||||||
|
}
|
||||||
|
return jobTrackerAddress;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user