diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5771ecc6a56..d9487733cc2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -70,6 +70,9 @@ Release 2.0.4-alpha - UNRELEASED BUG FIXES + MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream + does not close the wrapped InputStream. (Chris Nauroth via szetszwo) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index a7e8381be9e..ccd2eab34ae 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -124,7 +124,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { tmpTargetPath, true, BUFFER_SIZE, getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context)); - return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context); + return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context); } private void compareFileLengths(FileStatus sourceFileStatus, Path target, @@ -170,8 +170,8 @@ public class RetriableFileCopyCommand extends RetriableCommand { } private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, - int bufferSize, boolean mustCloseStream, - Mapper.Context context) throws IOException { + int bufferSize, Mapper.Context context) + throws IOException { Path source = sourceFileStatus.getPath(); byte buf[] = new byte[bufferSize]; ThrottledInputStream inStream = null; @@ -187,8 +187,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { bytesRead = inStream.read(buf); } } finally { - if (mustCloseStream) - IOUtils.cleanup(LOG, outStream, inStream); + IOUtils.cleanup(LOG, outStream, inStream); } return totalBytesRead; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index cf442f4a8ae..75ae86ad054 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -52,6 +52,11 @@ public class ThrottledInputStream extends InputStream { this.maxBytesPerSec = maxBytesPerSec; } + @Override + public void close() throws IOException { + rawStream.close(); + } + /** @inheritDoc */ @Override public int read() throws IOException { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index ca08e25478a..6d9947703d8 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -101,7 +101,7 @@ public class TestIntegration { try { addEntries(listFile, "singlefile1/file1"); - createFiles("singlefile1/file1", target.toString()); + createFiles("singlefile1/file1", "target"); runTest(listFile, target, sync);