HDFS-2109. Store uMask as member variable to DFSClient.Conf. Contributed by Bharath Mundlapudi

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1141767 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-01 00:21:51 +00:00
parent 7bd41f031f
commit 09c8adfb18
3 changed files with 45 additions and 21 deletions

View File

@ -537,6 +537,9 @@ Trunk (unreleased changes)
HDFS-2107. Move block management code from o.a.h.h.s.namenode to a new
package o.a.h.h.s.blockmanagement. (szetszwo)
HDFS-2109. Store uMask as member variable to DFSClient.Conf. (Bharath
Mundlapudi via szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -156,6 +156,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
final long prefetchSize;
final short defaultReplication;
final String taskId;
final FsPermission uMask;
Conf(Configuration conf) {
maxBlockAcquireFailures = conf.getInt(
@ -201,6 +202,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFSConfigKeys
.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf);
}
}
@ -789,7 +791,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
@ -856,7 +858,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
throws IOException {
try {
FsPermission dirPerm =
FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
namenode.createSymlink(target, link, dirPerm, createParent);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@ -1424,7 +1426,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
@ -1451,7 +1453,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
checkOpen();
if (absPermission == null) {
absPermission =
FsPermission.getDefault().applyUMask(FsPermission.getUMask(conf));
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
}
if(LOG.isDebugEnabled()) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@ -31,7 +30,6 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -68,7 +66,6 @@ public class TestDFSPermission extends TestCase {
final static private int NUM_TEST_PERMISSIONS =
conf.getInt("test.dfs.permission.num", 10) * (MAX_PERMISSION + 1) / 100;
final private static String PATH_NAME = "xx";
final private static Path FILE_DIR_PATH = new Path("/", PATH_NAME);
final private static Path NON_EXISTENT_PATH = new Path("/parent", PATH_NAME);
@ -115,44 +112,66 @@ public class TestDFSPermission extends TestCase {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
try {
cluster.waitActive();
fs = FileSystem.get(conf);
LOG.info("ROOT=" + fs.getFileStatus(new Path("/")));
testPermissionSetting(OpType.CREATE); // test file creation
testPermissionSetting(OpType.MKDIRS); // test directory creation
} finally {
fs.close();
cluster.shutdown();
}
}
private void initFileSystem(short umask) throws Exception {
// set umask in configuration, converting to padded octal
conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
fs = FileSystem.get(conf);
}
private void closeFileSystem() throws Exception {
fs.close();
}
/* check permission setting works correctly for file or directory */
private void testPermissionSetting(OpType op) throws Exception {
short uMask = DEFAULT_UMASK;
// case 1: use default permission but all possible umasks
PermissionGenerator generator = new PermissionGenerator(r);
FsPermission permission = new FsPermission(DEFAULT_PERMISSION);
for (short i = 0; i < NUM_TEST_PERMISSIONS; i++) {
createAndCheckPermission(op, FILE_DIR_PATH, generator.next(),
new FsPermission(DEFAULT_PERMISSION), true);
uMask = generator.next();
initFileSystem(uMask);
createAndCheckPermission(op, FILE_DIR_PATH, uMask, permission, true);
closeFileSystem();
}
// case 2: use permission 0643 and the default umask
createAndCheckPermission(op, FILE_DIR_PATH, DEFAULT_UMASK,
new FsPermission((short) 0643), true);
uMask = DEFAULT_UMASK;
initFileSystem(uMask);
createAndCheckPermission(op, FILE_DIR_PATH, uMask, new FsPermission(
(short) 0643), true);
closeFileSystem();
// case 3: use permission 0643 and umask 0222
createAndCheckPermission(op, FILE_DIR_PATH, (short) 0222,
new FsPermission((short) 0643), false);
uMask = (short) 0222;
initFileSystem(uMask);
createAndCheckPermission(op, FILE_DIR_PATH, uMask, new FsPermission(
(short) 0643), false);
closeFileSystem();
// case 4: set permission
fs.setPermission(FILE_DIR_PATH, new FsPermission((short) 0111));
uMask = (short) 0111;
initFileSystem(uMask);
fs.setPermission(FILE_DIR_PATH, new FsPermission(uMask));
short expectedPermission = (short) ((op == OpType.CREATE) ? 0 : 0111);
checkPermission(FILE_DIR_PATH, expectedPermission, true);
closeFileSystem();
// case 5: test non-existent parent directory
assertFalse(fs.exists(NON_EXISTENT_PATH));
createAndCheckPermission(op, NON_EXISTENT_PATH, DEFAULT_UMASK,
new FsPermission(DEFAULT_PERMISSION), false);
uMask = DEFAULT_UMASK;
initFileSystem(uMask);
assertFalse("File shouldn't exists", fs.exists(NON_EXISTENT_PATH));
createAndCheckPermission(op, NON_EXISTENT_PATH, uMask, new FsPermission(
DEFAULT_PERMISSION), false);
Path parent = NON_EXISTENT_PATH.getParent();
checkPermission(parent, getPermission(parent.getParent()), true);
closeFileSystem();
}
/* get the permission of a file/directory */