HDFS-7462. Consolidate implementation of mkdirs() into a single class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-12-02 14:53:45 -08:00
parent 52bcefca8b
commit 185e0c7b4c
17 changed files with 346 additions and 320 deletions

View File

@ -416,6 +416,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7438. Consolidate the implementation of rename() into a single class.
(wheat9)
HDFS-7462. Consolidate implementation of mkdirs() into a single class.
(wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.util.Time.now;
class FSDirMkdirOp {
static HdfsFileStatus mkdirs(
FSNamesystem fsn, String src, PermissionStatus permissions,
boolean createParent) throws IOException {
FSDirectory fsd = fsn.getFSDirectory();
final String srcArg = src;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath
(src);
src = fsd.resolvePath(pc, src, pathComponents);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, src);
}
if (!isDirMutable(fsd, src)) {
if (fsd.isPermissionEnabled()) {
fsd.checkAncestorAccess(pc, src, FsAction.WRITE);
}
if (!createParent) {
fsd.verifyParentDir(src);
}
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation might need to
// create multiple inodes.
fsn.checkFsObjectLimit();
if (!mkdirsRecursively(fsd, src, permissions, false, now())) {
throw new IOException("Failed to create directory: " + src);
}
}
return fsd.getAuditFileInfo(srcArg, false);
}
static INode unprotectedMkdir(
FSDirectory fsd, long inodeId, String src,
PermissionStatus permissions, List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, UnresolvedLinkException, AclException {
assert fsd.hasWriteLock();
byte[][] components = INode.getPathComponents(src);
INodesInPath iip = fsd.getExistingPathINodes(components);
INode[] inodes = iip.getINodes();
final int pos = inodes.length - 1;
unprotectedMkdir(fsd, inodeId, iip, pos, components[pos], permissions,
aclEntries, timestamp);
return inodes[pos];
}
/**
* Create a directory
* If ancestor directories do not exist, automatically create them.
* @param fsd FSDirectory
* @param src string representation of the path to the directory
* @param permissions the permission of the directory
* @param inheritPermission
* if the permission of the directory should inherit from its parent or not.
* u+wx is implicitly added to the automatically created directories,
* and to the given directory if inheritPermission is true
* @param now creation time
* @return true if the operation succeeds false otherwise
* @throws QuotaExceededException if directory creation violates
* any quota limit
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
static boolean mkdirsRecursively(
FSDirectory fsd, String src, PermissionStatus permissions,
boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException,
AclException {
src = FSDirectory.normalizePath(src);
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
final int lastInodeIndex = components.length - 1;
fsd.writeLock();
try {
INodesInPath iip = fsd.getExistingPathINodes(components);
if (iip.isSnapshot()) {
throw new SnapshotAccessControlException(
"Modification on RO snapshot is disallowed");
}
INode[] inodes = iip.getINodes();
// find the index of the first null in inodes[]
StringBuilder pathbuilder = new StringBuilder();
int i = 1;
for(; i < inodes.length && inodes[i] != null; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
if (!inodes[i].isDirectory()) {
throw new FileAlreadyExistsException(
"Parent path is not a directory: "
+ pathbuilder + " "+inodes[i].getLocalName());
}
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission
? inodes[i-1].getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
}
// create directories beginning from the first null index
for(; i < inodes.length; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
unprotectedMkdir(fsd, fsd.allocateNewInodeId(), iip, i, components[i],
(i < lastInodeIndex) ? parentPermissions : permissions, null, now);
if (inodes[i] == null) {
return false;
}
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
final String cur = pathbuilder.toString();
fsd.getEditLog().logMkDir(cur, inodes[i]);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"mkdirs: created directory " + cur);
}
}
} finally {
fsd.writeUnlock();
}
return true;
}
/**
* Check whether the path specifies a directory
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
private static boolean isDirMutable(
FSDirectory fsd, String src) throws UnresolvedLinkException,
SnapshotAccessControlException {
src = FSDirectory.normalizePath(src);
fsd.readLock();
try {
INode node = fsd.getINode4Write(src, false);
return node != null && node.isDirectory();
} finally {
fsd.readUnlock();
}
}
/** create a directory at index pos.
* The parent path to the directory is at [0, pos-1].
* All ancestors exist. Newly created one stored at index pos.
*/
private static void unprotectedMkdir(
FSDirectory fsd, long inodeId, INodesInPath inodesInPath, int pos,
byte[] name, PermissionStatus permission, List<AclEntry> aclEntries,
long timestamp)
throws QuotaExceededException, AclException {
assert fsd.hasWriteLock();
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
timestamp);
if (fsd.addChild(inodesInPath, pos, dir, true)) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
}
inodesInPath.setINode(pos, dir);
}
}
}

View File

@ -39,9 +39,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
@ -66,7 +66,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@ -83,7 +82,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@ -147,6 +145,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final boolean isPermissionEnabled;
private final String fsOwnerShortUserName;
private final String supergroup;
private final INodeId inodeId;
private final FSEditLog editLog;
@ -194,6 +193,7 @@ public int getWriteHoldCount() {
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.inodeId = new INodeId();
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
this.isPermissionEnabled = conf.getBoolean(
@ -329,8 +329,7 @@ INodeFile addFile(String path, PermissionStatus permissions,
UnresolvedLinkException, SnapshotAccessControlException, AclException {
long modTime = now();
INodeFile newNode = newINodeFile(namesystem.allocateNewInodeId(),
permissions, modTime, modTime, replication, preferredBlockSize);
INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime, modTime, replication, preferredBlockSize);
newNode.toUnderConstruction(clientName, clientMachine);
boolean added = false;
@ -930,22 +929,6 @@ boolean isDir(String src) throws UnresolvedLinkException {
}
}
/**
* Check whether the path specifies a directory
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
boolean isDirMutable(String src) throws UnresolvedLinkException,
SnapshotAccessControlException {
src = normalizePath(src);
readLock();
try {
INode node = getINode4Write(src, false);
return node != null && node.isDirectory();
} finally {
readUnlock();
}
}
/** Updates namespace and diskspace consumed for all
* directories until the parent directory of file represented by path.
*
@ -1081,38 +1064,6 @@ static String getFullPathName(INode inode) {
return inodes == null ? "" : getFullPathName(inodes, inodes.length - 1);
}
INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, UnresolvedLinkException, AclException {
assert hasWriteLock();
byte[][] components = INode.getPathComponents(src);
INodesInPath iip = getExistingPathINodes(components);
INode[] inodes = iip.getINodes();
final int pos = inodes.length - 1;
unprotectedMkdir(inodeId, iip, pos, components[pos], permissions, aclEntries,
timestamp);
return inodes[pos];
}
/** create a directory at index pos.
* The parent path to the directory is at [0, pos-1].
* All ancestors exist. Newly created one stored at index pos.
*/
void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
int pos, byte[] name, PermissionStatus permission,
List<AclEntry> aclEntries, long timestamp)
throws QuotaExceededException, AclException {
assert hasWriteLock();
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
timestamp);
if (addChild(inodesInPath, pos, dir, true)) {
if (aclEntries != null) {
AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID);
}
inodesInPath.setINode(pos, dir);
}
}
/**
* Add the given child to the namespace.
* @param src The full path name of the child node.
@ -1314,8 +1265,8 @@ private boolean addLastINode(INodesInPath inodesInPath,
* otherwise return true;
* @throws QuotaExceededException is thrown if it violates quota limit
*/
private boolean addChild(INodesInPath iip, int pos,
INode child, boolean checkQuota) throws QuotaExceededException {
boolean addChild(INodesInPath iip, int pos, INode child, boolean checkQuota)
throws QuotaExceededException {
final INode[] inodes = iip.getINodes();
// Disallow creation of /.reserved. This may be created when loading
// editlog/fsimage during upgrade since /.reserved was a valid name in older
@ -1626,6 +1577,7 @@ void reset() {
inodeMap.clear();
addToInodeMap(rootDir);
nameCache.reset();
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
} finally {
writeUnlock();
}
@ -2381,7 +2333,7 @@ INode getNode(String path, boolean resolveLink)
* @throws UnresolvedLinkException if symlink can't be resolved
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
private INode getINode4Write(String src, boolean resolveLink)
INode getINode4Write(String src, boolean resolveLink)
throws UnresolvedLinkException, SnapshotAccessControlException {
return getINodesInPath4Write(src, resolveLink).getLastINode();
}
@ -2481,4 +2433,51 @@ HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
? FSDirStatAndListingOp.getFileInfo(this, path, resolveSymlink, false,
false) : null;
}
/**
* Verify that parent directory of src exists.
*/
void verifyParentDir(String src)
throws FileNotFoundException, ParentNotDirectoryException,
UnresolvedLinkException {
Path parent = new Path(src).getParent();
if (parent != null) {
final INode parentNode = getINode(parent.toString());
if (parentNode == null) {
throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent);
} else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
throw new ParentNotDirectoryException("Parent path is not a directory: "
+ parent);
}
}
}
/** Allocate a new inode ID. */
long allocateNewInodeId() {
return inodeId.nextValue();
}
/** @return the last inode ID. */
public long getLastInodeId() {
return inodeId.getCurrentValue();
}
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
* @param newValue
*/
void resetLastInodeId(long newValue) throws IOException {
try {
inodeId.skipTo(newValue);
} catch(IllegalStateException ise) {
throw new IOException(ise);
}
}
/** Should only be used for tests to reset to any value
* @param newValue*/
void resetLastInodeIdWithoutChecking(long newValue) {
inodeId.setCurrentValue(newValue);
}
}

View File

@ -176,7 +176,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
long lastLogTime = now();
long lastInodeId = fsNamesys.getLastInodeId();
long lastInodeId = fsNamesys.dir.getLastInodeId();
try {
while (true) {
@ -276,7 +276,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
}
}
} finally {
fsNamesys.resetLastInodeId(lastInodeId);
fsNamesys.dir.resetLastInodeId(lastInodeId);
if(closeOnExit) {
in.close();
}
@ -305,12 +305,12 @@ private long getAndUpdateLastInodeId(long inodeIdFromOp, int logVersion,
throw new IOException("The layout version " + logVersion
+ " supports inodeId but gave bogus inodeId");
}
inodeId = fsNamesys.allocateNewInodeId();
inodeId = fsNamesys.dir.allocateNewInodeId();
} else {
// need to reset lastInodeId. fsnamesys gets lastInodeId firstly from
// fsimage but editlog captures more recent inodeId allocations
if (inodeId > lastInodeId) {
fsNamesys.resetLastInodeId(inodeId);
fsNamesys.dir.resetLastInodeId(inodeId);
}
}
return inodeId;
@ -530,7 +530,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
fsDir.unprotectedMkdir(inodeId,
FSDirMkdirOp.unprotectedMkdir(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;

View File

@ -379,7 +379,7 @@ public void load(File curFile) throws IOException {
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_INODE_ID, imgVersion)) {
long lastInodeId = in.readLong();
namesystem.resetLastInodeId(lastInodeId);
namesystem.dir.resetLastInodeId(lastInodeId);
if (LOG.isDebugEnabled()) {
LOG.debug("load last allocated InodeId from fsimage:" + lastInodeId);
}
@ -732,7 +732,7 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode,
long inodeId = NameNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_INODE_ID, imgVersion) ? in.readLong()
: namesystem.allocateNewInodeId();
: namesystem.dir.allocateNewInodeId();
final short replication = namesystem.getBlockManager().adjustReplication(
in.readShort());
@ -1260,7 +1260,7 @@ void save(File newFile, FSImageCompression compression) throws IOException {
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
out.writeLong(context.getTxId());
out.writeLong(sourceNamesystem.getLastInodeId());
out.writeLong(sourceNamesystem.dir.getLastInodeId());
sourceNamesystem.getSnapshotManager().write(out);

View File

@ -211,7 +211,7 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
void loadINodeSection(InputStream in) throws IOException {
INodeSection s = INodeSection.parseDelimitedFrom(in);
fsn.resetLastInodeId(s.getLastInodeId());
fsn.dir.resetLastInodeId(s.getLastInodeId());
LOG.info("Loading " + s.getNumInodes() + " INodes.");
for (int i = 0; i < s.getNumInodes(); ++i) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
@ -490,7 +490,7 @@ void serializeINodeSection(OutputStream out) throws IOException {
INodeMap inodesMap = fsn.dir.getINodeMap();
INodeSection.Builder b = INodeSection.newBuilder()
.setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
.setLastInodeId(fsn.dir.getLastInodeId()).setNumInodes(inodesMap.size());
INodeSection s = b.build();
s.writeDelimitedTo(out);

View File

@ -120,7 +120,7 @@ static INodeFile readINodeUnderConstruction(
byte[] name = readBytes(in);
long inodeId = NameNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_INODE_ID, imgVersion) ? in.readLong()
: fsNamesys.allocateNewInodeId();
: fsNamesys.dir.allocateNewInodeId();
short blockReplication = in.readShort();
long modificationTime = in.readLong();
long preferredBlockSize = in.readLong();

View File

@ -530,8 +530,6 @@ private void logAuditEvent(boolean succeeded,
*/
private volatile boolean startingActiveService = false;
private INodeId inodeId;
private final RetryCache retryCache;
private final NNConf nnConf;
@ -594,32 +592,6 @@ void waitForLoadingFSImage() {
}
}
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
public void resetLastInodeId(long newValue) throws IOException {
try {
inodeId.skipTo(newValue);
} catch(IllegalStateException ise) {
throw new IOException(ise);
}
}
/** Should only be used for tests to reset to any value */
void resetLastInodeIdWithoutChecking(long newValue) {
inodeId.setCurrentValue(newValue);
}
/** @return the last inode ID. */
public long getLastInodeId() {
return inodeId.getCurrentValue();
}
/** Allocate a new inode ID. */
public long allocateNewInodeId() {
return inodeId.nextValue();
}
/**
* Clear all loaded data
*/
@ -628,7 +600,6 @@ void clear() {
dtSecretManager.reset();
blockIdManager.clear();
leaseManager.removeAllLeases();
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
snapshotManager.clearSnapshottableDirs();
cacheManager.clear();
setImageLoaded(false);
@ -852,7 +823,6 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.editLogRollerInterval = conf.getInt(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.inodeId = new INodeId();
this.lazyPersistFileScrubIntervalSec = conf.getInt(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
@ -2082,7 +2052,7 @@ private void createSymlinkInt(String target, final String linkArg,
checkNameNodeSafeMode("Cannot create symlink " + link);
link = dir.resolvePath(pc, link, pathComponents);
if (!createParent) {
verifyParentDir(link);
dir.verifyParentDir(link);
}
if (!dir.isValidToCreate(link)) {
throw new IOException("failed to create link " + link
@ -2257,25 +2227,6 @@ long getPreferredBlockSize(String filename)
}
}
/**
* Verify that parent directory of src exists.
*/
private void verifyParentDir(String src) throws FileNotFoundException,
ParentNotDirectoryException, UnresolvedLinkException {
assert hasReadLock();
Path parent = new Path(src).getParent();
if (parent != null) {
final INode parentNode = dir.getINode(parent.toString());
if (parentNode == null) {
throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent);
} else if (!parentNode.isDirectory() && !parentNode.isSymlink()) {
throw new ParentNotDirectoryException("Parent path is not a directory: "
+ parent);
}
}
}
/**
* If the file is within an encryption zone, select the appropriate
* CryptoProtocolVersion from the list provided by the client. Since the
@ -2554,7 +2505,7 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
}
if (!createParent) {
verifyParentDir(src);
dir.verifyParentDir(src);
}
try {
@ -2586,8 +2537,8 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
// Always do an implicit mkdirs for parent directory tree.
Path parent = new Path(src).getParent();
if (parent != null && mkdirsRecursively(parent.toString(),
permissions, true, now())) {
if (parent != null && FSDirMkdirOp.mkdirsRecursively(dir,
parent.toString(), permissions, true, now())) {
newNode = dir.addFile(src, permissions, replication, blockSize,
holder, clientMachine);
}
@ -3875,186 +3826,22 @@ boolean isFileClosed(final String src) throws IOException {
* Create all the necessary directories
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
boolean ret = false;
try {
ret = mkdirsInt(src, permissions, createParent);
} catch (AccessControlException e) {
logAuditEvent(false, "mkdirs", src);
throw e;
}
return ret;
}
private boolean mkdirsInt(final String srcArg, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
String src = srcArg;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException(src);
}
FSPermissionChecker pc = getPermissionChecker();
boolean createParent) throws IOException {
HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
HdfsFileStatus resultingStat = null;
boolean status = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create directory " + src);
src = dir.resolvePath(pc, src, pathComponents);
status = mkdirsInternal(pc, src, permissions, createParent);
if (status) {
resultingStat = getAuditFileInfo(src, false);
}
auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
} catch (AccessControlException e) {
logAuditEvent(false, "mkdirs", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
if (status) {
logAuditEvent(true, "mkdirs", srcArg, null, resultingStat);
}
return status;
}
/**
* Create all the necessary directories
*/
private boolean mkdirsInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
if (isPermissionEnabled) {
checkTraverse(pc, src);
}
if (dir.isDirMutable(src)) {
// all the users of mkdirs() are used to expect 'true' even if
// a new directory is not created.
return true;
}
if (isPermissionEnabled) {
checkAncestorAccess(pc, src, FsAction.WRITE);
}
if (!createParent) {
verifyParentDir(src);
}
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation might need to
// create multiple inodes.
checkFsObjectLimit();
if (!mkdirsRecursively(src, permissions, false, now())) {
throw new IOException("Failed to create directory: " + src);
}
return true;
}
/**
* Create a directory
* If ancestor directories do not exist, automatically create them.
* @param src string representation of the path to the directory
* @param permissions the permission of the directory
* @param inheritPermission if the permission of the directory should inherit
* from its parent or not. u+wx is implicitly added to
* the automatically created directories, and to the
* given directory if inheritPermission is true
* @param now creation time
* @return true if the operation succeeds false otherwise
* @throws QuotaExceededException if directory creation violates
* any quota limit
* @throws UnresolvedLinkException if a symlink is encountered in src.
* @throws SnapshotAccessControlException if path is in RO snapshot
*/
private boolean mkdirsRecursively(String src, PermissionStatus permissions,
boolean inheritPermission, long now)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException,
AclException {
src = FSDirectory.normalizePath(src);
String[] names = INode.getPathNames(src);
byte[][] components = INode.getPathComponents(names);
final int lastInodeIndex = components.length - 1;
dir.writeLock();
try {
INodesInPath iip = dir.getExistingPathINodes(components);
if (iip.isSnapshot()) {
throw new SnapshotAccessControlException(
"Modification on RO snapshot is disallowed");
}
INode[] inodes = iip.getINodes();
// find the index of the first null in inodes[]
StringBuilder pathbuilder = new StringBuilder();
int i = 1;
for(; i < inodes.length && inodes[i] != null; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
if (!inodes[i].isDirectory()) {
throw new FileAlreadyExistsException(
"Parent path is not a directory: "
+ pathbuilder + " "+inodes[i].getLocalName());
}
}
// default to creating parent dirs with the given perms
PermissionStatus parentPermissions = permissions;
// if not inheriting and it's the last inode, there's no use in
// computing perms that won't be used
if (inheritPermission || (i < lastInodeIndex)) {
// if inheriting (ie. creating a file or symlink), use the parent dir,
// else the supplied permissions
// NOTE: the permissions of the auto-created directories violate posix
FsPermission parentFsPerm = inheritPermission
? inodes[i-1].getFsPermission() : permissions.getPermission();
// ensure that the permissions allow user write+execute
if (!parentFsPerm.getUserAction().implies(FsAction.WRITE_EXECUTE)) {
parentFsPerm = new FsPermission(
parentFsPerm.getUserAction().or(FsAction.WRITE_EXECUTE),
parentFsPerm.getGroupAction(),
parentFsPerm.getOtherAction()
);
}
if (!parentPermissions.getPermission().equals(parentFsPerm)) {
parentPermissions = new PermissionStatus(
parentPermissions.getUserName(),
parentPermissions.getGroupName(),
parentFsPerm
);
// when inheriting, use same perms for entire path
if (inheritPermission) permissions = parentPermissions;
}
}
// create directories beginning from the first null index
for(; i < inodes.length; i++) {
pathbuilder.append(Path.SEPARATOR).append(names[i]);
dir.unprotectedMkdir(allocateNewInodeId(), iip, i, components[i],
(i < lastInodeIndex) ? parentPermissions : permissions, null,
now);
if (inodes[i] == null) {
return false;
}
// Directory creation also count towards FilesCreated
// to match count of FilesDeleted metric.
NameNode.getNameNodeMetrics().incrFilesCreated();
final String cur = pathbuilder.toString();
getEditLog().logMkDir(cur, inodes[i]);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"mkdirs: created directory " + cur);
}
}
} finally {
dir.writeUnlock();
}
logAuditEvent(true, "mkdirs", src, null, auditStat);
return true;
}
@ -4763,12 +4550,13 @@ private INodeSymlink addSymlink(String path, String target,
final long modTime = now();
if (createParent) {
final String parent = new Path(path).getParent().toString();
if (!mkdirsRecursively(parent, dirPerms, true, modTime)) {
if (!FSDirMkdirOp.mkdirsRecursively(dir, parent, dirPerms, true,
modTime)) {
return null;
}
}
final String userName = dirPerms.getUserName();
long id = allocateNewInodeId();
long id = dir.allocateNewInodeId();
INodeSymlink newNode = dir.addSymlink(id, path, target, modTime, modTime,
new PermissionStatus(userName, null, FsPermission.getDefault()));
if (newNode == null) {

View File

@ -197,13 +197,11 @@ public static void setFakeHttpAddresses(Configuration conf,
logicalName, "nn2"), "127.0.0.1:12346");
}
public static void setEditLogForTesting(NameNode nn, FSEditLog newLog) {
Whitebox.setInternalState(nn.getFSImage(), "editLog", newLog);
Whitebox.setInternalState(nn.getNamesystem().getFSDirectory(), "editLog",
newLog);
public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
}
/** class MyFile contains enough information to recreate the contents of
* a single file.
*/

View File

@ -77,7 +77,7 @@ public void testWhileOpenRenameParent() throws IOException {
FSEditLog spyLog =
spy(cluster.getNameNode().getFSImage().getEditLog());
doNothing().when(spyLog).endCurrentLogSegment(Mockito.anyBoolean());
DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
DFSTestUtil.setEditLogForTesting(cluster.getNamesystem(), spyLog);
final int nnport = cluster.getNameNodePort();

View File

@ -188,7 +188,7 @@ public static FSImage spyOnFsImage(NameNode nn1) {
public static FSEditLog spyOnEditLog(NameNode nn) {
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
DFSTestUtil.setEditLogForTesting(nn, spyEditLog);
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
if (tailer != null) {
tailer.setEditLog(spyEditLog);

View File

@ -204,7 +204,7 @@ public void run() {
FSEditLog editLog = namesystem.getEditLog();
for (int i = 0; i < numTransactions; i++) {
INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
INodeFile inode = new INodeFile(namesystem.dir.allocateNewInodeId(), null,
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
inode.toUnderConstruction("", "");
@ -375,7 +375,7 @@ private void testEditLog(int initialSize) throws IOException {
// Remember the current lastInodeId and will reset it back to test
// loading editlog segments.The transactions in the following allocate new
// inode id to write to editlogs but doesn't create ionde in namespace
long originalLastInodeId = namesystem.getLastInodeId();
long originalLastInodeId = namesystem.dir.getLastInodeId();
// Create threads and make them run transactions concurrently.
Thread threadId[] = new Thread[NUM_THREADS];
@ -409,7 +409,7 @@ private void testEditLog(int initialSize) throws IOException {
// If there were any corruptions, it is likely that the reading in
// of these transactions will throw an exception.
//
namesystem.resetLastInodeIdWithoutChecking(originalLastInodeId);
namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId);
for (Iterator<StorageDirectory> it =
fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);

View File

@ -458,7 +458,7 @@ public void testSaveRightBeforeSync() throws Exception {
try {
FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = spy(fsimage.getEditLog());
fsimage.editLog = editLog;
DFSTestUtil.setEditLogForTesting(namesystem, editLog);
final AtomicReference<Throwable> deferredException =
new AtomicReference<Throwable>();

View File

@ -389,7 +389,7 @@ public void testInodeId() throws IOException {
cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem();
long lastId = fsn.getLastInodeId();
long lastId = fsn.dir.getLastInodeId();
// Ensure root has the correct inode ID
// Last inode ID should be root inode ID and inode map size should be 1
@ -404,14 +404,14 @@ public void testInodeId() throws IOException {
FileSystem fs = cluster.getFileSystem();
Path path = new Path("/test1");
assertTrue(fs.mkdirs(path));
assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
assertEquals(++expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
// Create a file
// Last inode ID and inode map size should increase by 1
NamenodeProtocols nnrpc = cluster.getNameNodeRpc();
DFSTestUtil.createFile(fs, new Path("/test1/file"), 1024, (short) 1, 0);
assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
assertEquals(++expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
// Ensure right inode ID is returned in file status
@ -422,7 +422,7 @@ public void testInodeId() throws IOException {
// Last inode ID and inode map size should not change
Path renamedPath = new Path("/test2");
assertTrue(fs.rename(path, renamedPath));
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Delete test2/file and test2 and ensure inode map size decreases
@ -439,12 +439,12 @@ public void testInodeId() throws IOException {
inodeCount += 3; // test1, file1 and file2 are created
expectedLastInodeId += 3;
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
// Concat the /test1/file1 /test1/file2 into /test1/file2
nnrpc.concat(file2, new String[] {file1});
inodeCount--; // file1 and file2 are concatenated to file2
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertTrue(fs.delete(new Path("/test1"), true));
inodeCount -= 2; // test1 and file2 is deleted
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
@ -453,14 +453,14 @@ public void testInodeId() throws IOException {
cluster.restartNameNode();
cluster.waitActive();
fsn = cluster.getNamesystem();
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Create two inodes test2 and test2/file2
DFSTestUtil.createFile(fs, new Path("/test2/file2"), 1024, (short) 1, 0);
expectedLastInodeId += 2;
inodeCount += 2;
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// create /test3, and /test3/file.
@ -469,7 +469,7 @@ public void testInodeId() throws IOException {
assertTrue(outStream != null);
expectedLastInodeId += 2;
inodeCount += 2;
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Apply editlogs to fsimage, ensure inodeUnderConstruction is handled
@ -483,7 +483,7 @@ public void testInodeId() throws IOException {
cluster.restartNameNode();
cluster.waitActive();
fsn = cluster.getNamesystem();
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(expectedLastInodeId, fsn.dir.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
} finally {
if (cluster != null) {

View File

@ -541,7 +541,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
FSEditLog spyLog =
spy(cluster.getNameNode().getFSImage().getEditLog());
doNothing().when(spyLog).endCurrentLogSegment(true);
DFSTestUtil.setEditLogForTesting(cluster.getNameNode(), spyLog);
DFSTestUtil.setEditLogForTesting(cluster.getNamesystem(), spyLog);
}
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();

View File

@ -142,7 +142,7 @@ private void testFailoverFinalizesAndReadsInProgress(
File sharedDir = new File(sharedUri.getPath(), "current");
FSNamesystem fsn = cluster.getNamesystem(0);
FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1,
fsn.getLastInodeId() + 1);
fsn.getFSDirectory().getLastInodeId() + 1);
assertEditFiles(Collections.singletonList(sharedUri),
NNStorage.getInProgressEditsFileName(1));

View File

@ -251,7 +251,7 @@ public void testCheckpointCancellation() throws Exception {
"testCheckpointCancellation-tmp");
FSNamesystem fsn = cluster.getNamesystem(0);
FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG, 3,
fsn.getLastInodeId() + 1);
fsn.getFSDirectory().getLastInodeId() + 1);
String fname = NNStorage.getInProgressEditsFileName(3);
new File(tmpDir, fname).renameTo(new File(sharedDir, fname));