svn merge -c 1442755 from trunk for HADOOP-9278. Fix the file handle leak in HarMetaData.parseMetaData() in HarFileSystem.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1442756 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7126958734
commit
2d5f195730
|
@ -286,6 +286,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HADOOP-9260. Hadoop version may be not correct when starting name node or
|
HADOOP-9260. Hadoop version may be not correct when starting name node or
|
||||||
data node. (Chris Nauroth via jlowe)
|
data node. (Chris Nauroth via jlowe)
|
||||||
|
|
||||||
|
HADOOP-9278. Fix the file handle leak in HarMetaData.parseMetaData() in
|
||||||
|
HarFileSystem. (Chris Nauroth via szetszwo)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -30,8 +30,11 @@ import java.util.TreeMap;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.util.LineReader;
|
import org.apache.hadoop.util.LineReader;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -50,6 +53,9 @@ import org.apache.hadoop.util.Progressable;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public class HarFileSystem extends FilterFileSystem {
|
public class HarFileSystem extends FilterFileSystem {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
|
||||||
|
|
||||||
public static final int VERSION = 3;
|
public static final int VERSION = 3;
|
||||||
|
|
||||||
private static final Map<URI, HarMetaData> harMetaCache =
|
private static final Map<URI, HarMetaData> harMetaCache =
|
||||||
|
@ -1025,68 +1031,69 @@ public class HarFileSystem extends FilterFileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void parseMetaData() throws IOException {
|
private void parseMetaData() throws IOException {
|
||||||
FSDataInputStream in = fs.open(masterIndexPath);
|
Text line;
|
||||||
FileStatus masterStat = fs.getFileStatus(masterIndexPath);
|
long read;
|
||||||
masterIndexTimestamp = masterStat.getModificationTime();
|
FSDataInputStream in = null;
|
||||||
LineReader lin = new LineReader(in, getConf());
|
LineReader lin = null;
|
||||||
Text line = new Text();
|
|
||||||
long read = lin.readLine(line);
|
|
||||||
|
|
||||||
// the first line contains the version of the index file
|
|
||||||
String versionLine = line.toString();
|
|
||||||
String[] arr = versionLine.split(" ");
|
|
||||||
version = Integer.parseInt(arr[0]);
|
|
||||||
// make it always backwards-compatible
|
|
||||||
if (this.version > HarFileSystem.VERSION) {
|
|
||||||
throw new IOException("Invalid version " +
|
|
||||||
this.version + " expected " + HarFileSystem.VERSION);
|
|
||||||
}
|
|
||||||
|
|
||||||
// each line contains a hashcode range and the index file name
|
|
||||||
String[] readStr = null;
|
|
||||||
while(read < masterStat.getLen()) {
|
|
||||||
int b = lin.readLine(line);
|
|
||||||
read += b;
|
|
||||||
readStr = line.toString().split(" ");
|
|
||||||
int startHash = Integer.parseInt(readStr[0]);
|
|
||||||
int endHash = Integer.parseInt(readStr[1]);
|
|
||||||
stores.add(new Store(Long.parseLong(readStr[2]),
|
|
||||||
Long.parseLong(readStr[3]), startHash,
|
|
||||||
endHash));
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
// close the master index
|
in = fs.open(masterIndexPath);
|
||||||
lin.close();
|
FileStatus masterStat = fs.getFileStatus(masterIndexPath);
|
||||||
} catch(IOException io){
|
masterIndexTimestamp = masterStat.getModificationTime();
|
||||||
// do nothing just a read.
|
lin = new LineReader(in, getConf());
|
||||||
|
line = new Text();
|
||||||
|
read = lin.readLine(line);
|
||||||
|
|
||||||
|
// the first line contains the version of the index file
|
||||||
|
String versionLine = line.toString();
|
||||||
|
String[] arr = versionLine.split(" ");
|
||||||
|
version = Integer.parseInt(arr[0]);
|
||||||
|
// make it always backwards-compatible
|
||||||
|
if (this.version > HarFileSystem.VERSION) {
|
||||||
|
throw new IOException("Invalid version " +
|
||||||
|
this.version + " expected " + HarFileSystem.VERSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
// each line contains a hashcode range and the index file name
|
||||||
|
String[] readStr = null;
|
||||||
|
while(read < masterStat.getLen()) {
|
||||||
|
int b = lin.readLine(line);
|
||||||
|
read += b;
|
||||||
|
readStr = line.toString().split(" ");
|
||||||
|
int startHash = Integer.parseInt(readStr[0]);
|
||||||
|
int endHash = Integer.parseInt(readStr[1]);
|
||||||
|
stores.add(new Store(Long.parseLong(readStr[2]),
|
||||||
|
Long.parseLong(readStr[3]), startHash,
|
||||||
|
endHash));
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, lin, in);
|
||||||
}
|
}
|
||||||
|
|
||||||
FSDataInputStream aIn = fs.open(archiveIndexPath);
|
FSDataInputStream aIn = fs.open(archiveIndexPath);
|
||||||
FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
|
|
||||||
archiveIndexTimestamp = archiveStat.getModificationTime();
|
|
||||||
LineReader aLin;
|
|
||||||
|
|
||||||
// now start reading the real index file
|
|
||||||
for (Store s: stores) {
|
|
||||||
read = 0;
|
|
||||||
aIn.seek(s.begin);
|
|
||||||
aLin = new LineReader(aIn, getConf());
|
|
||||||
while (read + s.begin < s.end) {
|
|
||||||
int tmp = aLin.readLine(line);
|
|
||||||
read += tmp;
|
|
||||||
String lineFeed = line.toString();
|
|
||||||
String[] parsed = lineFeed.split(" ");
|
|
||||||
parsed[0] = decodeFileName(parsed[0]);
|
|
||||||
archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
// close the archive index
|
FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
|
||||||
aIn.close();
|
archiveIndexTimestamp = archiveStat.getModificationTime();
|
||||||
} catch(IOException io) {
|
LineReader aLin;
|
||||||
// do nothing just a read.
|
|
||||||
|
// now start reading the real index file
|
||||||
|
for (Store s: stores) {
|
||||||
|
read = 0;
|
||||||
|
aIn.seek(s.begin);
|
||||||
|
aLin = new LineReader(aIn, getConf());
|
||||||
|
while (read + s.begin < s.end) {
|
||||||
|
int tmp = aLin.readLine(line);
|
||||||
|
read += tmp;
|
||||||
|
String lineFeed = line.toString();
|
||||||
|
String[] parsed = lineFeed.split(" ");
|
||||||
|
parsed[0] = decodeFileName(parsed[0]);
|
||||||
|
archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, aIn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
@ -39,7 +40,7 @@ import org.apache.hadoop.io.Text;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class LineReader {
|
public class LineReader implements Closeable {
|
||||||
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
|
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
|
||||||
private int bufferSize = DEFAULT_BUFFER_SIZE;
|
private int bufferSize = DEFAULT_BUFFER_SIZE;
|
||||||
private InputStream in;
|
private InputStream in;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -46,8 +47,18 @@ public class TestHarFileSystemBasics {
|
||||||
|
|
||||||
private static final String ROOT_PATH = System.getProperty("test.build.data",
|
private static final String ROOT_PATH = System.getProperty("test.build.data",
|
||||||
"build/test/data");
|
"build/test/data");
|
||||||
private static final Path rootPath = new Path(
|
private static final Path rootPath;
|
||||||
new File(ROOT_PATH).getAbsolutePath() + "/localfs");
|
static {
|
||||||
|
String root = new Path(new File(ROOT_PATH).getAbsolutePath(), "localfs")
|
||||||
|
.toUri().getPath();
|
||||||
|
// Strip drive specifier on Windows, which would make the HAR URI invalid and
|
||||||
|
// cause tests to fail.
|
||||||
|
if (Shell.WINDOWS) {
|
||||||
|
root = root.substring(root.indexOf(':') + 1);
|
||||||
|
}
|
||||||
|
rootPath = new Path(root);
|
||||||
|
}
|
||||||
|
|
||||||
// NB: .har suffix is necessary
|
// NB: .har suffix is necessary
|
||||||
private static final Path harPath = new Path(rootPath, "path1/path2/my.har");
|
private static final Path harPath = new Path(rootPath, "path1/path2/my.har");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue