diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 4fe0861921c..356ae3ff566 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.ipc.Client; @@ -38,9 +37,6 @@ * This instance of this class is the way end-user code interacts * with a Hadoop DistributedFileSystem in an asynchronous manner. * - * This class is unstable, so no guarantee is provided as to reliability, - * stability or compatibility across any level of release granularity. - * *****************************************************************/ @Unstable public class AsyncDistributedFileSystem { @@ -115,59 +111,4 @@ public Future rename(Path src, Path dst, Client.setAsynchronousMode(isAsync); } } - - /** - * Set permission of a path. - * - * @param p - * the path the permission is set to - * @param permission - * the permission that is set to a path. - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future setPermission(Path p, final FsPermission permission) - throws IOException { - dfs.getFsStatistics().incrementWriteOps(1); - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().setPermission(dfs.getPathName(absPath), permission); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } - - /** - * Set owner of a path (i.e. a file or a directory). The parameters username - * and groupname cannot both be null. - * - * @param p - * The path - * @param username - * If it is null, the original username remains unchanged. - * @param groupname - * If it is null, the original groupname remains unchanged. - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future setOwner(Path p, String username, String groupname) - throws IOException { - if (username == null && groupname == null) { - throw new IOException("username == null && groupname == null"); - } - - dfs.getFsStatistics().incrementWriteOps(1); - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 94c6c0ff741..75fba213409 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -358,30 +358,12 @@ public void setPermission(String src, FsPermission permission) .setPermission(PBHelperClient.convert(permission)) .build(); try { - if (Client.isAsynchronousMode()) { - rpcProxy.setPermission(null, req); - setReturnValueCallback(); - } else { - rpcProxy.setPermission(null, req); - } + rpcProxy.setPermission(null, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } - private void setReturnValueCallback() { - final Callable returnMessageCallback = ProtobufRpcEngine - .getReturnMessageCallback(); - Callable callBack = new Callable() { - @Override - public Void call() throws Exception { - returnMessageCallback.call(); - return null; - } - }; - RETURN_VALUE_CALLBACK.set(callBack); - } - @Override public void setOwner(String src, String username, String groupname) throws IOException { @@ -392,12 +374,7 @@ public void setOwner(String src, String username, String groupname) if (groupname != null) req.setGroupname(groupname); try { - if (Client.isAsynchronousMode()) { - rpcProxy.setOwner(null, req.build()); - setReturnValueCallback(); - } else { - rpcProxy.setOwner(null, req.build()); - } + rpcProxy.setOwner(null, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -526,7 +503,17 @@ public void rename2(String src, String dst, Rename... options) try { if (Client.isAsynchronousMode()) { rpcProxy.rename2(null, req); - setReturnValueCallback(); + + final Callable returnMessageCallback = ProtobufRpcEngine + .getReturnMessageCallback(); + Callable callBack = new Callable() { + @Override + public Void call() throws Exception { + returnMessageCallback.call(); + return null; + } + }; + RETURN_VALUE_CALLBACK.set(callBack); } else { rpcProxy.rename2(null, req); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index 7539fbdc82c..d129299bf59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -22,11 +22,8 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -34,30 +31,18 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; import org.junit.Test; public class TestAsyncDFSRename { public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - private final long seed = Time.now(); - private final Random r = new Random(seed); - private final PermissionGenerator permGenerator = new PermissionGenerator(r); - private final short replFactor = 2; - private final long blockSize = 512; - private long fileLen = blockSize * 3; /** * Check the blocks of dst file are cleaned after rename with overwrite @@ -65,6 +50,8 @@ public class TestAsyncDFSRename { */ @Test(timeout = 60000) public void testAsyncRenameWithOverwrite() throws Exception { + final short replFactor = 2; + final long blockSize = 512; Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( replFactor).build(); @@ -73,6 +60,8 @@ public void testAsyncRenameWithOverwrite() throws Exception { AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); try { + + long fileLen = blockSize * 3; String src = "/foo/src"; String dst = "/foo/dst"; String src2 = "/foo/src2"; @@ -126,6 +115,8 @@ public void testAsyncRenameWithOverwrite() throws Exception { @Test(timeout = 60000) public void testCallGetReturnValueMultipleTimes() throws Exception { + final short replFactor = 2; + final long blockSize = 512; final Path renameDir = new Path( "/test/testCallGetReturnValueMultipleTimes/"); final Configuration conf = new HdfsConfiguration(); @@ -136,6 +127,7 @@ public void testCallGetReturnValueMultipleTimes() throws Exception { final DistributedFileSystem dfs = cluster.getFileSystem(); final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); final int count = 100; + long fileLen = blockSize * 3; final Map> returnFutures = new HashMap>(); assertTrue(dfs.mkdirs(renameDir)); @@ -186,15 +178,15 @@ private void verifyCallGetReturnValueMultipleTimes( } } - @Test - public void testConservativeConcurrentAsyncRenameWithOverwrite() + @Test(timeout = 120000) + public void testAggressiveConcurrentAsyncRenameWithOverwrite() throws Exception { internalTestConcurrentAsyncRenameWithOverwrite(100, "testAggressiveConcurrentAsyncRenameWithOverwrite"); } @Test(timeout = 60000) - public void testAggressiveConcurrentAsyncRenameWithOverwrite() + public void testConservativeConcurrentAsyncRenameWithOverwrite() throws Exception { internalTestConcurrentAsyncRenameWithOverwrite(10000, "testConservativeConcurrentAsyncRenameWithOverwrite"); @@ -202,6 +194,8 @@ public void testAggressiveConcurrentAsyncRenameWithOverwrite() private void internalTestConcurrentAsyncRenameWithOverwrite( final int asyncCallLimit, final String basePath) throws Exception { + final short replFactor = 2; + final long blockSize = 512; final Path renameDir = new Path(String.format("/test/%s/", basePath)); Configuration conf = new HdfsConfiguration(); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, @@ -212,6 +206,7 @@ private void internalTestConcurrentAsyncRenameWithOverwrite( DistributedFileSystem dfs = cluster.getFileSystem(); AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); int count = 1000; + long fileLen = blockSize * 3; int start = 0, end = 0; Map> returnFutures = new HashMap>(); @@ -279,206 +274,8 @@ private void waitForReturnValues( } } - @Test - public void testConservativeConcurrentAsyncAPI() throws Exception { - internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI"); - } - @Test(timeout = 60000) - public void testAggressiveConcurrentAsyncAPI() throws Exception { - internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI"); - } - - private void internalTestConcurrentAsyncAPI(final int asyncCallLimit, - final String basePath) throws Exception { - Configuration conf = new HdfsConfiguration(); - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - int count = 500; - - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - // set the limit of max async calls - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - asyncCallLimit); - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // start mini cluster - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(3).build(); - cluster.waitActive(); - AsyncDistributedFileSystem adfs = cluster.getFileSystem() - .getAsyncDistributedFileSystem(); - - // prepare for test - FileSystem rootFs = FileSystem.get(conf); - final Path parent = new Path(String.format("/test/%s/", basePath)); - final Path[] srcs = new Path[count]; - final Path[] dsts = new Path[count]; - short[] permissions = new short[count]; - for (int i = 0; i < count; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1); - assertTrue(rootFs.exists(srcs[i])); - assertTrue(rootFs.getFileStatus(srcs[i]).isFile()); - assertTrue(rootFs.exists(dsts[i])); - assertTrue(rootFs.getFileStatus(dsts[i]).isFile()); - permissions[i] = permGenerator.next(); - } - - Map> renameRetFutures = - new HashMap>(); - Map> permRetFutures = - new HashMap>(); - Map> ownerRetFutures = - new HashMap>(); - int start = 0, end = 0; - // test rename - for (int i = 0; i < count; i++) { - for (;;) { - try { - Future returnFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - renameRetFutures.put(i, returnFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(renameRetFutures, start, end); - } - } - } - - // wait for completing the calls - for (int i = start; i < count; i++) { - renameRetFutures.get(i).get(); - } - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src should not exist, dst should - for (int i = 0; i < count; i++) { - assertFalse(rootFs.exists(srcs[i])); - assertTrue(rootFs.exists(dsts[i])); - } - - // test permissions - try { - for (int i = 0; i < count; i++) { - for (;;) { - try { - Future retFuture = adfs.setPermission(dsts[i], - new FsPermission(permissions[i])); - permRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(permRetFutures, start, end); - } - } - } - // wait for completing the calls - for (int i = start; i < count; i++) { - permRetFutures.get(i).get(); - } - - // Restart NN and check permission then - cluster.restartNameNodes(); - - // verify the permission - for (int i = 0; i < count; i++) { - assertTrue(rootFs.exists(dsts[i])); - FsPermission fsPerm = new FsPermission(permissions[i]); - checkAccessPermissions(rootFs.getFileStatus(dsts[i]), - fsPerm.getUserAction()); - } - - // test setOwner - start = 0; - end = 0; - for (int i = 0; i < count; i++) { - for (;;) { - try { - Future retFuture = adfs.setOwner(dsts[i], "user1", - "group2"); - ownerRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(ownerRetFutures, start, end); - } - } - } - // wait for completing the calls - for (int i = start; i < count; i++) { - ownerRetFutures.get(i).get(); - } - - // Restart NN and check owner then - cluster.restartNameNodes(); - - // verify the owner - for (int i = 0; i < count; i++) { - assertTrue(rootFs.exists(dsts[i])); - assertTrue( - "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner())); - assertTrue( - "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup())); - } - } catch (AccessControlException ace) { - throw ace; - } finally { - if (rootFs != null) { - rootFs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - } - - static void checkAccessPermissions(FileStatus stat, FsAction mode) - throws IOException { - checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); - } - - static void checkAccessPermissions(final UserGroupInformation ugi, - FileStatus stat, FsAction mode) throws IOException { - FsPermission perm = stat.getPermission(); - String user = ugi.getShortUserName(); - List groups = Arrays.asList(ugi.getGroupNames()); - - if (user.equals(stat.getOwner())) { - if (perm.getUserAction().implies(mode)) { - return; - } - } else if (groups.contains(stat.getGroup())) { - if (perm.getGroupAction().implies(mode)) { - return; - } - } else { - if (perm.getOtherAction().implies(mode)) { - return; - } - } - throw new AccessControlException(String.format( - "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat - .getPath(), stat.getOwner(), stat.getGroup(), - stat.isDirectory() ? "d" : "-", perm)); - } - - @Test(timeout = 60000) - public void testAsyncAPIWithException() throws Exception { + public void testAsyncRenameWithException() throws Exception { Configuration conf = new HdfsConfiguration(); String group1 = "group1"; String group2 = "group2"; @@ -489,9 +286,9 @@ public void testAsyncAPIWithException() throws Exception { conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + Map u2g_map = new HashMap(1); + u2g_map.put(user1, new String[] { group1, group2 }); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); // Initiate all four users ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { @@ -502,7 +299,7 @@ public void testAsyncAPIWithException() throws Exception { cluster.waitActive(); FileSystem rootFs = FileSystem.get(conf); - final Path renameDir = new Path("/test/async_api_exception/"); + final Path renameDir = new Path("/test/async_rename_exception/"); final Path src = new Path(renameDir, "src"); final Path dst = new Path(renameDir, "dst"); rootFs.mkdirs(src); @@ -515,33 +312,11 @@ public AsyncDistributedFileSystem run() throws Exception { } }); - Future retFuture; try { - retFuture = adfs.rename(src, dst, Rename.OVERWRITE); - retFuture.get(); + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFuture.get(); } catch (ExecutionException e) { checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(src.getParent().toUri().getPath())); - } - - FsPermission fsPerm = new FsPermission(permGenerator.next()); - try { - retFuture = adfs.setPermission(src, fsPerm); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the name of the path", - e.getMessage().contains(src.getName())); - } - - try { - retFuture = adfs.setOwner(src, "user1", "group2"); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the name of the path", - e.getMessage().contains(src.getName())); } finally { if (rootFs != null) { rootFs.close(); @@ -559,5 +334,7 @@ private void checkPermissionDenied(final Exception e, final Path dir, e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e .getMessage().contains(user)); + assertTrue("Permission denied messages must carry the path parent", e + .getMessage().contains(dir.getParent().toUri().getPath())); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java index 66a0380f92f..aa204cdc648 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java @@ -196,35 +196,22 @@ private short getPermission(Path path) throws IOException { return fs.getFileStatus(path).getPermission().toShort(); } - private void create(OpType op, Path name) throws IOException { - create(fs, conf, op, name); - } - /* create a file/directory with the default umask and permission */ - static void create(final FileSystem fs, final Configuration fsConf, - OpType op, Path name) throws IOException { - create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission( - DEFAULT_PERMISSION)); - } - - private void create(OpType op, Path name, short umask, - FsPermission permission) - throws IOException { - create(fs, conf, op, name, umask, permission); + private void create(OpType op, Path name) throws IOException { + create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION)); } /* create a file/directory with the given umask and permission */ - static void create(final FileSystem fs, final Configuration fsConf, - OpType op, Path name, short umask, FsPermission permission) - throws IOException { + private void create(OpType op, Path name, short umask, + FsPermission permission) throws IOException { // set umask in configuration, converting to padded octal - fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask)); + conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask)); // create the file/directory switch (op) { case CREATE: FSDataOutputStream out = fs.create(name, permission, true, - fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null); out.close(); break; @@ -372,7 +359,7 @@ private void checkOwnership(Path name, String expectedOwner, final static private String DIR_NAME = "dir"; final static private String FILE_DIR_NAME = "filedir"; - enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION, + private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION, GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE }; @@ -628,7 +615,7 @@ private void testPermissionCheckingPerUser(UserGroupInformation ugi, /* A random permission generator that guarantees that each permission * value is generated only once. */ - static class PermissionGenerator { + static private class PermissionGenerator { private final Random r; private final short[] permissions = new short[MAX_PERMISSION + 1]; private int numLeft = MAX_PERMISSION + 1;