svn merge -c 1189546 from trunk for HDFS-1869.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-10-27 00:03:51 +00:00
parent ea7c670969
commit 77b6810f49
6 changed files with 83 additions and 63 deletions

View File

@ -736,6 +736,9 @@ Release 0.23.0 - Unreleased
HDFS-2501. Add version prefix and root methods to webhdfs. (szetszwo)
HDFS-1869. mkdirs should use the supplied permission for all of the created
directories. (Daryn Sharp via szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@ -1104,7 +1107,7 @@ Release 0.23.0 - Unreleased
webhdfs responses. (szetszwo)
HDFS-2428. Convert com.sun.jersey.api.ParamException$QueryParamException
to IllegalArgumentException and response it http BAD_REQUEST in webhdfs.
to IllegalArgumentException and response it as http BAD_REQUEST in webhdfs.
(szetszwo)
HDFS-2424. Added a root element "HdfsFileStatuses" for the response

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -258,7 +259,7 @@ public class FSDirectory implements Closeable {
clientMachine, clientNode);
writeLock();
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
} finally {
writeUnlock();
}
@ -301,7 +302,7 @@ public class FSDirectory implements Closeable {
writeLock();
try {
try {
newNode = addNode(path, newNode, diskspace, false);
newNode = addNode(path, newNode, diskspace);
if(newNode != null && blocks != null) {
int nrBlocks = blocks.length;
// Add file->block mapping
@ -328,7 +329,7 @@ public class FSDirectory implements Closeable {
try {
try {
newParent = rootDir.addToParent(src, newNode, parentINode,
false, propagateModTime);
propagateModTime);
cacheName(newNode);
} catch (FileNotFoundException e) {
return null;
@ -601,7 +602,7 @@ public class FSDirectory implements Closeable {
// add src to the destination
dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
srcChild, UNKNOWN_DISK_SPACE, false);
srcChild, UNKNOWN_DISK_SPACE);
if (dstChild != null) {
srcChild = null;
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -618,7 +619,7 @@ public class FSDirectory implements Closeable {
// put it back
srcChild.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild,
UNKNOWN_DISK_SPACE, false);
UNKNOWN_DISK_SPACE);
}
}
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -756,7 +757,7 @@ public class FSDirectory implements Closeable {
removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
// add src as dst to complete rename
dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
removedSrc, UNKNOWN_DISK_SPACE, false);
removedSrc, UNKNOWN_DISK_SPACE);
int filesDeleted = 0;
if (dstChild != null) {
@ -784,13 +785,13 @@ public class FSDirectory implements Closeable {
// Rename failed - restore src
removedSrc.setLocalName(srcChildName);
addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc,
UNKNOWN_DISK_SPACE, false);
UNKNOWN_DISK_SPACE);
}
if (removedDst != null) {
// Rename failed - restore dst
removedDst.setLocalName(dstChildName);
addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst,
UNKNOWN_DISK_SPACE, false);
UNKNOWN_DISK_SPACE);
}
}
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@ -1461,9 +1462,10 @@ public class FSDirectory implements Closeable {
* @param src string representation of the path to the directory
* @param permissions the permission of the directory
* @param inheritPermission if the permission of the directory should inherit
* from its parent or not. The automatically created
* ones always inherit its permission from its parent
* @param isAutocreate if the permission of the directory should inherit
* from its parent or not. u+wx is implicitly added to
* the automatically created directories, and to the
* given directory if inheritPermission is true
* @param now creation time
* @return true if the operation succeeds false otherwise
* @throws FileNotFoundException if an ancestor or itself is a file
@ -1479,6 +1481,7 @@ public class FSDirectory implements Closeable {
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
INode[] inodes = new INode[components.length];
final int lastInodeIndex = inodes.length - 1;
writeLock();
try {
@ -1495,12 +1498,44 @@ public class FSDirectory implements Closeable {
}
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission
? inodes[i-1].getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
}
// create directories beginning from the first null index
for(; i < inodes.length; i++) {
pathbuilder.append(Path.SEPARATOR + names[i]);
String cur = pathbuilder.toString();
unprotectedMkdir(inodes, i, components[i], permissions,
inheritPermission || i != components.length-1, now);
unprotectedMkdir(inodes, i, components[i],
(i < lastInodeIndex) ? parentPermissions : permissions, now);
if (inodes[i] == null) {
return false;
}
@ -1531,7 +1566,7 @@ public class FSDirectory implements Closeable {
rootDir.getExistingPathINodes(components, inodes, false);
unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
permissions, false, timestamp);
permissions, timestamp);
return inodes[inodes.length-1];
}
@ -1540,19 +1575,19 @@ public class FSDirectory implements Closeable {
* All ancestors exist. Newly created one stored at index pos.
*/
private void unprotectedMkdir(INode[] inodes, int pos,
byte[] name, PermissionStatus permission, boolean inheritPermission,
byte[] name, PermissionStatus permission,
long timestamp) throws QuotaExceededException {
assert hasWriteLock();
inodes[pos] = addChild(inodes, pos,
new INodeDirectory(name, permission, timestamp),
-1, inheritPermission );
-1);
}
/** Add a node child to the namespace. The full path name of the node is src.
* childDiskspace should be -1, if unknown.
* QuotaExceededException is thrown if it violates quota limit */
private <T extends INode> T addNode(String src, T child,
long childDiskspace, boolean inheritPermission)
long childDiskspace)
throws QuotaExceededException, UnresolvedLinkException {
byte[][] components = INode.getPathComponents(src);
byte[] path = components[components.length-1];
@ -1562,8 +1597,7 @@ public class FSDirectory implements Closeable {
writeLock();
try {
rootDir.getExistingPathINodes(components, inodes, false);
return addChild(inodes, inodes.length-1, child, childDiskspace,
inheritPermission);
return addChild(inodes, inodes.length-1, child, childDiskspace);
} finally {
writeUnlock();
}
@ -1691,7 +1725,7 @@ public class FSDirectory implements Closeable {
* Its ancestors are stored at [0, pos-1].
* QuotaExceededException is thrown if it violates quota limit */
private <T extends INode> T addChild(INode[] pathComponents, int pos,
T child, long childDiskspace, boolean inheritPermission,
T child, long childDiskspace,
boolean checkQuota) throws QuotaExceededException {
// The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add
@ -1714,7 +1748,7 @@ public class FSDirectory implements Closeable {
throw new NullPointerException("Panic: parent does not exist");
}
T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
child, inheritPermission, true);
child, true);
if (addedNode == null) {
updateCount(pathComponents, pos, -counts.getNsCount(),
-childDiskspace, true);
@ -1723,18 +1757,16 @@ public class FSDirectory implements Closeable {
}
private <T extends INode> T addChild(INode[] pathComponents, int pos,
T child, long childDiskspace, boolean inheritPermission)
T child, long childDiskspace)
throws QuotaExceededException {
return addChild(pathComponents, pos, child, childDiskspace,
inheritPermission, true);
return addChild(pathComponents, pos, child, childDiskspace, true);
}
private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
int pos, T child, long childDiskspace, boolean inheritPermission) {
int pos, T child, long childDiskspace) {
T inode = null;
try {
inode = addChild(pathComponents, pos, child, childDiskspace,
inheritPermission, false);
inode = addChild(pathComponents, pos, child, childDiskspace, false);
} catch (QuotaExceededException e) {
NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e);
}
@ -2144,7 +2176,7 @@ public class FSDirectory implements Closeable {
assert hasWriteLock();
INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
} catch (UnresolvedLinkException e) {
/* All UnresolvedLinkExceptions should have been resolved by now, but we
* should re-throw them in case that changes so they are not swallowed

View File

@ -261,25 +261,13 @@ class INodeDirectory extends INode {
* Add a child inode to the directory.
*
* @param node INode to insert
* @param inheritPermission inherit permission from parent?
* @param setModTime set modification time for the parent node
* not needed when replaying the addition and
* the parent already has the proper mod time
* @return null if the child with this name already exists;
* node, otherwise
*/
<T extends INode> T addChild(final T node, boolean inheritPermission,
boolean setModTime) {
if (inheritPermission) {
FsPermission p = getFsPermission();
//make sure the permission has wx for the user
if (!p.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
p = new FsPermission(p.getUserAction().or(FsAction.WRITE_EXECUTE),
p.getGroupAction(), p.getOtherAction());
}
node.setPermission(p);
}
<T extends INode> T addChild(final T node, boolean setModTime) {
if (children == null) {
children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
}
@ -297,31 +285,22 @@ class INodeDirectory extends INode {
return node;
}
/**
* Equivalent to addNode(path, newNode, false).
* @see #addNode(String, INode, boolean)
*/
<T extends INode> T addNode(String path, T newNode)
throws FileNotFoundException, UnresolvedLinkException {
return addNode(path, newNode, false);
}
/**
* Add new INode to the file tree.
* Find the parent and insert
*
* @param path file path
* @param newNode INode to be added
* @param inheritPermission If true, copy the parent's permission to newNode.
* @return null if the node already exists; inserted INode, otherwise
* @throws FileNotFoundException if parent does not exist or
* @throws UnresolvedLinkException if any path component is a symbolic link
* is not a directory.
*/
<T extends INode> T addNode(String path, T newNode, boolean inheritPermission
<T extends INode> T addNode(String path, T newNode
) throws FileNotFoundException, UnresolvedLinkException {
byte[][] pathComponents = getPathComponents(path);
if(addToParent(pathComponents, newNode,
inheritPermission, true) == null)
true) == null)
return null;
return newNode;
}
@ -338,13 +317,12 @@ class INodeDirectory extends INode {
INodeDirectory addToParent( byte[] localname,
INode newNode,
INodeDirectory parent,
boolean inheritPermission,
boolean propagateModTime
) throws FileNotFoundException,
UnresolvedLinkException {
// insert into the parent children list
newNode.name = localname;
if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
if(parent.addChild(newNode, propagateModTime) == null)
return null;
return parent;
}
@ -380,7 +358,6 @@ class INodeDirectory extends INode {
*/
INodeDirectory addToParent( byte[][] pathComponents,
INode newNode,
boolean inheritPermission,
boolean propagateModTime
) throws FileNotFoundException,
UnresolvedLinkException {
@ -391,7 +368,7 @@ class INodeDirectory extends INode {
newNode.name = pathComponents[pathLen-1];
// insert into the parent children list
INodeDirectory parent = getParent(pathComponents);
if(parent.addChild(newNode, inheritPermission, propagateModTime) == null)
if(parent.addChild(newNode, propagateModTime) == null)
return null;
return parent;
}

View File

@ -165,7 +165,7 @@ public class TestFsLimits {
Class<?> generated = null;
try {
fs.verifyFsLimits(inodes, 1, child);
rootInode.addChild(child, false, false);
rootInode.addChild(child, false);
} catch (QuotaExceededException e) {
generated = e.getClass();
}

View File

@ -111,10 +111,18 @@ public class TestPermission extends TestCase {
FsPermission dirPerm = new FsPermission((short)0777);
fs.mkdirs(new Path("/a1/a2/a3"), dirPerm);
checkPermission(fs, "/a1", inheritPerm);
checkPermission(fs, "/a1/a2", inheritPerm);
checkPermission(fs, "/a1", dirPerm);
checkPermission(fs, "/a1/a2", dirPerm);
checkPermission(fs, "/a1/a2/a3", dirPerm);
dirPerm = new FsPermission((short)0123);
FsPermission permission = FsPermission.createImmutable(
(short)(dirPerm.toShort() | 0300));
fs.mkdirs(new Path("/aa/1/aa/2/aa/3"), dirPerm);
checkPermission(fs, "/aa/1", permission);
checkPermission(fs, "/aa/1/aa/2", permission);
checkPermission(fs, "/aa/1/aa/2/aa/3", dirPerm);
FsPermission filePerm = new FsPermission((short)0444);
FSDataOutputStream out = fs.create(new Path("/b1/b2/b3.txt"), filePerm,
true, conf.getInt("io.file.buffer.size", 4096),
@ -126,7 +134,7 @@ public class TestPermission extends TestCase {
checkPermission(fs, "/b1/b2/b3.txt", filePerm);
conf.set(FsPermission.UMASK_LABEL, "022");
FsPermission permission =
permission =
FsPermission.createImmutable((short)0666);
FileSystem.mkdirs(fs, new Path("/c1"), new FsPermission(permission));
FileSystem.create(fs, new Path("/c1/c2.txt"),

View File

@ -139,11 +139,11 @@ public class TestINodeFile {
assertEquals("f", inf.getFullPathName());
assertEquals("", inf.getLocalParentDir());
dir.addChild(inf, false, false);
dir.addChild(inf, false);
assertEquals("d"+Path.SEPARATOR+"f", inf.getFullPathName());
assertEquals("d", inf.getLocalParentDir());
root.addChild(dir, false, false);
root.addChild(dir, false);
assertEquals(Path.SEPARATOR+"d"+Path.SEPARATOR+"f", inf.getFullPathName());
assertEquals(Path.SEPARATOR+"d", dir.getFullPathName());