HDFS-10674. Optimize creating a full path from an inode. Contributed by Daryn Sharp.

(cherry picked from commit 22ef5286bc)
(cherry picked from commit a5d12d9c1f)
This commit is contained in:
Kihwal Lee 2016-08-03 13:15:28 -05:00 committed by Zhe Zhang
parent 7b5e12229d
commit e53f6fde46
5 changed files with 29 additions and 59 deletions

View File

@ -845,57 +845,6 @@ public EnumCounters<StorageType> getStorageTypeDeltas(byte storagePolicyID,
return typeSpaceDeltas;
}
/** Return the name of the path represented by inodes at [0, pos] */
static String getFullPathName(INode[] inodes, int pos) {
StringBuilder fullPathName = new StringBuilder();
if (inodes[0].isRoot()) {
if (pos == 0) return Path.SEPARATOR;
} else {
fullPathName.append(inodes[0].getLocalName());
}
for (int i=1; i<=pos; i++) {
fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
}
return fullPathName.toString();
}
/**
* @return the relative path of an inode from one of its ancestors,
* represented by an array of inodes.
*/
private static INode[] getRelativePathINodes(INode inode, INode ancestor) {
// calculate the depth of this inode from the ancestor
int depth = 0;
for (INode i = inode; i != null && !i.equals(ancestor); i = i.getParent()) {
depth++;
}
INode[] inodes = new INode[depth];
// fill up the inodes in the path from this inode to root
for (int i = 0; i < depth; i++) {
if (inode == null) {
NameNode.stateChangeLog.warn("Could not get full path."
+ " Corresponding file might have deleted already.");
return null;
}
inodes[depth-i-1] = inode;
inode = inode.getParent();
}
return inodes;
}
private static INode[] getFullPathINodes(INode inode) {
return getRelativePathINodes(inode, null);
}
/** Return the full path name of the specified inode */
static String getFullPathName(INode inode) {
INode[] inodes = getFullPathINodes(inode);
// inodes can be null only when its called without holding lock
return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
}
/**
* Add the given child to the namespace.
* @param existing the INodesInPath containing all the ancestral INodes
@ -947,9 +896,7 @@ static void verifyQuota(INodesInPath iip, int pos, QuotaCounts deltas,
try {
q.verifyQuota(deltas);
} catch (QuotaExceededException e) {
List<INode> inodes = iip.getReadOnlyINodes();
final String path = getFullPathName(inodes.toArray(new INode[inodes.size()]), i);
e.setPathName(path);
e.setPathName(iip.getPath(i));
throw e;
}
}

View File

@ -5740,7 +5740,7 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
final INodeFile inode = getBlockCollection(blk);
skip++;
if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
String src = FSDirectory.getFullPathName(inode);
String src = inode.getFullPathName();
if (src.startsWith(path)){
corruptFiles.add(new CorruptFileBlockInfo(src, blk));
count++;

View File

@ -574,7 +574,25 @@ public final byte[] getKey() {
public String getFullPathName() {
// Get the full path name of this inode.
return FSDirectory.getFullPathName(this);
if (isRoot()) {
return Path.SEPARATOR;
}
// compute size of needed bytes for the path
int idx = 0;
for (INode inode = this; inode != null; inode = inode.getParent()) {
// add component + delimiter (if not tail component)
idx += inode.getLocalNameBytes().length + (inode != this ? 1 : 0);
}
byte[] path = new byte[idx];
for (INode inode = this; inode != null; inode = inode.getParent()) {
if (inode != this) {
path[--idx] = Path.SEPARATOR_CHAR;
}
byte[] name = inode.getLocalNameBytes();
idx -= name.length;
System.arraycopy(name, 0, path, idx, name.length);
}
return DFSUtil.bytes2String(path);
}
@Override

View File

@ -347,11 +347,11 @@ public String getPath() {
}
public String getParentPath() {
return getPath(path.length - 1);
return getPath(path.length - 2);
}
public String getPath(int pos) {
return DFSUtil.byteArray2PathString(path, 0, pos);
return DFSUtil.byteArray2PathString(path, 0, pos + 1); // it's a length...
}
/**

View File

@ -156,6 +156,11 @@ public void testNonSnapshotPathINodes() throws Exception {
assertEquals(nodesInPath.getINode(components.length - 3).getFullPathName(),
dir.toString());
assertEquals(Path.SEPARATOR, nodesInPath.getPath(0));
assertEquals(dir.toString(), nodesInPath.getPath(1));
assertEquals(sub1.toString(), nodesInPath.getPath(2));
assertEquals(file1.toString(), nodesInPath.getPath(3));
nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, false);
assertEquals(nodesInPath.length(), components.length);
assertSnapshot(nodesInPath, false, null, -1);