HDFS-5104 Support dotdot name in NFS LOOKUP operation. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1515042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-08-17 21:16:50 +00:00
parent 1ad3fe4633
commit 214d4377fc
4 changed files with 87 additions and 29 deletions

View File

@ -291,6 +291,8 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5076 Add MXBean methods to query NN's transaction information and
JournalNode's journal status. (jing9)
HDFS-5104 Support dotdot name in NFS LOOKUP operation (brandonli)
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -204,13 +205,20 @@ public class DFSUtil {
String[] components = StringUtils.split(src, '/');
for (int i = 0; i < components.length; i++) {
String element = components[i];
if (element.equals("..") ||
element.equals(".") ||
if (element.equals(".") ||
(element.indexOf(":") >= 0) ||
(element.indexOf("/") >= 0)) {
return false;
}
// ".." is allowed in path starting with /.reserved/.inodes
if (element.equals("..")) {
if (components.length > 4
&& components[1].equals(FSDirectory.DOT_RESERVED_STRING)
&& components[2].equals(FSDirectory.DOT_INODES_STRING)) {
continue;
}
return false;
}
// The string may start or end with a /, but not have
// "//" in the middle.
if (element.isEmpty() && i != components.length - 1 &&

View File

@ -2730,6 +2730,19 @@ public class FSDirectory implements Closeable {
throw new FileNotFoundException(
"File for given inode path does not exist: " + src);
}
// Handle single ".." for NFS lookup support.
if ((pathComponents.length > 4)
&& DFSUtil.bytes2String(pathComponents[4]).equals("..")) {
INode parent = inode.getParent();
if (parent == null || parent.getId() == INodeId.ROOT_INODE_ID) {
// inode is root, or its parent is root.
return Path.SEPARATOR;
} else {
return parent.getFullPathName();
}
}
StringBuilder path = id == INodeId.ROOT_INODE_ID ? new StringBuilder()
: new StringBuilder(inode.getFullPathName());
for (int i = 4; i < pathComponents.length; i++) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
@ -901,31 +902,65 @@ public class TestINodeFile {
@Test
public void testInodeReplacement() throws Exception {
final Configuration conf = new Configuration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem hdfs = cluster.getFileSystem();
final FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final Path dir = new Path("/dir");
hdfs.mkdirs(dir);
INode dirNode = fsdir.getINode(dir.toString());
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
// set quota to dir, which leads to node replacement
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
dirNode = fsdir.getINode(dir.toString());
assertTrue(dirNode instanceof INodeDirectoryWithQuota);
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
hdfs.setQuota(dir, -1, -1);
dirNode = fsdir.getINode(dir.toString());
assertTrue(dirNode instanceof INodeDirectory);
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem hdfs = cluster.getFileSystem();
final FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final Path dir = new Path("/dir");
hdfs.mkdirs(dir);
INode dirNode = fsdir.getINode(dir.toString());
INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
// set quota to dir, which leads to node replacement
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
dirNode = fsdir.getINode(dir.toString());
assertTrue(dirNode instanceof INodeDirectoryWithQuota);
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
hdfs.setQuota(dir, -1, -1);
dirNode = fsdir.getINode(dir.toString());
assertTrue(dirNode instanceof INodeDirectory);
// the inode in inodeMap should also be replaced
dirNodeFromNode = fsdir.getInode(dirNode.getId());
assertSame(dirNode, dirNodeFromNode);
} finally {
cluster.shutdown();
}
}
@Test
public void testDotdotInodePath() throws Exception {
final Configuration conf = new Configuration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem hdfs = cluster.getFileSystem();
final FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final Path dir = new Path("/dir");
hdfs.mkdirs(dir);
long dirId = fsdir.getINode(dir.toString()).getId();
long parentId = fsdir.getINode("/").getId();
String testPath = "/.reserved/.inodes/" + dirId + "/..";
DFSClient client = new DFSClient(NameNode.getAddress(conf), conf);
HdfsFileStatus status = client.getFileInfo(testPath);
assertTrue(parentId == status.getFileId());
// Test root's parent is still root
testPath = "/.reserved/.inodes/" + parentId + "/..";
status = client.getFileInfo(testPath);
assertTrue(parentId == status.getFileId());
} finally {
cluster.shutdown();
}
}
}