HDFS-6099. Merging change r1579122 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579128 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-03-19 03:55:53 +00:00
parent e39ccf3b24
commit 76f4a32aec
4 changed files with 142 additions and 47 deletions

View File

@ -403,6 +403,8 @@ Release 2.4.0 - UNRELEASED
HDFS-6117. Print file path information in FileNotFoundException on INode
ID mismatch. (suresh)
HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -643,6 +643,7 @@ public class FSDirectory implements Closeable {
}
// Ensure dst has quota to accommodate rename
verifyFsLimitsForRename(srcIIP, dstIIP);
verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
boolean added = false;
@ -894,6 +895,7 @@ public class FSDirectory implements Closeable {
}
// Ensure dst has quota to accommodate rename
verifyFsLimitsForRename(srcIIP, dstIIP);
verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
INode srcChild = srcIIP.getLastINode();
@ -2134,6 +2136,27 @@ public class FSDirectory implements Closeable {
delta.get(Quota.DISKSPACE), src[i - 1]);
}
/**
* Checks file system limits (max component length and max directory items)
* during a rename operation.
*
* @param srcIIP INodesInPath containing every inode in the rename source
* @param dstIIP INodesInPath containing every inode in the rename destination
* @throws PathComponentTooLongException child's name is too long.
* @throws MaxDirectoryItemsExceededException too many children.
*/
private void verifyFsLimitsForRename(INodesInPath srcIIP, INodesInPath dstIIP)
throws PathComponentTooLongException, MaxDirectoryItemsExceededException {
byte[] dstChildName = dstIIP.getLastLocalName();
INode[] dstInodes = dstIIP.getINodes();
int pos = dstInodes.length - 1;
verifyMaxComponentLength(dstChildName, dstInodes, pos);
// Do not enforce max directory items if renaming within same directory.
if (srcIIP.getINode(-2) != dstIIP.getINode(-2)) {
verifyMaxDirItems(dstInodes, pos);
}
}
/** Verify if the snapshot name is legal. */
void verifySnapshotName(String snapshotName, String path)
throws PathComponentTooLongException {
@ -2159,10 +2182,14 @@ public class FSDirectory implements Closeable {
/**
* Verify child's name for fs limit.
*
* @param childName byte[] containing new child name
* @param parentPath Object either INode[] or String containing parent path
* @param pos int position of new child in path
* @throws PathComponentTooLongException child's name is too long.
*/
void verifyMaxComponentLength(byte[] childName, Object parentPath, int pos)
throws PathComponentTooLongException {
private void verifyMaxComponentLength(byte[] childName, Object parentPath,
int pos) throws PathComponentTooLongException {
if (maxComponentLength == 0) {
return;
}
@ -2184,9 +2211,12 @@ public class FSDirectory implements Closeable {
/**
* Verify children size for fs limit.
*
* @param pathComponents INode[] containing full path of inodes to new child
* @param pos int position of new child in pathComponents
* @throws MaxDirectoryItemsExceededException too many children.
*/
void verifyMaxDirItems(INode[] pathComponents, int pos)
private void verifyMaxDirItems(INode[] pathComponents, int pos)
throws MaxDirectoryItemsExceededException {
final INodeDirectory parent = pathComponents[pos-1].asDirectory();

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -353,7 +354,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
LeaseExpiredException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
AclException.class);
AclException.class,
FSLimitException.PathComponentTooLongException.class,
FSLimitException.MaxDirectoryItemsExceededException.class);
}
/** Allow access to the client RPC server for testing */

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.util.Time.now;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
@ -29,14 +30,15 @@ import java.io.IOException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@ -50,7 +52,12 @@ public class TestFsLimits {
static PermissionStatus perms
= new PermissionStatus("admin", "admin", FsPermission.getDefault());
static INodeDirectory rootInode;
static private FSImage getMockFSImage() {
FSEditLog editLog = mock(FSEditLog.class);
FSImage fsImage = mock(FSImage.class);
when(fsImage.getEditLog()).thenReturn(editLog);
return fsImage;
}
static private FSNamesystem getMockNamesystem() {
FSNamesystem fsn = mock(FSNamesystem.class);
@ -64,8 +71,9 @@ public class TestFsLimits {
private static class MockFSDirectory extends FSDirectory {
public MockFSDirectory() throws IOException {
super(new FSImage(conf), getMockNamesystem(), conf);
super(getMockFSImage(), getMockNamesystem(), conf);
setReady(fsIsReady);
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
}
}
@ -76,21 +84,18 @@ public class TestFsLimits {
fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
"namenode")).toString());
rootInode = new INodeDirectory(getMockNamesystem().allocateNewInodeId(),
INodeDirectory.ROOT_NAME, perms, 0L);
inodes = new INode[]{ rootInode, null };
fs = null;
fsIsReady = true;
}
@Test
public void testNoLimits() throws Exception {
addChildWithName("1", null);
addChildWithName("22", null);
addChildWithName("333", null);
addChildWithName("4444", null);
addChildWithName("55555", null);
addChildWithName(HdfsConstants.DOT_SNAPSHOT_DIR,
mkdirs("/1", null);
mkdirs("/22", null);
mkdirs("/333", null);
mkdirs("/4444", null);
mkdirs("/55555", null);
mkdirs("/1/" + HdfsConstants.DOT_SNAPSHOT_DIR,
HadoopIllegalArgumentException.class);
}
@ -98,33 +103,65 @@ public class TestFsLimits {
public void testMaxComponentLength() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 2);
addChildWithName("1", null);
addChildWithName("22", null);
addChildWithName("333", PathComponentTooLongException.class);
addChildWithName("4444", PathComponentTooLongException.class);
mkdirs("/1", null);
mkdirs("/22", null);
mkdirs("/333", PathComponentTooLongException.class);
mkdirs("/4444", PathComponentTooLongException.class);
}
@Test
public void testMaxComponentLengthRename() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 2);
mkdirs("/5", null);
rename("/5", "/555", PathComponentTooLongException.class);
rename("/5", "/55", null);
mkdirs("/6", null);
deprecatedRename("/6", "/666", PathComponentTooLongException.class);
deprecatedRename("/6", "/66", null);
}
@Test
public void testMaxDirItems() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
addChildWithName("1", null);
addChildWithName("22", null);
addChildWithName("333", MaxDirectoryItemsExceededException.class);
addChildWithName("4444", MaxDirectoryItemsExceededException.class);
mkdirs("/1", null);
mkdirs("/22", null);
mkdirs("/333", MaxDirectoryItemsExceededException.class);
mkdirs("/4444", MaxDirectoryItemsExceededException.class);
}
@Test
public void testMaxDirItemsRename() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
mkdirs("/1", null);
mkdirs("/2", null);
mkdirs("/2/A", null);
rename("/2/A", "/A", MaxDirectoryItemsExceededException.class);
rename("/2/A", "/1/A", null);
mkdirs("/2/B", null);
deprecatedRename("/2/B", "/B", MaxDirectoryItemsExceededException.class);
deprecatedRename("/2/B", "/1/B", null);
rename("/1", "/3", null);
deprecatedRename("/2", "/4", null);
}
@Test
public void testMaxDirItemsLimits() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 0);
try {
addChildWithName("1", null);
mkdirs("1", null);
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot set dfs", e);
}
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 64*100*1024);
try {
addChildWithName("1", null);
mkdirs("1", null);
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot set dfs", e);
}
@ -135,10 +172,10 @@ public class TestFsLimits {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 3);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
addChildWithName("1", null);
addChildWithName("22", null);
addChildWithName("333", MaxDirectoryItemsExceededException.class);
addChildWithName("4444", PathComponentTooLongException.class);
mkdirs("/1", null);
mkdirs("/22", null);
mkdirs("/333", MaxDirectoryItemsExceededException.class);
mkdirs("/4444", PathComponentTooLongException.class);
}
@Test
@ -147,32 +184,55 @@ public class TestFsLimits {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY, 2);
fsIsReady = false;
addChildWithName(HdfsConstants.DOT_SNAPSHOT_DIR,
mkdirs("/1", null);
mkdirs("/22", null);
mkdirs("/333", null);
mkdirs("/4444", null);
mkdirs("/1/" + HdfsConstants.DOT_SNAPSHOT_DIR,
HadoopIllegalArgumentException.class);
addChildWithName("1", null);
addChildWithName("22", null);
addChildWithName("333", null);
addChildWithName("4444", null);
}
private void addChildWithName(String name, Class<?> expected)
private void mkdirs(String name, Class<?> expected)
throws Exception {
// have to create after the caller has had a chance to set conf values
if (fs == null) fs = new MockFSDirectory();
INode child = new INodeDirectory(getMockNamesystem().allocateNewInodeId(),
DFSUtil.string2Bytes(name), perms, 0L);
lazyInitFSDirectory();
Class<?> generated = null;
try {
fs.verifyMaxComponentLength(child.getLocalNameBytes(), inodes, 1);
fs.verifyMaxDirItems(inodes, 1);
fs.verifyINodeName(child.getLocalNameBytes());
rootInode.addChild(child);
fs.mkdirs(name, perms, false, now());
} catch (Throwable e) {
generated = e.getClass();
}
assertEquals(expected, generated);
}
private void rename(String src, String dst, Class<?> expected)
throws Exception {
lazyInitFSDirectory();
Class<?> generated = null;
try {
fs.renameTo(src, dst, false, new Rename[] { });
} catch (Throwable e) {
generated = e.getClass();
}
assertEquals(expected, generated);
}
@SuppressWarnings("deprecation")
private void deprecatedRename(String src, String dst, Class<?> expected)
throws Exception {
lazyInitFSDirectory();
Class<?> generated = null;
try {
fs.renameTo(src, dst, false);
} catch (Throwable e) {
generated = e.getClass();
}
assertEquals(expected, generated);
}
private static void lazyInitFSDirectory() throws IOException {
// have to create after the caller has had a chance to set conf values
if (fs == null) {
fs = new MockFSDirectory();
}
}
}