From 7739f819a6c30252b42a2062dc8558ef1ca05d66 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Thu, 25 Dec 2014 23:29:54 +0900 Subject: [PATCH] HADOOP-11283. SequenceFile.Writer can leak file descriptors in DistCpV1#setup(). (Contributed by Varun Saxena via ozawa) (cherry picked from commit a164ce2ac985ecac957362fc717640ad45449371) --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/tools/DistCpV1.java | 60 +++++++++---------- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 63718d5aa15..75cfcc36e39 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -302,6 +302,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11414. FileBasedIPList#readLines() can leak file descriptors. (ozawa) + HADOOP-11283. SequenceFile.Writer can leak file descriptors in + DistCpV1#setup(). (Varun Saxena via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java index 6801d6f2758..c44b67b7328 100644 --- a/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java +++ b/hadoop-tools/hadoop-extras/src/main/java/org/apache/hadoop/tools/DistCpV1.java @@ -1258,42 +1258,40 @@ public class DistCpV1 implements Tool { FileSystem jobfs = jobDirectory.getFileSystem(jobConf); Path srcfilelist = new Path(jobDirectory, "_distcp_src_files"); - jobConf.set(SRC_LIST_LABEL, srcfilelist.toString()); - SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf, - srcfilelist, LongWritable.class, FilePair.class, - SequenceFile.CompressionType.NONE); - Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files"); - SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf, - dstfilelist, Text.class, Text.class, - SequenceFile.CompressionType.NONE); - Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs"); + jobConf.set(SRC_LIST_LABEL, srcfilelist.toString()); jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString()); - SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf, - dstdirlist, Text.class, FilePair.class, - SequenceFile.CompressionType.NONE); - - // handle the case where the destination directory doesn't exist - // and we've only a single src directory OR we're updating/overwriting - // the contents of the destination directory. - final boolean special = - (args.srcs.size() == 1 && !dstExists) || update || overwrite; int srcCount = 0, cnsyncf = 0, dirsyn = 0; long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L, skipFileCount = 0L, skipByteCount = 0L; - - Path basedir = null; - HashSet parentDirsToCopy = new HashSet(); - if (args.basedir != null) { - FileSystem basefs = args.basedir.getFileSystem(conf); - basedir = args.basedir.makeQualified(basefs); - if (!basefs.isDirectory(basedir)) { - throw new IOException("Basedir " + basedir + " is not a directory."); + try ( + SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, + jobConf, srcfilelist, LongWritable.class, FilePair.class, + SequenceFile.CompressionType.NONE); + SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, + jobConf, dstfilelist, Text.class, Text.class, + SequenceFile.CompressionType.NONE); + SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, + jobConf, dstdirlist, Text.class, FilePair.class, + SequenceFile.CompressionType.NONE) + ) { + // handle the case where the destination directory doesn't exist + // and we've only a single src directory OR we're updating/overwriting + // the contents of the destination directory. + final boolean special = + (args.srcs.size() == 1 && !dstExists) || update || overwrite; + + Path basedir = null; + HashSet parentDirsToCopy = new HashSet(); + if (args.basedir != null) { + FileSystem basefs = args.basedir.getFileSystem(conf); + basedir = args.basedir.makeQualified(basefs); + if (!basefs.isDirectory(basedir)) { + throw new IOException("Basedir " + basedir + " is not a directory."); + } } - } - - try { + for(Iterator srcItr = args.srcs.iterator(); srcItr.hasNext(); ) { final Path src = srcItr.next(); FileSystem srcfs = src.getFileSystem(conf); @@ -1426,10 +1424,6 @@ public class DistCpV1 implements Tool { } } } - } finally { - checkAndClose(src_writer); - checkAndClose(dst_writer); - checkAndClose(dir_writer); } LOG.info("sourcePathsCount(files+directories)=" + srcCount); LOG.info("filesToCopyCount=" + fileCount);