HDFS-6330. Merge r1602484 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1602485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-06-13 18:06:57 +00:00
parent e58355f9aa
commit 99b18b0019
4 changed files with 185 additions and 191 deletions

View File

@ -175,6 +175,8 @@ Release 2.5.0 - UNRELEASED
HDFS-3493. Invalidate excess corrupted blocks as long as minimum HDFS-3493. Invalidate excess corrupted blocks as long as minimum
replication is satisfied. (Juan Yu and Vinayakumar B via wang) replication is satisfied. (Juan Yu and Vinayakumar B via wang)
HDFS-6330. Move mkdirs() to FSNamesystem. (wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -316,19 +316,7 @@ public class FSDirectory implements Closeable {
UnresolvedLinkException, SnapshotAccessControlException, AclException { UnresolvedLinkException, SnapshotAccessControlException, AclException {
waitForReady(); waitForReady();
// Always do an implicit mkdirs for parent directory tree.
long modTime = now(); long modTime = now();
Path parent = new Path(path).getParent();
if (parent == null) {
// Trying to add "/" as a file - this path has no
// parent -- avoids an NPE below.
return null;
}
if (!mkdirs(parent.toString(), permissions, true, modTime)) {
return null;
}
INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null, INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication, permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
preferredBlockSize); preferredBlockSize);
@ -1808,112 +1796,6 @@ public class FSDirectory implements Closeable {
return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1); return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
} }
/**
* Create a directory
* If ancestor directories do not exist, automatically create them.
* @param src string representation of the path to the directory
* @param permissions the permission of the directory
* @param inheritPermission if the permission of the directory should inherit
* from its parent or not. u+wx is implicitly added to
* the automatically created directories, and to the
* given directory if inheritPermission is true
* @param now creation time
* @return true if the operation succeeds false otherwise
* @throws QuotaExceededException if directory creation violates
* any quota limit
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException,
AclException {
src = normalizePath(src);
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
final int lastInodeIndex = components.length - 1;
writeLock();
try {
INodesInPath iip = getExistingPathINodes(components);
if (iip.isSnapshot()) {
throw new SnapshotAccessControlException(
"Modification on RO snapshot is disallowed");
}
INode[] inodes = iip.getINodes();
// find the index of the first null in inodes[]
StringBuilder pathbuilder = new StringBuilder();
int i = 1;
for(; i < inodes.length && inodes[i] != null; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
if (!inodes[i].isDirectory()) {
throw new FileAlreadyExistsException("Parent path is not a directory: "
+ pathbuilder+ " "+inodes[i].getLocalName());
}
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission
? inodes[i-1].getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
}
// create directories beginning from the first null index
for(; i < inodes.length; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
unprotectedMkdir(namesystem.allocateNewInodeId(), iip, i,
components[i], (i < lastInodeIndex) ? parentPermissions
: permissions, null, now);
if (inodes[i] == null) {
return false;
}
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
if (getFSNamesystem() != null)
NameNode.getNameNodeMetrics().incrFilesCreated();
final String cur = pathbuilder.toString();
fsImage.getEditLog().logMkDir(cur, inodes[i]);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"DIR* FSDirectory.mkdirs: created directory " + cur);
}
}
} finally {
writeUnlock();
}
return true;
}
INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions, INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
List<AclEntry> aclEntries, long timestamp) List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, UnresolvedLinkException, AclException { throws QuotaExceededException, UnresolvedLinkException, AclException {
@ -1931,7 +1813,7 @@ public class FSDirectory implements Closeable {
* The parent path to the directory is at [0, pos-1]. * The parent path to the directory is at [0, pos-1].
* All ancestors exist. Newly created one stored at index pos. * All ancestors exist. Newly created one stored at index pos.
*/ */
private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath, void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
int pos, byte[] name, PermissionStatus permission, int pos, byte[] name, PermissionStatus permission,
List<AclEntry> aclEntries, long timestamp) List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, AclException { throws QuotaExceededException, AclException {
@ -2244,9 +2126,7 @@ public class FSDirectory implements Closeable {
return 1; return 1;
} }
/** static String normalizePath(String src) {
*/
String normalizePath(String src) {
if (src.length() > 1 && src.endsWith("/")) { if (src.length() > 1 && src.endsWith("/")) {
src = src.substring(0, src.length() - 1); src = src.substring(0, src.length() - 1);
} }
@ -2585,47 +2465,19 @@ public class FSDirectory implements Closeable {
} }
/** /**
* Add the given symbolic link to the fs. Record it in the edits log. * Add the specified path into the namespace.
*/ */
INodeSymlink addSymlink(String path, String target, INodeSymlink addSymlink(long id, String path, String target,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache) long mtime, long atime, PermissionStatus perm)
throws UnresolvedLinkException, FileAlreadyExistsException, throws UnresolvedLinkException, QuotaExceededException {
QuotaExceededException, SnapshotAccessControlException, AclException {
waitForReady();
final long modTime = now();
if (createParent) {
final String parent = new Path(path).getParent().toString();
if (!mkdirs(parent, dirPerms, true, modTime)) {
return null;
}
}
final String userName = dirPerms.getUserName();
INodeSymlink newNode = null;
long id = namesystem.allocateNewInodeId();
writeLock(); writeLock();
try { try {
newNode = unprotectedAddSymlink(id, path, target, modTime, modTime, return unprotectedAddSymlink(id, path, target, mtime, atime, perm);
new PermissionStatus(userName, null, FsPermission.getDefault()));
} finally { } finally {
writeUnlock(); writeUnlock();
} }
if (newNode == null) {
NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
return null;
}
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
}
return newNode;
} }
/**
* Add the specified path into the namespace. Invoked from edit log processing.
*/
INodeSymlink unprotectedAddSymlink(long id, String path, String target, INodeSymlink unprotectedAddSymlink(long id, String path, String target,
long mtime, long atime, PermissionStatus perm) long mtime, long atime, PermissionStatus perm)
throws UnresolvedLinkException, QuotaExceededException { throws UnresolvedLinkException, QuotaExceededException {

View File

@ -148,7 +148,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -170,6 +170,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@ -2069,7 +2070,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkFsObjectLimit(); checkFsObjectLimit();
// add symbolic link to namespace // add symbolic link to namespace
dir.addSymlink(link, target, dirPerms, createParent, logRetryCache); addSymlink(link, target, dirPerms, createParent, logRetryCache);
resultingStat = getAuditFileInfo(link, false); resultingStat = getAuditFileInfo(link, false);
} finally { } finally {
writeUnlock(); writeUnlock();
@ -2321,8 +2322,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DatanodeDescriptor clientNode = final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
INodeFile newNode = dir.addFile(src, permissions, replication, blockSize, INodeFile newNode = null;
holder, clientMachine, clientNode);
// Always do an implicit mkdirs for parent directory tree.
Path parent = new Path(src).getParent();
if (parent != null && mkdirsRecursively(parent.toString(),
permissions, true, now())) {
newNode = dir.addFile(src, permissions, replication, blockSize,
holder, clientMachine, clientNode);
}
if (newNode == null) { if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace"); throw new IOException("Unable to add " + src + " to namespace");
} }
@ -3694,12 +3703,118 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// create multiple inodes. // create multiple inodes.
checkFsObjectLimit(); checkFsObjectLimit();
if (!dir.mkdirs(src, permissions, false, now())) { if (!mkdirsRecursively(src, permissions, false, now())) {
throw new IOException("Failed to create directory: " + src); throw new IOException("Failed to create directory: " + src);
} }
return true; return true;
} }
/**
* Create a directory
* If ancestor directories do not exist, automatically create them.
* @param src string representation of the path to the directory
* @param permissions the permission of the directory
* @param inheritPermission if the permission of the directory should inherit
* from its parent or not. u+wx is implicitly added to
* the automatically created directories, and to the
* given directory if inheritPermission is true
* @param now creation time
* @return true if the operation succeeds false otherwise
* @throws QuotaExceededException if directory creation violates
* any quota limit
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
private boolean mkdirsRecursively(String src, PermissionStatus permissions,
boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException,
AclException {
src = FSDirectory.normalizePath(src);
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
final int lastInodeIndex = components.length - 1;
dir.writeLock();
try {
INodesInPath iip = dir.getExistingPathINodes(components);
if (iip.isSnapshot()) {
throw new SnapshotAccessControlException(
"Modification on RO snapshot is disallowed");
}
INode[] inodes = iip.getINodes();
// find the index of the first null in inodes[]
StringBuilder pathbuilder = new StringBuilder();
int i = 1;
for(; i < inodes.length && inodes[i] != null; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
if (!inodes[i].isDirectory()) {
throw new FileAlreadyExistsException(
"Parent path is not a directory: "
+ pathbuilder + " "+inodes[i].getLocalName());
}
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission
? inodes[i-1].getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
}
// create directories beginning from the first null index
for(; i < inodes.length; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
dir.unprotectedMkdir(allocateNewInodeId(), iip, i, components[i],
(i < lastInodeIndex) ? parentPermissions : permissions, null,
now);
if (inodes[i] == null) {
return false;
}
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
final String cur = pathbuilder.toString();
getEditLog().logMkDir(cur, inodes[i]);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"mkdirs: created directory " + cur);
}
}
} finally {
dir.writeUnlock();
}
return true;
}
/** /**
* Get the content summary for a specific file/dir. * Get the content summary for a specific file/dir.
* *
@ -4403,6 +4518,46 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
/**
* Add the given symbolic link to the fs. Record it in the edits log.
* @param path
* @param target
* @param dirPerms
* @param createParent
* @param logRetryCache
* @param dir
*/
private INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms,
boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException, AclException {
dir.waitForReady();
final long modTime = now();
if (createParent) {
final String parent = new Path(path).getParent().toString();
if (!mkdirsRecursively(parent, dirPerms, true, modTime)) {
return null;
}
}
final String userName = dirPerms.getUserName();
long id = allocateNewInodeId();
INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
new PermissionStatus(userName, null, FsPermission.getDefault()));
if (newNode == null) {
NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
return null;
}
getEditLog().logSymlink(path, target, modTime, modTime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("addSymlink: " + path + " is added");
}
return newNode;
}
/** /**
* Periodically calls hasAvailableResources of NameNodeResourceChecker, and if * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
* there are found to be insufficient resources available, causes the NN to * there are found to be insufficient resources available, causes the NN to

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -45,45 +46,28 @@ import org.junit.Test;
public class TestFsLimits { public class TestFsLimits {
static Configuration conf; static Configuration conf;
static INode[] inodes; static FSNamesystem fs;
static FSDirectory fs;
static boolean fsIsReady; static boolean fsIsReady;
static final PermissionStatus perms static final PermissionStatus perms
= new PermissionStatus("admin", "admin", FsPermission.getDefault()); = new PermissionStatus("admin", "admin", FsPermission.getDefault());
static private FSImage getMockFSImage() { static private FSNamesystem getMockNamesystem() throws IOException {
FSEditLog editLog = mock(FSEditLog.class);
FSImage fsImage = mock(FSImage.class); FSImage fsImage = mock(FSImage.class);
when(fsImage.getEditLog()).thenReturn(editLog); FSEditLog editLog = mock(FSEditLog.class);
return fsImage; doReturn(editLog).when(fsImage).getEditLog();
} FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.getFSDirectory().setReady(fsIsReady);
static private FSNamesystem getMockNamesystem() {
FSNamesystem fsn = mock(FSNamesystem.class);
when(
fsn.createFsOwnerPermissions((FsPermission)anyObject())
).thenReturn(
new PermissionStatus("root", "wheel", FsPermission.getDefault())
);
return fsn; return fsn;
} }
private static class MockFSDirectory extends FSDirectory {
public MockFSDirectory() throws IOException {
super(getMockFSImage(), getMockNamesystem(), conf);
setReady(fsIsReady);
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
}
}
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
conf = new Configuration(); conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(MiniDFSCluster.getBaseDirectory(), fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
"namenode")).toString()); "namenode")).toString());
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
fs = null; fs = null;
fsIsReady = true; fsIsReady = true;
} }
@ -197,9 +181,10 @@ public class TestFsLimits {
lazyInitFSDirectory(); lazyInitFSDirectory();
Class<?> generated = null; Class<?> generated = null;
try { try {
fs.mkdirs(name, perms, false, now()); fs.mkdirs(name, perms, false);
} catch (Throwable e) { } catch (Throwable e) {
generated = e.getClass(); generated = e.getClass();
e.printStackTrace();
} }
assertEquals(expected, generated); assertEquals(expected, generated);
} }
@ -209,7 +194,7 @@ public class TestFsLimits {
lazyInitFSDirectory(); lazyInitFSDirectory();
Class<?> generated = null; Class<?> generated = null;
try { try {
fs.renameTo(src, dst, now(), new Rename[] { }); fs.renameTo(src, dst, new Rename[] { });
} catch (Throwable e) { } catch (Throwable e) {
generated = e.getClass(); generated = e.getClass();
} }
@ -222,7 +207,7 @@ public class TestFsLimits {
lazyInitFSDirectory(); lazyInitFSDirectory();
Class<?> generated = null; Class<?> generated = null;
try { try {
fs.renameTo(src, dst, now()); fs.renameTo(src, dst);
} catch (Throwable e) { } catch (Throwable e) {
generated = e.getClass(); generated = e.getClass();
} }
@ -232,7 +217,7 @@ public class TestFsLimits {
private static void lazyInitFSDirectory() throws IOException { private static void lazyInitFSDirectory() throws IOException {
// have to create after the caller has had a chance to set conf values // have to create after the caller has had a chance to set conf values
if (fs == null) { if (fs == null) {
fs = new MockFSDirectory(); fs = getMockNamesystem();
} }
} }
} }