HDFS-14215. RBF: Remove dependency on availability of default namespace. Contributed by Ayush Saxena.
This commit is contained in:
parent
acdf911c01
commit
9eed3a49df
|
@ -201,8 +201,7 @@ public class RouterClientProtocol implements ClientProtocol {
|
|||
rpcServer.checkOperation(NameNode.OperationCategory.READ);
|
||||
|
||||
RemoteMethod method = new RemoteMethod("getServerDefaults");
|
||||
String ns = subclusterResolver.getDefaultNamespace();
|
||||
return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
|
||||
return rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.Map.Entry;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
|
@ -45,14 +44,11 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
|
|||
private final RouterRpcServer rpcServer;
|
||||
/** RPC clients to connect to the Namenodes. */
|
||||
private final RouterRpcClient rpcClient;
|
||||
/** Interface to map global name space to HDFS subcluster name spaces. */
|
||||
private final FileSubclusterResolver subclusterResolver;
|
||||
|
||||
|
||||
public RouterNamenodeProtocol(RouterRpcServer server) {
|
||||
this.rpcServer = server;
|
||||
this.rpcClient = this.rpcServer.getRPCClient();
|
||||
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,33 +90,27 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
|
|||
public ExportedBlockKeys getBlockKeys() throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
// We return the information from the default name space
|
||||
String defaultNsId = subclusterResolver.getDefaultNamespace();
|
||||
RemoteMethod method =
|
||||
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
|
||||
return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class);
|
||||
return rpcServer.invokeAtAvailableNs(method, ExportedBlockKeys.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTransactionID() throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
// We return the information from the default name space
|
||||
String defaultNsId = subclusterResolver.getDefaultNamespace();
|
||||
RemoteMethod method =
|
||||
new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
|
||||
return rpcClient.invokeSingle(defaultNsId, method, long.class);
|
||||
return rpcServer.invokeAtAvailableNs(method, long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMostRecentCheckpointTxId() throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
// We return the information from the default name space
|
||||
String defaultNsId = subclusterResolver.getDefaultNamespace();
|
||||
RemoteMethod method =
|
||||
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
|
||||
return rpcClient.invokeSingle(defaultNsId, method, long.class);
|
||||
return rpcServer.invokeAtAvailableNs(method, long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,11 +123,9 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
|
|||
public NamespaceInfo versionRequest() throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
|
||||
// We return the information from the default name space
|
||||
String defaultNsId = subclusterResolver.getDefaultNamespace();
|
||||
RemoteMethod method =
|
||||
new RemoteMethod(NamenodeProtocol.class, "versionRequest");
|
||||
return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class);
|
||||
return rpcServer.invokeAtAvailableNs(method, NamespaceInfo.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -479,6 +479,29 @@ public class RouterRpcServer extends AbstractService
|
|||
return methodName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the method at default namespace, if default namespace is not
|
||||
* available then at the first available namespace.
|
||||
* @param <T> expected return type.
|
||||
* @param method the remote method.
|
||||
* @return the response received after invoking method.
|
||||
* @throws IOException
|
||||
*/
|
||||
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
|
||||
throws IOException {
|
||||
String nsId = subclusterResolver.getDefaultNamespace();
|
||||
if (!nsId.isEmpty()) {
|
||||
return rpcClient.invokeSingle(nsId, method, clazz);
|
||||
}
|
||||
// If default Ns is not present return result from first namespace.
|
||||
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
||||
if (nss.isEmpty()) {
|
||||
throw new IOException("No namespace availaible.");
|
||||
}
|
||||
nsId = nss.iterator().next().getNameserviceId();
|
||||
return rpcClient.invokeSingle(nsId, method, clazz);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
||||
|
@ -36,13 +35,10 @@ public class RouterStoragePolicy {
|
|||
private final RouterRpcServer rpcServer;
|
||||
/** RPC clients to connect to the Namenodes. */
|
||||
private final RouterRpcClient rpcClient;
|
||||
/** Interface to map global name space to HDFS subcluster name spaces. */
|
||||
private final FileSubclusterResolver subclusterResolver;
|
||||
|
||||
public RouterStoragePolicy(RouterRpcServer server) {
|
||||
this.rpcServer = server;
|
||||
this.rpcClient = this.rpcServer.getRPCClient();
|
||||
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
|
||||
}
|
||||
|
||||
public void setStoragePolicy(String src, String policyName)
|
||||
|
@ -61,8 +57,7 @@ public class RouterStoragePolicy {
|
|||
rpcServer.checkOperation(NameNode.OperationCategory.READ);
|
||||
|
||||
RemoteMethod method = new RemoteMethod("getStoragePolicies");
|
||||
String ns = subclusterResolver.getDefaultNamespace();
|
||||
return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
|
||||
return rpcServer.invokeAtAvailableNs(method, BlockStoragePolicy[].class);
|
||||
}
|
||||
|
||||
public void unsetStoragePolicy(String src) throws IOException {
|
||||
|
|
|
@ -57,6 +57,7 @@ public class MockResolver
|
|||
private Map<String, List<RemoteLocation>> locations = new HashMap<>();
|
||||
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
|
||||
private String defaultNamespace = null;
|
||||
private boolean disableDefaultNamespace = false;
|
||||
|
||||
public MockResolver() {
|
||||
this.cleanRegistrations();
|
||||
|
@ -322,8 +323,19 @@ public class MockResolver
|
|||
public void setRouterId(String router) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mocks the availability of default namespace.
|
||||
* @param b if true default namespace is unset.
|
||||
*/
|
||||
public void setDisableNamespace(boolean b) {
|
||||
this.disableDefaultNamespace = b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDefaultNamespace() {
|
||||
if (disableDefaultNamespace) {
|
||||
return "";
|
||||
}
|
||||
return defaultNamespace;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -828,6 +829,40 @@ public class TestRouterRpc {
|
|||
assertEquals(nnPolicy.getId(), policy.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListStoragePolicies() throws IOException, URISyntaxException {
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
|
||||
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetServerDefaults() throws IOException, URISyntaxException {
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
FsServerDefaults defaults = namenode.getClient().getServerDefaults();
|
||||
assertEquals(defaults.getBlockSize(),
|
||||
routerProtocol.getServerDefaults().getBlockSize());
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
assertEquals(defaults.getBlockSize(),
|
||||
routerProtocol.getServerDefaults().getBlockSize());
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProxyGetPreferedBlockSize() throws Exception {
|
||||
|
||||
|
@ -1012,8 +1047,23 @@ public class TestRouterRpc {
|
|||
|
||||
@Test
|
||||
public void testProxyVersionRequest() throws Exception {
|
||||
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
|
||||
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
|
||||
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
|
||||
compareVersion(rVersion, nnVersion);
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
rVersion = routerNamenodeProtocol.versionRequest();
|
||||
compareVersion(rVersion, nnVersion);
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void compareVersion(NamespaceInfo rVersion, NamespaceInfo nnVersion) {
|
||||
assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
|
||||
assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
|
||||
assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
|
||||
|
@ -1023,8 +1073,24 @@ public class TestRouterRpc {
|
|||
|
||||
@Test
|
||||
public void testProxyGetBlockKeys() throws Exception {
|
||||
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
|
||||
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
|
||||
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
|
||||
compareBlockKeys(rKeys, nnKeys);
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
rKeys = routerNamenodeProtocol.getBlockKeys();
|
||||
compareBlockKeys(rKeys, nnKeys);
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void compareBlockKeys(ExportedBlockKeys rKeys,
|
||||
ExportedBlockKeys nnKeys) {
|
||||
assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
|
||||
assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
|
||||
assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
|
||||
|
@ -1054,17 +1120,38 @@ public class TestRouterRpc {
|
|||
|
||||
@Test
|
||||
public void testProxyGetTransactionID() throws IOException {
|
||||
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
|
||||
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
|
||||
assertEquals(nnTransactionID, routerTransactionID);
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
|
||||
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
|
||||
assertEquals(nnTransactionID, routerTransactionID);
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
routerTransactionID = routerNamenodeProtocol.getTransactionID();
|
||||
assertEquals(nnTransactionID, routerTransactionID);
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProxyGetMostRecentCheckpointTxId() throws IOException {
|
||||
long routerCheckPointId =
|
||||
routerNamenodeProtocol.getMostRecentCheckpointTxId();
|
||||
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
|
||||
assertEquals(nnCheckPointId, routerCheckPointId);
|
||||
MockResolver resolver =
|
||||
(MockResolver) router.getRouter().getSubclusterResolver();
|
||||
try {
|
||||
// Check with default namespace specified.
|
||||
long routerCheckPointId =
|
||||
routerNamenodeProtocol.getMostRecentCheckpointTxId();
|
||||
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
|
||||
assertEquals(nnCheckPointId, routerCheckPointId);
|
||||
// Check with default namespace unspecified.
|
||||
resolver.setDisableNamespace(true);
|
||||
routerCheckPointId = routerNamenodeProtocol.getMostRecentCheckpointTxId();
|
||||
} finally {
|
||||
resolver.setDisableNamespace(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue