HDFS-4209. Clean up the addNode/addChild/addChildNoQuotaCheck methods in FSDirectory and INodeDirectory.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1414447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-27 22:36:20 +00:00
parent d67cdb1784
commit 9047eb5162
9 changed files with 126 additions and 156 deletions

View File

@ -170,6 +170,9 @@ Trunk (Unreleased)
HDFS-4200. Reduce the size of synchronized sections in PacketResponder. HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
(suresh) (suresh)
HDFS-4209. Clean up the addNode/addChild/addChildNoQuotaCheck methods in
FSDirectory and INodeDirectory. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -233,7 +233,7 @@ public class DFSUtil {
/** /**
* Given a list of path components returns a path as a UTF8 String * Given a list of path components returns a path as a UTF8 String
*/ */
public static String byteArray2String(byte[][] pathComponents) { public static String byteArray2PathString(byte[][] pathComponents) {
if (pathComponents.length == 0) if (pathComponents.length == 0)
return ""; return "";
if (pathComponents.length == 1 && pathComponents[0].length == 0) { if (pathComponents.length == 1 && pathComponents[0].length == 0) {
@ -254,6 +254,14 @@ public class DFSUtil {
return null; return null;
} }
/** Convert an object representing a path to a string. */
public static String path2String(final Object path) {
return path == null? null
: path instanceof String? (String)path
: path instanceof byte[][]? byteArray2PathString((byte[][])path)
: path.toString();
}
/** /**
* Splits the array of bytes into array of arrays of bytes * Splits the array of bytes into array of arrays of bytes
* on byte separator * on byte separator

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -83,7 +84,6 @@ public class FSDirectory implements Closeable {
FSImage fsImage; FSImage fsImage;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
private volatile boolean ready = false; private volatile boolean ready = false;
private static final long UNKNOWN_DISK_SPACE = -1;
private final int maxComponentLength; private final int maxComponentLength;
private final int maxDirItems; private final int maxDirItems;
private final int lsLimit; // max list limit private final int lsLimit; // max list limit
@ -257,13 +257,14 @@ public class FSDirectory implements Closeable {
permissions,replication, permissions,replication,
preferredBlockSize, modTime, clientName, preferredBlockSize, modTime, clientName,
clientMachine, clientNode); clientMachine, clientNode);
boolean added = false;
writeLock(); writeLock();
try { try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); added = addINode(path, newNode);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
if (newNode == null) { if (!added) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path); NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
return null; return null;
} }
@ -283,7 +284,7 @@ public class FSDirectory implements Closeable {
boolean underConstruction, boolean underConstruction,
String clientName, String clientName,
String clientMachine) { String clientMachine) {
INode newNode; final INode newNode;
assert hasWriteLock(); assert hasWriteLock();
if (underConstruction) { if (underConstruction) {
newNode = new INodeFileUnderConstruction( newNode = new INodeFileUnderConstruction(
@ -296,16 +297,17 @@ public class FSDirectory implements Closeable {
} }
try { try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); if (addINode(path, newNode)) {
return newNode;
}
} catch (IOException e) { } catch (IOException e) {
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug( NameNode.stateChangeLog.debug(
"DIR* FSDirectory.unprotectedAddFile: exception when add " + path "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
+ " to the file system", e); + " to the file system", e);
} }
return null;
} }
return newNode; return null;
} }
/** /**
@ -547,12 +549,12 @@ public class FSDirectory implements Closeable {
// Ensure dst has quota to accommodate rename // Ensure dst has quota to accommodate rename
verifyQuotaForRename(srcInodes, dstInodes); verifyQuotaForRename(srcInodes, dstInodes);
INode dstChild = null; boolean added = false;
INode srcChild = null; INode srcChild = null;
String srcChildName = null; String srcChildName = null;
try { try {
// remove src // remove src
srcChild = removeChild(srcInodesInPath, srcInodes.length-1); srcChild = removeLastINode(srcInodesInPath);
if (srcChild == null) { if (srcChild == null) {
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+ "failed to rename " + src + " to " + dst + "failed to rename " + src + " to " + dst
@ -563,9 +565,8 @@ public class FSDirectory implements Closeable {
srcChild.setLocalName(dstComponents[dstInodes.length-1]); srcChild.setLocalName(dstComponents[dstInodes.length-1]);
// add src to the destination // add src to the destination
dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1, added = addLastINodeNoQuotaCheck(dstInodesInPath, srcChild);
srcChild, UNKNOWN_DISK_SPACE); if (added) {
if (dstChild != null) {
srcChild = null; srcChild = null;
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
@ -577,11 +578,10 @@ public class FSDirectory implements Closeable {
return true; return true;
} }
} finally { } finally {
if (dstChild == null && srcChild != null) { if (!added && srcChild != null) {
// put it back // put it back
srcChild.setLocalName(srcChildName); srcChild.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild, addLastINodeNoQuotaCheck(srcInodesInPath, srcChild);
UNKNOWN_DISK_SPACE);
} }
} }
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -701,7 +701,7 @@ public class FSDirectory implements Closeable {
// Ensure dst has quota to accommodate rename // Ensure dst has quota to accommodate rename
verifyQuotaForRename(srcInodes, dstInodes); verifyQuotaForRename(srcInodes, dstInodes);
INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1); INode removedSrc = removeLastINode(srcInodesInPath);
if (removedSrc == null) { if (removedSrc == null) {
error = "Failed to rename " + src + " to " + dst error = "Failed to rename " + src + " to " + dst
+ " because the source can not be removed"; + " because the source can not be removed";
@ -714,18 +714,13 @@ public class FSDirectory implements Closeable {
INode removedDst = null; INode removedDst = null;
try { try {
if (dstInode != null) { // dst exists remove it if (dstInode != null) { // dst exists remove it
removedDst = removeChild(dstInodesInPath, dstInodes.length - 1); removedDst = removeLastINode(dstInodesInPath);
dstChildName = removedDst.getLocalName(); dstChildName = removedDst.getLocalName();
} }
INode dstChild = null;
removedSrc.setLocalName(dstComponents[dstInodes.length - 1]); removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
// add src as dst to complete rename // add src as dst to complete rename
dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, if (addLastINodeNoQuotaCheck(dstInodesInPath, removedSrc)) {
removedSrc, UNKNOWN_DISK_SPACE);
int filesDeleted = 0;
if (dstChild != null) {
removedSrc = null; removedSrc = null;
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug( NameNode.stateChangeLog.debug(
@ -736,6 +731,7 @@ public class FSDirectory implements Closeable {
dstInodes[dstInodes.length - 2].setModificationTime(timestamp); dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
// Collect the blocks and remove the lease for previous dst // Collect the blocks and remove the lease for previous dst
int filesDeleted = 0;
if (removedDst != null) { if (removedDst != null) {
INode rmdst = removedDst; INode rmdst = removedDst;
removedDst = null; removedDst = null;
@ -749,14 +745,12 @@ public class FSDirectory implements Closeable {
if (removedSrc != null) { if (removedSrc != null) {
// Rename failed - restore src // Rename failed - restore src
removedSrc.setLocalName(srcChildName); removedSrc.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc, addLastINodeNoQuotaCheck(srcInodesInPath, removedSrc);
UNKNOWN_DISK_SPACE);
} }
if (removedDst != null) { if (removedDst != null) {
// Rename failed - restore dst // Rename failed - restore dst
removedDst.setLocalName(dstChildName); removedDst.setLocalName(dstChildName);
addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst, addLastINodeNoQuotaCheck(dstInodesInPath, removedDst);
UNKNOWN_DISK_SPACE);
} }
} }
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -1055,14 +1049,13 @@ public class FSDirectory implements Closeable {
" because the root is not allowed to be deleted"); " because the root is not allowed to be deleted");
return 0; return 0;
} }
int pos = inodes.length - 1;
// Remove the node from the namespace // Remove the node from the namespace
targetNode = removeChild(inodesInPath, pos); targetNode = removeLastINode(inodesInPath);
if (targetNode == null) { if (targetNode == null) {
return 0; return 0;
} }
// set the parent's modification time // set the parent's modification time
inodes[pos-1].setModificationTime(mtime); inodes[inodes.length - 2].setModificationTime(mtime);
int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks); int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: " NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
@ -1091,7 +1084,7 @@ public class FSDirectory implements Closeable {
/* Currently oldnode and newnode are assumed to contain the same /* Currently oldnode and newnode are assumed to contain the same
* blocks. Otherwise, blocks need to be removed from the blocksMap. * blocks. Otherwise, blocks need to be removed from the blocksMap.
*/ */
rootDir.addNode(path, newnode); rootDir.addINode(path, newnode);
int index = 0; int index = 0;
for (BlockInfo b : newnode.getBlocks()) { for (BlockInfo b : newnode.getBlocks()) {
@ -1193,21 +1186,6 @@ public class FSDirectory implements Closeable {
} }
} }
/**
* Get the parent node of path.
*
* @param path the path to explore
* @return its parent node
*/
INodeDirectory getParent(byte[][] path)
throws FileNotFoundException, UnresolvedLinkException {
readLock();
try {
return rootDir.getParent(path);
} finally {
readUnlock();
}
}
/** /**
* Check whether the filepath could be created * Check whether the filepath could be created
@ -1249,20 +1227,17 @@ public class FSDirectory implements Closeable {
* @param nsDelta the delta change of namespace * @param nsDelta the delta change of namespace
* @param dsDelta the delta change of diskspace * @param dsDelta the delta change of diskspace
* @throws QuotaExceededException if the new count violates any quota limit * @throws QuotaExceededException if the new count violates any quota limit
* @throws FileNotFound if path does not exist. * @throws FileNotFoundException if path does not exist.
*/ */
void updateSpaceConsumed(String path, long nsDelta, long dsDelta) void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
throws QuotaExceededException, throws QuotaExceededException, FileNotFoundException, UnresolvedLinkException {
FileNotFoundException,
UnresolvedLinkException {
writeLock(); writeLock();
try { try {
final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false); final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
final INode[] inodes = inodesInPath.getINodes(); final INode[] inodes = inodesInPath.getINodes();
int len = inodes.length; int len = inodes.length;
if (inodes[len - 1] == null) { if (inodes[len - 1] == null) {
throw new FileNotFoundException(path + throw new FileNotFoundException("Path not found: " + path);
" does not exist under rootDir.");
} }
updateCount(inodesInPath, len-1, nsDelta, dsDelta, true); updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
} finally { } finally {
@ -1486,15 +1461,17 @@ public class FSDirectory implements Closeable {
long timestamp) throws QuotaExceededException { long timestamp) throws QuotaExceededException {
assert hasWriteLock(); assert hasWriteLock();
final INodeDirectory dir = new INodeDirectory(name, permission, timestamp); final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
final INode inode = addChild(inodesInPath, pos, dir, -1, true); if (addChild(inodesInPath, pos, dir, true)) {
inodesInPath.setINode(pos, inode); inodesInPath.setINode(pos, dir);
}
} }
/** Add a node child to the namespace. The full path name of the node is src. /**
* childDiskspace should be -1, if unknown. * Add the given child to the namespace.
* @param src The full path name of the child node.
* @throw QuotaExceededException is thrown if it violates quota limit * @throw QuotaExceededException is thrown if it violates quota limit
*/ */
private <T extends INode> T addNode(String src, T child, long childDiskspace private boolean addINode(String src, INode child
) throws QuotaExceededException, UnresolvedLinkException { ) throws QuotaExceededException, UnresolvedLinkException {
byte[][] components = INode.getPathComponents(src); byte[][] components = INode.getPathComponents(src);
byte[] path = components[components.length-1]; byte[] path = components[components.length-1];
@ -1504,8 +1481,7 @@ public class FSDirectory implements Closeable {
try { try {
INodesInPath inodesInPath = rootDir.getExistingPathINodes(components, INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
components.length, false); components.length, false);
return addChild(inodesInPath, inodesInPath.getINodes().length-1, child, return addLastINode(inodesInPath, child, true);
childDiskspace, true);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -1629,14 +1605,24 @@ public class FSDirectory implements Closeable {
} }
} }
/**
* The same as {@link #addChild(INodesInPath, int, INode, boolean)}
* with pos = length - 1.
*/
private boolean addLastINode(INodesInPath inodesInPath,
INode inode, boolean checkQuota) throws QuotaExceededException {
final int pos = inodesInPath.getINodes().length - 1;
return addChild(inodesInPath, pos, inode, checkQuota);
}
/** Add a node child to the inodes at index pos. /** Add a node child to the inodes at index pos.
* Its ancestors are stored at [0, pos-1]. * Its ancestors are stored at [0, pos-1].
* @return the added node. * @return false if the child with this name already exists;
* otherwise return true;
* @throw QuotaExceededException is thrown if it violates quota limit * @throw QuotaExceededException is thrown if it violates quota limit
*/ */
private <T extends INode> T addChild(INodesInPath inodesInPath, int pos, private boolean addChild(INodesInPath inodesInPath, int pos,
T child, long childDiskspace, INode child, boolean checkQuota) throws QuotaExceededException {
boolean checkQuota) throws QuotaExceededException {
final INode[] inodes = inodesInPath.getINodes(); final INode[] inodes = inodesInPath.getINodes();
// The filesystem limits are not really quotas, so this check may appear // The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add // odd. It's because a rename operation deletes the src, tries to add
@ -1650,38 +1636,34 @@ public class FSDirectory implements Closeable {
INode.DirCounts counts = new INode.DirCounts(); INode.DirCounts counts = new INode.DirCounts();
child.spaceConsumedInTree(counts); child.spaceConsumedInTree(counts);
if (childDiskspace < 0) { updateCount(inodesInPath, pos, counts.getNsCount(), counts.getDsCount(), checkQuota);
childDiskspace = counts.getDsCount();
}
updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
if (inodes[pos-1] == null) { if (inodes[pos-1] == null) {
throw new NullPointerException("Panic: parent does not exist"); throw new NullPointerException("Panic: parent does not exist");
} }
final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true); final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
if (addedNode == null) { if (!added) {
updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true); updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
} }
return addedNode; return added;
} }
private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath, private boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
int pos, T child, long childDiskspace) {
T inode = null;
try { try {
inode = addChild(inodesInPath, pos, child, childDiskspace, false); return addLastINode(inodesInPath, i, false);
} catch (QuotaExceededException e) { } catch (QuotaExceededException e) {
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
} }
return inode; return false;
} }
/** Remove an inode at index pos from the namespace. /**
* Its ancestors are stored at [0, pos-1]. * Remove the last inode in the path from the namespace.
* Count of each ancestor with quota is also updated. * Count of each ancestor with quota is also updated.
* Return the removed node; null if the removal fails. * @return the removed node; null if the removal fails.
*/ */
private INode removeChild(final INodesInPath inodesInPath, int pos) { private INode removeLastINode(final INodesInPath inodesInPath) {
final INode[] inodes = inodesInPath.getINodes(); final INode[] inodes = inodesInPath.getINodes();
final int pos = inodes.length - 1;
INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]); INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
if (removedNode != null) { if (removedNode != null) {
INode.DirCounts counts = new INode.DirCounts(); INode.DirCounts counts = new INode.DirCounts();
@ -1800,14 +1782,15 @@ public class FSDirectory implements Closeable {
* See {@link ClientProtocol#setQuota(String, long, long)} for the contract. * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* Sets quota for for a directory. * Sets quota for for a directory.
* @returns INodeDirectory if any of the quotas have changed. null other wise. * @returns INodeDirectory if any of the quotas have changed. null other wise.
* @throws FileNotFoundException if the path does not exist or is a file * @throws FileNotFoundException if the path does not exist.
* @throws PathIsNotDirectoryException if the path is not a directory.
* @throws QuotaExceededException if the directory tree size is * @throws QuotaExceededException if the directory tree size is
* greater than the given quota * greater than the given quota
* @throws UnresolvedLinkException if a symlink is encountered in src. * @throws UnresolvedLinkException if a symlink is encountered in src.
*/ */
INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota) INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException, throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException { QuotaExceededException, UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
// sanity check // sanity check
if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET && if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
@ -1823,15 +1806,10 @@ public class FSDirectory implements Closeable {
final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true); final INodesInPath inodesInPath = rootDir.getExistingPathINodes(src, true);
final INode[] inodes = inodesInPath.getINodes(); final INode[] inodes = inodesInPath.getINodes();
INode targetNode = inodes[inodes.length-1]; INodeDirectory dirNode = INodeDirectory.valueOf(inodes[inodes.length-1], srcs);
if (targetNode == null) { if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
throw new FileNotFoundException("Directory does not exist: " + srcs);
} else if (!targetNode.isDirectory()) {
throw new FileNotFoundException("Cannot set quota on a file: " + srcs);
} else if (targetNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
throw new IllegalArgumentException("Cannot clear namespace quota on root."); throw new IllegalArgumentException("Cannot clear namespace quota on root.");
} else { // a directory inode } else { // a directory inode
INodeDirectory dirNode = (INodeDirectory)targetNode;
long oldNsQuota = dirNode.getNsQuota(); long oldNsQuota = dirNode.getNsQuota();
long oldDsQuota = dirNode.getDsQuota(); long oldDsQuota = dirNode.getDsQuota();
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) { if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
@ -1865,13 +1843,12 @@ public class FSDirectory implements Closeable {
} }
/** /**
* See {@link ClientProtocol#setQuota(String, long, long)} for the * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
* contract.
* @see #unprotectedSetQuota(String, long, long) * @see #unprotectedSetQuota(String, long, long)
*/ */
void setQuota(String src, long nsQuota, long dsQuota) void setQuota(String src, long nsQuota, long dsQuota)
throws FileNotFoundException, QuotaExceededException, throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException { QuotaExceededException, UnresolvedLinkException {
writeLock(); writeLock();
try { try {
INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota); INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
@ -2080,7 +2057,7 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException, QuotaExceededException { throws UnresolvedLinkException, QuotaExceededException {
assert hasWriteLock(); assert hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm); final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm);
return addNode(path, symlink, UNKNOWN_DISK_SPACE); return addINode(path, symlink)? symlink: null;
} }
/** /**

View File

@ -288,7 +288,7 @@ class FSImageFormat {
} }
// check if the new inode belongs to the same parent // check if the new inode belongs to the same parent
if(!isParent(pathComponents, parentPath)) { if(!isParent(pathComponents, parentPath)) {
parentINode = fsDir.getParent(pathComponents); parentINode = fsDir.rootDir.getParent(pathComponents);
parentPath = getParent(pathComponents); parentPath = getParent(pathComponents);
} }
@ -305,7 +305,7 @@ class FSImageFormat {
*/ */
void addToParent(INodeDirectory parent, INode child) { void addToParent(INodeDirectory parent, INode child) {
// NOTE: This does not update space counts for parents // NOTE: This does not update space counts for parents
if (parent.addChild(child, false) == null) { if (!parent.addChild(child, false)) {
return; return;
} }
namesystem.dir.cacheName(child); namesystem.dir.cacheName(child);

View File

@ -18,12 +18,12 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -36,13 +36,14 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
class INodeDirectory extends INode { class INodeDirectory extends INode {
/** Cast INode to INodeDirectory. */ /** Cast INode to INodeDirectory. */
public static INodeDirectory valueOf(INode inode, String path public static INodeDirectory valueOf(INode inode, Object path
) throws IOException { ) throws FileNotFoundException, PathIsNotDirectoryException {
if (inode == null) { if (inode == null) {
throw new IOException("Directory does not exist: " + path); throw new FileNotFoundException("Directory does not exist: "
+ DFSUtil.path2String(path));
} }
if (!inode.isDirectory()) { if (!inode.isDirectory()) {
throw new IOException("Path is not a directory: " + path); throw new PathIsNotDirectoryException(DFSUtil.path2String(path));
} }
return (INodeDirectory)inode; return (INodeDirectory)inode;
} }
@ -277,16 +278,17 @@ class INodeDirectory extends INode {
* @param setModTime set modification time for the parent node * @param setModTime set modification time for the parent node
* not needed when replaying the addition and * not needed when replaying the addition and
* the parent already has the proper mod time * the parent already has the proper mod time
* @return null if the child with this name already exists; * @return false if the child with this name already exists;
* node, otherwise * otherwise, return true;
*/ */
<T extends INode> T addChild(final T node, boolean setModTime) { boolean addChild(final INode node, final boolean setModTime) {
if (children == null) { if (children == null) {
children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY); children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
} }
final int low = searchChildren(node); final int low = searchChildren(node);
if(low >= 0) if (low >= 0) {
return null; return false;
}
node.parent = this; node.parent = this;
children.add(-low - 1, node); children.add(-low - 1, node);
// update modification time of the parent directory // update modification time of the parent directory
@ -295,7 +297,7 @@ class INodeDirectory extends INode {
if (node.getGroupName() == null) { if (node.getGroupName() == null) {
node.setGroup(getGroupName()); node.setGroup(getGroupName());
} }
return node; return true;
} }
/** /**
@ -304,53 +306,32 @@ class INodeDirectory extends INode {
* *
* @param path file path * @param path file path
* @param newNode INode to be added * @param newNode INode to be added
* @return null if the node already exists; inserted INode, otherwise * @return false if the node already exists; otherwise, return true;
* @throws FileNotFoundException if parent does not exist or * @throws FileNotFoundException if parent does not exist or
* @throws UnresolvedLinkException if any path component is a symbolic link * @throws UnresolvedLinkException if any path component is a symbolic link
* is not a directory. * is not a directory.
*/ */
<T extends INode> T addNode(String path, T newNode boolean addINode(String path, INode newNode
) throws FileNotFoundException, UnresolvedLinkException { ) throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException {
byte[][] pathComponents = getPathComponents(path); byte[][] pathComponents = getPathComponents(path);
return addToParent(pathComponents, newNode, true) == null? null: newNode;
}
INodeDirectory getParent(byte[][] pathComponents
) throws FileNotFoundException, UnresolvedLinkException {
if (pathComponents.length < 2) // add root
return null;
// Gets the parent INode
INodesInPath inodes = getExistingPathINodes(pathComponents, 2, false);
INode inode = inodes.inodes[0];
if (inode == null) {
throw new FileNotFoundException("Parent path does not exist: "+
DFSUtil.byteArray2String(pathComponents));
}
if (!inode.isDirectory()) {
throw new FileNotFoundException("Parent path is not a directory: "+
DFSUtil.byteArray2String(pathComponents));
}
return (INodeDirectory)inode;
}
/**
* Add new inode
* Optimized version of addNode()
*
* @return parent INode if new inode is inserted
* or null if it already exists.
* @throws FileNotFoundException if parent does not exist or
* is not a directory.
*/
INodeDirectory addToParent(byte[][] pathComponents, INode newNode,
boolean propagateModTime) throws FileNotFoundException, UnresolvedLinkException {
if (pathComponents.length < 2) { // add root if (pathComponents.length < 2) { // add root
return null; return false;
} }
newNode.setLocalName(pathComponents[pathComponents.length - 1]); newNode.setLocalName(pathComponents[pathComponents.length - 1]);
// insert into the parent children list // insert into the parent children list
INodeDirectory parent = getParent(pathComponents); INodeDirectory parent = getParent(pathComponents);
return parent.addChild(newNode, propagateModTime) == null? null: parent; return parent.addChild(newNode, true);
}
INodeDirectory getParent(byte[][] pathComponents
) throws FileNotFoundException, PathIsNotDirectoryException,
UnresolvedLinkException {
if (pathComponents.length < 2) // add root
return null;
// Gets the parent INode
INodesInPath inodes = getExistingPathINodes(pathComponents, 2, false);
return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
} }
@Override @Override

View File

@ -33,7 +33,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@InterfaceAudience.Private @InterfaceAudience.Private
public class INodeFile extends INode implements BlockCollection { public class INodeFile extends INode implements BlockCollection {
/** Cast INode to INodeFile. */ /** Cast INode to INodeFile. */
public static INodeFile valueOf(INode inode, String path) throws IOException { public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException {
if (inode == null) { if (inode == null) {
throw new FileNotFoundException("File does not exist: " + path); throw new FileNotFoundException("File does not exist: " + path);
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -36,10 +37,10 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection { class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
/** Cast INode to INodeFileUnderConstruction. */ /** Cast INode to INodeFileUnderConstruction. */
public static INodeFileUnderConstruction valueOf(INode inode, String path public static INodeFileUnderConstruction valueOf(INode inode, String path
) throws IOException { ) throws FileNotFoundException {
final INodeFile file = INodeFile.valueOf(inode, path); final INodeFile file = INodeFile.valueOf(inode, path);
if (!file.isUnderConstruction()) { if (!file.isUnderConstruction()) {
throw new IOException("File is not under construction: " + path); throw new FileNotFoundException("File is not under construction: " + path);
} }
return (INodeFileUnderConstruction)file; return (INodeFileUnderConstruction)file;
} }

View File

@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@ -239,8 +240,8 @@ public class TestINodeFile {
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(IOException ioe) { } catch(FileNotFoundException e) {
assertTrue(ioe.getMessage().contains("Directory does not exist")); assertTrue(e.getMessage().contains("Directory does not exist"));
} }
} }
@ -264,8 +265,7 @@ public class TestINodeFile {
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(IOException ioe) { } catch(PathIsNotDirectoryException e) {
assertTrue(ioe.getMessage().contains("Path is not a directory"));
} }
} }
@ -286,8 +286,7 @@ public class TestINodeFile {
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(IOException ioe) { } catch(PathIsNotDirectoryException e) {
assertTrue(ioe.getMessage().contains("Path is not a directory"));
} }
} }

View File

@ -15488,7 +15488,7 @@
<comparators> <comparators>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>Cannot set quota on a file: /test/file1</expected-output> <expected-output>setQuota: `/test/file1': Is not a directory</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>