diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 356ae3ff566..4fe0861921c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.ipc.Client; @@ -37,6 +38,9 @@ import com.google.common.util.concurrent.AbstractFuture; * This instance of this class is the way end-user code interacts * with a Hadoop DistributedFileSystem in an asynchronous manner. * + * This class is unstable, so no guarantee is provided as to reliability, + * stability or compatibility across any level of release granularity. + * *****************************************************************/ @Unstable public class AsyncDistributedFileSystem { @@ -111,4 +115,59 @@ public class AsyncDistributedFileSystem { Client.setAsynchronousMode(isAsync); } } + + /** + * Set permission of a path. + * + * @param p + * the path the permission is set to + * @param permission + * the permission that is set to a path. + * @return an instance of Future, #get of which is invoked to wait for + * asynchronous call being finished. + */ + public Future setPermission(Path p, final FsPermission permission) + throws IOException { + dfs.getFsStatistics().incrementWriteOps(1); + final Path absPath = dfs.fixRelativePart(p); + final boolean isAsync = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + try { + dfs.getClient().setPermission(dfs.getPathName(absPath), permission); + return getReturnValue(); + } finally { + Client.setAsynchronousMode(isAsync); + } + } + + /** + * Set owner of a path (i.e. a file or a directory). The parameters username + * and groupname cannot both be null. + * + * @param p + * The path + * @param username + * If it is null, the original username remains unchanged. + * @param groupname + * If it is null, the original groupname remains unchanged. + * @return an instance of Future, #get of which is invoked to wait for + * asynchronous call being finished. + */ + public Future setOwner(Path p, String username, String groupname) + throws IOException { + if (username == null && groupname == null) { + throw new IOException("username == null && groupname == null"); + } + + dfs.getFsStatistics().incrementWriteOps(1); + final Path absPath = dfs.fixRelativePart(p); + final boolean isAsync = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + try { + dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname); + return getReturnValue(); + } finally { + Client.setAsynchronousMode(isAsync); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 75fba213409..94c6c0ff741 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -358,12 +358,30 @@ public class ClientNamenodeProtocolTranslatorPB implements .setPermission(PBHelperClient.convert(permission)) .build(); try { - rpcProxy.setPermission(null, req); + if (Client.isAsynchronousMode()) { + rpcProxy.setPermission(null, req); + setReturnValueCallback(); + } else { + rpcProxy.setPermission(null, req); + } } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } + private void setReturnValueCallback() { + final Callable returnMessageCallback = ProtobufRpcEngine + .getReturnMessageCallback(); + Callable callBack = new Callable() { + @Override + public Void call() throws Exception { + returnMessageCallback.call(); + return null; + } + }; + RETURN_VALUE_CALLBACK.set(callBack); + } + @Override public void setOwner(String src, String username, String groupname) throws IOException { @@ -374,7 +392,12 @@ public class ClientNamenodeProtocolTranslatorPB implements if (groupname != null) req.setGroupname(groupname); try { - rpcProxy.setOwner(null, req.build()); + if (Client.isAsynchronousMode()) { + rpcProxy.setOwner(null, req.build()); + setReturnValueCallback(); + } else { + rpcProxy.setOwner(null, req.build()); + } } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -503,17 +526,7 @@ public class ClientNamenodeProtocolTranslatorPB implements try { if (Client.isAsynchronousMode()) { rpcProxy.rename2(null, req); - - final Callable returnMessageCallback = ProtobufRpcEngine - .getReturnMessageCallback(); - Callable callBack = new Callable() { - @Override - public Void call() throws Exception { - returnMessageCallback.call(); - return null; - } - }; - RETURN_VALUE_CALLBACK.set(callBack); + setReturnValueCallback(); } else { rpcProxy.rename2(null, req); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index d129299bf59..7539fbdc82c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -22,8 +22,11 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -31,18 +34,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.junit.Test; public class TestAsyncDFSRename { public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); + private final long seed = Time.now(); + private final Random r = new Random(seed); + private final PermissionGenerator permGenerator = new PermissionGenerator(r); + private final short replFactor = 2; + private final long blockSize = 512; + private long fileLen = blockSize * 3; /** * Check the blocks of dst file are cleaned after rename with overwrite @@ -50,8 +65,6 @@ public class TestAsyncDFSRename { */ @Test(timeout = 60000) public void testAsyncRenameWithOverwrite() throws Exception { - final short replFactor = 2; - final long blockSize = 512; Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( replFactor).build(); @@ -60,8 +73,6 @@ public class TestAsyncDFSRename { AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); try { - - long fileLen = blockSize * 3; String src = "/foo/src"; String dst = "/foo/dst"; String src2 = "/foo/src2"; @@ -115,8 +126,6 @@ public class TestAsyncDFSRename { @Test(timeout = 60000) public void testCallGetReturnValueMultipleTimes() throws Exception { - final short replFactor = 2; - final long blockSize = 512; final Path renameDir = new Path( "/test/testCallGetReturnValueMultipleTimes/"); final Configuration conf = new HdfsConfiguration(); @@ -127,7 +136,6 @@ public class TestAsyncDFSRename { final DistributedFileSystem dfs = cluster.getFileSystem(); final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); final int count = 100; - long fileLen = blockSize * 3; final Map> returnFutures = new HashMap>(); assertTrue(dfs.mkdirs(renameDir)); @@ -178,15 +186,15 @@ public class TestAsyncDFSRename { } } - @Test(timeout = 120000) - public void testAggressiveConcurrentAsyncRenameWithOverwrite() + @Test + public void testConservativeConcurrentAsyncRenameWithOverwrite() throws Exception { internalTestConcurrentAsyncRenameWithOverwrite(100, "testAggressiveConcurrentAsyncRenameWithOverwrite"); } @Test(timeout = 60000) - public void testConservativeConcurrentAsyncRenameWithOverwrite() + public void testAggressiveConcurrentAsyncRenameWithOverwrite() throws Exception { internalTestConcurrentAsyncRenameWithOverwrite(10000, "testConservativeConcurrentAsyncRenameWithOverwrite"); @@ -194,8 +202,6 @@ public class TestAsyncDFSRename { private void internalTestConcurrentAsyncRenameWithOverwrite( final int asyncCallLimit, final String basePath) throws Exception { - final short replFactor = 2; - final long blockSize = 512; final Path renameDir = new Path(String.format("/test/%s/", basePath)); Configuration conf = new HdfsConfiguration(); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, @@ -206,7 +212,6 @@ public class TestAsyncDFSRename { DistributedFileSystem dfs = cluster.getFileSystem(); AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); int count = 1000; - long fileLen = blockSize * 3; int start = 0, end = 0; Map> returnFutures = new HashMap>(); @@ -274,8 +279,206 @@ public class TestAsyncDFSRename { } } + @Test + public void testConservativeConcurrentAsyncAPI() throws Exception { + internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI"); + } + @Test(timeout = 60000) - public void testAsyncRenameWithException() throws Exception { + public void testAggressiveConcurrentAsyncAPI() throws Exception { + internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI"); + } + + private void internalTestConcurrentAsyncAPI(final int asyncCallLimit, + final String basePath) throws Exception { + Configuration conf = new HdfsConfiguration(); + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + int count = 500; + + // explicitly turn on permission checking + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + // set the limit of max async calls + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + asyncCallLimit); + + // create fake mapping for the groups + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + + // start mini cluster + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build(); + cluster.waitActive(); + AsyncDistributedFileSystem adfs = cluster.getFileSystem() + .getAsyncDistributedFileSystem(); + + // prepare for test + FileSystem rootFs = FileSystem.get(conf); + final Path parent = new Path(String.format("/test/%s/", basePath)); + final Path[] srcs = new Path[count]; + final Path[] dsts = new Path[count]; + short[] permissions = new short[count]; + for (int i = 0; i < count; i++) { + srcs[i] = new Path(parent, "src" + i); + dsts[i] = new Path(parent, "dst" + i); + DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1); + assertTrue(rootFs.exists(srcs[i])); + assertTrue(rootFs.getFileStatus(srcs[i]).isFile()); + assertTrue(rootFs.exists(dsts[i])); + assertTrue(rootFs.getFileStatus(dsts[i]).isFile()); + permissions[i] = permGenerator.next(); + } + + Map> renameRetFutures = + new HashMap>(); + Map> permRetFutures = + new HashMap>(); + Map> ownerRetFutures = + new HashMap>(); + int start = 0, end = 0; + // test rename + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future returnFuture = adfs.rename(srcs[i], dsts[i], + Rename.OVERWRITE); + renameRetFutures.put(i, returnFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(renameRetFutures, start, end); + } + } + } + + // wait for completing the calls + for (int i = start; i < count; i++) { + renameRetFutures.get(i).get(); + } + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + + // very the src should not exist, dst should + for (int i = 0; i < count; i++) { + assertFalse(rootFs.exists(srcs[i])); + assertTrue(rootFs.exists(dsts[i])); + } + + // test permissions + try { + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.setPermission(dsts[i], + new FsPermission(permissions[i])); + permRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(permRetFutures, start, end); + } + } + } + // wait for completing the calls + for (int i = start; i < count; i++) { + permRetFutures.get(i).get(); + } + + // Restart NN and check permission then + cluster.restartNameNodes(); + + // verify the permission + for (int i = 0; i < count; i++) { + assertTrue(rootFs.exists(dsts[i])); + FsPermission fsPerm = new FsPermission(permissions[i]); + checkAccessPermissions(rootFs.getFileStatus(dsts[i]), + fsPerm.getUserAction()); + } + + // test setOwner + start = 0; + end = 0; + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.setOwner(dsts[i], "user1", + "group2"); + ownerRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(ownerRetFutures, start, end); + } + } + } + // wait for completing the calls + for (int i = start; i < count; i++) { + ownerRetFutures.get(i).get(); + } + + // Restart NN and check owner then + cluster.restartNameNodes(); + + // verify the owner + for (int i = 0; i < count; i++) { + assertTrue(rootFs.exists(dsts[i])); + assertTrue( + "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner())); + assertTrue( + "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup())); + } + } catch (AccessControlException ace) { + throw ace; + } finally { + if (rootFs != null) { + rootFs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + static void checkAccessPermissions(FileStatus stat, FsAction mode) + throws IOException { + checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); + } + + static void checkAccessPermissions(final UserGroupInformation ugi, + FileStatus stat, FsAction mode) throws IOException { + FsPermission perm = stat.getPermission(); + String user = ugi.getShortUserName(); + List groups = Arrays.asList(ugi.getGroupNames()); + + if (user.equals(stat.getOwner())) { + if (perm.getUserAction().implies(mode)) { + return; + } + } else if (groups.contains(stat.getGroup())) { + if (perm.getGroupAction().implies(mode)) { + return; + } + } else { + if (perm.getOtherAction().implies(mode)) { + return; + } + } + throw new AccessControlException(String.format( + "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat + .getPath(), stat.getOwner(), stat.getGroup(), + stat.isDirectory() ? "d" : "-", perm)); + } + + @Test(timeout = 60000) + public void testAsyncAPIWithException() throws Exception { Configuration conf = new HdfsConfiguration(); String group1 = "group1"; String group2 = "group2"; @@ -286,9 +489,9 @@ public class TestAsyncDFSRename { conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); // create fake mapping for the groups - Map u2g_map = new HashMap(1); - u2g_map.put(user1, new String[] { group1, group2 }); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); // Initiate all four users ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { @@ -299,7 +502,7 @@ public class TestAsyncDFSRename { cluster.waitActive(); FileSystem rootFs = FileSystem.get(conf); - final Path renameDir = new Path("/test/async_rename_exception/"); + final Path renameDir = new Path("/test/async_api_exception/"); final Path src = new Path(renameDir, "src"); final Path dst = new Path(renameDir, "dst"); rootFs.mkdirs(src); @@ -312,11 +515,33 @@ public class TestAsyncDFSRename { } }); + Future retFuture; try { - Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFuture.get(); + retFuture = adfs.rename(src, dst, Rename.OVERWRITE); + retFuture.get(); } catch (ExecutionException e) { checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the path parent", e + .getMessage().contains(src.getParent().toUri().getPath())); + } + + FsPermission fsPerm = new FsPermission(permGenerator.next()); + try { + retFuture = adfs.setPermission(src, fsPerm); + retFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the name of the path", + e.getMessage().contains(src.getName())); + } + + try { + retFuture = adfs.setOwner(src, "user1", "group2"); + retFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the name of the path", + e.getMessage().contains(src.getName())); } finally { if (rootFs != null) { rootFs.close(); @@ -334,7 +559,5 @@ public class TestAsyncDFSRename { e.getMessage().contains("AccessControlException")); assertTrue("Permission denied messages must carry the username", e .getMessage().contains(user)); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(dir.getParent().toUri().getPath())); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java index 8f2f0a04e87..8885ce75f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java @@ -196,22 +196,35 @@ public class TestDFSPermission { return fs.getFileStatus(path).getPermission().toShort(); } - /* create a file/directory with the default umask and permission */ private void create(OpType op, Path name) throws IOException { - create(op, name, DEFAULT_UMASK, new FsPermission(DEFAULT_PERMISSION)); + create(fs, conf, op, name); + } + + /* create a file/directory with the default umask and permission */ + static void create(final FileSystem fs, final Configuration fsConf, + OpType op, Path name) throws IOException { + create(fs, fsConf, op, name, DEFAULT_UMASK, new FsPermission( + DEFAULT_PERMISSION)); + } + + private void create(OpType op, Path name, short umask, + FsPermission permission) + throws IOException { + create(fs, conf, op, name, umask, permission); } /* create a file/directory with the given umask and permission */ - private void create(OpType op, Path name, short umask, - FsPermission permission) throws IOException { + static void create(final FileSystem fs, final Configuration fsConf, + OpType op, Path name, short umask, FsPermission permission) + throws IOException { // set umask in configuration, converting to padded octal - conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask)); + fsConf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask)); // create the file/directory switch (op) { case CREATE: FSDataOutputStream out = fs.create(name, permission, true, - conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + fsConf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), fs.getDefaultReplication(name), fs.getDefaultBlockSize(name), null); out.close(); break; @@ -359,7 +372,7 @@ public class TestDFSPermission { final static private String DIR_NAME = "dir"; final static private String FILE_DIR_NAME = "filedir"; - private enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION, + enum OpType {CREATE, MKDIRS, OPEN, SET_REPLICATION, GET_FILEINFO, IS_DIR, EXISTS, GET_CONTENT_LENGTH, LIST, RENAME, DELETE }; @@ -602,7 +615,7 @@ public class TestDFSPermission { /* A random permission generator that guarantees that each permission * value is generated only once. */ - static private class PermissionGenerator { + static class PermissionGenerator { private final Random r; private final short[] permissions = new short[MAX_PERMISSION + 1]; private int numLeft = MAX_PERMISSION + 1;