diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java deleted file mode 100644 index 824336a2922..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.Future; - -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; -import org.apache.hadoop.io.retry.AsyncCallHandler; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; -import org.apache.hadoop.ipc.Client; - -/**************************************************************** - * Implementation of the asynchronous distributed file system. - * This instance of this class is the way end-user code interacts - * with a Hadoop DistributedFileSystem in an asynchronous manner. - * - * This class is unstable, so no guarantee is provided as to reliability, - * stability or compatibility across any level of release granularity. - * - *****************************************************************/ -@Unstable -public class AsyncDistributedFileSystem { - - private final DistributedFileSystem dfs; - - AsyncDistributedFileSystem(final DistributedFileSystem dfs) { - this.dfs = dfs; - } - - private static Future getReturnValue() { - return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); - } - - /** - * Renames Path src to Path dst - * - *

- * If OVERWRITE option is not passed as an argument, rename fails if the dst - * already exists. - *

- * If OVERWRITE option is passed as an argument, rename overwrites the dst if - * it is a file or an empty directory. Rename fails if dst is a non-empty - * directory. - *

- * Note that atomicity of rename is dependent on the file system - * implementation. Please refer to the file system documentation for details. - * This default implementation is non atomic. - * - * @param src - * path to be renamed - * @param dst - * new path after rename - * @throws IOException - * on failure - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future rename(Path src, Path dst, - final Options.Rename... options) throws IOException { - dfs.getFsStatistics().incrementWriteOps(1); - dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME); - - final Path absSrc = dfs.fixRelativePart(src); - final Path absDst = dfs.fixRelativePart(dst); - - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst), - options); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } - - /** - * Set permission of a path. - * - * @param p - * the path the permission is set to - * @param permission - * the permission that is set to a path. - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future setPermission(Path p, final FsPermission permission) - throws IOException { - dfs.getFsStatistics().incrementWriteOps(1); - dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION); - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().setPermission(dfs.getPathName(absPath), permission); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } - - /** - * Set owner of a path (i.e. a file or a directory). The parameters username - * and groupname cannot both be null. - * - * @param p - * The path - * @param username - * If it is null, the original username remains unchanged. - * @param groupname - * If it is null, the original groupname remains unchanged. - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future setOwner(Path p, String username, String groupname) - throws IOException { - if (username == null && groupname == null) { - throw new IOException("username == null && groupname == null"); - } - - dfs.getFsStatistics().incrementWriteOps(1); - dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER); - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().setOwner(dfs.getPathName(absPath), username, groupname); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } - - /** - * Fully replaces ACL of files and directories, discarding all existing - * entries. - * - * @param p - * Path to modify - * @param aclSpec - * List describing modifications, must include entries for - * user, group, and others for compatibility with permission bits. - * @throws IOException - * if an ACL could not be modified - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future setAcl(Path p, final List aclSpec) - throws IOException { - dfs.getFsStatistics().incrementWriteOps(1); - dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL); - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } - - /** - * Gets the ACL of a file or directory. - * - * @param p - * Path to get - * @return AclStatus describing the ACL of the file or directory - * @throws IOException - * if an ACL could not be read - * @return an instance of Future, #get of which is invoked to wait for - * asynchronous call being finished. - */ - public Future getAclStatus(Path p) throws IOException { - final Path absPath = dfs.fixRelativePart(p); - final boolean isAsync = Client.isAsynchronousMode(); - Client.setAsynchronousMode(true); - try { - dfs.getClient().getAclStatus(dfs.getPathName(absPath)); - return getReturnValue(); - } finally { - Client.setAsynchronousMode(isAsync); - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 66ee42fea7f..24ffb40b270 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -31,7 +31,6 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; @@ -2511,15 +2510,6 @@ public class DistributedFileSystem extends FileSystem { return ret; } - private final AsyncDistributedFileSystem adfs = - new AsyncDistributedFileSystem(this); - - /** @return an {@link AsyncDistributedFileSystem} object. */ - @Unstable - public AsyncDistributedFileSystem getAsyncDistributedFileSystem() { - return adfs; - } - @Override protected Path fixRelativePart(Path p) { return super.fixRelativePart(p); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java deleted file mode 100644 index 6a602900b2d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java +++ /dev/null @@ -1,454 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS; -import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; -import static org.apache.hadoop.fs.permission.AclEntryType.GROUP; -import static org.apache.hadoop.fs.permission.AclEntryType.MASK; -import static org.apache.hadoop.fs.permission.AclEntryType.OTHER; -import static org.apache.hadoop.fs.permission.AclEntryType.USER; -import static org.apache.hadoop.fs.permission.FsAction.ALL; -import static org.apache.hadoop.fs.permission.FsAction.NONE; -import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; -import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; -import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; -import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; -import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -/** - * Unit tests for asynchronous distributed filesystem. - * */ -public class TestAsyncDFS { - public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); - private final short replFactor = 1; - private final long blockSize = 512; - private long fileLen = 0; - private final long seed = Time.now(); - private final Random r = new Random(seed); - private final PermissionGenerator permGenerator = new PermissionGenerator(r); - private static final int NUM_TESTS = 50; - private static final int NUM_NN_HANDLER = 10; - private static final int ASYNC_CALL_LIMIT = 1000; - - private Configuration conf; - private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private AsyncDistributedFileSystem adfs; - - @Before - public void setup() throws IOException { - conf = new HdfsConfiguration(); - // explicitly turn on acl - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); - // set the limit of max async calls - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - ASYNC_CALL_LIMIT); - // set server handlers - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - adfs = fs.getAsyncDistributedFileSystem(); - } - - @After - public void tearDown() throws IOException { - if (fs != null) { - fs.close(); - fs = null; - } - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @Test(timeout=60000) - public void testBatchAsyncAcl() throws Exception { - final String basePath = "testBatchAsyncAcl"; - final Path parent = new Path(String.format("/test/%s/", basePath)); - - // prepare test - final Path[] paths = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - paths[i] = new Path(parent, "acl" + i); - FileSystem.mkdirs(fs, paths[i], - FsPermission.createImmutable((short) 0750)); - assertTrue(fs.exists(paths[i])); - assertTrue(fs.getFileStatus(paths[i]).isDirectory()); - } - - final List aclSpec = getAclSpec(); - final AclEntry[] expectedAclSpec = getExpectedAclSpec(); - Map> setAclRetFutures = - new HashMap>(); - Map> getAclRetFutures = - new HashMap>(); - int start = 0, end = 0; - try { - // test setAcl - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.setAcl(paths[i], aclSpec); - setAclRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForAclReturnValues(setAclRetFutures, start, end); - } - } - } - waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS); - - // test getAclStatus - start = 0; - end = 0; - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.getAclStatus(paths[i]); - getAclRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForAclReturnValues(getAclRetFutures, start, end, paths, - expectedAclSpec); - } - } - } - waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths, - expectedAclSpec); - } catch (Exception e) { - throw e; - } - } - - static void waitForReturnValues(final Map> retFutures, - final int start, final int end) - throws InterruptedException, ExecutionException { - LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); - for (int i = start; i < end; i++) { - LOG.info("calling Future#get #" + i); - retFutures.get(i).get(); - } - } - - private void waitForAclReturnValues( - final Map> aclRetFutures, final int start, - final int end) throws InterruptedException, ExecutionException { - for (int i = start; i < end; i++) { - aclRetFutures.get(i).get(); - } - } - - private void waitForAclReturnValues( - final Map> aclRetFutures, final int start, - final int end, final Path[] paths, final AclEntry[] expectedAclSpec) - throws InterruptedException, ExecutionException, IOException { - for (int i = start; i < end; i++) { - AclStatus aclStatus = aclRetFutures.get(i).get(); - verifyGetAcl(aclStatus, expectedAclSpec, paths[i]); - } - } - - private void verifyGetAcl(final AclStatus aclStatus, - final AclEntry[] expectedAclSpec, final Path path) throws IOException { - if (aclStatus == null) { - return; - } - - // verify permission and acl - AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]); - assertArrayEquals(expectedAclSpec, returned); - assertPermission(path, (short) 010770); - FSAclBaseTest.assertAclFeature(cluster, path, true); - } - - private List getAclSpec() { - return Lists.newArrayList( - aclEntry(ACCESS, USER, ALL), - aclEntry(ACCESS, USER, "foo", ALL), - aclEntry(ACCESS, GROUP, READ_EXECUTE), - aclEntry(ACCESS, OTHER, NONE), - aclEntry(DEFAULT, USER, "foo", ALL)); - } - - private AclEntry[] getExpectedAclSpec() { - return new AclEntry[] { - aclEntry(ACCESS, USER, "foo", ALL), - aclEntry(ACCESS, GROUP, READ_EXECUTE), - aclEntry(DEFAULT, USER, ALL), - aclEntry(DEFAULT, USER, "foo", ALL), - aclEntry(DEFAULT, GROUP, READ_EXECUTE), - aclEntry(DEFAULT, MASK, ALL), - aclEntry(DEFAULT, OTHER, NONE) }; - } - - private void assertPermission(final Path pathToCheck, final short perm) - throws IOException { - AclTestHelpers.assertPermission(fs, pathToCheck, perm); - } - - @Test(timeout=60000) - public void testAsyncAPIWithException() throws Exception { - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - UserGroupInformation ugi1; - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // Initiate all four users - ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { - group1, group2 }); - - final Path parent = new Path("/test/async_api_exception/"); - final Path aclDir = new Path(parent, "aclDir"); - final Path src = new Path(parent, "src"); - final Path dst = new Path(parent, "dst"); - fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700)); - fs.mkdirs(src); - - AsyncDistributedFileSystem adfs1 = ugi1 - .doAs(new PrivilegedExceptionAction() { - @Override - public AsyncDistributedFileSystem run() throws Exception { - return cluster.getFileSystem().getAsyncDistributedFileSystem(); - } - }); - - Future retFuture; - // test rename - try { - retFuture = adfs1.rename(src, dst, Rename.OVERWRITE); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(src.getParent().toUri().getPath())); - } - - // test setPermission - FsPermission fsPerm = new FsPermission(permGenerator.next()); - try { - retFuture = adfs1.setPermission(src, fsPerm); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - } - - // test setOwner - try { - retFuture = adfs1.setOwner(src, "user1", "group2"); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - } - - // test setAcl - try { - retFuture = adfs1.setAcl(aclDir, - Lists.newArrayList(aclEntry(ACCESS, USER, ALL))); - retFuture.get(); - fail("setAcl should fail with permission denied"); - } catch (ExecutionException e) { - checkPermissionDenied(e, aclDir, user1); - } - - // test getAclStatus - try { - Future aclRetFuture = adfs1.getAclStatus(aclDir); - aclRetFuture.get(); - fail("getAclStatus should fail with permission denied"); - } catch (ExecutionException e) { - checkPermissionDenied(e, aclDir, user1); - } - } - - public static void checkPermissionDenied(final Exception e, final Path dir, - final String user) { - assertTrue(e.getCause() instanceof RemoteException); - assertTrue("Permission denied messages must carry AccessControlException", - e.getMessage().contains("AccessControlException")); - assertTrue("Permission denied messages must carry the username", e - .getMessage().contains(user)); - assertTrue("Permission denied messages must carry the name of the path", - e.getMessage().contains(dir.getName())); - } - - - @Test(timeout = 120000) - public void testConcurrentAsyncAPI() throws Exception { - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // prepare for test - final Path parent = new Path( - String.format("/test/%s/", "testConcurrentAsyncAPI")); - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - short[] permissions = new short[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); - assertTrue(fs.exists(srcs[i])); - assertTrue(fs.getFileStatus(srcs[i]).isFile()); - assertTrue(fs.exists(dsts[i])); - assertTrue(fs.getFileStatus(dsts[i]).isFile()); - permissions[i] = permGenerator.next(); - } - - Map> renameRetFutures = - new HashMap>(); - Map> permRetFutures = - new HashMap>(); - Map> ownerRetFutures = - new HashMap>(); - int start = 0, end = 0; - // test rename - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future returnFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - renameRetFutures.put(i, returnFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(renameRetFutures, start, end); - } - } - } - - // wait for completing the calls - waitForAclReturnValues(renameRetFutures, end, NUM_TESTS); - - // verify the src should not exist, dst should - for (int i = 0; i < NUM_TESTS; i++) { - assertFalse(fs.exists(srcs[i])); - assertTrue(fs.exists(dsts[i])); - } - - // test permissions - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.setPermission(dsts[i], - new FsPermission(permissions[i])); - permRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(permRetFutures, start, end); - } - } - } - // wait for completing the calls - waitForAclReturnValues(permRetFutures, end, NUM_TESTS); - - // verify the permission - for (int i = 0; i < NUM_TESTS; i++) { - assertTrue(fs.exists(dsts[i])); - FsPermission fsPerm = new FsPermission(permissions[i]); - fs.access(dsts[i], fsPerm.getUserAction()); - } - - // test setOwner - start = 0; - end = 0; - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.setOwner(dsts[i], "user1", "group2"); - ownerRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(ownerRetFutures, start, end); - } - } - } - // wait for completing the calls - waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS); - - // verify the owner - for (int i = 0; i < NUM_TESTS; i++) { - assertTrue(fs.exists(dsts[i])); - assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner())); - assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup())); - } - } - - @Test - public void testAsyncWithoutRetry() throws Exception { - TestAsyncHDFSWithHA.runTestAsyncWithoutRetry(conf, cluster, fs); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java deleted file mode 100644 index 8d3e509d9dc..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestAsyncDFSRename { - public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - private final short replFactor = 1; - private final long blockSize = 512; - private long fileLen = blockSize * 3; - private static final int NUM_TESTS = 50; - private static final int NUM_NN_HANDLER = 10; - private static final int ASYNC_CALL_LIMIT = 1000; - - private Configuration conf; - private MiniDFSCluster cluster; - private FileSystem fs; - private AsyncDistributedFileSystem adfs; - - @Before - public void setup() throws IOException { - conf = new HdfsConfiguration(); - // set the limit of max async calls - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - ASYNC_CALL_LIMIT); - // set server handlers - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = FileSystem.get(conf); - adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); - } - - @After - public void tearDown() throws IOException { - if (fs != null) { - fs.close(); - fs = null; - } - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - @Test(timeout = 60000) - public void testCallGetReturnValueMultipleTimes() throws Exception { - final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/"); - assertTrue(fs.mkdirs(parent)); - - // prepare test - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); - } - - // concurrently invoking many rename - final Map> reFutures = - new HashMap>(); - for (int i = 0; i < NUM_TESTS; i++) { - Future retFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - reFutures.put(i, retFuture); - } - - assertEquals(NUM_TESTS, reFutures.size()); - - for (int i = 0; i < 5; i++) { - verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts); - } - } - - private void verifyCallGetReturnValueMultipleTimes( - final Map> reFutures, final Path[] srcs, - final Path[] dsts) - throws InterruptedException, ExecutionException, IOException { - - // wait for completing the calls - waitForReturnValues(reFutures, 0, NUM_TESTS); - - // verify the src dir should not exist, dst should - verifyRenames(srcs, dsts); - } - - @Test(timeout = 60000) - public void testConcurrentAsyncRename() throws Exception { - final Path parent = new Path( - String.format("/test/%s/", "testConcurrentAsyncRename")); - assertTrue(fs.mkdirs(parent)); - - // prepare test - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); - } - - // concurrently invoking many rename - int start = 0, end = 0; - Map> retFutures = - new HashMap>(); - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - LOG.info("rename #" + i); - Future retFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - retFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - /** - * reached limit of async calls, fetch results of finished async calls - * to let follow-on calls go - */ - LOG.error(e); - start = end; - end = i; - LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); - waitForReturnValues(retFutures, start, end); - } - } - } - - // wait for completing the calls - waitForReturnValues(retFutures, end, NUM_TESTS); - - // verify the src dir should not exist, dst should - verifyRenames(srcs, dsts); - } - - private void verifyRenames(final Path[] srcs, final Path[] dsts) - throws IOException { - for (int i = 0; i < NUM_TESTS; i++) { - assertFalse(fs.exists(srcs[i])); - assertTrue(fs.exists(dsts[i])); - } - } - - void waitForReturnValues(final Map> retFutures, - final int start, final int end) - throws InterruptedException, ExecutionException { - TestAsyncDFS.waitForReturnValues(retFutures, start, end); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java deleted file mode 100644 index 9ade8ec31f6..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncHDFSWithHA.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.io.retry.AsyncCallHandler; -import org.apache.hadoop.io.retry.RetryInvocationHandler; -import org.apache.hadoop.ipc.Client; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.concurrent.AsyncGetFuture; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** Test async methods with HA setup. */ -public class TestAsyncHDFSWithHA { - static final Logger LOG = LoggerFactory.getLogger(TestAsyncHDFSWithHA.class); - static { - GenericTestUtils.setLogLevel(RetryInvocationHandler.LOG, Level.ALL); - } - - private static Future getReturnValue() { - return new AsyncGetFuture<>(AsyncCallHandler.getAsyncReturn()); - } - - static void mkdirs(DistributedFileSystem dfs, String dir, Path[] srcs, - Path[] dsts) throws IOException { - for (int i = 0; i < srcs.length; i++) { - srcs[i] = new Path(dir, "src" + i); - dsts[i] = new Path(dir, "dst" + i); - dfs.mkdirs(srcs[i]); - } - } - - static void runTestAsyncWithoutRetry(Configuration conf, - MiniDFSCluster cluster, DistributedFileSystem dfs) throws Exception { - final int num = 5; - - final String renameDir = "/testAsyncWithoutRetry/"; - final Path[] srcs = new Path[num + 1]; - final Path[] dsts = new Path[num + 1]; - mkdirs(dfs, renameDir, srcs, dsts); - - // create a proxy without retry. - final NameNodeProxiesClient.ProxyAndInfo proxyInfo - = NameNodeProxies.createNonHAProxy(conf, - cluster.getNameNode(0).getNameNodeAddress(), - ClientProtocol.class, UserGroupInformation.getCurrentUser(), - false); - final ClientProtocol cp = proxyInfo.getProxy(); - - // submit async calls - Client.setAsynchronousMode(true); - final List> results = new ArrayList<>(); - for (int i = 0; i < num; i++) { - final String src = srcs[i].toString(); - final String dst = dsts[i].toString(); - LOG.info(i + ") rename " + src + " -> " + dst); - cp.rename2(src, dst); - results.add(getReturnValue()); - } - Client.setAsynchronousMode(false); - - // wait for the async calls - for (Future f : results) { - f.get(); - } - - //check results - for (int i = 0; i < num; i++) { - Assert.assertEquals(false, dfs.exists(srcs[i])); - Assert.assertEquals(true, dfs.exists(dsts[i])); - } - } - - /** Testing HDFS async methods with HA setup. */ - @Test(timeout = 120000) - public void testAsyncWithHAFailover() throws Exception { - final int num = 10; - - final Configuration conf = new HdfsConfiguration(); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(0).build(); - - try { - cluster.waitActive(); - cluster.transitionToActive(0); - - final DistributedFileSystem dfs = HATestUtil.configureFailoverFs( - cluster, conf); - runTestAsyncWithoutRetry(conf, cluster, dfs); - - final String renameDir = "/testAsyncWithHAFailover/"; - final Path[] srcs = new Path[num + 1]; - final Path[] dsts = new Path[num + 1]; - mkdirs(dfs, renameDir, srcs, dsts); - - // submit async calls and trigger failover in the middle. - final AsyncDistributedFileSystem adfs - = dfs.getAsyncDistributedFileSystem(); - final ExecutorService executor = Executors.newFixedThreadPool(num + 1); - - final List> results = new ArrayList<>(); - final List exceptions = new ArrayList<>(); - final List> futures = new ArrayList<>(); - final int half = num/2; - for(int i = 0; i <= num; i++) { - final int id = i; - futures.add(executor.submit(new Runnable() { - @Override - public void run() { - try { - if (id == half) { - // failover - cluster.shutdownNameNode(0); - cluster.transitionToActive(1); - } else { - // rename - results.add(adfs.rename(srcs[id], dsts[id])); - } - } catch (IOException e) { - exceptions.add(e); - } - } - })); - } - - // wait for the tasks - Assert.assertEquals(num + 1, futures.size()); - for(int i = 0; i <= num; i++) { - futures.get(i).get(); - } - // wait for the async calls - Assert.assertEquals(num, results.size()); - Assert.assertTrue(exceptions.isEmpty()); - for(Future r : results) { - r.get(); - } - - // check results - for(int i = 0; i <= num; i++) { - final boolean renamed = i != half; - Assert.assertEquals(!renamed, dfs.exists(srcs[i])); - Assert.assertEquals(renamed, dfs.exists(dsts[i])); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } -} \ No newline at end of file