diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index baee979dfe7..ec61258f527 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -103,6 +103,7 @@ import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -771,13 +772,14 @@ public class RouterClientProtocol implements ClientProtocol { List> listings = getListingInt(src, startAfter, needLocation); - Map nnListing = new TreeMap<>(); + TreeMap nnListing = new TreeMap<>(); int totalRemainingEntries = 0; int remainingEntries = 0; boolean namenodeListingExists = false; + // Check the subcluster listing with the smallest name to make sure + // no file is skipped across subclusters + String lastName = null; if (listings != null) { - // Check the subcluster listing with the smallest name - String lastName = null; for (RemoteResult result : listings) { if (result.hasException()) { IOException ioe = result.getException(); @@ -824,6 +826,10 @@ public class RouterClientProtocol implements ClientProtocol { // Add mount points at this level in the tree final List children = subclusterResolver.getMountPoints(src); + // Sort the list as the entries from subcluster are also sorted + if (children != null) { + Collections.sort(children); + } if (children != null) { // Get the dates for each mount point Map dates = getMountPointDates(src); @@ -838,9 +844,27 @@ public class RouterClientProtocol implements ClientProtocol { HdfsFileStatus dirStatus = getMountPointStatus(childPath.toString(), 0, date); - // This may overwrite existing listing entries with the mount point - // TODO don't add if already there? - nnListing.put(child, dirStatus); + // if there is no subcluster path, always add mount point + if (lastName == null) { + nnListing.put(child, dirStatus); + } else { + if (shouldAddMountPoint(child, + lastName, startAfter, remainingEntries)) { + // This may overwrite existing listing entries with the mount point + // TODO don't add if already there? + nnListing.put(child, dirStatus); + } + } + } + // Update the remaining count to include left mount points + if (nnListing.size() > 0) { + String lastListing = nnListing.lastKey(); + for (int i = 0; i < children.size(); i++) { + if (children.get(i).compareTo(lastListing) > 0) { + remainingEntries += (children.size() - i); + break; + } + } } } @@ -2108,6 +2132,36 @@ public class RouterClientProtocol implements ClientProtocol { } } + /** + * Check if we should add the mount point into the total listing. + * This should be done under either of the two cases: + * 1) current mount point is between startAfter and cutoff lastEntry. + * 2) there are no remaining entries from subclusters and this mount + * point is bigger than all files from subclusters + * This is to make sure that the following batch of + * getListing call will use the correct startAfter, which is lastEntry from + * subcluster. + * + * @param mountPoint to be added mount point inside router + * @param lastEntry biggest listing from subcluster + * @param startAfter starting listing from client, used to define listing + * start boundary + * @param remainingEntries how many entries left from subcluster + * @return + */ + private static boolean shouldAddMountPoint( + String mountPoint, String lastEntry, byte[] startAfter, + int remainingEntries) { + if (mountPoint.compareTo(DFSUtil.bytes2String(startAfter)) > 0 && + mountPoint.compareTo(lastEntry) <= 0) { + return true; + } + if (remainingEntries == 0 && mountPoint.compareTo(lastEntry) >= 0) { + return true; + } + return false; + } + /** * Checks if the path is a directory and is supposed to be present in all * subclusters. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 131dd74b8e0..39334250bc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -327,16 +327,30 @@ public class MockResolver @Override public List getMountPoints(String path) throws IOException { - // Mounts only supported under root level - if (!path.equals("/")) { - return null; - } List mounts = new ArrayList<>(); - for (String mount : this.locations.keySet()) { - if (mount.length() > 1) { - // Remove leading slash, this is the behavior of the mount tree, - // return only names. - mounts.add(mount.replace("/", "")); + // for root path search, returning all downstream root level mapping + if (path.equals("/")) { + // Mounts only supported under root level + for (String mount : this.locations.keySet()) { + if (mount.length() > 1) { + // Remove leading slash, this is the behavior of the mount tree, + // return only names. + mounts.add(mount.replace("/", "")); + } + } + } else { + // a simplified version of MountTableResolver implementation + for (String key : this.locations.keySet()) { + if (key.startsWith(path)) { + String child = key.substring(path.length()); + if (child.length() > 0) { + // only take children so remove parent path and / + mounts.add(key.substring(path.length()+1)); + } + } + } + if (mounts.size() == 0) { + mounts = null; } } return mounts; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index bfc712f057a..082094e5592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -203,6 +203,9 @@ public class TestRouterRpc { cluster.addNamenodeOverrides(namenodeConf); cluster.setIndependentDNs(); + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5); + cluster.addNamenodeOverrides(conf); // Start NNs and DNs and wait until ready cluster.startCluster(); @@ -436,6 +439,62 @@ public class TestRouterRpc { new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); } + @Test + public void testProxyListFilesLargeDir() throws IOException { + // Call listStatus against a dir with many files + // Create a parent point as well as a subfolder mount + // /parent + // ns0 -> /parent + // /parent/file-7 + // ns0 -> /parent/file-7 + // /parent/file-0 + // ns0 -> /parent/file-0 + for (RouterContext rc : cluster.getRouters()) { + MockResolver resolver = + (MockResolver) rc.getRouter().getSubclusterResolver(); + resolver.addLocation("/parent", ns, "/parent"); + // file-0 is only in mount table + resolver.addLocation("/parent/file-0", ns, "/parent/file-0"); + // file-7 is both in mount table and in file system + resolver.addLocation("/parent/file-7", ns, "/parent/file-7"); + } + + // Test the case when there is no subcluster path and only mount point + FileStatus[] result = routerFS.listStatus(new Path("/parent")); + assertEquals(2, result.length); + // this makes sure file[0-8] is added in order + assertEquals("file-0", result[0].getPath().getName()); + assertEquals("file-7", result[1].getPath().getName()); + + // Create files and test full listing in order + NamenodeContext nn = cluster.getNamenode(ns, null); + FileSystem nnFileSystem = nn.getFileSystem(); + for (int i = 1; i < 9; i++) { + createFile(nnFileSystem, "/parent/file-"+i, 32); + } + + result = routerFS.listStatus(new Path("/parent")); + assertEquals(9, result.length); + // this makes sure file[0-8] is added in order + for (int i = 0; i < 9; i++) { + assertEquals("file-"+i, result[i].getPath().getName()); + } + + // Add file-9 and now this listing will be added from mount point + for (RouterContext rc : cluster.getRouters()) { + MockResolver resolver = + (MockResolver) rc.getRouter().getSubclusterResolver(); + resolver.addLocation("/parent/file-9", ns, "/parent/file-9"); + } + assertFalse(verifyFileExists(nnFileSystem, "/parent/file-9")); + result = routerFS.listStatus(new Path("/parent")); + // file-9 will be added by mount point + assertEquals(10, result.length); + for (int i = 0; i < 10; i++) { + assertEquals("file-"+i, result[i].getPath().getName()); + } + } + @Test public void testProxyListFilesWithConflict() throws IOException, InterruptedException {