HDFS-7528. Consolidate symlink-related implementation into a single class. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-12-17 14:48:30 -08:00
parent 316613bdae
commit 0da1330bfd
6 changed files with 151 additions and 121 deletions

View File

@ -462,6 +462,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7536. Remove unused CryptoCodec in org.apache.hadoop.fs.Hdfs.
(Yi Liu via wheat9)
HDFS-7528. Consolidate symlink-related implementation into a single class.
(wheat9)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import java.io.IOException;
import static org.apache.hadoop.util.Time.now;
class FSDirSymlinkOp {
static HdfsFileStatus createSymlinkInt(
FSNamesystem fsn, String target, final String linkArg,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException {
FSDirectory fsd = fsn.getFSDirectory();
String link = linkArg;
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target) || target.isEmpty()) {
throw new InvalidPathException("Invalid target name: " + target);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
+ target + " link=" + link);
}
FSPermissionChecker pc = fsn.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(link);
fsd.writeLock();
try {
link = fsd.resolvePath(pc, link, pathComponents);
final INodesInPath iip = fsd.getINodesInPath4Write(link, false);
if (!createParent) {
fsd.verifyParentDir(iip, link);
}
if (!fsd.isValidToCreate(link, iip)) {
throw new IOException(
"failed to create link " + link +
" either because the filename is invalid or the file exists");
}
if (fsd.isPermissionEnabled()) {
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
// validate that we have enough inodes.
fsn.checkFsObjectLimit();
// add symbolic link to namespace
addSymlink(fsd, link, iip, target, dirPerms, createParent, logRetryCache);
} finally {
fsd.writeUnlock();
}
NameNode.getNameNodeMetrics().incrCreateSymlinkOps();
return fsd.getAuditFileInfo(link, false);
}
static INodeSymlink unprotectedAddSymlink(
FSDirectory fsd, INodesInPath iip, long id, String target, long mtime,
long atime, PermissionStatus perm)
throws UnresolvedLinkException, QuotaExceededException {
assert fsd.hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
target);
return fsd.addINode(iip, symlink) ? symlink : null;
}
/**
* Add the given symbolic link to the fs. Record it in the edits log.
*/
private static INodeSymlink addSymlink(
FSDirectory fsd, String path, INodesInPath iip, String target,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException {
final long mtime = now();
if (createParent) {
INodesInPath parentIIP = iip.getParentINodesInPath();
if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(
fsd,
parentIIP, dirPerms, true, mtime)) == null) {
return null;
} else {
iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
}
}
final String userName = dirPerms.getUserName();
long id = fsd.allocateNewInodeId();
PermissionStatus perm = new PermissionStatus(
userName, null, FsPermission.getDefault());
INodeSymlink newNode =
unprotectedAddSymlink(fsd, iip, id, target, mtime, mtime, perm);
if (newNode == null) {
NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
return null;
}
fsd.getEditLog().logSymlink(path, target, mtime, mtime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("addSymlink: " + path + " is added");
}
return newNode;
}
}

View File

@ -860,7 +860,7 @@ public class FSDirectory implements Closeable {
* @param iip the INodesInPath instance containing all the ancestral INodes
* @throws QuotaExceededException is thrown if it violates quota limit
*/
private boolean addINode(INodesInPath iip, INode child)
boolean addINode(INodesInPath iip, INode child)
throws QuotaExceededException, UnresolvedLinkException {
child.setLocalName(iip.getLastLocalName());
cacheName(child);
@ -1192,29 +1192,6 @@ public class FSDirectory implements Closeable {
}
}
/**
* Add the specified path into the namespace.
*/
INodeSymlink addSymlink(INodesInPath iip, long id, String target,
long mtime, long atime, PermissionStatus perm)
throws UnresolvedLinkException, QuotaExceededException {
writeLock();
try {
return unprotectedAddSymlink(iip, id, target, mtime, atime, perm);
} finally {
writeUnlock();
}
}
INodeSymlink unprotectedAddSymlink(INodesInPath iip, long id, String target,
long mtime, long atime, PermissionStatus perm)
throws UnresolvedLinkException, QuotaExceededException {
assert hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(id, null, perm, mtime, atime,
target);
return addINode(iip, symlink) ? symlink : null;
}
boolean isInAnEZ(INodesInPath iip)
throws UnresolvedLinkException, SnapshotAccessControlException {
readLock();

View File

@ -360,12 +360,12 @@ public class FSEditLogLoader {
// add to the file tree
inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion,
lastInodeId);
newFile = fsDir.unprotectedAddFile(inodeId,
iip, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.xAttrs,
replication, addCloseOp.mtime, addCloseOp.atime,
addCloseOp.blockSize, true, addCloseOp.clientName,
addCloseOp.clientMachine, addCloseOp.storagePolicyId);
newFile = fsDir.unprotectedAddFile(
inodeId, iip, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.xAttrs, replication, addCloseOp.mtime,
addCloseOp.atime, addCloseOp.blockSize, true,
addCloseOp.clientName, addCloseOp.clientMachine,
addCloseOp.storagePolicyId);
iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
@ -587,9 +587,9 @@ public class FSEditLogLoader {
final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
logVersion);
final INodesInPath iip = fsDir.getINodesInPath(path, false);
fsDir.unprotectedAddSymlink(iip, inodeId,
symlinkOp.value, symlinkOp.mtime, symlinkOp.atime,
symlinkOp.permissionStatus);
FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip, inodeId, symlinkOp.value,
symlinkOp.mtime, symlinkOp.atime,
symlinkOp.permissionStatus);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);

View File

@ -168,7 +168,6 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -1918,60 +1917,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException {
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
try {
createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
}
}
private void createSymlinkInt(String target, final String linkArg,
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException {
String link = linkArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
+ target + " link=" + link);
}
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
HdfsFileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(link);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create symlink " + link);
link = dir.resolvePath(pc, link, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(link, false);
if (!createParent) {
dir.verifyParentDir(iip, link);
}
if (!dir.isValidToCreate(link, iip)) {
throw new IOException("failed to create link " + link
+" either because the filename is invalid or the file exists");
}
if (isPermissionEnabled) {
dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
}
// validate that we have enough inodes.
checkFsObjectLimit();
// add symbolic link to namespace
addSymlink(link, iip, target, dirPerms, createParent, logRetryCache);
resultingStat = getAuditFileInfo(link, false);
auditStat = FSDirSymlinkOp.createSymlinkInt(this, target, link, dirPerms,
createParent, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
logAuditEvent(true, "createSymlink", linkArg, target, resultingStat);
logAuditEvent(true, "createSymlink", link, target, auditStat);
}
/**
@ -4360,43 +4322,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
/**
* Add the given symbolic link to the fs. Record it in the edits log.
*/
private INodeSymlink addSymlink(String path, INodesInPath iip, String target,
PermissionStatus dirPerms,
boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException, AclException {
waitForLoadingFSImage();
final long modTime = now();
if (createParent) {
INodesInPath parentIIP = iip.getParentINodesInPath();
if (parentIIP == null || (parentIIP = FSDirMkdirOp.mkdirsRecursively(dir,
parentIIP, dirPerms, true, modTime)) == null) {
return null;
} else {
iip = INodesInPath.append(parentIIP, null, iip.getLastLocalName());
}
}
final String userName = dirPerms.getUserName();
long id = dir.allocateNewInodeId();
INodeSymlink newNode = dir.addSymlink(iip, id, target, modTime, modTime,
new PermissionStatus(userName, null, FsPermission.getDefault()));
if (newNode == null) {
NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
return null;
}
getEditLog().logSymlink(path, target, modTime, modTime, newNode,
logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("addSymlink: " + path + " is added");
}
return newNode;
}
/**
* Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
* there are found to be insufficient resources available, causes the NN to

View File

@ -1142,7 +1142,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
return; // Return previous response
}
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
*/
@ -1151,9 +1150,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
" character limit");
}
if ("".equals(target)) {
throw new IOException("Invalid symlink target");
}
final UserGroupInformation ugi = getRemoteUser();
boolean success = false;