HDFS-14082. RBF: Add option to fail operations when a subcluster is unavailable. Contributed by Inigo Goiri.
This commit is contained in:
parent
fa55eacd35
commit
f4bd1114ff
|
@ -125,6 +125,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||
public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
|
||||
FEDERATION_ROUTER_PREFIX + "client.reject.overload";
|
||||
public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false;
|
||||
public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST =
|
||||
FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing";
|
||||
public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true;
|
||||
|
||||
|
||||
// HDFS Router State Store connection
|
||||
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
|
||||
|
|
|
@ -112,6 +112,9 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
private final FileSubclusterResolver subclusterResolver;
|
||||
private final ActiveNamenodeResolver namenodeResolver;
|
||||
|
||||
/** If it requires response from all subclusters. */
|
||||
private final boolean allowPartialList;
|
||||
|
||||
/** Identifier for the super user. */
|
||||
private String superUser;
|
||||
/** Identifier for the super group. */
|
||||
|
@ -125,6 +128,10 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
this.subclusterResolver = rpcServer.getSubclusterResolver();
|
||||
this.namenodeResolver = rpcServer.getNamenodeResolver();
|
||||
|
||||
this.allowPartialList = conf.getBoolean(
|
||||
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST,
|
||||
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT);
|
||||
|
||||
// User and group for reporting
|
||||
try {
|
||||
this.superUser = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
|
@ -614,8 +621,8 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
|
||||
new RemoteParam(), startAfter, needLocation);
|
||||
Map<RemoteLocation, DirectoryListing> listings =
|
||||
rpcClient.invokeConcurrent(
|
||||
locations, method, false, false, DirectoryListing.class);
|
||||
rpcClient.invokeConcurrent(locations, method,
|
||||
!this.allowPartialList, false, DirectoryListing.class);
|
||||
|
||||
Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
|
||||
int totalRemainingEntries = 0;
|
||||
|
@ -1004,8 +1011,8 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
RemoteMethod method = new RemoteMethod("getContentSummary",
|
||||
new Class<?>[] {String.class}, new RemoteParam());
|
||||
Map<RemoteLocation, ContentSummary> results =
|
||||
rpcClient.invokeConcurrent(
|
||||
locations, method, false, false, ContentSummary.class);
|
||||
rpcClient.invokeConcurrent(locations, method,
|
||||
!this.allowPartialList, false, ContentSummary.class);
|
||||
summaries.addAll(results.values());
|
||||
} catch (FileNotFoundException e) {
|
||||
notFoundException = e;
|
||||
|
|
|
@ -1483,6 +1483,15 @@ public class RouterRpcServer extends AbstractService
|
|||
return this.quotaCall;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get ClientProtocol module implementation.
|
||||
* @return ClientProtocol implementation
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public RouterClientProtocol getClientProtocolModule() {
|
||||
return this.clientProto;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get RPC metrics info.
|
||||
* @return The instance of FederationRPCMetrics.
|
||||
|
|
|
@ -482,6 +482,16 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.client.allow-partial-listing</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
If the Router can return a partial list of files in a multi-destination mount point when one of the subclusters is unavailable.
|
||||
True may return a partial list of files if a subcluster is down.
|
||||
False will fail the request if one is unavailable.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.keytab.file</name>
|
||||
<value></value>
|
||||
|
|
|
@ -20,6 +20,13 @@ package org.apache.hadoop.hdfs.server.federation.router;
|
|||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
|
||||
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.internal.util.reflection.Whitebox.getInternalState;
|
||||
import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
|
@ -44,6 +51,13 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterConte
|
|||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* The the RPC interface of the {@link getRouter()} implemented by
|
||||
|
@ -214,4 +228,49 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc {
|
|||
testRename(getRouterContext(), filename1, renamedFile, false);
|
||||
testRename2(getRouterContext(), filename1, renamedFile, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubclusterDown() throws Exception {
|
||||
final int totalFiles = 6;
|
||||
|
||||
List<RouterContext> routers = getCluster().getRouters();
|
||||
|
||||
// Test the behavior when everything is fine
|
||||
FileSystem fs = getRouterFileSystem();
|
||||
FileStatus[] files = fs.listStatus(new Path("/"));
|
||||
assertEquals(totalFiles, files.length);
|
||||
|
||||
// Simulate one of the subclusters is in standby
|
||||
NameNode nn0 = getCluster().getNamenode("ns0", null).getNamenode();
|
||||
FSNamesystem ns0 = nn0.getNamesystem();
|
||||
HAContext nn0haCtx = (HAContext)getInternalState(ns0, "haContext");
|
||||
HAContext mockCtx = mock(HAContext.class);
|
||||
doThrow(new StandbyException("Mock")).when(mockCtx).checkOperation(any());
|
||||
setInternalState(ns0, "haContext", mockCtx);
|
||||
|
||||
// router0 should throw an exception
|
||||
RouterContext router0 = routers.get(0);
|
||||
RouterRpcServer router0RPCServer = router0.getRouter().getRpcServer();
|
||||
RouterClientProtocol router0ClientProtocol =
|
||||
router0RPCServer.getClientProtocolModule();
|
||||
setInternalState(router0ClientProtocol, "allowPartialList", false);
|
||||
try {
|
||||
router0.getFileSystem().listStatus(new Path("/"));
|
||||
fail("I should throw an exception");
|
||||
} catch (RemoteException re) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"No namenode available to invoke getListing", re);
|
||||
}
|
||||
|
||||
// router1 should report partial results
|
||||
RouterContext router1 = routers.get(1);
|
||||
files = router1.getFileSystem().listStatus(new Path("/"));
|
||||
assertTrue("Found " + files.length + " items, we should have less",
|
||||
files.length < totalFiles);
|
||||
|
||||
|
||||
// Restore the HA context and the Router
|
||||
setInternalState(ns0, "haContext", nn0haCtx);
|
||||
setInternalState(router0ClientProtocol, "allowPartialList", true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue