From 72d08a0e41efda635baa985d55d67cb059a7c07c Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 24 Jun 2015 15:29:11 +0000 Subject: [PATCH] MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov. --- hadoop-mapreduce-project/CHANGES.txt | 4 ++++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 1 + .../task/reduce/IFileWrappedMapOutput.java | 10 ++-------- .../mapreduce/task/reduce/LocalFetcher.java | 15 +++++---------- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5eae44eb27d..6c650329e3d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -505,6 +505,10 @@ Release 2.8.0 - UNRELEASED multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira AJISAKA via jlowe) + MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed + too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via + jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 1e0338787ad..fb0ac0a7154 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -335,6 +335,7 @@ class Fetcher extends Thread { try { failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled); } catch (IOException e) { + IOUtils.cleanup(LOG, input); // // Setup connection again if disconnected by NM connection.disconnect(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java index 119db155879..6051c34d8d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java @@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput extends MapOutput { long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - IFileInputStream iFin = - new IFileInputStream(input, compressedLength, conf); - try { - this.doShuffle(host, iFin, compressedLength, - decompressedLength, metrics, reporter); - } finally { - iFin.close(); - } + doShuffle(host, new IFileInputStream(input, compressedLength, conf), + compressedLength, decompressedLength, metrics, reporter); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index de2382cf92f..f45742fe5dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.IndexRecord; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; @@ -149,19 +150,13 @@ class LocalFetcher extends Fetcher { // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); - - inStream = CryptoUtils.wrapIfNecessary(job, inStream); - try { + inStream = CryptoUtils.wrapIfNecessary(job, inStream); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); - mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); + mapOutput.shuffle(LOCALHOST, inStream, compressedLength, + decompressedLength, metrics, reporter); } finally { - try { - inStream.close(); - } catch (IOException ioe) { - LOG.warn("IOException closing inputstream from map output: " - + ioe.toString()); - } + IOUtils.cleanup(LOG, inStream); } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,