From d1aa844dc690ae43f4e73667d765bee2dc45d7bc Mon Sep 17 00:00:00 2001 From: Daniel Templeton Date: Thu, 5 Jan 2017 17:55:05 -0800 Subject: [PATCH] MAPREDUCE-6715. Fix Several Unsafe Practices (Contributed by Yufei Gu via Daniel Templeton) (cherry picked from commit 0b8a7c18ddbe73b356b3c9baf4460659ccaee095) --- .../apache/hadoop/mapred/CleanupQueue.java | 7 +++- .../org/apache/hadoop/mapred/MapTask.java | 10 ++++- .../lib/output/TextOutputFormat.java | 15 ++++--- .../task/reduce/ShuffleSchedulerImpl.java | 40 +++++++++++-------- .../hadoop/examples/dancing/Pentomino.java | 5 +++ .../examples/terasort/TeraScheduler.java | 14 +++---- 6 files changed, 56 insertions(+), 35 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java index 456ed7c689c..2282b5410e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java @@ -136,7 +136,12 @@ class CleanupQueue { LOG.debug("DELETED " + context.fullPath); } } catch (InterruptedException t) { - LOG.warn("Interrupted deletion of " + context.fullPath); + if (context == null) { + LOG.warn("Interrupted deletion of an invalid path: Path deletion " + + "context is null."); + } else { + LOG.warn("Interrupted deletion of " + context.fullPath); + } return; } catch (Exception e) { LOG.warn("Error deleting path " + context.fullPath + ": " + e); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index e839aafd2d2..306c728b65e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -411,8 +411,14 @@ public class MapTask extends Task { LOG.warn(msg, e); } } - throw new IOException("Initialization of all the collectors failed. " + - "Error in last collector was :" + lastException.getMessage(), lastException); + + if (lastException != null) { + throw new IOException("Initialization of all the collectors failed. " + + "Error in last collector was:" + lastException.toString(), + lastException); + } else { + throw new IOException("Initialization of all the collectors failed."); + } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java index 1c8ea720065..2e49f68edab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java @@ -113,19 +113,18 @@ public class TextOutputFormat extends FileOutputFormat { if (isCompressed) { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); + codec = ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); - if (!isCompressed) { - FSDataOutputStream fileOut = fs.create(file, false); - return new LineRecordWriter(fileOut, keyValueSeparator); + FSDataOutputStream fileOut = fs.create(file, false); + if (isCompressed) { + return new LineRecordWriter<>( + new DataOutputStream(codec.createOutputStream(fileOut)), + keyValueSeparator); } else { - FSDataOutputStream fileOut = fs.create(file, false); - return new LineRecordWriter(new DataOutputStream - (codec.createOutputStream(fileOut)), - keyValueSeparator); + return new LineRecordWriter<>(fileOut, keyValueSeparator); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index a8197716107..2b6dc57c349 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -433,25 +433,29 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { public synchronized MapHost getHost() throws InterruptedException { - while(pendingHosts.isEmpty()) { - wait(); - } + while(pendingHosts.isEmpty()) { + wait(); + } - MapHost host = null; - Iterator iter = pendingHosts.iterator(); - int numToPick = random.nextInt(pendingHosts.size()); - for (int i=0; i <= numToPick; ++i) { - host = iter.next(); - } + Iterator iter = pendingHosts.iterator(); + // Safe to take one because we know pendingHosts isn't empty + MapHost host = iter.next(); + int numToPick = random.nextInt(pendingHosts.size()); + for (int i = 0; i < numToPick; ++i) { + host = iter.next(); + } - pendingHosts.remove(host); - host.markBusy(); + pendingHosts.remove(host); + host.markBusy(); - LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() + - " to " + Thread.currentThread().getName()); - SHUFFLE_START.set(Time.monotonicNow()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to " + + Thread.currentThread().getName()); + } + SHUFFLE_START.set(Time.monotonicNow()); - return host; + return host; } public synchronized List getMapsForHost(MapHost host) { @@ -477,8 +481,10 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { host.addKnownMap(id); } } - LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " + - host + " to " + Thread.currentThread().getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " + host + + " to " + Thread.currentThread().getName()); + } return result; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/Pentomino.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/Pentomino.java index 5e636b901e7..2485728b079 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/Pentomino.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/dancing/Pentomino.java @@ -153,6 +153,11 @@ public class Pentomino { break; } } + + if (piece == null) { + continue; + } + // for each point where the piece was placed, mark it with the piece name for(ColumnName item: row) { if (item instanceof Point) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java index 30b50d8cf15..3e12a3d9619 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java @@ -73,14 +73,14 @@ class TeraScheduler { List readFile(String filename) throws IOException { List result = new ArrayList(10000); - BufferedReader in = new BufferedReader( - new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8)); - String line = in.readLine(); - while (line != null) { - result.add(line); - line = in.readLine(); + try (BufferedReader in = new BufferedReader( + new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8))) { + String line = in.readLine(); + while (line != null) { + result.add(line); + line = in.readLine(); + } } - in.close(); return result; }