HDFS-4434. Reverting the merge to branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1475972 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-04-25 21:28:51 +00:00
parent 874f58b23a
commit 14246f3d24
9 changed files with 91 additions and 771 deletions

View File

@ -4,8 +4,6 @@ Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
HDFS-4434. Provide a mapping from INodeId to INode. (suresh)
NEW FEATURES NEW FEATURES
HDFS-1804. Add a new block-volume device choosing policy that looks at HDFS-1804. Add a new block-volume device choosing policy that looks at

View File

@ -67,10 +67,7 @@ class BlocksMap {
void close() { void close() {
if (blocks != null) { blocks.clear();
blocks.clear();
blocks = null;
}
} }
BlockCollection getBlockCollection(Block b) { BlockCollection getBlockCollection(Block b) {

View File

@ -23,13 +23,11 @@ import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -43,7 +41,6 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -64,8 +61,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.GSet;
import org.apache.hadoop.hdfs.util.LightWeightGSet;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -81,21 +76,11 @@ import com.google.common.base.Preconditions;
*************************************************/ *************************************************/
public class FSDirectory implements Closeable { public class FSDirectory implements Closeable {
private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) { private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
return new INodeDirectoryWithQuota(INodeId.ROOT_INODE_ID, return new INodeDirectoryWithQuota(namesystem.allocateNewInodeId(),
INodeDirectory.ROOT_NAME, INodeDirectory.ROOT_NAME,
namesystem.createFsOwnerPermissions(new FsPermission((short) 0755))); namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
} }
@VisibleForTesting
static boolean CHECK_RESERVED_FILE_NAMES = true;
public final static String DOT_RESERVED_STRING = ".reserved";
public final static String DOT_RESERVED_PATH_PREFIX = Path.SEPARATOR
+ DOT_RESERVED_STRING;
public final static byte[] DOT_RESERVED =
DFSUtil.string2Bytes(DOT_RESERVED_STRING);
public final static String DOT_INODES_STRING = ".inodes";
public final static byte[] DOT_INODES =
DFSUtil.string2Bytes(DOT_INODES_STRING);
INodeDirectoryWithQuota rootDir; INodeDirectoryWithQuota rootDir;
FSImage fsImage; FSImage fsImage;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
@ -103,7 +88,6 @@ public class FSDirectory implements Closeable {
private final int maxComponentLength; private final int maxComponentLength;
private final int maxDirItems; private final int maxDirItems;
private final int lsLimit; // max list limit private final int lsLimit; // max list limit
private GSet<INode, INode> inodeMap; // Synchronized by dirLock
// lock to protect the directory and BlockMap // lock to protect the directory and BlockMap
private ReentrantReadWriteLock dirLock; private ReentrantReadWriteLock dirLock;
@ -144,7 +128,6 @@ public class FSDirectory implements Closeable {
this.dirLock = new ReentrantReadWriteLock(true); // fair this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition(); this.cond = dirLock.writeLock().newCondition();
rootDir = createRoot(ns); rootDir = createRoot(ns);
inodeMap = initInodeMap(rootDir);
this.fsImage = fsImage; this.fsImage = fsImage;
int configuredLimit = conf.getInt( int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
@ -167,16 +150,6 @@ public class FSDirectory implements Closeable {
nameCache = new NameCache<ByteArray>(threshold); nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns; namesystem = ns;
} }
@VisibleForTesting
static LightWeightGSet<INode, INode> initInodeMap(INodeDirectory rootDir) {
// Compute the map capacity by allocating 1% of total memory
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
LightWeightGSet<INode, INode> map = new LightWeightGSet<INode, INode>(
capacity);
map.put(rootDir);
return map;
}
private FSNamesystem getFSNamesystem() { private FSNamesystem getFSNamesystem() {
return namesystem; return namesystem;
@ -280,8 +253,9 @@ public class FSDirectory implements Closeable {
if (!mkdirs(parent.toString(), permissions, true, modTime)) { if (!mkdirs(parent.toString(), permissions, true, modTime)) {
return null; return null;
} }
long id = namesystem.allocateNewInodeId();
INodeFileUnderConstruction newNode = new INodeFileUnderConstruction( INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
namesystem.allocateNewInodeId(), id,
permissions,replication, permissions,replication,
preferredBlockSize, modTime, clientName, preferredBlockSize, modTime, clientName,
clientMachine, clientNode); clientMachine, clientNode);
@ -1089,10 +1063,9 @@ public class FSDirectory implements Closeable {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: " NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+src+" is removed"); +src+" is removed");
} }
remvoedAllFromInodesFromMap(targetNode);
return filesRemoved; return filesRemoved;
} }
/** /**
* Replaces the specified inode with the specified one. * Replaces the specified inode with the specified one.
*/ */
@ -1117,13 +1090,11 @@ public class FSDirectory implements Closeable {
throw new IOException("FSDirectory.replaceNode: " + throw new IOException("FSDirectory.replaceNode: " +
"failed to remove " + path); "failed to remove " + path);
} }
removeFromInodeMap(oldnode);
// Parent should be non-null, otherwise oldnode.removeNode() will return // Parent should be non-null, otherwise oldnode.removeNode() will return
// false // false
newnode.setLocalName(oldnode.getLocalNameBytes()); newnode.setLocalName(oldnode.getLocalNameBytes());
parent.addChild(newnode, true); parent.addChild(newnode, true);
inodeMap.put(newnode);
/* Currently oldnode and newnode are assumed to contain the same /* Currently oldnode and newnode are assumed to contain the same
* blocks. Otherwise, blocks need to be removed from the blocksMap. * blocks. Otherwise, blocks need to be removed from the blocksMap.
@ -1484,9 +1455,9 @@ public class FSDirectory implements Closeable {
return true; return true;
} }
INode unprotectedMkdir(long inodeId, String src, INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
PermissionStatus permissions, long timestamp) long timestamp) throws QuotaExceededException,
throws QuotaExceededException, UnresolvedLinkException { UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
byte[][] components = INode.getPathComponents(src); byte[][] components = INode.getPathComponents(src);
INodesInPath inodesInPath = rootDir.getExistingPathINodes(components, INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
@ -1513,22 +1484,13 @@ public class FSDirectory implements Closeable {
} }
} }
private INode getFromINodeMap(INode inode) {
readLock();
try {
return inodeMap.get(inode);
} finally {
readUnlock();
}
}
/** /**
* Add the given child to the namespace. * Add the given child to the namespace.
* @param src The full path name of the child node. * @param src The full path name of the child node.
* @throw QuotaExceededException is thrown if it violates quota limit * @throw QuotaExceededException is thrown if it violates quota limit
*/ */
private boolean addINode(String src, INode child) private boolean addINode(String src, INode child
throws QuotaExceededException, UnresolvedLinkException { ) throws QuotaExceededException, UnresolvedLinkException {
byte[][] components = INode.getPathComponents(src); byte[][] components = INode.getPathComponents(src);
byte[] path = components[components.length-1]; byte[] path = components[components.length-1];
child.setLocalName(path); child.setLocalName(path);
@ -1680,17 +1642,6 @@ public class FSDirectory implements Closeable {
private boolean addChild(INodesInPath inodesInPath, int pos, private boolean addChild(INodesInPath inodesInPath, int pos,
INode child, boolean checkQuota) throws QuotaExceededException { INode child, boolean checkQuota) throws QuotaExceededException {
final INode[] inodes = inodesInPath.getINodes(); final INode[] inodes = inodesInPath.getINodes();
// Disallow creation of /.reserved. This may be created when loading
// editlog/fsimage during upgrade since /.reserved was a valid name in older
// release. This may also be called when a user tries to create a file
// or directory /.reserved.
if (pos == 1 && inodes[0] == rootDir && isReservedName(child)) {
throw new HadoopIllegalArgumentException(
"File name \"" + child.getLocalName() + "\" is reserved and cannot "
+ "be created. If this is during upgrade change the name of the "
+ "existing file or directory to another name before upgrading "
+ "to the new release.");
}
// The filesystem limits are not really quotas, so this check may appear // The filesystem limits are not really quotas, so this check may appear
// odd. It's because a rename operation deletes the src, tries to add // odd. It's because a rename operation deletes the src, tries to add
// to the dest, if that fails, re-adds the src from whence it came. // to the dest, if that fails, re-adds the src from whence it came.
@ -1707,12 +1658,9 @@ public class FSDirectory implements Closeable {
if (inodes[pos-1] == null) { if (inodes[pos-1] == null) {
throw new NullPointerException("Panic: parent does not exist"); throw new NullPointerException("Panic: parent does not exist");
} }
final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true); final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
if (!added) { if (!added) {
updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true); updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
} else {
inodeMap.put(child);
} }
return added; return added;
} }
@ -1740,7 +1688,6 @@ public class FSDirectory implements Closeable {
removedNode.spaceConsumedInTree(counts); removedNode.spaceConsumedInTree(counts);
updateCountNoQuotaCheck(inodesInPath, pos, updateCountNoQuotaCheck(inodesInPath, pos,
-counts.getNsCount(), -counts.getDsCount()); -counts.getNsCount(), -counts.getDsCount());
removeFromInodeMap(removedNode);
} }
return removedNode; return removedNode;
} }
@ -1771,30 +1718,6 @@ public class FSDirectory implements Closeable {
} }
} }
/** This method is always called with writeLock held */
final void addToInodeMapUnprotected(INode inode) {
inodeMap.put(inode);
}
/* This method is always called with writeLock held */
private final void removeFromInodeMap(INode inode) {
inodeMap.remove(inode);
}
/** Remove all the inodes under given inode from the map */
private void remvoedAllFromInodesFromMap(INode inode) {
removeFromInodeMap(inode);
if (!inode.isDirectory()) {
return;
}
INodeDirectory dir = (INodeDirectory) inode;
for (INode child : dir.getChildrenList()) {
remvoedAllFromInodesFromMap(child);
}
dir.clearChildren();
}
/** Update the count of each directory with quota in the namespace /** Update the count of each directory with quota in the namespace
* A directory's count is defined as the total number inodes in the tree * A directory's count is defined as the total number inodes in the tree
* rooted at the directory. * rooted at the directory.
@ -1803,7 +1726,7 @@ public class FSDirectory implements Closeable {
* throw QuotaExceededException. * throw QuotaExceededException.
*/ */
void updateCountForINodeWithQuota() { void updateCountForINodeWithQuota() {
updateCountForINodeWithQuota(this, rootDir, new INode.DirCounts(), updateCountForINodeWithQuota(rootDir, new INode.DirCounts(),
new ArrayList<INode>(50)); new ArrayList<INode>(50));
} }
@ -1818,8 +1741,9 @@ public class FSDirectory implements Closeable {
* @param counters counters for name space and disk space * @param counters counters for name space and disk space
* @param nodesInPath INodes for the each of components in the path. * @param nodesInPath INodes for the each of components in the path.
*/ */
private static void updateCountForINodeWithQuota(FSDirectory fsd, private static void updateCountForINodeWithQuota(INodeDirectory dir,
INodeDirectory dir, INode.DirCounts counts, ArrayList<INode> nodesInPath) { INode.DirCounts counts,
ArrayList<INode> nodesInPath) {
long parentNamespace = counts.nsCount; long parentNamespace = counts.nsCount;
long parentDiskspace = counts.dsCount; long parentDiskspace = counts.dsCount;
@ -1831,9 +1755,8 @@ public class FSDirectory implements Closeable {
nodesInPath.add(dir); nodesInPath.add(dir);
for (INode child : dir.getChildrenList()) { for (INode child : dir.getChildrenList()) {
fsd.inodeMap.put(child);
if (child.isDirectory()) { if (child.isDirectory()) {
updateCountForINodeWithQuota(fsd, (INodeDirectory)child, updateCountForINodeWithQuota((INodeDirectory)child,
counts, nodesInPath); counts, nodesInPath);
} else if (child.isSymlink()) { } else if (child.isSymlink()) {
counts.nsCount += 1; counts.nsCount += 1;
@ -1972,7 +1895,7 @@ public class FSDirectory implements Closeable {
boolean status = false; boolean status = false;
writeLock(); writeLock();
try { try {
status = unprotectedSetTimes(inode, mtime, atime, force); status = unprotectedSetTimes(src, inode, mtime, atime, force);
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -1985,10 +1908,10 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException { throws UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
INode inode = getINode(src); INode inode = getINode(src);
return unprotectedSetTimes(inode, mtime, atime, force); return unprotectedSetTimes(src, inode, mtime, atime, force);
} }
private boolean unprotectedSetTimes(INode inode, long mtime, private boolean unprotectedSetTimes(String src, INode inode, long mtime,
long atime, boolean force) { long atime, boolean force) {
assert hasWriteLock(); assert hasWriteLock();
boolean status = false; boolean status = false;
@ -2114,8 +2037,8 @@ public class FSDirectory implements Closeable {
*/ */
INodeSymlink addSymlink(String path, String target, INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms, boolean createParent) PermissionStatus dirPerms, boolean createParent)
throws UnresolvedLinkException, throws UnresolvedLinkException, FileAlreadyExistsException,
FileAlreadyExistsException, QuotaExceededException { QuotaExceededException {
waitForReady(); waitForReady();
final long modTime = now(); final long modTime = now();
@ -2152,8 +2075,7 @@ public class FSDirectory implements Closeable {
*/ */
INodeSymlink unprotectedAddSymlink(long id, String path, String target, INodeSymlink unprotectedAddSymlink(long id, String path, String target,
long mtime, long atime, PermissionStatus perm) long mtime, long atime, PermissionStatus perm)
throws UnresolvedLinkException, throws UnresolvedLinkException, QuotaExceededException {
QuotaExceededException {
assert hasWriteLock(); assert hasWriteLock();
final INodeSymlink symlink = new INodeSymlink(id, target, mtime, atime, final INodeSymlink symlink = new INodeSymlink(id, target, mtime, atime,
perm); perm);
@ -2178,110 +2100,5 @@ public class FSDirectory implements Closeable {
void shutdown() { void shutdown() {
nameCache.reset(); nameCache.reset();
inodeMap.clear();
inodeMap = null;
}
@VisibleForTesting
INode getInode(long id) {
INode inode = new INode(id, new PermissionStatus("", "", new FsPermission(
(short) 0)), 0, 0) {
@Override
int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
return 0;
}
@Override
long[] computeContentSummary(long[] summary) {
return null;
}
@Override
DirCounts spaceConsumedInTree(DirCounts counts) {
return null;
}
};
return getFromINodeMap(inode);
}
/**
* Given an INode get all the path complents leading to it from the root.
* If an Inode corresponding to C is given in /A/B/C, the returned
* patch components will be {root, A, B, C}
*/
static byte[][] getPathComponents(INode inode) {
List<byte[]> components = new ArrayList<byte[]>();
components.add(0, inode.getLocalNameBytes());
while(inode.getParent() != null) {
components.add(0, inode.getParent().getLocalNameBytes());
inode = inode.getParent();
}
return components.toArray(new byte[components.size()][]);
}
/**
* @return path components for reserved path, else null.
*/
static byte[][] getPathComponentsForReservedPath(String src) {
return !isReservedName(src) ? null : INode.getPathComponents(src);
}
/**
* Resolve the path of /.reserved/.inodes/<inodeid>/... to a regular path
*
* @param src path that is being processed
* @param pathComponents path components corresponding to the path
* @param fsd FSDirectory
* @return if the path indicates an inode, return path after replacing upto
* <inodeid> with the corresponding path of the inode, else the path
* in {@code src} as is.
* @throws FileNotFoundException if inodeid is invalid
*/
static String resolvePath(String src, byte[][] pathComponents, FSDirectory fsd)
throws FileNotFoundException {
if (pathComponents == null || pathComponents.length <= 3) {
return src;
}
// Not /.reserved/.inodes
if (!Arrays.equals(DOT_RESERVED, pathComponents[1])
|| !Arrays.equals(DOT_INODES, pathComponents[2])) { // Not .inodes path
return src;
}
final String inodeId = DFSUtil.bytes2String(pathComponents[3]);
long id = 0;
try {
id = Long.valueOf(inodeId);
} catch (NumberFormatException e) {
throw new FileNotFoundException(
"File for given inode path does not exist: " + src);
}
if (id == INodeId.ROOT_INODE_ID && pathComponents.length == 4) {
return Path.SEPARATOR;
}
StringBuilder path = id == INodeId.ROOT_INODE_ID ? new StringBuilder()
: new StringBuilder(fsd.getInode(id).getFullPathName());
for (int i = 4; i < pathComponents.length; i++) {
path.append(Path.SEPARATOR).append(DFSUtil.bytes2String(pathComponents[i]));
}
if (NameNode.LOG.isDebugEnabled()) {
NameNode.LOG.debug("Resolved path is " + path);
}
return path.toString();
}
@VisibleForTesting
int getInodeMapSize() {
return inodeMap.size();
}
/** Check if a given inode name is reserved */
public static boolean isReservedName(INode inode) {
return CHECK_RESERVED_FILE_NAMES
&& Arrays.equals(inode.getLocalNameBytes(), DOT_RESERVED);
}
/** Check if a given path is reserved */
public static boolean isReservedName(String src) {
return src.startsWith(DOT_RESERVED_PATH_PREFIX);
} }
} }

View File

@ -33,7 +33,6 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -369,13 +368,6 @@ class FSImageFormat {
* modification time update and space count update are not needed. * modification time update and space count update are not needed.
*/ */
void addToParent(INodeDirectory parent, INode child) { void addToParent(INodeDirectory parent, INode child) {
FSDirectory fsDir = namesystem.dir;
if (parent == fsDir.rootDir && FSDirectory.isReservedName(child)) {
throw new HadoopIllegalArgumentException("File name \""
+ child.getLocalName() + "\" is reserved. Please "
+ " change the name of the existing file or directory to another "
+ "name before upgrading to this release.");
}
// NOTE: This does not update space counts for parents // NOTE: This does not update space counts for parents
if (!parent.addChild(child, false)) { if (!parent.addChild(child, false)) {
return; return;

View File

@ -887,7 +887,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
@Override
public void checkOperation(OperationCategory op) throws StandbyException { public void checkOperation(OperationCategory op) throws StandbyException {
if (haContext != null) { if (haContext != null) {
// null in some unit tests // null in some unit tests
@ -1191,14 +1191,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set permission for " + src, safeMode); throw new SafeModeException("Cannot set permission for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOwner(pc, src); checkOwner(pc, src);
dir.setPermission(src, permission); dir.setPermission(src, permission);
resultingStat = getAuditFileInfo(src, false); resultingStat = getAuditFileInfo(src, false);
@ -1230,14 +1228,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode); throw new SafeModeException("Cannot set owner for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOwner(pc, src); checkOwner(pc, src);
if (!pc.isSuperUser()) { if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) { if (username != null && !pc.getUser().equals(username)) {
@ -1333,7 +1329,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws FileNotFoundException, throws FileNotFoundException,
UnresolvedLinkException, IOException { UnresolvedLinkException, IOException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
for (int attempt = 0; attempt < 2; attempt++) { for (int attempt = 0; attempt < 2; attempt++) {
boolean isReadOp = (attempt == 0); boolean isReadOp = (attempt == 0);
if (isReadOp) { // first attempt is with readlock if (isReadOp) { // first attempt is with readlock
@ -1343,7 +1338,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); // writelock is needed to set accesstime writeLock(); // writelock is needed to set accesstime
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
try { try {
if (isReadOp) { if (isReadOp) {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
@ -1389,8 +1383,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Moves all the blocks from srcs and appends them to trg * Moves all the blocks from srcs and appends them to trg
* To avoid rollbacks we will verify validitity of ALL of the args * To avoid rollbacks we will verify validitity of ALL of the args
* before we start actual move. * before we start actual move.
*
* This does not support ".inodes" relative path
* @param target * @param target
* @param srcs * @param srcs
* @throws IOException * @throws IOException
@ -1576,14 +1568,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set times " + src, safeMode); throw new SafeModeException("Cannot set times " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
// Write access is required to set access and modification times // Write access is required to set access and modification times
if (isPermissionEnabled) { if (isPermissionEnabled) {
@ -1609,10 +1599,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus dirPerms, boolean createParent) PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
if (!DFSUtil.isValidName(link)) { if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link); throw new InvalidPathException("Invalid file name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
} }
try { try {
createSymlinkInt(target, link, dirPerms, createParent); createSymlinkInt(target, link, dirPerms, createParent);
@ -1632,14 +1619,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(link);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot create symlink " + link, safeMode); throw new SafeModeException("Cannot create symlink " + link, safeMode);
} }
link = FSDirectory.resolvePath(link, pathComponents, dir);
if (!createParent) { if (!createParent) {
verifyParentDir(link); verifyParentDir(link);
} }
@ -1686,20 +1671,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
private boolean setReplicationInt(String src, final short replication) private boolean setReplicationInt(final String src, final short replication)
throws IOException { throws IOException {
blockManager.verifyReplication(src, replication, null); blockManager.verifyReplication(src, replication, null);
final boolean isFile; final boolean isFile;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot set replication for " + src, safeMode); throw new SafeModeException("Cannot set replication for " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.WRITE); checkPathAccess(pc, src, FsAction.WRITE);
} }
@ -1725,11 +1708,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(filename);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
filename = FSDirectory.resolvePath(filename, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, filename); checkTraverse(pc, filename);
} }
@ -1802,14 +1783,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final HdfsFileStatus stat; final HdfsFileStatus stat;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
startFileInternal(pc, src, permissions, holder, clientMachine, flag, startFileInternal(pc, src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize); createParent, replication, blockSize);
stat = dir.getFileInfo(src, false); stat = dir.getFileInfo(src, false);
@ -1851,6 +1826,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
AccessControlException, UnresolvedLinkException, FileNotFoundException, AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException { ParentNotDirectoryException, IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create file" + src, safeMode);
}
// Verify that the destination does not exist as a directory already. // Verify that the destination does not exist as a directory already.
boolean pathExists = dir.exists(src); boolean pathExists = dir.exists(src);
if (pathExists && dir.isDir(src)) { if (pathExists && dir.isDir(src)) {
@ -1992,7 +1971,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean skipSync = false; boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
@ -2000,7 +1978,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException( throw new SafeModeException(
"Cannot recover the lease of " + src, safeMode); "Cannot recover the lease of " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src); final INodeFile inode = INodeFile.valueOf(dir.getINode(src), src);
if (!inode.isUnderConstruction()) { if (!inode.isUnderConstruction()) {
return true; return true;
@ -2118,11 +2095,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException, throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException, FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException { ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine);
}
boolean skipSync = false; boolean skipSync = false;
if (!supportAppends) { if (!supportAppends) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
@ -2141,14 +2113,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock lb = null; LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
lb = startFileInternal(pc, src, null, holder, clientMachine, lb = startFileInternal(pc, src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND), EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0); false, blockManager.maxReplication, 0);
@ -2212,11 +2178,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Part I. Analyze the state of the file with respect to the input data. // Part I. Analyze the state of the file with respect to the input data.
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
LocatedBlock[] onRetryBlock = new LocatedBlock[1]; LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState( final INode[] inodes = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock).getINodes(); src, fileId, clientName, previous, onRetryBlock).getINodes();
@ -2389,7 +2353,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */ /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk, LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final HashMap<Node, Node> excludes, final DatanodeInfo[] existings, final HashMap<Node, Node> excludes,
final int numAdditionalNodes, final String clientName final int numAdditionalNodes, final String clientName
) throws IOException { ) throws IOException {
@ -2400,7 +2364,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final long preferredblocksize; final long preferredblocksize;
final List<DatanodeDescriptor> chosen; final List<DatanodeDescriptor> chosen;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
@ -2409,7 +2372,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot add datanode; src=" + src throw new SafeModeException("Cannot add datanode; src=" + src
+ ", blk=" + blk, safeMode); + ", blk=" + blk, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
//check lease //check lease
final INodeFileUnderConstruction file = checkLease(src, clientName); final INodeFileUnderConstruction file = checkLease(src, clientName);
@ -2449,7 +2411,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ "of file " + src); + "of file " + src);
} }
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
@ -2457,8 +2418,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot abandon block " + b + throw new SafeModeException("Cannot abandon block " + b +
" for fle" + src, safeMode); " for fle" + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
// //
// Remove the block from the pending creates list // Remove the block from the pending creates list
// //
@ -2530,16 +2489,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkBlock(last); checkBlock(last);
boolean success = false; boolean success = false;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); success = completeFileInternal(src, holder,
if (isInSafeMode()) { ExtendedBlock.getLocalBlock(last));
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
} finally { } finally {
writeUnlock(); writeUnlock();
} }
@ -2553,6 +2506,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String holder, Block last) throws SafeModeException, String holder, Block last) throws SafeModeException,
UnresolvedLinkException, IOException { UnresolvedLinkException, IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
INodeFileUnderConstruction pendingFile; INodeFileUnderConstruction pendingFile;
try { try {
pendingFile = checkLease(src, holder); pendingFile = checkLease(src, holder);
@ -2693,19 +2651,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
boolean status = false; boolean status = false;
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
status = renameToInternal(pc, src, dst); status = renameToInternal(pc, src, dst);
if (status) { if (status) {
resultingStat = getAuditFileInfo(dst, false); resultingStat = getAuditFileInfo(dst, false);
@ -2725,6 +2674,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst) private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo(). //We should not be doing this. This is move() not renameTo().
//but for now, //but for now,
@ -2755,17 +2708,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options); renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false); resultingStat = getAuditFileInfo(dst, false);
} finally { } finally {
@ -2784,6 +2729,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void renameToInternal(FSPermissionChecker pc, String src, String dst, private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException { Options.Rename... options) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot rename " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkParentAccess(pc, src, FsAction.WRITE); checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, dst, FsAction.WRITE); checkAncestorAccess(pc, dst, FsAction.WRITE);
@ -2844,14 +2793,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode); throw new SafeModeException("Cannot delete " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (!recursive && dir.isNonEmptyDirectory(src)) { if (!recursive && dir.isNonEmptyDirectory(src)) {
throw new IOException(src + " is non empty"); throw new IOException(src + " is non empty");
} }
@ -2978,14 +2925,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus stat = null; HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException("Invalid file name: " + src);
}
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, src); checkTraverse(pc, src);
} }
@ -3050,16 +2992,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
boolean status = false; boolean status = false;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
status = mkdirsInternal(pc, src, permissions, createParent); status = mkdirsInternal(pc, src, permissions, createParent);
if (status) { if (status) {
resultingStat = dir.getFileInfo(src, false); resultingStat = dir.getFileInfo(src, false);
@ -3081,6 +3017,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus permissions, boolean createParent) PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
assert hasWriteLock(); assert hasWriteLock();
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkTraverse(pc, src); checkTraverse(pc, src);
} }
@ -3111,11 +3051,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FileNotFoundException, UnresolvedLinkException, StandbyException { FileNotFoundException, UnresolvedLinkException, StandbyException {
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE); checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
} }
@ -3129,8 +3067,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Set the namespace quota and diskspace quota for a directory. * Set the namespace quota and diskspace quota for a directory.
* See {@link ClientProtocol#setQuota(String, long, long)} for the * See {@link ClientProtocol#setQuota(String, long, long)} for the
* contract. * contract.
*
* Note: This does not support ".inodes" relative path.
*/ */
void setQuota(String path, long nsQuota, long dsQuota) void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
@ -3160,14 +3096,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName); NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException("Cannot fsync file " + src, safeMode); throw new SafeModeException("Cannot fsync file " + src, safeMode);
} }
src = FSDirectory.resolvePath(src, pathComponents, dir);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName); INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
if (lastBlockLength > 0) { if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength); pendingFile.updateLengthOfLastBlock(lastBlockLength);
@ -3512,11 +3446,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DirectoryListing dl; DirectoryListing dl;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) { if (isPermissionEnabled) {
if (dir.isDir(src)) { if (dir.isDir(src)) {

View File

@ -17,9 +17,11 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -31,7 +33,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.util.LightWeightGSet.LinkedElement;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -43,7 +44,7 @@ import com.google.common.primitives.SignedBytes;
* directory inodes. * directory inodes.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
abstract class INode implements Comparable<byte[]>, LinkedElement { abstract class INode implements Comparable<byte[]> {
static final List<INode> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<INode>()); static final List<INode> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<INode>());
/** Wrapper of two counters for namespace consumed and diskspace consumed. */ /** Wrapper of two counters for namespace consumed and diskspace consumed. */
@ -124,7 +125,6 @@ abstract class INode implements Comparable<byte[]>, LinkedElement {
protected INodeDirectory parent = null; protected INodeDirectory parent = null;
protected long modificationTime = 0L; protected long modificationTime = 0L;
protected long accessTime = 0L; protected long accessTime = 0L;
protected LinkedElement next = null;
private INode(long id, byte[] name, long permission, INodeDirectory parent, private INode(long id, byte[] name, long permission, INodeDirectory parent,
long modificationTime, long accessTime) { long modificationTime, long accessTime) {
@ -464,12 +464,12 @@ abstract class INode implements Comparable<byte[]>, LinkedElement {
if (that == null || !(that instanceof INode)) { if (that == null || !(that instanceof INode)) {
return false; return false;
} }
return id == ((INode) that).id; return Arrays.equals(this.name, ((INode)that).name);
} }
@Override @Override
public final int hashCode() { public final int hashCode() {
return (int)(id^(id>>>32)); return Arrays.hashCode(this.name);
} }
/** /**
@ -581,14 +581,4 @@ abstract class INode implements Comparable<byte[]>, LinkedElement {
toDeleteList.clear(); toDeleteList.clear();
} }
} }
@Override
public void setNext(LinkedElement next) {
this.next = next;
}
@Override
public LinkedElement getNext() {
return next;
}
} }

View File

@ -401,6 +401,7 @@ class INodeDirectory extends INode {
total += child.collectSubtreeBlocksAndClear(info); total += child.collectSubtreeBlocksAndClear(info);
} }
parent = null; parent = null;
children = null;
return total; return total;
} }
@ -473,11 +474,4 @@ class INodeDirectory extends INode {
} }
prefix.setLength(prefix.length() - 2); prefix.setLength(prefix.length() - 2);
} }
void clearChildren() {
if (children != null) {
this.children.clear();
this.children = null;
}
}
} }

View File

@ -31,11 +31,9 @@ import org.apache.hadoop.util.SequentialNumber;
@InterfaceAudience.Private @InterfaceAudience.Private
public class INodeId extends SequentialNumber { public class INodeId extends SequentialNumber {
/** /**
* The last reserved inode id. InodeIDs are allocated from LAST_RESERVED_ID + * The last reserved inode id.
* 1.
*/ */
public static final long LAST_RESERVED_ID = 2 << 14 - 1; public static final long LAST_RESERVED_ID = 1000L;
public static final long ROOT_INODE_ID = LAST_RESERVED_ID + 1;
/** /**
* The inode id validation of lease check will be skipped when the request * The inode id validation of lease check will be skipped when the request
@ -57,6 +55,6 @@ public class INodeId extends SequentialNumber {
} }
INodeId() { INodeId() {
super(ROOT_INODE_ID); super(LAST_RESERVED_ID);
} }
} }

View File

@ -20,45 +20,31 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestINodeFile { public class TestINodeFile {
public static final Log LOG = LogFactory.getLog(TestINodeFile.class);
static final short BLOCKBITS = 48; static final short BLOCKBITS = 48;
static final long BLKSIZE_MAXVALUE = ~(0xffffL << BLOCKBITS); static final long BLKSIZE_MAXVALUE = ~(0xffffL << BLOCKBITS);
@ -336,7 +322,6 @@ public class TestINodeFile {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(PathIsNotDirectoryException e) { } catch(PathIsNotDirectoryException e) {
// Expected
} }
} }
@ -358,8 +343,7 @@ public class TestINodeFile {
try { try {
INodeDirectory.valueOf(from, path); INodeDirectory.valueOf(from, path);
fail(); fail();
} catch(PathIsNotDirectoryException expected) { } catch(PathIsNotDirectoryException e) {
// expected
} }
} }
@ -390,10 +374,13 @@ public class TestINodeFile {
} }
/** /**
* This test verifies inode ID counter and inode map functionality. * Verify root always has inode id 1001 and new formated fsimage has last
* allocated inode id 1000. Validate correct lastInodeId is persisted.
* @throws IOException
*/ */
@Test @Test
public void testInodeId() throws IOException { public void testInodeId() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
@ -403,83 +390,55 @@ public class TestINodeFile {
cluster.waitActive(); cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
long lastId = fsn.getLastInodeId(); assertTrue(fsn.getLastInodeId() == 1001);
// Ensure root has the correct inode ID // Create one directory and the last inode id should increase to 1002
// Last inode ID should be root inode ID and inode map size should be 1
int inodeCount = 1;
long expectedLastInodeId = INodeId.ROOT_INODE_ID;
assertEquals(fsn.dir.rootDir.getId(), INodeId.ROOT_INODE_ID);
assertEquals(expectedLastInodeId, lastId);
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Create a directory
// Last inode ID and inode map size should increase by 1
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Path path = new Path("/test1"); Path path = new Path("/test1");
assertTrue(fs.mkdirs(path)); assertTrue(fs.mkdirs(path));
assertEquals(++expectedLastInodeId, fsn.getLastInodeId()); assertTrue(fsn.getLastInodeId() == 1002);
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
// Create a file int fileLen = 1024;
// Last inode ID and inode map size should increase by 1 Path filePath = new Path("/test1/file");
NamenodeProtocols nnrpc = cluster.getNameNodeRpc(); DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0);
DFSTestUtil.createFile(fs, new Path("/test1/file"), 1024, (short) 1, 0); assertTrue(fsn.getLastInodeId() == 1003);
assertEquals(++expectedLastInodeId, fsn.getLastInodeId());
assertEquals(++inodeCount, fsn.dir.getInodeMapSize());
// Ensure right inode ID is returned in file status
HdfsFileStatus fileStatus = nnrpc.getFileInfo("/test1/file");
assertEquals(expectedLastInodeId, fileStatus.getFileId());
// Rename a directory // Rename doesn't increase inode id
// Last inode ID and inode map size should not change
Path renamedPath = new Path("/test2"); Path renamedPath = new Path("/test2");
assertTrue(fs.rename(path, renamedPath)); fs.rename(path, renamedPath);
assertEquals(expectedLastInodeId, fsn.getLastInodeId()); assertTrue(fsn.getLastInodeId() == 1003);
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Delete test2/file and test2 and ensure inode map size decreases
assertTrue(fs.delete(renamedPath, true));
inodeCount -= 2;
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
cluster.restartNameNode();
cluster.waitActive();
// Make sure empty editlog can be handled // Make sure empty editlog can be handled
cluster.restartNameNode(); cluster.restartNameNode();
cluster.waitActive(); cluster.waitActive();
fsn = cluster.getNamesystem(); fsn = cluster.getNamesystem();
assertEquals(expectedLastInodeId, fsn.getLastInodeId()); assertTrue(fsn.getLastInodeId() == 1003);
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Create two inodes test2 and test2/file2 DFSTestUtil.createFile(fs, new Path("/test2/file2"), fileLen, (short) 1,
DFSTestUtil.createFile(fs, new Path("/test2/file2"), 1024, (short) 1, 0); 0);
expectedLastInodeId += 2; long id = fsn.getLastInodeId();
inodeCount += 2; assertTrue(id == 1004);
assertEquals(expectedLastInodeId, fsn.getLastInodeId()); fs.delete(new Path("/test2"), true);
assertEquals(inodeCount, fsn.dir.getInodeMapSize()); // create a file under construction
// create /test3, and /test3/file.
// /test3/file is a file under construction
FSDataOutputStream outStream = fs.create(new Path("/test3/file")); FSDataOutputStream outStream = fs.create(new Path("/test3/file"));
assertTrue(outStream != null); assertTrue(outStream != null);
expectedLastInodeId += 2; assertTrue(fsn.getLastInodeId() == 1006);
inodeCount += 2;
assertEquals(expectedLastInodeId, fsn.getLastInodeId());
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
// Apply editlogs to fsimage, ensure inodeUnderConstruction is handled // Apply editlogs to fsimage, test fsimage with inodeUnderConstruction can
// be handled
fsn.enterSafeMode(false); fsn.enterSafeMode(false);
fsn.saveNamespace(); fsn.saveNamespace();
fsn.leaveSafeMode(); fsn.leaveSafeMode();
outStream.close(); outStream.close();
// The lastInodeId in fsimage should remain the same after reboot // The lastInodeId in fsimage should remain 1006 after reboot
cluster.restartNameNode(); cluster.restartNameNode();
cluster.waitActive(); cluster.waitActive();
fsn = cluster.getNamesystem(); fsn = cluster.getNamesystem();
assertEquals(expectedLastInodeId, fsn.getLastInodeId()); assertTrue(fsn.getLastInodeId() == 1006);
assertEquals(inodeCount, fsn.dir.getInodeMapSize());
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
@ -489,6 +448,7 @@ public class TestINodeFile {
@Test @Test
public void testWriteToRenamedFile() throws IOException { public void testWriteToRenamedFile() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build(); .build();
@ -519,368 +479,10 @@ public class TestINodeFile {
fail("Write should fail after rename"); fail("Write should fail after rename");
} catch (Exception e) { } catch (Exception e) {
/* Ignore */ /* Ignore */
} finally {
cluster.shutdown();
}
}
private Path getInodePath(long inodeId, String remainingPath) {
StringBuilder b = new StringBuilder();
b.append(Path.SEPARATOR).append(FSDirectory.DOT_RESERVED_STRING)
.append(Path.SEPARATOR).append(FSDirectory.DOT_INODES_STRING)
.append(Path.SEPARATOR).append(inodeId).append(Path.SEPARATOR)
.append(remainingPath);
Path p = new Path(b.toString());
LOG.info("Inode path is " + p);
return p;
}
/**
* Tests for addressing files using /.reserved/.inodes/<inodeID> in file system
* operations.
*/
@Test
public void testInodeIdBasedPaths() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
NamenodeProtocols nnRpc = cluster.getNameNodeRpc();
// FileSystem#mkdirs "/testInodeIdBasedPaths"
Path baseDir = getInodePath(INodeId.ROOT_INODE_ID, "testInodeIdBasedPaths");
Path baseDirRegPath = new Path("/testInodeIdBasedPaths");
fs.mkdirs(baseDir);
fs.exists(baseDir);
long baseDirFileId = nnRpc.getFileInfo(baseDir.toString()).getFileId();
// FileSystem#create file and FileSystem#close
Path testFileInodePath = getInodePath(baseDirFileId, "test1");
Path testFileRegularPath = new Path(baseDir, "test1");
final int testFileBlockSize = 1024;
FileSystemTestHelper.createFile(fs, testFileInodePath, 1, testFileBlockSize);
assertTrue(fs.exists(testFileInodePath));
// FileSystem#setPermission
FsPermission perm = new FsPermission((short)0666);
fs.setPermission(testFileInodePath, perm);
// FileSystem#getFileStatus and FileSystem#getPermission
FileStatus fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(perm, fileStatus.getPermission());
// FileSystem#setOwner
fs.setOwner(testFileInodePath, fileStatus.getOwner(), fileStatus.getGroup());
// FileSystem#setTimes
fs.setTimes(testFileInodePath, 0, 0);
fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(0, fileStatus.getModificationTime());
assertEquals(0, fileStatus.getAccessTime());
// FileSystem#setReplication
fs.setReplication(testFileInodePath, (short)3);
fileStatus = fs.getFileStatus(testFileInodePath);
assertEquals(3, fileStatus.getReplication());
fs.setReplication(testFileInodePath, (short)1);
// ClientProtocol#getPreferredBlockSize
assertEquals(testFileBlockSize,
nnRpc.getPreferredBlockSize(testFileInodePath.toString()));
// symbolic link related tests
// Reserved path is not allowed as a target
String invalidTarget = new Path(baseDir, "invalidTarget").toString();
String link = new Path(baseDir, "link").toString();
testInvalidSymlinkTarget(nnRpc, invalidTarget, link);
// Test creating a link using reserved inode path
String validTarget = "/validtarget";
testValidSymlinkTarget(nnRpc, validTarget, link);
// FileSystem#append
fs.append(testFileInodePath);
// DistributedFileSystem#recoverLease
fs.recoverLease(testFileInodePath);
// Namenode#getBlockLocations
LocatedBlocks l1 = nnRpc.getBlockLocations(testFileInodePath.toString(),
0, Long.MAX_VALUE);
LocatedBlocks l2 = nnRpc.getBlockLocations(testFileRegularPath.toString(),
0, Long.MAX_VALUE);
checkEquals(l1, l2);
// FileSystem#rename - both the variants
Path renameDst = getInodePath(baseDirFileId, "test2");
fileStatus = fs.getFileStatus(testFileInodePath);
// Rename variant 1: rename and rename bacck
fs.rename(testFileInodePath, renameDst);
fs.rename(renameDst, testFileInodePath);
assertEquals(fileStatus, fs.getFileStatus(testFileInodePath));
// Rename variant 2: rename and rename bacck
fs.rename(testFileInodePath, renameDst, Rename.OVERWRITE);
fs.rename(renameDst, testFileInodePath, Rename.OVERWRITE);
assertEquals(fileStatus, fs.getFileStatus(testFileInodePath));
// FileSystem#getContentSummary
assertEquals(fs.getContentSummary(testFileRegularPath).toString(),
fs.getContentSummary(testFileInodePath).toString());
// FileSystem#listFiles
checkEquals(fs.listFiles(baseDirRegPath, false),
fs.listFiles(baseDir, false));
// FileSystem#delete
fs.delete(testFileInodePath, true);
assertFalse(fs.exists(testFileInodePath));
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
} }
private void testInvalidSymlinkTarget(NamenodeProtocols nnRpc,
String invalidTarget, String link) throws IOException {
try {
FsPermission perm = FsPermission.createImmutable((short)0755);
nnRpc.createSymlink(invalidTarget, link, perm, false);
fail("Symbolic link creation of target " + invalidTarget + " should fail");
} catch (InvalidPathException expected) {
// Expected
}
}
private void testValidSymlinkTarget(NamenodeProtocols nnRpc, String target,
String link) throws IOException {
FsPermission perm = FsPermission.createImmutable((short)0755);
nnRpc.createSymlink(target, link, perm, false);
assertEquals(target, nnRpc.getLinkTarget(link));
}
private static void checkEquals(LocatedBlocks l1, LocatedBlocks l2) {
List<LocatedBlock> list1 = l1.getLocatedBlocks();
List<LocatedBlock> list2 = l2.getLocatedBlocks();
assertEquals(list1.size(), list2.size());
for (int i = 0; i < list1.size(); i++) {
LocatedBlock b1 = list1.get(i);
LocatedBlock b2 = list2.get(i);
assertEquals(b1.getBlock(), b2.getBlock());
assertEquals(b1.getBlockSize(), b2.getBlockSize());
}
}
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
RemoteIterator<LocatedFileStatus> i2) throws IOException {
while (i1.hasNext()) {
assertTrue(i2.hasNext());
// Compare all the fields but the path name, which is relative
// to the original path from listFiles.
LocatedFileStatus l1 = i1.next();
LocatedFileStatus l2 = i2.next();
assertEquals(l1.getAccessTime(), l2.getAccessTime());
assertEquals(l1.getBlockSize(), l2.getBlockSize());
assertEquals(l1.getGroup(), l2.getGroup());
assertEquals(l1.getLen(), l2.getLen());
assertEquals(l1.getModificationTime(), l2.getModificationTime());
assertEquals(l1.getOwner(), l2.getOwner());
assertEquals(l1.getPermission(), l2.getPermission());
assertEquals(l1.getReplication(), l2.getReplication());
}
assertFalse(i2.hasNext());
}
/**
* Check /.reserved path is reserved and cannot be created.
*/
@Test
public void testReservedFileNames() throws IOException {
Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
// First start a cluster with reserved file names check turned off
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// Creation of directory or file with reserved path names is disallowed
ensureReservedFileNamesCannotBeCreated(fs, "/.reserved", false);
ensureReservedFileNamesCannotBeCreated(fs, "/.reserved", false);
Path reservedPath = new Path("/.reserved");
// Loading of fsimage or editlog with /.reserved directory should fail
// Mkdir "/.reserved reserved path with reserved path check turned off
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
fs.mkdirs(reservedPath);
assertTrue(fs.isDirectory(reservedPath));
ensureReservedFileNamesCannotBeLoaded(cluster);
// Loading of fsimage or editlog with /.reserved file should fail
// Create file "/.reserved reserved path with reserved path check turned off
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
ensureClusterRestartSucceeds(cluster);
fs.delete(reservedPath, true);
DFSTestUtil.createFile(fs, reservedPath, 10, (short)1, 0L);
assertTrue(!fs.isDirectory(reservedPath));
ensureReservedFileNamesCannotBeLoaded(cluster);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void ensureReservedFileNamesCannotBeCreated(FileSystem fs, String name,
boolean isDir) {
// Creation of directory or file with reserved path names is disallowed
Path reservedPath = new Path(name);
try {
if (isDir) {
fs.mkdirs(reservedPath);
} else {
DFSTestUtil.createFile(fs, reservedPath, 10, (short) 1, 0L);
}
fail((isDir ? "mkdir" : "create file") + " should be disallowed");
} catch (Exception expected) {
// ignored
}
}
private void ensureReservedFileNamesCannotBeLoaded(MiniDFSCluster cluster)
throws IOException {
// Turn on reserved file name checking. Loading of edits should fail
FSDirectory.CHECK_RESERVED_FILE_NAMES = true;
ensureClusterRestartFails(cluster);
// Turn off reserved file name checking and successfully load edits
FSDirectory.CHECK_RESERVED_FILE_NAMES = false;
ensureClusterRestartSucceeds(cluster);
// Turn on reserved file name checking. Loading of fsimage should fail
FSDirectory.CHECK_RESERVED_FILE_NAMES = true;
ensureClusterRestartFails(cluster);
}
private void ensureClusterRestartFails(MiniDFSCluster cluster) {
try {
cluster.restartNameNode();
fail("Cluster should not have successfully started");
} catch (Exception expected) {
LOG.info("Expected exception thrown " + expected);
}
assertFalse(cluster.isClusterUp());
}
private void ensureClusterRestartSucceeds(MiniDFSCluster cluster)
throws IOException {
cluster.restartNameNode();
cluster.waitActive();
assertTrue(cluster.isClusterUp());
}
/**
* For a given path, build a tree of INodes and return the leaf node.
*/
private INode createTreeOfInodes(String path) {
byte[][] components = INode.getPathComponents(path);
FsPermission perm = FsPermission.createImmutable((short)0755);
PermissionStatus permstatus = PermissionStatus.createImmutable("", "", perm);
long id = 0;
INodeDirectory prev = new INodeDirectory(++id, "", permstatus);
INodeDirectory dir = null;
for (byte[] component : components) {
if (component.length == 0) {
continue;
}
System.out.println("Adding component " + DFSUtil.bytes2String(component));
dir = new INodeDirectory(++id, component, permstatus, 0);
prev.addChild(dir, false);
prev = dir;
}
return dir; // Last Inode in the chain
}
private static void checkEquals(byte[][] expected, byte[][] actual) {
assertEquals(expected.length, actual.length);
int i = 0;
for (byte[] e : expected) {
assertTrue(Arrays.equals(e, actual[i++]));
}
}
/**
* Test for {@link FSDirectory#getPathComponents(INode)}
*/
@Test
public void testGetPathFromInode() {
String path = "/a/b/c";
INode inode = createTreeOfInodes(path);
byte[][] expected = INode.getPathComponents(path);
byte[][] actual = FSDirectory.getPathComponents(inode);
checkEquals(expected, actual);
}
/**
* Tests for {@link FSDirectory#resolvePath(String, byte[][], FSDirectory)}
*/
@Test
public void testInodePath() throws FileNotFoundException {
// For a non .inodes path the regular components are returned
String path = "/a/b/c";
INode inode = createTreeOfInodes(path);
// For an any inode look up return inode corresponding to "c" from /a/b/c
FSDirectory fsd = Mockito.mock(FSDirectory.class);
Mockito.doReturn(inode).when(fsd).getInode(Mockito.anyLong());
// Null components
assertEquals("/test", FSDirectory.resolvePath("/test", null, fsd));
// Tests for FSDirectory#resolvePath()
// Non inode regular path
byte[][] components = INode.getPathComponents(path);
String resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals(path, resolvedPath);
// Inode path with no trailing separator
components = INode.getPathComponents("/.reserved/.inodes/1");
resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals(path, resolvedPath);
// Inode path with trailing separator
components = INode.getPathComponents("/.reserved/.inodes/1/");
assertEquals(path, resolvedPath);
// Inode relative path
components = INode.getPathComponents("/.reserved/.inodes/1/d/e/f");
resolvedPath = FSDirectory.resolvePath(path, components, fsd);
assertEquals("/a/b/c/d/e/f", resolvedPath);
// A path with just .inodes returns the path as is
String testPath = "/.reserved/.inodes";
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals(testPath, resolvedPath);
// Root inode path
testPath = "/.reserved/.inodes/" + INodeId.ROOT_INODE_ID;
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals("/", resolvedPath);
// An invalid inode path should remain unresolved
testPath = "/.invalid/.inodes/1";
components = INode.getPathComponents(testPath);
resolvedPath = FSDirectory.resolvePath(testPath, components, fsd);
assertEquals(testPath, resolvedPath);
}
} }