HADOOP-11283. SequenceFile.Writer can leak file descriptors in DistCpV1#setup(). (Contributed by Varun Saxena via ozawa)

(cherry picked from commit a164ce2ac9)
This commit is contained in:
Tsuyoshi Ozawa 2014-12-25 23:29:54 +09:00
parent 4ca49c8634
commit 7739f819a6
2 changed files with 30 additions and 33 deletions

View File

@ -302,6 +302,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11414. FileBasedIPList#readLines() can leak file descriptors.
(ozawa)
HADOOP-11283. SequenceFile.Writer can leak file descriptors in
DistCpV1#setup(). (Varun Saxena via ozawa)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -1258,42 +1258,40 @@ public class DistCpV1 implements Tool {
FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
srcfilelist, LongWritable.class, FilePair.class,
SequenceFile.CompressionType.NONE);
Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
dstfilelist, Text.class, Text.class,
SequenceFile.CompressionType.NONE);
Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
dstdirlist, Text.class, FilePair.class,
SequenceFile.CompressionType.NONE);
// handle the case where the destination directory doesn't exist
// and we've only a single src directory OR we're updating/overwriting
// the contents of the destination directory.
final boolean special =
(args.srcs.size() == 1 && !dstExists) || update || overwrite;
int srcCount = 0, cnsyncf = 0, dirsyn = 0;
long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L,
skipFileCount = 0L, skipByteCount = 0L;
Path basedir = null;
HashSet<Path> parentDirsToCopy = new HashSet<Path>();
if (args.basedir != null) {
FileSystem basefs = args.basedir.getFileSystem(conf);
basedir = args.basedir.makeQualified(basefs);
if (!basefs.isDirectory(basedir)) {
throw new IOException("Basedir " + basedir + " is not a directory.");
try (
SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs,
jobConf, srcfilelist, LongWritable.class, FilePair.class,
SequenceFile.CompressionType.NONE);
SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs,
jobConf, dstfilelist, Text.class, Text.class,
SequenceFile.CompressionType.NONE);
SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs,
jobConf, dstdirlist, Text.class, FilePair.class,
SequenceFile.CompressionType.NONE)
) {
// handle the case where the destination directory doesn't exist
// and we've only a single src directory OR we're updating/overwriting
// the contents of the destination directory.
final boolean special =
(args.srcs.size() == 1 && !dstExists) || update || overwrite;
Path basedir = null;
HashSet<Path> parentDirsToCopy = new HashSet<Path>();
if (args.basedir != null) {
FileSystem basefs = args.basedir.getFileSystem(conf);
basedir = args.basedir.makeQualified(basefs);
if (!basefs.isDirectory(basedir)) {
throw new IOException("Basedir " + basedir + " is not a directory.");
}
}
}
try {
for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
final Path src = srcItr.next();
FileSystem srcfs = src.getFileSystem(conf);
@ -1426,10 +1424,6 @@ public class DistCpV1 implements Tool {
}
}
}
} finally {
checkAndClose(src_writer);
checkAndClose(dst_writer);
checkAndClose(dir_writer);
}
LOG.info("sourcePathsCount(files+directories)=" + srcCount);
LOG.info("filesToCopyCount=" + fileCount);