diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java index 1f60df2e386..b507fa52a08 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.util.List; import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.util.concurrent.AsyncGet; import org.apache.hadoop.util.concurrent.AsyncGetFuture; @@ -85,6 +89,7 @@ public class AsyncDistributedFileSystem { public Future rename(Path src, Path dst, final Options.Rename... options) throws IOException { dfs.getFsStatistics().incrementWriteOps(1); + dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.RENAME); final Path absSrc = dfs.fixRelativePart(src); final Path absDst = dfs.fixRelativePart(dst); @@ -113,6 +118,7 @@ public class AsyncDistributedFileSystem { public Future setPermission(Path p, final FsPermission permission) throws IOException { dfs.getFsStatistics().incrementWriteOps(1); + dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_PERMISSION); final Path absPath = dfs.fixRelativePart(p); final boolean isAsync = Client.isAsynchronousMode(); Client.setAsynchronousMode(true); @@ -144,6 +150,7 @@ public class AsyncDistributedFileSystem { } dfs.getFsStatistics().incrementWriteOps(1); + dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_OWNER); final Path absPath = dfs.fixRelativePart(p); final boolean isAsync = Client.isAsynchronousMode(); Client.setAsynchronousMode(true); @@ -154,4 +161,56 @@ public class AsyncDistributedFileSystem { Client.setAsynchronousMode(isAsync); } } + + /** + * Fully replaces ACL of files and directories, discarding all existing + * entries. + * + * @param p + * Path to modify + * @param aclSpec + * List describing modifications, must include entries for + * user, group, and others for compatibility with permission bits. + * @throws IOException + * if an ACL could not be modified + * @return an instance of Future, #get of which is invoked to wait for + * asynchronous call being finished. + */ + public Future setAcl(Path p, final List aclSpec) + throws IOException { + dfs.getFsStatistics().incrementWriteOps(1); + dfs.getDFSOpsCountStatistics().incrementOpCounter(OpType.SET_ACL); + final Path absPath = dfs.fixRelativePart(p); + final boolean isAsync = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + try { + dfs.getClient().setAcl(dfs.getPathName(absPath), aclSpec); + return getReturnValue(); + } finally { + Client.setAsynchronousMode(isAsync); + } + } + + /** + * Gets the ACL of a file or directory. + * + * @param p + * Path to get + * @return AclStatus describing the ACL of the file or directory + * @throws IOException + * if an ACL could not be read + * @return an instance of Future, #get of which is invoked to wait for + * asynchronous call being finished. + */ + public Future getAclStatus(Path p) throws IOException { + final Path absPath = dfs.fixRelativePart(p); + final boolean isAsync = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + try { + dfs.getClient().getAclStatus(dfs.getPathName(absPath)); + return getReturnValue(); + } finally { + Client.setAsynchronousMode(isAsync); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2ffe11a206b..d81d8d504ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -2499,4 +2499,7 @@ public class DistributedFileSystem extends FileSystem { return statistics; } + DFSOpsCountStatistics getDFSOpsCountStatistics() { + return storageStatistics; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 849f06d0aeb..b9dcee55585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.RemoveAclRequestProto; @@ -161,7 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTim import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos; +import org.apache.hadoop.hdfs.protocol.proto.*; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; @@ -1334,7 +1335,12 @@ public class ClientNamenodeProtocolTranslatorPB implements .addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)) .build(); try { - rpcProxy.setAcl(null, req); + if (Client.isAsynchronousMode()) { + rpcProxy.setAcl(null, req); + setAsyncReturnValue(); + } else { + rpcProxy.setAcl(null, req); + } } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -1345,7 +1351,25 @@ public class ClientNamenodeProtocolTranslatorPB implements GetAclStatusRequestProto req = GetAclStatusRequestProto.newBuilder() .setSrc(src).build(); try { - return PBHelperClient.convert(rpcProxy.getAclStatus(null, req)); + if (Client.isAsynchronousMode()) { + rpcProxy.getAclStatus(null, req); + final AsyncGet asyncReturnMessage + = ProtobufRpcEngine.getAsyncReturnMessage(); + final AsyncGet asyncGet = + new AsyncGet() { + @Override + public AclStatus get(long timeout, TimeUnit unit) + throws Exception { + return PBHelperClient + .convert((GetAclStatusResponseProto) asyncReturnMessage + .get(timeout, unit)); + } + }; + ASYNC_RETURN_VALUE.set(asyncGet); + return null; + } else { + return PBHelperClient.convert(rpcProxy.getAclStatus(null, req)); + } } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java new file mode 100644 index 00000000000..67262dd4668 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.GROUP; +import static org.apache.hadoop.fs.permission.AclEntryType.MASK; +import static org.apache.hadoop.fs.permission.AclEntryType.OTHER; +import static org.apache.hadoop.fs.permission.AclEntryType.USER; +import static org.apache.hadoop.fs.permission.FsAction.ALL; +import static org.apache.hadoop.fs.permission.FsAction.NONE; +import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE; +import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; +import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; +import org.apache.hadoop.ipc.AsyncCallLimitExceededException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Unit tests for asynchronous distributed filesystem. + * */ +public class TestAsyncDFS { + public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); + private static final int NUM_TESTS = 1000; + private static final int NUM_NN_HANDLER = 10; + private static final int ASYNC_CALL_LIMIT = 100; + + private Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fs; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + // explicitly turn on acl + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // explicitly turn on ACL + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // set the limit of max async calls + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + ASYNC_CALL_LIMIT); + // set server handlers + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + fs = FileSystem.get(conf); + } + + @After + public void tearDown() throws IOException { + if (fs != null) { + fs.close(); + fs = null; + } + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + static class AclQueueEntry { + private final Object future; + private final Path path; + private final Boolean isSetAcl; + + AclQueueEntry(final Object future, final Path path, + final Boolean isSetAcl) { + this.future = future; + this.path = path; + this.isSetAcl = isSetAcl; + } + + public final Object getFuture() { + return future; + } + + public final Path getPath() { + return path; + } + + public final Boolean isSetAcl() { + return this.isSetAcl; + } + } + + @Test(timeout=60000) + public void testBatchAsyncAcl() throws Exception { + final String basePath = "testBatchAsyncAcl"; + final Path parent = new Path(String.format("/test/%s/", basePath)); + + AsyncDistributedFileSystem adfs = cluster.getFileSystem() + .getAsyncDistributedFileSystem(); + + // prepare test + int count = NUM_TESTS; + final Path[] paths = new Path[count]; + for (int i = 0; i < count; i++) { + paths[i] = new Path(parent, "acl" + i); + FileSystem.mkdirs(fs, paths[i], + FsPermission.createImmutable((short) 0750)); + assertTrue(fs.exists(paths[i])); + assertTrue(fs.getFileStatus(paths[i]).isDirectory()); + } + + final List aclSpec = getAclSpec(); + final AclEntry[] expectedAclSpec = getExpectedAclSpec(); + Map> setAclRetFutures = + new HashMap>(); + Map> getAclRetFutures = + new HashMap>(); + int start = 0, end = 0; + try { + // test setAcl + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.setAcl(paths[i], aclSpec); + setAclRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForAclReturnValues(setAclRetFutures, start, end); + } + } + } + waitForAclReturnValues(setAclRetFutures, end, count); + + // test getAclStatus + start = 0; + end = 0; + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.getAclStatus(paths[i]); + getAclRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForAclReturnValues(getAclRetFutures, start, end, paths, + expectedAclSpec); + } + } + } + waitForAclReturnValues(getAclRetFutures, end, count, paths, + expectedAclSpec); + } catch (Exception e) { + throw e; + } + } + + private void waitForAclReturnValues( + final Map> aclRetFutures, final int start, + final int end) throws InterruptedException, ExecutionException { + for (int i = start; i < end; i++) { + aclRetFutures.get(i).get(); + } + } + + private void waitForAclReturnValues( + final Map> aclRetFutures, final int start, + final int end, final Path[] paths, final AclEntry[] expectedAclSpec) + throws InterruptedException, ExecutionException, IOException { + for (int i = start; i < end; i++) { + AclStatus aclStatus = aclRetFutures.get(i).get(); + verifyGetAcl(aclStatus, expectedAclSpec, paths[i]); + } + } + + private void verifyGetAcl(final AclStatus aclStatus, + final AclEntry[] expectedAclSpec, final Path path) throws IOException { + if (aclStatus == null) { + return; + } + + // verify permission and acl + AclEntry[] returned = aclStatus.getEntries().toArray(new AclEntry[0]); + assertArrayEquals(expectedAclSpec, returned); + assertPermission(path, (short) 010770); + FSAclBaseTest.assertAclFeature(cluster, path, true); + } + + private List getAclSpec() { + return Lists.newArrayList( + aclEntry(ACCESS, USER, ALL), + aclEntry(ACCESS, USER, "foo", ALL), + aclEntry(ACCESS, GROUP, READ_EXECUTE), + aclEntry(ACCESS, OTHER, NONE), + aclEntry(DEFAULT, USER, "foo", ALL)); + } + + private AclEntry[] getExpectedAclSpec() { + return new AclEntry[] { + aclEntry(ACCESS, USER, "foo", ALL), + aclEntry(ACCESS, GROUP, READ_EXECUTE), + aclEntry(DEFAULT, USER, ALL), + aclEntry(DEFAULT, USER, "foo", ALL), + aclEntry(DEFAULT, GROUP, READ_EXECUTE), + aclEntry(DEFAULT, MASK, ALL), + aclEntry(DEFAULT, OTHER, NONE) }; + } + + private void assertPermission(final Path pathToCheck, final short perm) + throws IOException { + AclTestHelpers.assertPermission(fs, pathToCheck, perm); + } + + @Test(timeout=60000) + public void testAsyncAPIWithException() throws Exception { + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + UserGroupInformation ugi1; + + // create fake mapping for the groups + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + + // Initiate all four users + ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { + group1, group2 }); + + final Path parent = new Path("/test/async_api_exception/"); + final Path aclDir = new Path(parent, "aclDir"); + fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770)); + + AsyncDistributedFileSystem adfs = ugi1 + .doAs(new PrivilegedExceptionAction() { + @Override + public AsyncDistributedFileSystem run() throws Exception { + return cluster.getFileSystem().getAsyncDistributedFileSystem(); + } + }); + + Future retFuture; + // test setAcl + try { + retFuture = adfs.setAcl(aclDir, + Lists.newArrayList(aclEntry(ACCESS, USER, ALL))); + retFuture.get(); + fail("setAcl should fail with permission denied"); + } catch (ExecutionException e) { + checkPermissionDenied(e, aclDir, user1); + } + + // test getAclStatus + try { + Future aclRetFuture = adfs.getAclStatus(aclDir); + aclRetFuture.get(); + fail("getAclStatus should fail with permission denied"); + } catch (ExecutionException e) { + checkPermissionDenied(e, aclDir, user1); + } + } + + public static void checkPermissionDenied(final Exception e, final Path dir, + final String user) { + assertTrue(e.getCause() instanceof ExecutionException); + assertTrue("Permission denied messages must carry AccessControlException", + e.getMessage().contains("AccessControlException")); + assertTrue("Permission denied messages must carry the username", e + .getMessage().contains(user)); + assertTrue("Permission denied messages must carry the name of the path", + e.getMessage().contains(dir.getName())); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index 7539fbdc82c..03c8151a457 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -520,7 +520,7 @@ public class TestAsyncDFSRename { retFuture = adfs.rename(src, dst, Rename.OVERWRITE); retFuture.get(); } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); + TestAsyncDFS.checkPermissionDenied(e, src, user1); assertTrue("Permission denied messages must carry the path parent", e .getMessage().contains(src.getParent().toUri().getPath())); } @@ -530,7 +530,7 @@ public class TestAsyncDFSRename { retFuture = adfs.setPermission(src, fsPerm); retFuture.get(); } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); + TestAsyncDFS.checkPermissionDenied(e, src, user1); assertTrue("Permission denied messages must carry the name of the path", e.getMessage().contains(src.getName())); } @@ -539,7 +539,7 @@ public class TestAsyncDFSRename { retFuture = adfs.setOwner(src, "user1", "group2"); retFuture.get(); } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); + TestAsyncDFS.checkPermissionDenied(e, src, user1); assertTrue("Permission denied messages must carry the name of the path", e.getMessage().contains(src.getName())); } finally { @@ -551,13 +551,4 @@ public class TestAsyncDFSRename { } } } - - private void checkPermissionDenied(final Exception e, final Path dir, - final String user) { - assertTrue(e.getCause() instanceof ExecutionException); - assertTrue("Permission denied messages must carry AccessControlException", - e.getMessage().contains("AccessControlException")); - assertTrue("Permission denied messages must carry the username", e - .getMessage().contains(user)); - } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java index f481bc1c647..52e638ecbf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSAclBaseTest.java @@ -1637,17 +1637,23 @@ public abstract class FSAclBaseTest { assertAclFeature(path, expectAclFeature); } + private static void assertAclFeature(Path pathToCheck, + boolean expectAclFeature) throws IOException { + assertAclFeature(cluster, pathToCheck, expectAclFeature); + } + /** * Asserts whether or not the inode for a specific path has an AclFeature. * + * @param miniCluster the cluster into which the path resides * @param pathToCheck Path inode to check * @param expectAclFeature boolean true if an AclFeature must be present, * false if an AclFeature must not be present * @throws IOException thrown if there is an I/O error */ - private static void assertAclFeature(Path pathToCheck, - boolean expectAclFeature) throws IOException { - AclFeature aclFeature = getAclFeature(pathToCheck, cluster); + public static void assertAclFeature(final MiniDFSCluster miniCluster, + Path pathToCheck, boolean expectAclFeature) throws IOException { + AclFeature aclFeature = getAclFeature(pathToCheck, miniCluster); if (expectAclFeature) { assertNotNull(aclFeature); // Intentionally capturing a reference to the entries, not using nested