diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 42293c66a93..7f6ef08ad29 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -286,6 +286,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-9260. Hadoop version may be not correct when starting name node or data node. (Chris Nauroth via jlowe) + HADOOP-9278. Fix the file handle leak in HarMetaData.parseMetaData() in + HarFileSystem. (Chris Nauroth via szetszwo) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 1f5a2a12612..a82fba7068f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -30,8 +30,11 @@ import java.util.TreeMap; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.Progressable; @@ -50,6 +53,9 @@ import org.apache.hadoop.util.Progressable; */ public class HarFileSystem extends FilterFileSystem { + + private static final Log LOG = LogFactory.getLog(HarFileSystem.class); + public static final int VERSION = 3; private static final Map harMetaCache = @@ -1025,68 +1031,69 @@ public class HarFileSystem extends FilterFileSystem { } private void parseMetaData() throws IOException { - FSDataInputStream in = fs.open(masterIndexPath); - FileStatus masterStat = fs.getFileStatus(masterIndexPath); - masterIndexTimestamp = masterStat.getModificationTime(); - LineReader lin = new LineReader(in, getConf()); - Text line = new Text(); - long read = lin.readLine(line); + Text line; + long read; + FSDataInputStream in = null; + LineReader lin = null; - // the first line contains the version of the index file - String versionLine = line.toString(); - String[] arr = versionLine.split(" "); - version = Integer.parseInt(arr[0]); - // make it always backwards-compatible - if (this.version > HarFileSystem.VERSION) { - throw new IOException("Invalid version " + - this.version + " expected " + HarFileSystem.VERSION); - } - - // each line contains a hashcode range and the index file name - String[] readStr = null; - while(read < masterStat.getLen()) { - int b = lin.readLine(line); - read += b; - readStr = line.toString().split(" "); - int startHash = Integer.parseInt(readStr[0]); - int endHash = Integer.parseInt(readStr[1]); - stores.add(new Store(Long.parseLong(readStr[2]), - Long.parseLong(readStr[3]), startHash, - endHash)); - line.clear(); - } try { - // close the master index - lin.close(); - } catch(IOException io){ - // do nothing just a read. + in = fs.open(masterIndexPath); + FileStatus masterStat = fs.getFileStatus(masterIndexPath); + masterIndexTimestamp = masterStat.getModificationTime(); + lin = new LineReader(in, getConf()); + line = new Text(); + read = lin.readLine(line); + + // the first line contains the version of the index file + String versionLine = line.toString(); + String[] arr = versionLine.split(" "); + version = Integer.parseInt(arr[0]); + // make it always backwards-compatible + if (this.version > HarFileSystem.VERSION) { + throw new IOException("Invalid version " + + this.version + " expected " + HarFileSystem.VERSION); + } + + // each line contains a hashcode range and the index file name + String[] readStr = null; + while(read < masterStat.getLen()) { + int b = lin.readLine(line); + read += b; + readStr = line.toString().split(" "); + int startHash = Integer.parseInt(readStr[0]); + int endHash = Integer.parseInt(readStr[1]); + stores.add(new Store(Long.parseLong(readStr[2]), + Long.parseLong(readStr[3]), startHash, + endHash)); + line.clear(); + } + } finally { + IOUtils.cleanup(LOG, lin, in); } FSDataInputStream aIn = fs.open(archiveIndexPath); - FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); - archiveIndexTimestamp = archiveStat.getModificationTime(); - LineReader aLin; - - // now start reading the real index file - for (Store s: stores) { - read = 0; - aIn.seek(s.begin); - aLin = new LineReader(aIn, getConf()); - while (read + s.begin < s.end) { - int tmp = aLin.readLine(line); - read += tmp; - String lineFeed = line.toString(); - String[] parsed = lineFeed.split(" "); - parsed[0] = decodeFileName(parsed[0]); - archive.put(new Path(parsed[0]), new HarStatus(lineFeed)); - line.clear(); - } - } try { - // close the archive index - aIn.close(); - } catch(IOException io) { - // do nothing just a read. + FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); + archiveIndexTimestamp = archiveStat.getModificationTime(); + LineReader aLin; + + // now start reading the real index file + for (Store s: stores) { + read = 0; + aIn.seek(s.begin); + aLin = new LineReader(aIn, getConf()); + while (read + s.begin < s.end) { + int tmp = aLin.readLine(line); + read += tmp; + String lineFeed = line.toString(); + String[] parsed = lineFeed.split(" "); + parsed[0] = decodeFileName(parsed[0]); + archive.put(new Path(parsed[0]), new HarStatus(lineFeed)); + line.clear(); + } + } + } finally { + IOUtils.cleanup(LOG, aIn); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 1681d6dfe70..682322dc640 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.util; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -39,7 +40,7 @@ import org.apache.hadoop.io.Text; */ @InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceStability.Unstable -public class LineReader { +public class LineReader implements Closeable { private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystemBasics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystemBasics.java index c11afbadd27..de7c3fd1745 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystemBasics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystemBasics.java @@ -28,6 +28,7 @@ import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Shell; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,8 +47,18 @@ public class TestHarFileSystemBasics { private static final String ROOT_PATH = System.getProperty("test.build.data", "build/test/data"); - private static final Path rootPath = new Path( - new File(ROOT_PATH).getAbsolutePath() + "/localfs"); + private static final Path rootPath; + static { + String root = new Path(new File(ROOT_PATH).getAbsolutePath(), "localfs") + .toUri().getPath(); + // Strip drive specifier on Windows, which would make the HAR URI invalid and + // cause tests to fail. + if (Shell.WINDOWS) { + root = root.substring(root.indexOf(':') + 1); + } + rootPath = new Path(root); + } + // NB: .har suffix is necessary private static final Path harPath = new Path(rootPath, "path1/path2/my.har");