HDFS-14215. RBF: Remove dependency on availability of default namespace. Contributed by Ayush Saxena.

This commit is contained in:
Inigo Goiri 2019-01-28 10:04:24 -08:00 committed by Brahma Reddy Battula
parent 0c47bac33d
commit 4c4e8df68c
6 changed files with 139 additions and 35 deletions

View File

@ -201,8 +201,7 @@ public class RouterClientProtocol implements ClientProtocol {
rpcServer.checkOperation(NameNode.OperationCategory.READ); rpcServer.checkOperation(NameNode.OperationCategory.READ);
RemoteMethod method = new RemoteMethod("getServerDefaults"); RemoteMethod method = new RemoteMethod("getServerDefaults");
String ns = subclusterResolver.getDefaultNamespace(); return rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
} }
@Override @Override

View File

@ -24,7 +24,6 @@ import java.util.Map.Entry;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@ -45,14 +44,11 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
private final RouterRpcServer rpcServer; private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */ /** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient; private final RouterRpcClient rpcClient;
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
public RouterNamenodeProtocol(RouterRpcServer server) { public RouterNamenodeProtocol(RouterRpcServer server) {
this.rpcServer = server; this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient(); this.rpcClient = this.rpcServer.getRPCClient();
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
} }
@Override @Override
@ -94,33 +90,27 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
public ExportedBlockKeys getBlockKeys() throws IOException { public ExportedBlockKeys getBlockKeys() throws IOException {
rpcServer.checkOperation(OperationCategory.READ); rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method = RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys"); new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class); return rpcServer.invokeAtAvailableNs(method, ExportedBlockKeys.class);
} }
@Override @Override
public long getTransactionID() throws IOException { public long getTransactionID() throws IOException {
rpcServer.checkOperation(OperationCategory.READ); rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method = RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getTransactionID"); new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
return rpcClient.invokeSingle(defaultNsId, method, long.class); return rpcServer.invokeAtAvailableNs(method, long.class);
} }
@Override @Override
public long getMostRecentCheckpointTxId() throws IOException { public long getMostRecentCheckpointTxId() throws IOException {
rpcServer.checkOperation(OperationCategory.READ); rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method = RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId"); new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
return rpcClient.invokeSingle(defaultNsId, method, long.class); return rpcServer.invokeAtAvailableNs(method, long.class);
} }
@Override @Override
@ -133,11 +123,9 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
public NamespaceInfo versionRequest() throws IOException { public NamespaceInfo versionRequest() throws IOException {
rpcServer.checkOperation(OperationCategory.READ); rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method = RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "versionRequest"); new RemoteMethod(NamenodeProtocol.class, "versionRequest");
return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class); return rpcServer.invokeAtAvailableNs(method, NamespaceInfo.class);
} }
@Override @Override

View File

@ -479,6 +479,29 @@ public class RouterRpcServer extends AbstractService
return methodName; return methodName;
} }
/**
* Invokes the method at default namespace, if default namespace is not
* available then at the first available namespace.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
if (!nsId.isEmpty()) {
return rpcClient.invokeSingle(nsId, method, clazz);
}
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
throw new IOException("No namespace availaible.");
}
nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
}
@Override // ClientProtocol @Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException { throws IOException {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -36,13 +35,10 @@ public class RouterStoragePolicy {
private final RouterRpcServer rpcServer; private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */ /** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient; private final RouterRpcClient rpcClient;
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
public RouterStoragePolicy(RouterRpcServer server) { public RouterStoragePolicy(RouterRpcServer server) {
this.rpcServer = server; this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient(); this.rpcClient = this.rpcServer.getRPCClient();
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
} }
public void setStoragePolicy(String src, String policyName) public void setStoragePolicy(String src, String policyName)
@ -61,8 +57,7 @@ public class RouterStoragePolicy {
rpcServer.checkOperation(NameNode.OperationCategory.READ); rpcServer.checkOperation(NameNode.OperationCategory.READ);
RemoteMethod method = new RemoteMethod("getStoragePolicies"); RemoteMethod method = new RemoteMethod("getStoragePolicies");
String ns = subclusterResolver.getDefaultNamespace(); return rpcServer.invokeAtAvailableNs(method, BlockStoragePolicy[].class);
return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
} }
public void unsetStoragePolicy(String src) throws IOException { public void unsetStoragePolicy(String src) throws IOException {

View File

@ -57,6 +57,7 @@ public class MockResolver
private Map<String, List<RemoteLocation>> locations = new HashMap<>(); private Map<String, List<RemoteLocation>> locations = new HashMap<>();
private Set<FederationNamespaceInfo> namespaces = new HashSet<>(); private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null; private String defaultNamespace = null;
private boolean disableDefaultNamespace = false;
public MockResolver() { public MockResolver() {
this.cleanRegistrations(); this.cleanRegistrations();
@ -322,8 +323,19 @@ public class MockResolver
public void setRouterId(String router) { public void setRouterId(String router) {
} }
/**
* Mocks the availability of default namespace.
* @param b if true default namespace is unset.
*/
public void setDisableNamespace(boolean b) {
this.disableDefaultNamespace = b;
}
@Override @Override
public String getDefaultNamespace() { public String getDefaultNamespace() {
if (disableDefaultNamespace) {
return "";
}
return defaultNamespace; return defaultNamespace;
} }
} }

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -828,6 +829,40 @@ public class TestRouterRpc {
assertEquals(nnPolicy.getId(), policy.getId()); assertEquals(nnPolicy.getId(), policy.getId());
} }
@Test
public void testListStoragePolicies() throws IOException, URISyntaxException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
} finally {
resolver.setDisableNamespace(false);
}
}
@Test
public void testGetServerDefaults() throws IOException, URISyntaxException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
FsServerDefaults defaults = namenode.getClient().getServerDefaults();
assertEquals(defaults.getBlockSize(),
routerProtocol.getServerDefaults().getBlockSize());
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
assertEquals(defaults.getBlockSize(),
routerProtocol.getServerDefaults().getBlockSize());
} finally {
resolver.setDisableNamespace(false);
}
}
@Test @Test
public void testProxyGetPreferedBlockSize() throws Exception { public void testProxyGetPreferedBlockSize() throws Exception {
@ -1012,8 +1047,23 @@ public class TestRouterRpc {
@Test @Test
public void testProxyVersionRequest() throws Exception { public void testProxyVersionRequest() throws Exception {
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest(); MockResolver resolver =
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest(); (MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
compareVersion(rVersion, nnVersion);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
rVersion = routerNamenodeProtocol.versionRequest();
compareVersion(rVersion, nnVersion);
} finally {
resolver.setDisableNamespace(false);
}
}
private void compareVersion(NamespaceInfo rVersion, NamespaceInfo nnVersion) {
assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID()); assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID()); assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
assertEquals(nnVersion.getClusterID(), rVersion.getClusterID()); assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
@ -1023,8 +1073,24 @@ public class TestRouterRpc {
@Test @Test
public void testProxyGetBlockKeys() throws Exception { public void testProxyGetBlockKeys() throws Exception {
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys(); MockResolver resolver =
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys(); (MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
compareBlockKeys(rKeys, nnKeys);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
rKeys = routerNamenodeProtocol.getBlockKeys();
compareBlockKeys(rKeys, nnKeys);
} finally {
resolver.setDisableNamespace(false);
}
}
private void compareBlockKeys(ExportedBlockKeys rKeys,
ExportedBlockKeys nnKeys) {
assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey()); assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval()); assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime()); assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
@ -1054,17 +1120,38 @@ public class TestRouterRpc {
@Test @Test
public void testProxyGetTransactionID() throws IOException { public void testProxyGetTransactionID() throws IOException {
long routerTransactionID = routerNamenodeProtocol.getTransactionID(); MockResolver resolver =
long nnTransactionID = nnNamenodeProtocol.getTransactionID(); (MockResolver) router.getRouter().getSubclusterResolver();
assertEquals(nnTransactionID, routerTransactionID); try {
// Check with default namespace specified.
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
assertEquals(nnTransactionID, routerTransactionID);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
routerTransactionID = routerNamenodeProtocol.getTransactionID();
assertEquals(nnTransactionID, routerTransactionID);
} finally {
resolver.setDisableNamespace(false);
}
} }
@Test @Test
public void testProxyGetMostRecentCheckpointTxId() throws IOException { public void testProxyGetMostRecentCheckpointTxId() throws IOException {
long routerCheckPointId = MockResolver resolver =
routerNamenodeProtocol.getMostRecentCheckpointTxId(); (MockResolver) router.getRouter().getSubclusterResolver();
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId(); try {
assertEquals(nnCheckPointId, routerCheckPointId); // Check with default namespace specified.
long routerCheckPointId =
routerNamenodeProtocol.getMostRecentCheckpointTxId();
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
assertEquals(nnCheckPointId, routerCheckPointId);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
routerCheckPointId = routerNamenodeProtocol.getMostRecentCheckpointTxId();
} finally {
resolver.setDisableNamespace(false);
}
} }
@Test @Test