svn merge -c 1458741 from trunk for MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream does not close the wrapped InputStream.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1458747 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-03-20 10:43:21 +00:00
parent e25308f415
commit 84664ccde2
4 changed files with 13 additions and 6 deletions

View File

@ -70,6 +70,9 @@ Release 2.0.4-alpha - UNRELEASED
BUG FIXES
MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream
does not close the wrapped InputStream. (Chris Nauroth via szetszwo)
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES

View File

@ -124,7 +124,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
tmpTargetPath, true, BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath),
getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
}
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
@ -170,8 +170,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
int bufferSize, boolean mustCloseStream,
Mapper.Context context) throws IOException {
int bufferSize, Mapper.Context context)
throws IOException {
Path source = sourceFileStatus.getPath();
byte buf[] = new byte[bufferSize];
ThrottledInputStream inStream = null;
@ -187,8 +187,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
bytesRead = inStream.read(buf);
}
} finally {
if (mustCloseStream)
IOUtils.cleanup(LOG, outStream, inStream);
IOUtils.cleanup(LOG, outStream, inStream);
}
return totalBytesRead;

View File

@ -52,6 +52,11 @@ public class ThrottledInputStream extends InputStream {
this.maxBytesPerSec = maxBytesPerSec;
}
@Override
public void close() throws IOException {
rawStream.close();
}
/** @inheritDoc */
@Override
public int read() throws IOException {

View File

@ -101,7 +101,7 @@ public class TestIntegration {
try {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1", target.toString());
createFiles("singlefile1/file1", "target");
runTest(listFile, target, sync);