MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.

This commit is contained in:
Jason Lowe 2015-06-24 15:29:11 +00:00
parent 2ba6465721
commit 72d08a0e41
4 changed files with 12 additions and 18 deletions

View File

@ -505,6 +505,10 @@ Release 2.8.0 - UNRELEASED
multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
AJISAKA via jlowe)
MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed
too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via
jlowe)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -335,6 +335,7 @@ class Fetcher<K,V> extends Thread {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
} catch (IOException e) {
IOUtils.cleanup(LOG, input);
//
// Setup connection again if disconnected by NM
connection.disconnect();

View File

@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
IFileInputStream iFin =
new IFileInputStream(input, compressedLength, conf);
try {
this.doShuffle(host, iFin, compressedLength,
decompressedLength, metrics, reporter);
} finally {
iFin.close();
}
doShuffle(host, new IFileInputStream(input, compressedLength, conf),
compressedLength, decompressedLength, metrics, reporter);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
@ -149,19 +150,13 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
inStream = CryptoUtils.wrapIfNecessary(job, inStream);
try {
inStream = CryptoUtils.wrapIfNecessary(job, inStream);
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
decompressedLength, metrics, reporter);
} finally {
try {
inStream.close();
} catch (IOException ioe) {
LOG.warn("IOException closing inputstream from map output: "
+ ioe.toString());
}
IOUtils.cleanup(LOG, inStream);
}
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,