diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
index e097037027d..8ebd95b4866 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java
@@ -31,8 +31,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index 1d27b51a167..7e779b5b1ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import java.net.InetSocketAddress;
+
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC;
/**
@@ -26,18 +27,24 @@ import org.apache.hadoop.ipc.RPC;
* a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active.
+ *
+ * The protocols currently used are:
+ *
*/
public class ConnectionContext {
/** Client for the connection. */
- private final ProxyAndInfo client;
+ private final ProxyAndInfo> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
- public ConnectionContext(ProxyAndInfo connection) {
+ public ConnectionContext(ProxyAndInfo> connection) {
this.client = connection;
}
@@ -74,7 +81,7 @@ public class ConnectionContext {
*
* @return Connection client.
*/
- public synchronized ProxyAndInfo getClient() {
+ public synchronized ProxyAndInfo> getClient() {
this.numThreads++;
return this.client;
}
@@ -96,9 +103,27 @@ public class ConnectionContext {
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
- ClientProtocol proxy = this.client.getProxy();
+ Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}
}
+
+ @Override
+ public String toString() {
+ InetSocketAddress addr = this.client.getAddress();
+ Object proxy = this.client.getProxy();
+ Class> clazz = proxy.getClass();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(clazz.getSimpleName());
+ sb.append("@");
+ sb.append(addr);
+ sb.append("x");
+ sb.append(numThreads);
+ if (closed) {
+ sb.append("[CLOSED]");
+ }
+ return sb.toString();
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index c7ab269dfdc..f62af1f7d6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -166,11 +166,12 @@ public class ConnectionManager {
*
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
+ * @param protocol Protocol for the connection.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
- public ConnectionContext getConnection(
- UserGroupInformation ugi, String nnAddress) throws IOException {
+ public ConnectionContext getConnection(UserGroupInformation ugi,
+ String nnAddress, Class> protocol) throws IOException {
// Check if the manager is shutdown
if (!this.running) {
@@ -181,7 +182,8 @@ public class ConnectionManager {
}
// Try to get the pool if created
- ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
+ ConnectionPoolId connectionId =
+ new ConnectionPoolId(ugi, nnAddress, protocol);
ConnectionPool pool = null;
readLock.lock();
try {
@@ -197,7 +199,7 @@ public class ConnectionManager {
pool = this.pools.get(connectionId);
if (pool == null) {
pool = new ConnectionPool(
- this.conf, nnAddress, ugi, this.minSize, this.maxSize);
+ this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol);
this.pools.put(connectionId, pool);
}
} finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 06bed9c846e..bd7cac30c8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
@@ -75,6 +78,8 @@ public class ConnectionPool {
private final String namenodeAddress;
/** User for this connections. */
private final UserGroupInformation ugi;
+ /** Class of the protocol. */
+ private final Class> protocol;
/** Pool of connections. We mimic a COW array. */
private volatile List connections = new ArrayList<>();
@@ -91,16 +96,17 @@ public class ConnectionPool {
protected ConnectionPool(Configuration config, String address,
- UserGroupInformation user, int minPoolSize, int maxPoolSize)
- throws IOException {
+ UserGroupInformation user, int minPoolSize, int maxPoolSize,
+ Class> proto) throws IOException {
this.conf = config;
// Connection pool target
this.ugi = user;
this.namenodeAddress = address;
+ this.protocol = proto;
this.connectionPoolId =
- new ConnectionPoolId(this.ugi, this.namenodeAddress);
+ new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol);
// Set configuration parameters for the pool
this.minSize = minPoolSize;
@@ -287,7 +293,8 @@ public class ConnectionPool {
* @throws IOException
*/
public ConnectionContext newConnection() throws IOException {
- return newConnection(this.conf, this.namenodeAddress, this.ugi);
+ return newConnection(
+ this.conf, this.namenodeAddress, this.ugi, this.protocol);
}
/**
@@ -299,12 +306,46 @@ public class ConnectionPool {
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
- * @return Proxy for the target ClientProtocol that contains the user's
+ * @param proto Interface of the protocol.
+ * @return proto for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static ConnectionContext newConnection(Configuration conf,
- String nnAddress, UserGroupInformation ugi)
+ String nnAddress, UserGroupInformation ugi, Class> proto)
+ throws IOException {
+ ConnectionContext ret;
+ if (proto == ClientProtocol.class) {
+ ret = newClientConnection(conf, nnAddress, ugi);
+ } else if (proto == NamenodeProtocol.class) {
+ ret = newNamenodeConnection(conf, nnAddress, ugi);
+ } else {
+ String msg = "Unsupported protocol for connection to NameNode: " +
+ ((proto != null) ? proto.getClass().getName() : "null");
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ return ret;
+ }
+
+ /**
+ * Creates a proxy wrapper for a client NN connection. Each proxy contains
+ * context for a single user/security context. To maximize throughput it is
+ * recommended to use multiple connection per user+server, allowing multiple
+ * writes and reads to be dispatched in parallel.
+ *
+ * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
+ * connection identifier.
+ *
+ * @param conf Configuration for the connection.
+ * @param nnAddress Address of server supporting the ClientProtocol.
+ * @param ugi User context.
+ * @return Proxy for the target ClientProtocol that contains the user's
+ * security context.
+ * @throws IOException If it cannot be created.
+ */
+ private static ConnectionContext newClientConnection(
+ Configuration conf, String nnAddress, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
@@ -334,4 +375,49 @@ public class ConnectionPool {
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
+
+ /**
+ * Creates a proxy wrapper for a NN connection. Each proxy contains context
+ * for a single user/security context. To maximize throughput it is
+ * recommended to use multiple connection per user+server, allowing multiple
+ * writes and reads to be dispatched in parallel.
+ *
+ * @param conf Configuration for the connection.
+ * @param nnAddress Address of server supporting the ClientProtocol.
+ * @param ugi User context.
+ * @return Proxy for the target NamenodeProtocol that contains the user's
+ * security context.
+ * @throws IOException If it cannot be created.
+ */
+ private static ConnectionContext newNamenodeConnection(
+ Configuration conf, String nnAddress, UserGroupInformation ugi)
+ throws IOException {
+ RPC.setProtocolEngine(
+ conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+ final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
+ conf,
+ HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
+ HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
+ HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
+ HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
+ HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
+
+ SocketFactory factory = SocketFactory.getDefault();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SaslRpcServer.init(conf);
+ }
+ InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
+ final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
+ NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
+ version, socket, ugi, conf,
+ factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+ NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
+ Text dtService = SecurityUtil.buildTokenService(socket);
+
+ ProxyAndInfo clientProxy =
+ new ProxyAndInfo(client, dtService, socket);
+ ConnectionContext connection = new ConnectionContext(clientProxy);
+ return connection;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
index 6e1ee9a5c4d..458fec203f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
@@ -42,16 +42,21 @@ public class ConnectionPoolId implements Comparable {
private final String nnId;
/** Information about the user. */
private final UserGroupInformation ugi;
+ /** Protocol for the connection. */
+ private final Class> protocol;
/**
* New connection pool identifier.
*
* @param ugi Information of the user issuing the request.
* @param nnId Namenode address with port.
+ * @param proto Protocol of the connection.
*/
- public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
+ public ConnectionPoolId(final UserGroupInformation ugi, final String nnId,
+ final Class> proto) {
this.nnId = nnId;
this.ugi = ugi;
+ this.protocol = proto;
}
@Override
@@ -60,6 +65,7 @@ public class ConnectionPoolId implements Comparable {
.append(this.nnId)
.append(this.ugi.toString())
.append(this.getTokenIds())
+ .append(this.protocol)
.toHashCode();
return hash;
}
@@ -76,14 +82,18 @@ public class ConnectionPoolId implements Comparable {
}
String thisTokens = this.getTokenIds().toString();
String otherTokens = other.getTokenIds().toString();
- return thisTokens.equals(otherTokens);
+ if (!thisTokens.equals(otherTokens)) {
+ return false;
+ }
+ return this.protocol.equals(other.protocol);
}
return false;
}
@Override
public String toString() {
- return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
+ return this.ugi + " " + this.getTokenIds() + "->" + this.nnId + " [" +
+ this.protocol.getSimpleName() + "]";
}
@Override
@@ -97,6 +107,9 @@ public class ConnectionPoolId implements Comparable {
String otherTokens = other.getTokenIds().toString();
ret = thisTokens.compareTo(otherTokens);
}
+ if (ret == 0) {
+ ret = this.protocol.toString().compareTo(other.protocol.toString());
+ }
return ret;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
index 7978105584e..6ff2b01b0b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -38,22 +38,35 @@ public class RemoteMethod {
private final Object[] params;
/** List of method parameters types, matches parameters. */
private final Class>[] types;
+ /** Class of the protocol for the method. */
+ private final Class> protocol;
/** String name of the ClientProtocol method. */
private final String methodName;
/**
- * Create a method with no parameters.
+ * Create a remote method generator for the ClientProtocol with no parameters.
*
- * @param method The string name of the ClientProtocol method.
+ * @param method The string name of the protocol method.
*/
public RemoteMethod(String method) {
- this.params = null;
- this.types = null;
- this.methodName = method;
+ this(ClientProtocol.class, method);
}
/**
- * Creates a remote method generator.
+ * Create a method with no parameters.
+ *
+ * @param proto Protocol of the method.
+ * @param method The string name of the ClientProtocol method.
+ */
+ public RemoteMethod(Class> proto, String method) {
+ this.params = null;
+ this.types = null;
+ this.methodName = method;
+ this.protocol = proto;
+ }
+
+ /**
+ * Create a remote method generator for the ClientProtocol.
*
* @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method.
@@ -70,16 +83,49 @@ public class RemoteMethod {
*/
public RemoteMethod(String method, Class>[] pTypes, Object... pParams)
throws IOException {
+ this(ClientProtocol.class, method, pTypes, pParams);
+ }
+
+ /**
+ * Creates a remote method generator.
+ *
+ * @param proto Protocol of the method.
+ * @param method The string name of the ClientProtocol method.
+ * @param pTypes A list of types to use to locate the specific method.
+ * @param pParams A list of parameters for the method. The order of the
+ * parameter list must match the order and number of the types.
+ * Parameters are grouped into 2 categories:
+ *
+ *
Static parameters that are immutable across locations.
+ *
Dynamic parameters that are determined for each location by a
+ * RemoteParam object. To specify a dynamic parameter, pass an
+ * instance of RemoteParam in place of the parameter value.
+ *
+ * @throws IOException If the types and parameter lists are not valid.
+ */
+ public RemoteMethod(Class> proto, String method, Class>[] pTypes,
+ Object... pParams) throws IOException {
if (pParams.length != pTypes.length) {
throw new IOException("Invalid parameters for method " + method);
}
+ this.protocol = proto;
this.params = pParams;
this.types = Arrays.copyOf(pTypes, pTypes.length);
this.methodName = method;
}
+ /**
+ * Get the interface/protocol for this method. For example, ClientProtocol or
+ * NamenodeProtocol.
+ *
+ * @return Protocol for this method.
+ */
+ public Class> getProtocol() {
+ return this.protocol;
+ }
+
/**
* Get the represented java method.
*
@@ -89,18 +135,18 @@ public class RemoteMethod {
public Method getMethod() throws IOException {
try {
if (types != null) {
- return ClientProtocol.class.getDeclaredMethod(methodName, types);
+ return protocol.getDeclaredMethod(methodName, types);
} else {
- return ClientProtocol.class.getDeclaredMethod(methodName);
+ return protocol.getDeclaredMethod(methodName);
}
} catch (NoSuchMethodException e) {
// Re-throw as an IOException
- LOG.error("Cannot get method {} with types {}",
- methodName, Arrays.toString(types), e);
+ LOG.error("Cannot get method {} with types {} from {}",
+ methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e);
} catch (SecurityException e) {
- LOG.error("Cannot access method {} with types {}",
- methodName, Arrays.toString(types), e);
+ LOG.error("Cannot access method {} with types {} from {}",
+ methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e);
}
}
@@ -161,4 +207,10 @@ public class RemoteMethod {
}
return objList;
}
+
+ @Override
+ public String toString() {
+ return this.protocol.getSimpleName() + "#" + this.methodName + " " +
+ Arrays.toString(this.params);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
new file mode 100644
index 00000000000..85e92d3caf3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+/**
+ * Module that implements all the RPC calls in {@link NamenodeProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterNamenodeProtocol implements NamenodeProtocol {
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+ /** Interface to map global name space to HDFS subcluster name spaces. */
+ private final FileSubclusterResolver subclusterResolver;
+
+
+ public RouterNamenodeProtocol(RouterRpcServer server) {
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ this.subclusterResolver = this.rpcServer.getSubclusterResolver();
+ }
+
+ @Override
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+ throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ);
+
+ // Get the namespace where the datanode is located
+ Map map =
+ rpcServer.getDatanodeStorageReportMap(DatanodeReportType.ALL);
+ String nsId = null;
+ for (Entry entry : map.entrySet()) {
+ DatanodeStorageReport[] dns = entry.getValue();
+ for (DatanodeStorageReport dn : dns) {
+ DatanodeInfo dnInfo = dn.getDatanodeInfo();
+ if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
+ nsId = entry.getKey();
+ break;
+ }
+ }
+ // Break the loop if already found
+ if (nsId != null) {
+ break;
+ }
+ }
+
+ // Forward to the proper namenode
+ if (nsId != null) {
+ RemoteMethod method = new RemoteMethod(
+ NamenodeProtocol.class, "getBlocks",
+ new Class>[] {DatanodeInfo.class, long.class},
+ datanode, size);
+ return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
+ }
+ return null;
+ }
+
+ @Override
+ public ExportedBlockKeys getBlockKeys() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ);
+
+ // We return the information from the default name space
+ String defaultNsId = subclusterResolver.getDefaultNamespace();
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
+ return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class);
+ }
+
+ @Override
+ public long getTransactionID() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ);
+
+ // We return the information from the default name space
+ String defaultNsId = subclusterResolver.getDefaultNamespace();
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
+ return rpcClient.invokeSingle(defaultNsId, method, long.class);
+ }
+
+ @Override
+ public long getMostRecentCheckpointTxId() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ);
+
+ // We return the information from the default name space
+ String defaultNsId = subclusterResolver.getDefaultNamespace();
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
+ return rpcClient.invokeSingle(defaultNsId, method, long.class);
+ }
+
+ @Override
+ public CheckpointSignature rollEditLog() throws IOException {
+ rpcServer.checkOperation(OperationCategory.WRITE, false);
+ return null;
+ }
+
+ @Override
+ public NamespaceInfo versionRequest() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ);
+
+ // We return the information from the default name space
+ String defaultNsId = subclusterResolver.getDefaultNamespace();
+ RemoteMethod method =
+ new RemoteMethod(NamenodeProtocol.class, "versionRequest");
+ return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class);
+ }
+
+ @Override
+ public void errorReport(NamenodeRegistration registration, int errorCode,
+ String msg) throws IOException {
+ rpcServer.checkOperation(OperationCategory.UNCHECKED, false);
+ }
+
+ @Override
+ public NamenodeRegistration registerSubordinateNamenode(
+ NamenodeRegistration registration) throws IOException {
+ rpcServer.checkOperation(OperationCategory.WRITE, false);
+ return null;
+ }
+
+ @Override
+ public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
+ throws IOException {
+ rpcServer.checkOperation(OperationCategory.WRITE, false);
+ return null;
+ }
+
+ @Override
+ public void endCheckpoint(NamenodeRegistration registration,
+ CheckpointSignature sig) throws IOException {
+ rpcServer.checkOperation(OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+ throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public boolean isUpgradeFinalized() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ, false);
+ return false;
+ }
+
+ @Override
+ public boolean isRollingUpgrade() throws IOException {
+ rpcServer.checkOperation(OperationCategory.READ, false);
+ return false;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index b23a931d8a3..ed2e4b83063 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -48,7 +48,6 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
@@ -217,14 +216,14 @@ public class RouterRpcClient {
*
* @param ugi User group information.
* @param nsId Nameservice identifier.
- * @param rpcAddress ClientProtocol RPC server address of the NN.
+ * @param rpcAddress RPC server address of the NN.
+ * @param proto Protocol of the connection.
* @return ConnectionContext containing a ClientProtocol proxy client for the
* NN + current user.
* @throws IOException If we cannot get a connection to the NameNode.
*/
- private ConnectionContext getConnection(
- UserGroupInformation ugi, String nsId, String rpcAddress)
- throws IOException {
+ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
+ String rpcAddress, Class> proto) throws IOException {
ConnectionContext connection = null;
try {
// Each proxy holds the UGI info for the current user when it is created.
@@ -234,7 +233,7 @@ public class RouterRpcClient {
// for each individual request.
// TODO Add tokens from the federated UGI
- connection = this.connectionManager.getConnection(ugi, rpcAddress);
+ connection = this.connectionManager.getConnection(ugi, rpcAddress, proto);
LOG.debug("User {} NN {} is using connection {}",
ugi.getUserName(), rpcAddress, connection);
} catch (Exception ex) {
@@ -318,7 +317,8 @@ public class RouterRpcClient {
private Object invokeMethod(
final UserGroupInformation ugi,
final List extends FederationNamenodeContext> namenodes,
- final Method method, final Object... params) throws IOException {
+ final Class> protocol, final Method method, final Object... params)
+ throws IOException {
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() +
@@ -336,9 +336,10 @@ public class RouterRpcClient {
try {
String nsId = namenode.getNameserviceId();
String rpcAddress = namenode.getRpcAddress();
- connection = this.getConnection(ugi, nsId, rpcAddress);
- ProxyAndInfo client = connection.getClient();
- ClientProtocol proxy = client.getProxy();
+ connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
+ ProxyAndInfo> client = connection.getClient();
+ final Object proxy = client.getProxy();
+
ret = invoke(nsId, 0, method, proxy, params);
if (failover) {
// Success on alternate server, update
@@ -603,7 +604,29 @@ public class RouterRpcClient {
List extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/");
- return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
+ Class> proto = method.getProtocol();
+ Method m = method.getMethod();
+ Object[] params = method.getParams(loc);
+ return invokeMethod(ugi, nns, proto, m, params);
+ }
+
+ /**
+ * Invokes a remote method against the specified namespace.
+ *
+ * Re-throws exceptions generated by the remote RPC call as either
+ * RemoteException or IOException.
+ *
+ * @param nsId Target namespace for the method.
+ * @param method The remote method and parameters to invoke.
+ * @param clazz Class for the return type.
+ * @return The result of invoking the method.
+ * @throws IOException If the invoke generated an error.
+ */
+ public T invokeSingle(final String nsId, RemoteMethod method,
+ Class clazz) throws IOException {
+ @SuppressWarnings("unchecked")
+ T ret = (T)invokeSingle(nsId, method);
+ return ret;
}
/**
@@ -681,8 +704,9 @@ public class RouterRpcClient {
List extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
+ Class> proto = remoteMethod.getProtocol();
Object[] params = remoteMethod.getParams(loc);
- Object result = invokeMethod(ugi, namenodes, m, params);
+ Object result = invokeMethod(ugi, namenodes, proto, m, params);
// Check if the result is what we expected
if (isExpectedClass(expectedResultClass, result) &&
isExpectedValue(expectedResultValue, result)) {
@@ -906,8 +930,9 @@ public class RouterRpcClient {
String ns = location.getNameserviceId();
final List extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
+ Class> proto = method.getProtocol();
Object[] paramList = method.getParams(location);
- Object result = invokeMethod(ugi, namenodes, m, paramList);
+ Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
return Collections.singletonMap(location, clazz.cast(result));
}
@@ -917,6 +942,7 @@ public class RouterRpcClient {
String nsId = location.getNameserviceId();
final List extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(nsId);
+ final Class> proto = method.getProtocol();
final Object[] paramList = method.getParams(location);
if (standby) {
// Call the objectGetter to all NNs (including standby)
@@ -931,7 +957,7 @@ public class RouterRpcClient {
orderedLocations.add(nnLocation);
callables.add(new Callable