From f4b9bcd87c66a39f0c93983431630e9d1b6e36d3 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 26 May 2016 12:15:17 -0700 Subject: [PATCH] HDFS-10431 Refactor and speedup TestAsyncDFSRename. Contributed by Xiaobing Zhou --- .../org/apache/hadoop/hdfs/TestAsyncDFS.java | 233 +++++++- .../hadoop/hdfs/TestAsyncDFSRename.java | 563 +++--------------- 2 files changed, 313 insertions(+), 483 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java index 67262dd4668..ddcf492922c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java @@ -29,13 +29,16 @@ import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -43,15 +46,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -63,21 +72,28 @@ import com.google.common.collect.Lists; * */ public class TestAsyncDFS { public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); - private static final int NUM_TESTS = 1000; + private final short replFactor = 1; + private final long blockSize = 512; + private long fileLen = blockSize * 3; + private final long seed = Time.now(); + private final Random r = new Random(seed); + private final PermissionGenerator permGenerator = new PermissionGenerator(r); + private static final int NUM_TESTS = 50; private static final int NUM_NN_HANDLER = 10; - private static final int ASYNC_CALL_LIMIT = 100; + private static final int ASYNC_CALL_LIMIT = 1000; private Configuration conf; private MiniDFSCluster cluster; private FileSystem fs; + private AsyncDistributedFileSystem adfs; @Before public void setup() throws IOException { conf = new HdfsConfiguration(); // explicitly turn on acl conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); - // explicitly turn on ACL - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // explicitly turn on permission checking + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); // set the limit of max async calls conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, ASYNC_CALL_LIMIT); @@ -86,6 +102,7 @@ public class TestAsyncDFS { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); fs = FileSystem.get(conf); + adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); } @After @@ -130,13 +147,9 @@ public class TestAsyncDFS { final String basePath = "testBatchAsyncAcl"; final Path parent = new Path(String.format("/test/%s/", basePath)); - AsyncDistributedFileSystem adfs = cluster.getFileSystem() - .getAsyncDistributedFileSystem(); - // prepare test - int count = NUM_TESTS; - final Path[] paths = new Path[count]; - for (int i = 0; i < count; i++) { + final Path[] paths = new Path[NUM_TESTS]; + for (int i = 0; i < NUM_TESTS; i++) { paths[i] = new Path(parent, "acl" + i); FileSystem.mkdirs(fs, paths[i], FsPermission.createImmutable((short) 0750)); @@ -153,7 +166,7 @@ public class TestAsyncDFS { int start = 0, end = 0; try { // test setAcl - for (int i = 0; i < count; i++) { + for (int i = 0; i < NUM_TESTS; i++) { for (;;) { try { Future retFuture = adfs.setAcl(paths[i], aclSpec); @@ -166,12 +179,12 @@ public class TestAsyncDFS { } } } - waitForAclReturnValues(setAclRetFutures, end, count); + waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS); // test getAclStatus start = 0; end = 0; - for (int i = 0; i < count; i++) { + for (int i = 0; i < NUM_TESTS; i++) { for (;;) { try { Future retFuture = adfs.getAclStatus(paths[i]); @@ -185,13 +198,23 @@ public class TestAsyncDFS { } } } - waitForAclReturnValues(getAclRetFutures, end, count, paths, + waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths, expectedAclSpec); } catch (Exception e) { throw e; } } + static void waitForReturnValues(final Map> retFutures, + final int start, final int end) + throws InterruptedException, ExecutionException { + LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); + for (int i = start; i < end; i++) { + LOG.info("calling Future#get #" + i); + retFutures.get(i).get(); + } + } + private void waitForAclReturnValues( final Map> aclRetFutures, final int start, final int end) throws InterruptedException, ExecutionException { @@ -266,9 +289,12 @@ public class TestAsyncDFS { final Path parent = new Path("/test/async_api_exception/"); final Path aclDir = new Path(parent, "aclDir"); - fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770)); + final Path src = new Path(parent, "src"); + final Path dst = new Path(parent, "dst"); + fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700)); + fs.mkdirs(src); - AsyncDistributedFileSystem adfs = ugi1 + AsyncDistributedFileSystem adfs1 = ugi1 .doAs(new PrivilegedExceptionAction() { @Override public AsyncDistributedFileSystem run() throws Exception { @@ -277,9 +303,36 @@ public class TestAsyncDFS { }); Future retFuture; + // test rename + try { + retFuture = adfs1.rename(src, dst, Rename.OVERWRITE); + retFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the path parent", e + .getMessage().contains(src.getParent().toUri().getPath())); + } + + // test setPermission + FsPermission fsPerm = new FsPermission(permGenerator.next()); + try { + retFuture = adfs1.setPermission(src, fsPerm); + retFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src, user1); + } + + // test setOwner + try { + retFuture = adfs1.setOwner(src, "user1", "group2"); + retFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src, user1); + } + // test setAcl try { - retFuture = adfs.setAcl(aclDir, + retFuture = adfs1.setAcl(aclDir, Lists.newArrayList(aclEntry(ACCESS, USER, ALL))); retFuture.get(); fail("setAcl should fail with permission denied"); @@ -289,7 +342,7 @@ public class TestAsyncDFS { // test getAclStatus try { - Future aclRetFuture = adfs.getAclStatus(aclDir); + Future aclRetFuture = adfs1.getAclStatus(aclDir); aclRetFuture.get(); fail("getAclStatus should fail with permission denied"); } catch (ExecutionException e) { @@ -307,4 +360,148 @@ public class TestAsyncDFS { assertTrue("Permission denied messages must carry the name of the path", e.getMessage().contains(dir.getName())); } + + + @Test(timeout = 120000) + public void testConcurrentAsyncAPI() throws Exception { + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + + // create fake mapping for the groups + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + + // prepare for test + final Path parent = new Path( + String.format("/test/%s/", "testConcurrentAsyncAPI")); + final Path[] srcs = new Path[NUM_TESTS]; + final Path[] dsts = new Path[NUM_TESTS]; + short[] permissions = new short[NUM_TESTS]; + for (int i = 0; i < NUM_TESTS; i++) { + srcs[i] = new Path(parent, "src" + i); + dsts[i] = new Path(parent, "dst" + i); + DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); + assertTrue(fs.exists(srcs[i])); + assertTrue(fs.getFileStatus(srcs[i]).isFile()); + assertTrue(fs.exists(dsts[i])); + assertTrue(fs.getFileStatus(dsts[i]).isFile()); + permissions[i] = permGenerator.next(); + } + + Map> renameRetFutures = + new HashMap>(); + Map> permRetFutures = + new HashMap>(); + Map> ownerRetFutures = + new HashMap>(); + int start = 0, end = 0; + // test rename + for (int i = 0; i < NUM_TESTS; i++) { + for (;;) { + try { + Future returnFuture = adfs.rename(srcs[i], dsts[i], + Rename.OVERWRITE); + renameRetFutures.put(i, returnFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(renameRetFutures, start, end); + } + } + } + + // wait for completing the calls + waitForAclReturnValues(renameRetFutures, end, NUM_TESTS); + + // verify the src should not exist, dst should + for (int i = 0; i < NUM_TESTS; i++) { + assertFalse(fs.exists(srcs[i])); + assertTrue(fs.exists(dsts[i])); + } + + // test permissions + for (int i = 0; i < NUM_TESTS; i++) { + for (;;) { + try { + Future retFuture = adfs.setPermission(dsts[i], + new FsPermission(permissions[i])); + permRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(permRetFutures, start, end); + } + } + } + // wait for completing the calls + waitForAclReturnValues(permRetFutures, end, NUM_TESTS); + + // verify the permission + for (int i = 0; i < NUM_TESTS; i++) { + assertTrue(fs.exists(dsts[i])); + FsPermission fsPerm = new FsPermission(permissions[i]); + checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction()); + } + + // test setOwner + start = 0; + end = 0; + for (int i = 0; i < NUM_TESTS; i++) { + for (;;) { + try { + Future retFuture = adfs.setOwner(dsts[i], "user1", "group2"); + ownerRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(ownerRetFutures, start, end); + } + } + } + // wait for completing the calls + waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS); + + // verify the owner + for (int i = 0; i < NUM_TESTS; i++) { + assertTrue(fs.exists(dsts[i])); + assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner())); + assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup())); + } + } + + static void checkAccessPermissions(FileStatus stat, FsAction mode) + throws IOException { + checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); + } + + static void checkAccessPermissions(final UserGroupInformation ugi, + FileStatus stat, FsAction mode) throws IOException { + FsPermission perm = stat.getPermission(); + String user = ugi.getShortUserName(); + List groups = Arrays.asList(ugi.getGroupNames()); + + if (user.equals(stat.getOwner())) { + if (perm.getUserAction().implies(mode)) { + return; + } + } else if (groups.contains(stat.getGroup())) { + if (perm.getGroupAction().implies(mode)) { + return; + } + } else { + if (perm.getOtherAction().implies(mode)) { + return; + } + } + throw new AccessControlException(String.format( + "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat + .getPath(), stat.getOwner(), stat.getGroup(), + stat.isDirectory() ? "d" : "-", perm)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index 03c8151a457..8d3e509d9dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -34,521 +31,157 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class TestAsyncDFSRename { public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - private final long seed = Time.now(); - private final Random r = new Random(seed); - private final PermissionGenerator permGenerator = new PermissionGenerator(r); - private final short replFactor = 2; + private final short replFactor = 1; private final long blockSize = 512; private long fileLen = blockSize * 3; + private static final int NUM_TESTS = 50; + private static final int NUM_NN_HANDLER = 10; + private static final int ASYNC_CALL_LIMIT = 1000; - /** - * Check the blocks of dst file are cleaned after rename with overwrite - * Restart NN to check the rename successfully - */ - @Test(timeout = 60000) - public void testAsyncRenameWithOverwrite() throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( - replFactor).build(); + private Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fs; + private AsyncDistributedFileSystem adfs; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + // set the limit of max async calls + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + ASYNC_CALL_LIMIT); + // set server handlers + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + fs = FileSystem.get(conf); + adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); + } - try { - String src = "/foo/src"; - String dst = "/foo/dst"; - String src2 = "/foo/src2"; - String dst2 = "/foo/dst2"; - Path srcPath = new Path(src); - Path dstPath = new Path(dst); - Path srcPath2 = new Path(src2); - Path dstPath2 = new Path(dst2); - - DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1); - - LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( - cluster.getNameNode(), dst, 0, fileLen); - LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations( - cluster.getNameNode(), dst2, 0, fileLen); - BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode()) - .getBlockManager(); - assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) != null); - assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) != null); - - Future retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE); - Future retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE); - retVal1.get(); - retVal2.get(); - - assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) == null); - assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() - .getLocalBlock()) == null); - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - assertFalse(dfs.exists(srcPath)); - assertTrue(dfs.exists(dstPath)); - assertFalse(dfs.exists(srcPath2)); - assertTrue(dfs.exists(dstPath2)); - } finally { - if (dfs != null) { - dfs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } + @After + public void tearDown() throws IOException { + if (fs != null) { + fs.close(); + fs = null; + } + if (cluster != null) { + cluster.shutdown(); + cluster = null; } } @Test(timeout = 60000) public void testCallGetReturnValueMultipleTimes() throws Exception { - final Path renameDir = new Path( - "/test/testCallGetReturnValueMultipleTimes/"); - final Configuration conf = new HdfsConfiguration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - final int count = 100; - final Map> returnFutures = new HashMap>(); + final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/"); + assertTrue(fs.mkdirs(parent)); - assertTrue(dfs.mkdirs(renameDir)); + // prepare test + final Path[] srcs = new Path[NUM_TESTS]; + final Path[] dsts = new Path[NUM_TESTS]; + for (int i = 0; i < NUM_TESTS; i++) { + srcs[i] = new Path(parent, "src" + i); + dsts[i] = new Path(parent, "dst" + i); + DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); + } - try { - // concurrently invoking many rename - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); - Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFutures.put(i, returnFuture); - } + // concurrently invoking many rename + final Map> reFutures = + new HashMap>(); + for (int i = 0; i < NUM_TESTS; i++) { + Future retFuture = adfs.rename(srcs[i], dsts[i], + Rename.OVERWRITE); + reFutures.put(i, retFuture); + } - for (int i = 0; i < 5; i++) { - verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster, - renameDir, dfs); - } - } finally { - if (dfs != null) { - dfs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } + assertEquals(NUM_TESTS, reFutures.size()); + + for (int i = 0; i < 5; i++) { + verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts); } } private void verifyCallGetReturnValueMultipleTimes( - Map> returnFutures, int count, - MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs) + final Map> reFutures, final Path[] srcs, + final Path[] dsts) throws InterruptedException, ExecutionException, IOException { + // wait for completing the calls - for (int i = 0; i < count; i++) { - returnFutures.get(i).get(); - } + waitForReturnValues(reFutures, 0, NUM_TESTS); - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src dir should not exist, dst should - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - assertFalse(dfs.exists(src)); - assertTrue(dfs.exists(dst)); - } - } - - @Test - public void testConservativeConcurrentAsyncRenameWithOverwrite() - throws Exception { - internalTestConcurrentAsyncRenameWithOverwrite(100, - "testAggressiveConcurrentAsyncRenameWithOverwrite"); + // verify the src dir should not exist, dst should + verifyRenames(srcs, dsts); } @Test(timeout = 60000) - public void testAggressiveConcurrentAsyncRenameWithOverwrite() - throws Exception { - internalTestConcurrentAsyncRenameWithOverwrite(10000, - "testConservativeConcurrentAsyncRenameWithOverwrite"); - } + public void testConcurrentAsyncRename() throws Exception { + final Path parent = new Path( + String.format("/test/%s/", "testConcurrentAsyncRename")); + assertTrue(fs.mkdirs(parent)); - private void internalTestConcurrentAsyncRenameWithOverwrite( - final int asyncCallLimit, final String basePath) throws Exception { - final Path renameDir = new Path(String.format("/test/%s/", basePath)); - Configuration conf = new HdfsConfiguration(); - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - asyncCallLimit); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) - .build(); - cluster.waitActive(); - DistributedFileSystem dfs = cluster.getFileSystem(); - AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - int count = 1000; - int start = 0, end = 0; - Map> returnFutures = new HashMap>(); - - assertTrue(dfs.mkdirs(renameDir)); - - try { - // concurrently invoking many rename - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); - DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); - for (;;) { - try { - LOG.info("rename #" + i); - Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); - returnFutures.put(i, returnFuture); - break; - } catch (AsyncCallLimitExceededException e) { - /** - * reached limit of async calls, fetch results of finished async - * calls to let follow-on calls go - */ - LOG.error(e); - start = end; - end = i; - LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); - waitForReturnValues(returnFutures, start, end); - } - } - } - - // wait for completing the calls - for (int i = start; i < count; i++) { - returnFutures.get(i).get(); - } - - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src dir should not exist, dst should - for (int i = 0; i < count; i++) { - Path src = new Path(renameDir, "src" + i); - Path dst = new Path(renameDir, "dst" + i); - assertFalse(dfs.exists(src)); - assertTrue(dfs.exists(dst)); - } - } finally { - if (dfs != null) { - dfs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - } - - private void waitForReturnValues( - final Map> returnFutures, final int start, - final int end) throws InterruptedException, ExecutionException { - LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); - for (int i = start; i < end; i++) { - LOG.info("calling Future#get #" + i); - returnFutures.get(i).get(); - } - } - - @Test - public void testConservativeConcurrentAsyncAPI() throws Exception { - internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI"); - } - - @Test(timeout = 60000) - public void testAggressiveConcurrentAsyncAPI() throws Exception { - internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI"); - } - - private void internalTestConcurrentAsyncAPI(final int asyncCallLimit, - final String basePath) throws Exception { - Configuration conf = new HdfsConfiguration(); - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - int count = 500; - - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - // set the limit of max async calls - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - asyncCallLimit); - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // start mini cluster - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(3).build(); - cluster.waitActive(); - AsyncDistributedFileSystem adfs = cluster.getFileSystem() - .getAsyncDistributedFileSystem(); - - // prepare for test - FileSystem rootFs = FileSystem.get(conf); - final Path parent = new Path(String.format("/test/%s/", basePath)); - final Path[] srcs = new Path[count]; - final Path[] dsts = new Path[count]; - short[] permissions = new short[count]; - for (int i = 0; i < count; i++) { + // prepare test + final Path[] srcs = new Path[NUM_TESTS]; + final Path[] dsts = new Path[NUM_TESTS]; + for (int i = 0; i < NUM_TESTS; i++) { srcs[i] = new Path(parent, "src" + i); dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1); - assertTrue(rootFs.exists(srcs[i])); - assertTrue(rootFs.getFileStatus(srcs[i]).isFile()); - assertTrue(rootFs.exists(dsts[i])); - assertTrue(rootFs.getFileStatus(dsts[i]).isFile()); - permissions[i] = permGenerator.next(); + DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); } - Map> renameRetFutures = - new HashMap>(); - Map> permRetFutures = - new HashMap>(); - Map> ownerRetFutures = - new HashMap>(); + // concurrently invoking many rename int start = 0, end = 0; - // test rename - for (int i = 0; i < count; i++) { + Map> retFutures = + new HashMap>(); + for (int i = 0; i < NUM_TESTS; i++) { for (;;) { try { - Future returnFuture = adfs.rename(srcs[i], dsts[i], + LOG.info("rename #" + i); + Future retFuture = adfs.rename(srcs[i], dsts[i], Rename.OVERWRITE); - renameRetFutures.put(i, returnFuture); + retFutures.put(i, retFuture); break; } catch (AsyncCallLimitExceededException e) { + /** + * reached limit of async calls, fetch results of finished async calls + * to let follow-on calls go + */ + LOG.error(e); start = end; end = i; - waitForReturnValues(renameRetFutures, start, end); + LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); + waitForReturnValues(retFutures, start, end); } } } // wait for completing the calls - for (int i = start; i < count; i++) { - renameRetFutures.get(i).get(); - } + waitForReturnValues(retFutures, end, NUM_TESTS); - // Restart NN and check the rename successfully - cluster.restartNameNodes(); - - // very the src should not exist, dst should - for (int i = 0; i < count; i++) { - assertFalse(rootFs.exists(srcs[i])); - assertTrue(rootFs.exists(dsts[i])); - } - - // test permissions - try { - for (int i = 0; i < count; i++) { - for (;;) { - try { - Future retFuture = adfs.setPermission(dsts[i], - new FsPermission(permissions[i])); - permRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(permRetFutures, start, end); - } - } - } - // wait for completing the calls - for (int i = start; i < count; i++) { - permRetFutures.get(i).get(); - } - - // Restart NN and check permission then - cluster.restartNameNodes(); - - // verify the permission - for (int i = 0; i < count; i++) { - assertTrue(rootFs.exists(dsts[i])); - FsPermission fsPerm = new FsPermission(permissions[i]); - checkAccessPermissions(rootFs.getFileStatus(dsts[i]), - fsPerm.getUserAction()); - } - - // test setOwner - start = 0; - end = 0; - for (int i = 0; i < count; i++) { - for (;;) { - try { - Future retFuture = adfs.setOwner(dsts[i], "user1", - "group2"); - ownerRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(ownerRetFutures, start, end); - } - } - } - // wait for completing the calls - for (int i = start; i < count; i++) { - ownerRetFutures.get(i).get(); - } - - // Restart NN and check owner then - cluster.restartNameNodes(); - - // verify the owner - for (int i = 0; i < count; i++) { - assertTrue(rootFs.exists(dsts[i])); - assertTrue( - "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner())); - assertTrue( - "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup())); - } - } catch (AccessControlException ace) { - throw ace; - } finally { - if (rootFs != null) { - rootFs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } + // verify the src dir should not exist, dst should + verifyRenames(srcs, dsts); } - static void checkAccessPermissions(FileStatus stat, FsAction mode) + private void verifyRenames(final Path[] srcs, final Path[] dsts) throws IOException { - checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); + for (int i = 0; i < NUM_TESTS; i++) { + assertFalse(fs.exists(srcs[i])); + assertTrue(fs.exists(dsts[i])); + } } - static void checkAccessPermissions(final UserGroupInformation ugi, - FileStatus stat, FsAction mode) throws IOException { - FsPermission perm = stat.getPermission(); - String user = ugi.getShortUserName(); - List groups = Arrays.asList(ugi.getGroupNames()); - - if (user.equals(stat.getOwner())) { - if (perm.getUserAction().implies(mode)) { - return; - } - } else if (groups.contains(stat.getGroup())) { - if (perm.getGroupAction().implies(mode)) { - return; - } - } else { - if (perm.getOtherAction().implies(mode)) { - return; - } - } - throw new AccessControlException(String.format( - "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat - .getPath(), stat.getOwner(), stat.getGroup(), - stat.isDirectory() ? "d" : "-", perm)); - } - - @Test(timeout = 60000) - public void testAsyncAPIWithException() throws Exception { - Configuration conf = new HdfsConfiguration(); - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - UserGroupInformation ugi1; - - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // Initiate all four users - ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { - group1, group2 }); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(3).build(); - cluster.waitActive(); - - FileSystem rootFs = FileSystem.get(conf); - final Path renameDir = new Path("/test/async_api_exception/"); - final Path src = new Path(renameDir, "src"); - final Path dst = new Path(renameDir, "dst"); - rootFs.mkdirs(src); - - AsyncDistributedFileSystem adfs = ugi1 - .doAs(new PrivilegedExceptionAction() { - @Override - public AsyncDistributedFileSystem run() throws Exception { - return cluster.getFileSystem().getAsyncDistributedFileSystem(); - } - }); - - Future retFuture; - try { - retFuture = adfs.rename(src, dst, Rename.OVERWRITE); - retFuture.get(); - } catch (ExecutionException e) { - TestAsyncDFS.checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(src.getParent().toUri().getPath())); - } - - FsPermission fsPerm = new FsPermission(permGenerator.next()); - try { - retFuture = adfs.setPermission(src, fsPerm); - retFuture.get(); - } catch (ExecutionException e) { - TestAsyncDFS.checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the name of the path", - e.getMessage().contains(src.getName())); - } - - try { - retFuture = adfs.setOwner(src, "user1", "group2"); - retFuture.get(); - } catch (ExecutionException e) { - TestAsyncDFS.checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the name of the path", - e.getMessage().contains(src.getName())); - } finally { - if (rootFs != null) { - rootFs.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } + void waitForReturnValues(final Map> retFutures, + final int start, final int end) + throws InterruptedException, ExecutionException { + TestAsyncDFS.waitForReturnValues(retFutures, start, end); } } \ No newline at end of file