diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java index 480b232ca42..f4584b1afaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java @@ -157,7 +157,11 @@ public void setErasureCodingPolicy(String src, String ecPolicyName) RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy", new Class[] {String.class, String.class}, new RemoteParam(), ecPolicyName); - rpcClient.invokeSequential(locations, remoteMethod, null, null); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, remoteMethod); + } else { + rpcClient.invokeSequential(locations, remoteMethod); + } } public void unsetErasureCodingPolicy(String src) throws IOException { @@ -167,7 +171,11 @@ public void unsetErasureCodingPolicy(String src) throws IOException { rpcServer.getLocationsForPath(src, true); RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy", new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, remoteMethod, null, null); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, remoteMethod); + } else { + rpcClient.invokeSequential(locations, remoteMethod); + } } public ECBlockGroupStats getECBlockGroupStats() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index abab51111c1..757e0969606 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -219,7 +219,7 @@ public HdfsFileStatus create(String src, FsPermission masked, throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - if (createParent && isPathAll(src)) { + if (createParent && rpcServer.isPathAll(src)) { int index = src.lastIndexOf(Path.SEPARATOR); String parent = src.substring(0, index); LOG.debug("Creating {} requires creating parent {}", src, parent); @@ -279,9 +279,13 @@ public boolean setReplication(String src, short replication) RemoteMethod method = new RemoteMethod("setReplication", new Class[] {String.class, short.class}, new RemoteParam(), replication); - Object result = rpcClient.invokeSequential( - locations, method, Boolean.class, Boolean.TRUE); - return (boolean) result; + if (rpcServer.isInvokeConcurrent(src)) { + return !rpcClient.invokeConcurrent(locations, method, Boolean.class) + .containsValue(false); + } else { + return rpcClient.invokeSequential(locations, method, Boolean.class, + Boolean.TRUE); + } } @Override @@ -305,7 +309,7 @@ public void setPermission(String src, FsPermission permissions) RemoteMethod method = new RemoteMethod("setPermission", new Class[] {String.class, FsPermission.class}, new RemoteParam(), permissions); - if (isPathAll(src)) { + if (rpcServer.isInvokeConcurrent(src)) { rpcClient.invokeConcurrent(locations, method); } else { rpcClient.invokeSequential(locations, method); @@ -322,7 +326,7 @@ public void setOwner(String src, String username, String groupname) RemoteMethod method = new RemoteMethod("setOwner", new Class[] {String.class, String.class, String.class}, new RemoteParam(), username, groupname); - if (isPathAll(src)) { + if (rpcServer.isInvokeConcurrent(src)) { rpcClient.invokeConcurrent(locations, method); } else { rpcClient.invokeSequential(locations, method); @@ -555,7 +559,7 @@ public boolean delete(String src, boolean recursive) throws IOException { RemoteMethod method = new RemoteMethod("delete", new Class[] {String.class, boolean.class}, new RemoteParam(), recursive); - if (isPathAll(src)) { + if (rpcServer.isPathAll(src)) { return rpcClient.invokeAll(locations, method); } else { return rpcClient.invokeSequential(locations, method, @@ -575,7 +579,7 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) new RemoteParam(), masked, createParent); // Create in all locations - if (isPathAll(src)) { + if (rpcServer.isPathAll(src)) { return rpcClient.invokeAll(locations, method); } @@ -713,7 +717,7 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { HdfsFileStatus ret = null; // If it's a directory, we check in all locations - if (isPathAll(src)) { + if (rpcServer.isPathAll(src)) { ret = getFileInfoAll(locations, method); } else { // Check for file information sequentially @@ -1315,7 +1319,11 @@ public void setXAttr(String src, XAttr xAttr, EnumSet flag) RemoteMethod method = new RemoteMethod("setXAttr", new Class[] {String.class, XAttr.class, EnumSet.class}, new RemoteParam(), xAttr, flag); - rpcClient.invokeSequential(locations, method); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } @SuppressWarnings("unchecked") @@ -1356,7 +1364,11 @@ public void removeXAttr(String src, XAttr xAttr) throws IOException { rpcServer.getLocationsForPath(src, true); RemoteMethod method = new RemoteMethod("removeXAttr", new Class[] {String.class, XAttr.class}, new RemoteParam(), xAttr); - rpcClient.invokeSequential(locations, method); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } @Override @@ -1718,27 +1730,6 @@ private static FsPermission getParentPermission(final FsPermission mask) { return ret; } - /** - * Check if a path should be in all subclusters. - * - * @param path Path to check. - * @return If a path should be in all subclusters. - */ - private boolean isPathAll(final String path) { - if (subclusterResolver instanceof MountTableResolver) { - try { - MountTableResolver mountTable = (MountTableResolver)subclusterResolver; - MountTable entry = mountTable.getMountPoint(path); - if (entry != null) { - return entry.isAll(); - } - } catch (IOException e) { - LOG.error("Cannot get mount point", e); - } - } - return false; - } - /** * Create a new file status for a mount point. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index a312d4b3a6b..e4ea58b5071 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1541,4 +1541,48 @@ public RouterClientProtocol getClientProtocolModule() { public FederationRPCMetrics getRPCMetrics() { return this.rpcMonitor.getRPCMetrics(); } -} + + /** + * Check if a path should be in all subclusters. + * + * @param path Path to check. + * @return If a path should be in all subclusters. + */ + boolean isPathAll(final String path) { + if (subclusterResolver instanceof MountTableResolver) { + try { + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + MountTable entry = mountTable.getMountPoint(path); + if (entry != null) { + return entry.isAll(); + } + } catch (IOException e) { + LOG.error("Cannot get mount point", e); + } + } + return false; + } + + /** + * Check if call needs to be invoked to all the locations. The call is + * supposed to be invoked in all the locations in case the order of the mount + * entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a + * mount entry. + * @param path The path on which the operation need to be invoked. + * @return true if the call is supposed to invoked on all locations. + * @throws IOException + */ + boolean isInvokeConcurrent(final String path) throws IOException { + if (subclusterResolver instanceof MountTableResolver) { + MountTableResolver mountTableResolver = + (MountTableResolver) subclusterResolver; + List mountPoints = mountTableResolver.getMountPoints(path); + // If this is a mount point, we need to invoke everywhere. + if (mountPoints != null) { + return true; + } + return isPathAll(path); + } + return false; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java index 8a55b9a6fd4..a4538b0e6bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java @@ -50,7 +50,11 @@ public void setStoragePolicy(String src, String policyName) new Class[] {String.class, String.class}, new RemoteParam(), policyName); - rpcClient.invokeSequential(locations, method, null, null); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } public BlockStoragePolicy[] getStoragePolicies() throws IOException { @@ -67,7 +71,11 @@ public void unsetStoragePolicy(String src) throws IOException { RemoteMethod method = new RemoteMethod("unsetStoragePolicy", new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, method); + if (rpcServer.isInvokeConcurrent(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } public BlockStoragePolicy getStoragePolicy(String path) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java new file mode 100644 index 00000000000..8c1515140ae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests router rpc with multiple destination mount table resolver. + */ +public class TestRouterRPCMultipleDestinationMountTableResolver { + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + private static MountTableResolver resolver; + private static DistributedFileSystem nnFs0; + private static DistributedFileSystem nnFs1; + private static DistributedFileSystem routerFs; + private static RouterRpcServer rpcServer; + + @BeforeClass + public static void setUp() throws Exception { + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2, + MultipleDestinationMountTableResolver.class); + Configuration routerConf = + new RouterConfigBuilder().stateStore().admin().quota().rpc().build(); + + Configuration hdfsConf = new Configuration(false); + + cluster.addRouterOverrides(routerConf); + cluster.addNamenodeOverrides(hdfsConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + routerContext = cluster.getRandomRouter(); + resolver = + (MountTableResolver) routerContext.getRouter().getSubclusterResolver(); + nnFs0 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(0), null).getFileSystem(); + nnFs1 = (DistributedFileSystem) cluster + .getNamenode(cluster.getNameservices().get(1), null).getFileSystem(); + routerFs = (DistributedFileSystem) routerContext.getFileSystem(); + rpcServer =routerContext.getRouter().getRpcServer(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + /** + * SetUp the mount entry , directories and file to verify invocation. + * @param order The order that the mount entry needs to follow. + * @throws Exception On account of any exception encountered during setting up + * the environment. + */ + public void setupOrderMountPath(DestinationOrder order) throws Exception { + Map destMap = new HashMap<>(); + destMap.put("ns0", "/tmp"); + destMap.put("ns1", "/tmp"); + nnFs0.mkdirs(new Path("/tmp")); + nnFs1.mkdirs(new Path("/tmp")); + MountTable addEntry = MountTable.newInstance("/mount", destMap); + addEntry.setDestOrder(order); + assertTrue(addMountTable(addEntry)); + routerFs.mkdirs(new Path("/mount/dir/dir")); + DFSTestUtil.createFile(routerFs, new Path("/mount/dir/file"), 100L, (short) 1, + 1024L); + DFSTestUtil.createFile(routerFs, new Path("/mount/file"), 100L, (short) 1, + 1024L); + } + + @After + public void resetTestEnvironment() throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + RemoveMountTableEntryRequest req2 = + RemoveMountTableEntryRequest.newInstance("/mount"); + mountTableManager.removeMountTableEntry(req2); + nnFs0.delete(new Path("/tmp"), true); + nnFs1.delete(new Path("/tmp"), true); + + } + + @Test + public void testInvocationSpaceOrder() throws Exception { + setupOrderMountPath(DestinationOrder.SPACE); + boolean isDirAll = rpcServer.isPathAll("/mount/dir"); + assertTrue(isDirAll); + testInvocation(isDirAll); + } + + @Test + public void testInvocationHashAllOrder() throws Exception { + setupOrderMountPath(DestinationOrder.HASH_ALL); + boolean isDirAll = rpcServer.isPathAll("/mount/dir"); + assertTrue(isDirAll); + testInvocation(isDirAll); + } + + @Test + public void testInvocationRandomOrder() throws Exception { + setupOrderMountPath(DestinationOrder.RANDOM); + boolean isDirAll = rpcServer.isPathAll("/mount/dir"); + assertTrue(isDirAll); + testInvocation(isDirAll); + } + + @Test + public void testInvocationHashOrder() throws Exception { + setupOrderMountPath(DestinationOrder.HASH); + boolean isDirAll = rpcServer.isPathAll("/mount/dir"); + assertFalse(isDirAll); + testInvocation(isDirAll); + } + + @Test + public void testInvocationLocalOrder() throws Exception { + setupOrderMountPath(DestinationOrder.LOCAL); + boolean isDirAll = rpcServer.isPathAll("/mount/dir"); + assertFalse(isDirAll); + testInvocation(isDirAll); + } + + /** + * Verifies the invocation of API's at directory level , file level and at + * mount level. + * @param dirAll if true assumes that the mount entry creates directory on all + * locations. + * @throws IOException + */ + private void testInvocation(boolean dirAll) throws IOException { + // Verify invocation on nested directory and file. + Path mountDir = new Path("/mount/dir/dir"); + Path nameSpaceFile = new Path("/tmp/dir/file"); + Path mountFile = new Path("/mount/dir/file"); + Path mountEntry = new Path("/mount"); + Path mountDest = new Path("/tmp"); + Path nameSpaceDir = new Path("/tmp/dir/dir"); + final String name = "user.a1"; + final byte[] value = {0x31, 0x32, 0x33}; + testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile, + mountFile, nameSpaceDir, name, value); + + // Verify invocation on non nested directory and file. + mountDir = new Path("/mount/dir"); + nameSpaceFile = new Path("/tmp/file"); + mountFile = new Path("/mount/file"); + nameSpaceDir = new Path("/tmp/dir"); + testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile, + mountFile, nameSpaceDir, name, value); + + // Check invocation directly for a mount point. + // Verify owner and permissions. + routerFs.setOwner(mountEntry, "testuser", "testgroup"); + routerFs.setPermission(mountEntry, + FsPermission.createImmutable((short) 777)); + assertEquals("testuser", routerFs.getFileStatus(mountEntry).getOwner()); + assertEquals("testuser", nnFs0.getFileStatus(mountDest).getOwner()); + assertEquals("testuser", nnFs1.getFileStatus(mountDest).getOwner()); + assertEquals((short) 777, + routerFs.getFileStatus(mountEntry).getPermission().toShort()); + assertEquals((short) 777, + nnFs0.getFileStatus(mountDest).getPermission().toShort()); + assertEquals((short) 777, + nnFs1.getFileStatus(mountDest).getPermission().toShort()); + + //Verify storage policy. + routerFs.setStoragePolicy(mountEntry, "COLD"); + assertEquals("COLD", routerFs.getStoragePolicy(mountEntry).getName()); + assertEquals("COLD", nnFs0.getStoragePolicy(mountDest).getName()); + assertEquals("COLD", nnFs1.getStoragePolicy(mountDest).getName()); + routerFs.unsetStoragePolicy(mountEntry); + assertEquals("HOT", routerFs.getStoragePolicy(mountDest).getName()); + assertEquals("HOT", nnFs0.getStoragePolicy(mountDest).getName()); + assertEquals("HOT", nnFs1.getStoragePolicy(mountDest).getName()); + + //Verify erasure coding policy. + routerFs.setErasureCodingPolicy(mountEntry, "RS-6-3-1024k"); + assertEquals("RS-6-3-1024k", + routerFs.getErasureCodingPolicy(mountEntry).getName()); + assertEquals("RS-6-3-1024k", + nnFs0.getErasureCodingPolicy(mountDest).getName()); + assertEquals("RS-6-3-1024k", + nnFs1.getErasureCodingPolicy(mountDest).getName()); + routerFs.unsetErasureCodingPolicy(mountEntry); + assertNull(routerFs.getErasureCodingPolicy(mountDest)); + assertNull(nnFs0.getErasureCodingPolicy(mountDest)); + assertNull(nnFs1.getErasureCodingPolicy(mountDest)); + + //Verify xAttr. + routerFs.setXAttr(mountEntry, name, value); + assertArrayEquals(value, routerFs.getXAttr(mountEntry, name)); + assertArrayEquals(value, nnFs0.getXAttr(mountDest, name)); + assertArrayEquals(value, nnFs1.getXAttr(mountDest, name)); + routerFs.removeXAttr(mountEntry, name); + assertEquals(0, routerFs.getXAttrs(mountEntry).size()); + assertEquals(0, nnFs0.getXAttrs(mountDest).size()); + assertEquals(0, nnFs1.getXAttrs(mountDest).size()); + } + + /** + * SetUp to verify invocations on directories and file. + */ + private void testDirectoryAndFileLevelInvocation(boolean dirAll, + Path mountDir, Path nameSpaceFile, Path mountFile, Path nameSpaceDir, + final String name, final byte[] value) throws IOException { + // Check invocation for a directory. + routerFs.setOwner(mountDir, "testuser", "testgroup"); + routerFs.setPermission(mountDir, FsPermission.createImmutable((short) 777)); + routerFs.setStoragePolicy(mountDir, "COLD"); + routerFs.setErasureCodingPolicy(mountDir, "RS-6-3-1024k"); + routerFs.setXAttr(mountDir, name, value); + + // Verify the directory level invocations were checked in case of mounts not + // creating directories in all subclusters. + boolean checkedDir1 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir, + nnFs0, name, value); + boolean checkedDir2 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir, + nnFs1, name, value); + assertTrue("The file didn't existed in either of the subclusters.", + checkedDir1 || checkedDir2); + routerFs.unsetStoragePolicy(mountDir); + routerFs.removeXAttr(mountDir, name); + routerFs.unsetErasureCodingPolicy(mountDir); + + checkedDir1 = + verifyDirectoryLevelUnsetInvocations(dirAll, nnFs0, nameSpaceDir); + checkedDir2 = + verifyDirectoryLevelUnsetInvocations(dirAll, nnFs1, nameSpaceDir); + assertTrue("The file didn't existed in either of the subclusters.", + checkedDir1 || checkedDir2); + + // Check invocation for a file. + routerFs.setOwner(mountFile, "testuser", "testgroup"); + routerFs.setPermission(mountFile, + FsPermission.createImmutable((short) 777)); + routerFs.setStoragePolicy(mountFile, "COLD"); + routerFs.setReplication(mountFile, (short) 2); + routerFs.setXAttr(mountFile, name, value); + verifyFileLevelInvocations(nameSpaceFile, nnFs0, mountFile, name, value); + verifyFileLevelInvocations(nameSpaceFile, nnFs1, mountFile, name, value); + } + + /** + * Verify invocations of API's unseting values at the directory level. + * @param dirAll true if the mount entry order creates directory in all + * locations. + * @param nameSpaceDir path of the directory in the namespace. + * @param nnFs file system where the directory level invocation needs to be + * tested. + * @throws IOException + */ + private boolean verifyDirectoryLevelUnsetInvocations(boolean dirAll, + DistributedFileSystem nnFs, Path nameSpaceDir) throws IOException { + boolean checked = false; + if (dirAll || nnFs.exists(nameSpaceDir)) { + checked = true; + assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceDir).getName()); + assertNull(nnFs.getErasureCodingPolicy(nameSpaceDir)); + assertEquals(0, nnFs.getXAttrs(nameSpaceDir).size()); + } + return checked; + } + + /** + * Verify file level invocations. + * @param nameSpaceFile path of the file in the namespace. + * @param nnFs the file system where the file invocation needs to checked. + * @param mountFile path of the file w.r.t. mount table. + * @param name name of Xattr. + * @param value value of Xattr. + * @throws IOException + */ + private void verifyFileLevelInvocations(Path nameSpaceFile, + DistributedFileSystem nnFs, Path mountFile, final String name, + final byte[] value) throws IOException { + if (nnFs.exists(nameSpaceFile)) { + assertEquals("testuser", nnFs.getFileStatus(nameSpaceFile).getOwner()); + assertEquals((short) 777, + nnFs.getFileStatus(nameSpaceFile).getPermission().toShort()); + assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceFile).getName()); + assertEquals((short) 2, + nnFs.getFileStatus(nameSpaceFile).getReplication()); + assertArrayEquals(value, nnFs.getXAttr(nameSpaceFile, name)); + + routerFs.unsetStoragePolicy(mountFile); + routerFs.removeXAttr(mountFile, name); + assertEquals(0, nnFs.getXAttrs(nameSpaceFile).size()); + + assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceFile).getName()); + + } + } + + /** + * Verify invocations at the directory level. + * @param dirAll true if the mount entry order creates directory in all + * locations. + * @param nameSpaceDir path of the directory in the namespace. + * @param nnFs file system where the directory level invocation needs to be + * tested. + * @param name name for the Xattr. + * @param value value for the Xattr. + * @return true, if directory existed and successful verification of + * invocations. + * @throws IOException + */ + private boolean verifyDirectoryLevelInvocations(boolean dirAll, + Path nameSpaceDir, DistributedFileSystem nnFs, final String name, + final byte[] value) throws IOException { + boolean checked = false; + if (dirAll || nnFs.exists(nameSpaceDir)) { + checked = true; + assertEquals("testuser", nnFs.getFileStatus(nameSpaceDir).getOwner()); + assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceDir).getName()); + assertEquals("RS-6-3-1024k", + nnFs.getErasureCodingPolicy(nameSpaceDir).getName()); + assertArrayEquals(value, nnFs.getXAttr(nameSpaceDir, name)); + assertEquals((short) 777, + nnFs.getFileStatus(nameSpaceDir).getPermission().toShort()); + } + return checked; + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was successfully added. + * @throws IOException + * Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache + resolver.loadCache(true); + + return addResponse.getStatus(); + } +} \ No newline at end of file