diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index dd72e3695ac..10018fe2bd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -125,6 +125,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD = FEDERATION_ROUTER_PREFIX + "client.reject.overload"; public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false; + public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST = + FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing"; + public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true; + // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 2e0713fba2e..c8b7cdd1aa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -112,6 +112,9 @@ public class RouterClientProtocol implements ClientProtocol { private final FileSubclusterResolver subclusterResolver; private final ActiveNamenodeResolver namenodeResolver; + /** If it requires response from all subclusters. */ + private final boolean allowPartialList; + /** Identifier for the super user. */ private String superUser; /** Identifier for the super group. */ @@ -125,6 +128,10 @@ public class RouterClientProtocol implements ClientProtocol { this.subclusterResolver = rpcServer.getSubclusterResolver(); this.namenodeResolver = rpcServer.getNamenodeResolver(); + this.allowPartialList = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, + RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT); + // User and group for reporting try { this.superUser = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -614,8 +621,8 @@ public class RouterClientProtocol implements ClientProtocol { new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); Map listings = - rpcClient.invokeConcurrent( - locations, method, false, false, DirectoryListing.class); + rpcClient.invokeConcurrent(locations, method, + !this.allowPartialList, false, DirectoryListing.class); Map nnListing = new TreeMap<>(); int totalRemainingEntries = 0; @@ -1004,8 +1011,8 @@ public class RouterClientProtocol implements ClientProtocol { RemoteMethod method = new RemoteMethod("getContentSummary", new Class[] {String.class}, new RemoteParam()); Map results = - rpcClient.invokeConcurrent( - locations, method, false, false, ContentSummary.class); + rpcClient.invokeConcurrent(locations, method, + !this.allowPartialList, false, ContentSummary.class); summaries.addAll(results.values()); } catch (FileNotFoundException e) { notFoundException = e; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index fcb35f41271..ad5980b8d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1483,6 +1483,15 @@ public class RouterRpcServer extends AbstractService return this.quotaCall; } + /** + * Get ClientProtocol module implementation. + * @return ClientProtocol implementation + */ + @VisibleForTesting + public RouterClientProtocol getClientProtocolModule() { + return this.clientProto; + } + /** * Get RPC metrics info. * @return The instance of FederationRPCMetrics. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 53bf53aab93..09050bbff6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -482,6 +482,16 @@ + + dfs.federation.router.client.allow-partial-listing + true + + If the Router can return a partial list of files in a multi-destination mount point when one of the subclusters is unavailable. + True may return a partial list of files if a subcluster is down. + False will fail the request if one is unavailable. + + + dfs.federation.router.keytab.file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java index 94b712f534e..31017480b48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -20,6 +20,13 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.internal.util.reflection.Whitebox.getInternalState; +import static org.mockito.internal.util.reflection.Whitebox.setInternalState; import java.io.IOException; import java.lang.reflect.Method; @@ -44,6 +51,13 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterConte import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; /** * The the RPC interface of the {@link getRouter()} implemented by @@ -214,4 +228,49 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc { testRename(getRouterContext(), filename1, renamedFile, false); testRename2(getRouterContext(), filename1, renamedFile, false); } + + @Test + public void testSubclusterDown() throws Exception { + final int totalFiles = 6; + + List routers = getCluster().getRouters(); + + // Test the behavior when everything is fine + FileSystem fs = getRouterFileSystem(); + FileStatus[] files = fs.listStatus(new Path("/")); + assertEquals(totalFiles, files.length); + + // Simulate one of the subclusters is in standby + NameNode nn0 = getCluster().getNamenode("ns0", null).getNamenode(); + FSNamesystem ns0 = nn0.getNamesystem(); + HAContext nn0haCtx = (HAContext)getInternalState(ns0, "haContext"); + HAContext mockCtx = mock(HAContext.class); + doThrow(new StandbyException("Mock")).when(mockCtx).checkOperation(any()); + setInternalState(ns0, "haContext", mockCtx); + + // router0 should throw an exception + RouterContext router0 = routers.get(0); + RouterRpcServer router0RPCServer = router0.getRouter().getRpcServer(); + RouterClientProtocol router0ClientProtocol = + router0RPCServer.getClientProtocolModule(); + setInternalState(router0ClientProtocol, "allowPartialList", false); + try { + router0.getFileSystem().listStatus(new Path("/")); + fail("I should throw an exception"); + } catch (RemoteException re) { + GenericTestUtils.assertExceptionContains( + "No namenode available to invoke getListing", re); + } + + // router1 should report partial results + RouterContext router1 = routers.get(1); + files = router1.getFileSystem().listStatus(new Path("/")); + assertTrue("Found " + files.length + " items, we should have less", + files.length < totalFiles); + + + // Restore the HA context and the Router + setInternalState(ns0, "haContext", nn0haCtx); + setInternalState(router0ClientProtocol, "allowPartialList", true); + } } \ No newline at end of file