HADOOP-17362. reduce RPC calls doing ls on HAR file (#2444). Contributed by Daryn Sharp and Ahmed Hussein

(cherry picked from commit ebe1d1fbf7)
This commit is contained in:
Ahmed Hussein 2020-11-13 14:22:35 -06:00 committed by Jim Brennan
parent d78bf82722
commit 75ca0c0f23
3 changed files with 37 additions and 39 deletions

View File

@ -35,6 +35,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@ -513,41 +514,22 @@ public class HarFileSystem extends FileSystem {
if (!parentString.endsWith(Path.SEPARATOR)){ if (!parentString.endsWith(Path.SEPARATOR)){
parentString += Path.SEPARATOR; parentString += Path.SEPARATOR;
} }
Path harPath = new Path(parentString);
int harlen = harPath.depth();
final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
for (HarStatus hstatus : metadata.archive.values()) { for (String child: parent.children) {
String child = hstatus.getName(); Path p = new Path(parentString + child);
if ((child.startsWith(parentString))) { statuses.add(toFileStatus(metadata.archive.get(p)));
Path thisPath = new Path(child);
if (thisPath.depth() == harlen + 1) {
statuses.add(toFileStatus(hstatus, cache));
}
}
} }
} }
/** /**
* Combine the status stored in the index and the underlying status. * Combine the status stored in the index and the underlying status.
* @param h status stored in the index * @param h status stored in the index
* @param cache caching the underlying file statuses
* @return the combined file status * @return the combined file status
* @throws IOException * @throws IOException
*/ */
private FileStatus toFileStatus(HarStatus h, private FileStatus toFileStatus(HarStatus h) throws IOException {
Map<String, FileStatus> cache) throws IOException { final Path p = h.isDir ? archivePath : new Path(archivePath, h.partName);
FileStatus underlying = null; FileStatus underlying = metadata.getPartFileStatus(p);
if (cache != null) {
underlying = cache.get(h.partName);
}
if (underlying == null) {
final Path p = h.isDir? archivePath: new Path(archivePath, h.partName);
underlying = fs.getFileStatus(p);
if (cache != null) {
cache.put(h.partName, underlying);
}
}
long modTime = 0; long modTime = 0;
int version = metadata.getVersion(); int version = metadata.getVersion();
@ -658,7 +640,7 @@ public class HarFileSystem extends FileSystem {
@Override @Override
public FileStatus getFileStatus(Path f) throws IOException { public FileStatus getFileStatus(Path f) throws IOException {
HarStatus hstatus = getFileHarStatus(f); HarStatus hstatus = getFileHarStatus(f);
return toFileStatus(hstatus, null); return toFileStatus(hstatus);
} }
private HarStatus getFileHarStatus(Path f) throws IOException { private HarStatus getFileHarStatus(Path f) throws IOException {
@ -815,7 +797,7 @@ public class HarFileSystem extends FileSystem {
if (hstatus.isDir()) { if (hstatus.isDir()) {
fileStatusesInIndex(hstatus, statuses); fileStatusesInIndex(hstatus, statuses);
} else { } else {
statuses.add(toFileStatus(hstatus, null)); statuses.add(toFileStatus(hstatus));
} }
return statuses.toArray(new FileStatus[statuses.size()]); return statuses.toArray(new FileStatus[statuses.size()]);
@ -1143,7 +1125,8 @@ public class HarFileSystem extends FileSystem {
List<Store> stores = new ArrayList<Store>(); List<Store> stores = new ArrayList<Store>();
Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>(); Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>(); // keys are always the internal har path.
private Map<Path, FileStatus> partFileStatuses = new ConcurrentHashMap<>();
public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) { public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
this.fs = fs; this.fs = fs;
@ -1151,16 +1134,23 @@ public class HarFileSystem extends FileSystem {
this.archiveIndexPath = archiveIndexPath; this.archiveIndexPath = archiveIndexPath;
} }
public FileStatus getPartFileStatus(Path partPath) throws IOException { public FileStatus getPartFileStatus(Path path) throws IOException {
Path partPath = getPathInHar(path);
FileStatus status; FileStatus status;
status = partFileStatuses.get(partPath); status = partFileStatuses.get(partPath);
if (status == null) { if (status == null) {
status = fs.getFileStatus(partPath); status = fs.getFileStatus(path);
partFileStatuses.put(partPath, status); partFileStatuses.put(partPath, status);
} }
return status; return status;
} }
private void addPartFileStatuses(Path path) throws IOException {
for (FileStatus stat : fs.listStatus(path)) {
partFileStatuses.put(getPathInHar(stat.getPath()), stat);
}
}
public long getMasterIndexTimestamp() { public long getMasterIndexTimestamp() {
return masterIndexTimestamp; return masterIndexTimestamp;
} }
@ -1217,16 +1207,22 @@ public class HarFileSystem extends FileSystem {
try { try {
FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
archiveIndexTimestamp = archiveStat.getModificationTime(); archiveIndexTimestamp = archiveStat.getModificationTime();
LineReader aLin;
// pre-populate part cache.
addPartFileStatuses(archiveIndexPath.getParent());
LineReader aLin = null;
// now start reading the real index file // now start reading the real index file
long pos = -1;
for (Store s: stores) { for (Store s: stores) {
read = 0; if (pos != s.begin) {
aIn.seek(s.begin); pos = s.begin;
aLin = new LineReader(aIn, getConf()); aIn.seek(s.begin);
while (read + s.begin < s.end) { aLin = new LineReader(aIn, getConf());
int tmp = aLin.readLine(line); }
read += tmp;
while (pos < s.end) {
pos += aLin.readLine(line);
String lineFeed = line.toString(); String lineFeed = line.toString();
String[] parsed = lineFeed.split(" "); String[] parsed = lineFeed.split(" ");
parsed[0] = decodeFileName(parsed[0]); parsed[0] = decodeFileName(parsed[0]);

View File

@ -41,7 +41,6 @@ import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import static org.apache.hadoop.fs.Options.ChecksumOpt; import static org.apache.hadoop.fs.Options.ChecksumOpt;

View File

@ -33,7 +33,10 @@ import java.net.URI;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** /**
* This test class checks basic operations with {@link HarFileSystem} including * This test class checks basic operations with {@link HarFileSystem} including