From 0db4b587c604300ffb44677f7cf81ae3faa430d3 Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Wed, 21 Mar 2018 11:32:05 +0800 Subject: [PATCH] HDFS-13250. RBF: Router to manage requests across multiple subclusters. Contributed by Inigo Goiri. (cherry picked from commit 2caba999bbb9d6e3ec56024a0a9d3d56a229edcf) --- .../federation/router/RouterRpcClient.java | 60 +++ .../federation/router/RouterRpcServer.java | 136 +++++- .../federation/store/records/MountTable.java | 13 +- .../router/TestRouterAllResolver.java | 402 ++++++++++++++++++ 4 files changed, 598 insertions(+), 13 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 0d298ac6c4b..c973aa68779 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -768,6 +768,66 @@ private static boolean isExpectedValue(Object expectedValue, Object value) { } } + /** + * Invoke method in all locations and return success if any succeeds. + * + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @return If the call succeeds in any location. + * @throws IOException If any of the calls return an exception. + */ + public boolean invokeAll( + final Collection locations, final RemoteMethod method) + throws IOException { + boolean anyResult = false; + Map results = + invokeConcurrent(locations, method, false, false, Boolean.class); + for (Boolean value : results.values()) { + boolean result = value.booleanValue(); + if (result) { + anyResult = true; + } + } + return anyResult; + } + + /** + * Invoke multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @throws IOException If all the calls throw an exception. + */ + public void invokeConcurrent( + final Collection locations, final RemoteMethod method) + throws IOException { + invokeConcurrent(locations, method, void.class); + } + + /** + * Invoke multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If all the calls throw an exception. + */ + public Map invokeConcurrent( + final Collection locations, final RemoteMethod method, Class clazz) + throws IOException { + return invokeConcurrent(locations, method, false, false, clazz); + } + /** * Invoke multiple concurrent proxy calls to different clients. Returns an * array of results. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index e3a18344f7b..661022c36ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -510,6 +510,18 @@ public HdfsFileStatus create(String src, FsPermission masked, throws IOException { checkOperation(OperationCategory.WRITE); + if (createParent && isPathAll(src)) { + int index = src.lastIndexOf(Path.SEPARATOR); + String parent = src.substring(0, index); + LOG.debug("Creating {} requires creating parent {}", src, parent); + FsPermission parentPermissions = getParentPermission(masked); + boolean success = mkdirs(parent, parentPermissions, createParent); + if (!success) { + // This shouldn't happen as mkdirs returns true or exception + LOG.error("Couldn't create parents for {}", src); + } + } + RemoteLocation createLocation = getCreateLocation(src); RemoteMethod method = new RemoteMethod("create", new Class[] {String.class, FsPermission.class, String.class, @@ -521,6 +533,32 @@ public HdfsFileStatus create(String src, FsPermission masked, return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); } + /** + * Get the permissions for the parent of a child with given permissions. If + * the child has r--, we will set it to r-x. + * @param mask The permission mask of the child. + * @return The permission mask of the parent. + */ + private static FsPermission getParentPermission(final FsPermission mask) { + FsPermission ret = new FsPermission( + applyExecute(mask.getUserAction()), + applyExecute(mask.getGroupAction()), + applyExecute(mask.getOtherAction())); + return ret; + } + + /** + * Apply the execute permissions if it can be read. + * @param action Input permission. + * @return Output permission. + */ + private static FsAction applyExecute(final FsAction action) { + if (action.and(FsAction.READ) == FsAction.READ) { + return action.or(FsAction.EXECUTE); + } + return action; + } + /** * Get the location to create a file. It checks if the file already existed * in one of the locations. @@ -580,7 +618,7 @@ public LastBlockWithStatus append(String src, final String clientName, RemoteMethod method = new RemoteMethod("append", new Class[] {String.class, String.class, EnumSetWritable.class}, new RemoteParam(), clientName, flag); - return (LastBlockWithStatus) rpcClient.invokeSequential( + return rpcClient.invokeSequential( locations, method, LastBlockWithStatus.class, null); } @@ -643,7 +681,11 @@ public void setPermission(String src, FsPermission permissions) RemoteMethod method = new RemoteMethod("setPermission", new Class[] {String.class, FsPermission.class}, new RemoteParam(), permissions); - rpcClient.invokeSequential(locations, method); + if (isPathAll(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } @Override // ClientProtocol @@ -655,7 +697,11 @@ public void setOwner(String src, String username, String groupname) RemoteMethod method = new RemoteMethod("setOwner", new Class[] {String.class, String.class, String.class}, new RemoteParam(), username, groupname); - rpcClient.invokeSequential(locations, method); + if (isPathAll(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } } /** @@ -933,8 +979,12 @@ public boolean delete(String src, boolean recursive) throws IOException { RemoteMethod method = new RemoteMethod("delete", new Class[] {String.class, boolean.class}, new RemoteParam(), recursive); - return ((Boolean) rpcClient.invokeSequential(locations, method, - Boolean.class, Boolean.TRUE)).booleanValue(); + if (isPathAll(src)) { + return rpcClient.invokeAll(locations, method); + } else { + return rpcClient.invokeSequential(locations, method, + Boolean.class, Boolean.TRUE).booleanValue(); + } } @Override // ClientProtocol @@ -943,6 +993,15 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) checkOperation(OperationCategory.WRITE); final List locations = getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("mkdirs", + new Class[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), masked, createParent); + + // Create in all locations + if (isPathAll(src)) { + return rpcClient.invokeAll(locations, method); + } + if (locations.size() > 1) { // Check if this directory already exists try { @@ -959,9 +1018,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) } RemoteLocation firstLocation = locations.get(0); - RemoteMethod method = new RemoteMethod("mkdirs", - new Class[] {String.class, FsPermission.class, boolean.class}, - new RemoteParam(), masked, createParent); return ((Boolean) rpcClient.invokeSingle(firstLocation, method)) .booleanValue(); } @@ -1077,8 +1133,16 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { final List locations = getLocationsForPath(src, false); RemoteMethod method = new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); - HdfsFileStatus ret = (HdfsFileStatus) rpcClient.invokeSequential( - locations, method, HdfsFileStatus.class, null); + + HdfsFileStatus ret = null; + // If it's a directory, we check in all locations + if (isPathAll(src)) { + ret = getFileInfoAll(locations, method); + } else { + // Check for file information sequentially + ret = (HdfsFileStatus) rpcClient.invokeSequential( + locations, method, HdfsFileStatus.class, null); + } // If there is no real path, check mount points if (ret == null) { @@ -1096,6 +1160,37 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { return ret; } + /** + * Get the file info from all the locations. + * + * @param locations Locations to check. + * @param method The file information method to run. + * @return The first file info if it's a file, the directory if it's + * everywhere. + * @throws IOException If all the locations throw an exception. + */ + private HdfsFileStatus getFileInfoAll(final List locations, + final RemoteMethod method) throws IOException { + + // Get the file info from everybody + Map results = + rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class); + + // We return the first file + HdfsFileStatus dirStatus = null; + for (RemoteLocation loc : locations) { + HdfsFileStatus fileStatus = results.get(loc); + if (fileStatus != null) { + if (!fileStatus.isDirectory()) { + return fileStatus; + } else if (dirStatus == null) { + dirStatus = fileStatus; + } + } + } + return dirStatus; + } + @Override // ClientProtocol public boolean isFileClosed(String src) throws IOException { checkOperation(OperationCategory.READ); @@ -2071,6 +2166,27 @@ protected List getLocationsForPath( } } + /** + * Check if a path should be in all subclusters. + * + * @param path Path to check. + * @return If a path should be in all subclusters. + */ + private boolean isPathAll(final String path) { + if (subclusterResolver instanceof MountTableResolver) { + try { + MountTableResolver mountTable = (MountTableResolver)subclusterResolver; + MountTable entry = mountTable.getMountPoint(path); + if (entry != null) { + return entry.isAll(); + } + } catch (IOException e) { + LOG.error("Cannot get mount point: {}", e.getMessage()); + } + } + return false; + } + /** * Check if a path is in a read only mount point. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index 0eab0439940..f8fec87c606 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Data schema for @@ -50,7 +48,6 @@ */ public abstract class MountTable extends BaseRecord { - private static final Logger LOG = LoggerFactory.getLogger(MountTable.class); public static final String ERROR_MSG_NO_SOURCE_PATH = "Invalid entry, no source path specified "; public static final String ERROR_MSG_MUST_START_WITH_BACK_SLASH = @@ -417,6 +414,16 @@ public boolean equals(Object obj) { return false; } + /** + * Check if a mount table spans all locations. + * @return If the mount table spreads across all locations. + */ + public boolean isAll() { + DestinationOrder order = getDestOrder(); + return order == DestinationOrder.HASH_ALL || + order == DestinationOrder.RANDOM; + } + /** * Normalize a path for that filesystem. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java new file mode 100644 index 00000000000..90abd019594 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAllResolver.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests the use of the resolvers that write in all subclusters from the + * Router. It supports: + *
  • HashResolver + *
  • RandomResolver. + */ +public class TestRouterAllResolver { + + /** Directory that will be in a HASH_ALL mount point. */ + private static final String TEST_DIR_HASH_ALL = "/hashall"; + /** Directory that will be in a HASH_ALL mount point. */ + private static final String TEST_DIR_RANDOM = "/random"; + + /** Number of namespaces. */ + private static final int NUM_NAMESPACES = 2; + + + /** Mini HDFS clusters with Routers and State Store. */ + private static StateStoreDFSCluster cluster; + /** Router for testing. */ + private static RouterContext routerContext; + /** Router/federated filesystem. */ + private static FileSystem routerFs; + /** Filesystem for each namespace. */ + private static List nsFss = new LinkedList<>(); + + + @Before + public void setup() throws Exception { + // 2 nameservices with 1 namenode each (no HA needed for this test) + cluster = new StateStoreDFSCluster( + false, NUM_NAMESPACES, MultipleDestinationMountTableResolver.class); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Build and start a Router with: State Store + Admin + RPC + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + routerContext = cluster.getRandomRouter(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + + // Setup the test mount point + createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL); + createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM); + + // Get filesystems for federated and each namespace + routerFs = routerContext.getFileSystem(); + for (String nsId : cluster.getNameservices()) { + List nns = cluster.getNamenodes(nsId); + for (NamenodeContext nn : nns) { + FileSystem nnFs = nn.getFileSystem(); + nsFss.add(nnFs); + } + } + assertEquals(NUM_NAMESPACES, nsFss.size()); + } + + @After + public void cleanup() { + cluster.shutdown(); + cluster = null; + routerContext = null; + routerFs = null; + nsFss.clear(); + } + + @Test + public void testHashAll() throws Exception { + testAll(TEST_DIR_HASH_ALL); + } + + @Test + public void testRandomAll() throws Exception { + testAll(TEST_DIR_RANDOM); + } + + /** + * Tests that the resolver spreads files across subclusters in the whole + * tree. + * @throws Exception If the resolver is not working. + */ + private void testAll(final String path) throws Exception { + + // Create directories in different levels + routerFs.mkdirs(new Path(path + "/dir0")); + routerFs.mkdirs(new Path(path + "/dir1")); + routerFs.mkdirs(new Path(path + "/dir2/dir20")); + routerFs.mkdirs(new Path(path + "/dir2/dir21")); + routerFs.mkdirs(new Path(path + "/dir2/dir22")); + routerFs.mkdirs(new Path(path + "/dir2/dir22/dir220")); + routerFs.mkdirs(new Path(path + "/dir2/dir22/dir221")); + routerFs.mkdirs(new Path(path + "/dir2/dir22/dir222")); + assertDirsEverywhere(path, 9); + + // Create 14 files at different levels of the tree + createTestFile(routerFs, path + "/dir0/file1.txt"); + createTestFile(routerFs, path + "/dir0/file2.txt"); + createTestFile(routerFs, path + "/dir1/file2.txt"); + createTestFile(routerFs, path + "/dir1/file3.txt"); + createTestFile(routerFs, path + "/dir2/dir20/file4.txt"); + createTestFile(routerFs, path + "/dir2/dir20/file5.txt"); + createTestFile(routerFs, path + "/dir2/dir21/file6.txt"); + createTestFile(routerFs, path + "/dir2/dir21/file7.txt"); + createTestFile(routerFs, path + "/dir2/dir22/file8.txt"); + createTestFile(routerFs, path + "/dir2/dir22/file9.txt"); + createTestFile(routerFs, path + "/dir2/dir22/dir220/file10.txt"); + createTestFile(routerFs, path + "/dir2/dir22/dir220/file11.txt"); + createTestFile(routerFs, path + "/dir2/dir22/dir220/file12.txt"); + createTestFile(routerFs, path + "/dir2/dir22/dir220/file13.txt"); + assertDirsEverywhere(path, 9); + assertFilesDistributed(path, 14); + + // Test append + String testFile = path + "/dir2/dir22/dir220/file-append.txt"; + createTestFile(routerFs, testFile); + Path testFilePath = new Path(testFile); + assertTrue("Created file is too small", + routerFs.getFileStatus(testFilePath).getLen() > 50); + appendTestFile(routerFs, testFile); + assertTrue("Append file is too small", + routerFs.getFileStatus(testFilePath).getLen() > 110); + assertDirsEverywhere(path, 9); + assertFilesDistributed(path, 15); + + // Removing a directory should remove it from every subcluster + routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true); + assertDirsEverywhere(path, 8); + assertFilesDistributed(path, 10); + + // Removing all sub directories + routerFs.delete(new Path(path + "/dir0"), true); + routerFs.delete(new Path(path + "/dir1"), true); + routerFs.delete(new Path(path + "/dir2"), true); + assertDirsEverywhere(path, 0); + assertFilesDistributed(path, 0); + } + + /** + * Directories in HASH_ALL mount points must be in every namespace. + * @param path Path to check under. + * @param expectedNumDirs Expected number of directories. + * @throws IOException If it cannot check the directories. + */ + private void assertDirsEverywhere(String path, int expectedNumDirs) + throws IOException { + + // Check for the directories in each filesystem + List files = listRecursive(routerFs, path); + int numDirs = 0; + for (FileStatus file : files) { + if (file.isDirectory()) { + numDirs++; + + Path dirPath = file.getPath(); + Path checkPath = getRelativePath(dirPath); + for (FileSystem nsFs : nsFss) { + FileStatus fileStatus1 = nsFs.getFileStatus(checkPath); + assertTrue(file + " should be a directory", + fileStatus1.isDirectory()); + } + } + } + assertEquals(expectedNumDirs, numDirs); + } + + /** + * Check that the files are somewhat spread across namespaces. + * @param path Path to check under. + * @param expectedNumFiles Number of files expected. + * @throws IOException If the files cannot be checked. + */ + private void assertFilesDistributed(String path, int expectedNumFiles) + throws IOException { + + // Check where the files went + List routerFiles = listRecursive(routerFs, path); + List> nssFiles = new LinkedList<>(); + for (FileSystem nsFs : nsFss) { + List nsFiles = listRecursive(nsFs, path); + nssFiles.add(nsFiles); + } + + // We should see all the files in the federated view + int numRouterFiles = getNumTxtFiles(routerFiles); + assertEquals(numRouterFiles, expectedNumFiles); + + // All the files should be spread somewhat evenly across subclusters + List numNsFiles = new LinkedList<>(); + int sumNsFiles = 0; + for (int i = 0; i < NUM_NAMESPACES; i++) { + List nsFiles = nssFiles.get(i); + int numFiles = getNumTxtFiles(nsFiles); + numNsFiles.add(numFiles); + sumNsFiles += numFiles; + } + assertEquals(numRouterFiles, sumNsFiles); + if (expectedNumFiles > 0) { + for (int numFiles : numNsFiles) { + assertTrue("Files not distributed: " + numNsFiles, numFiles > 0); + } + } + } + + /** + * Create a test file in the filesystem and check if it was written. + * @param fs Filesystem. + * @param filename Name of the file to create. + * @throws IOException If it cannot create the file. + */ + private static void createTestFile( + final FileSystem fs, final String filename)throws IOException { + + final Path path = new Path(filename); + + // Write the data + FSDataOutputStream os = fs.create(path); + os.writeUTF("Test data " + filename); + os.close(); + + // Read the data and check + FSDataInputStream is = fs.open(path); + String read = is.readUTF(); + assertEquals("Test data " + filename, read); + is.close(); + } + + /** + * Append to a test file in the filesystem and check if we appended. + * @param fs Filesystem. + * @param filename Name of the file to append to. + * @throws IOException + */ + private static void appendTestFile( + final FileSystem fs, final String filename) throws IOException { + final Path path = new Path(filename); + + // Write the data + FSDataOutputStream os = fs.append(path); + os.writeUTF("Test append data " + filename); + os.close(); + + // Read the data previous data + FSDataInputStream is = fs.open(path); + String read = is.readUTF(); + assertEquals(read, "Test data " + filename); + // Read the new data and check + read = is.readUTF(); + assertEquals(read, "Test append data " + filename); + is.close(); + } + + /** + * Count the number of text files in a list. + * @param files File list. + * @return Number of .txt files. + */ + private static int getNumTxtFiles(final List files) { + int numFiles = 0; + for (FileStatus file : files) { + if (file.getPath().getName().endsWith(".txt")) { + numFiles++; + } + } + return numFiles; + } + + /** + * Get the relative path within a filesystem (removes the filesystem prefix). + * @param path Path to check. + * @return File within the filesystem. + */ + private static Path getRelativePath(final Path path) { + URI uri = path.toUri(); + String uriPath = uri.getPath(); + return new Path(uriPath); + } + + /** + * Get the list the files/dirs under a path. + * @param fs Filesystem to check in. + * @param path Path to check for. + * @return List of files. + * @throws IOException If it cannot list the files. + */ + private List listRecursive( + final FileSystem fs, final String path) throws IOException { + List ret = new LinkedList<>(); + List temp = new LinkedList<>(); + temp.add(new Path(path)); + while (!temp.isEmpty()) { + Path p = temp.remove(0); + for (FileStatus fileStatus : fs.listStatus(p)) { + ret.add(fileStatus); + if (fileStatus.isDirectory()) { + temp.add(fileStatus.getPath()); + } + } + } + return ret; + } + + /** + * Add a mount table entry in all nameservices and wait until it is + * available in all routers. + * @param mountPoint Name of the mount point. + * @param order Order of the mount table entry. + * @throws Exception If the entry could not be created. + */ + private void createMountTableEntry( + final String mountPoint, final DestinationOrder order) throws Exception { + + RouterClient admin = routerContext.getAdminClient(); + MountTableManager mountTable = admin.getMountTableManager(); + Map destMap = new HashMap<>(); + for (String nsId : cluster.getNameservices()) { + destMap.put(nsId, mountPoint); + } + MountTable newEntry = MountTable.newInstance(mountPoint, destMap); + newEntry.setDestOrder(order); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + boolean created = addResponse.getStatus(); + assertTrue(created); + + // Refresh the caches to get the mount table + Router router = routerContext.getRouter(); + StateStoreService stateStore = router.getStateStore(); + stateStore.refreshCaches(true); + + // Check for the path + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mountPoint); + GetMountTableEntriesResponse getResponse = + mountTable.getMountTableEntries(getRequest); + List entries = getResponse.getEntries(); + assertEquals(1, entries.size()); + assertEquals(mountPoint, entries.get(0).getSourcePath()); + } +} \ No newline at end of file