From 254ec1e4d3547d87c5ec6d35b334fe1af085a016 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Tue, 3 Apr 2018 09:29:20 -0700 Subject: [PATCH] HDFS-13364. RBF: Support NamenodeProtocol in the Router. Contributed by Inigo Goiri. --- .../metrics/NamenodeBeanMetrics.java | 2 - .../federation/router/ConnectionContext.java | 35 +++- .../federation/router/ConnectionManager.java | 10 +- .../federation/router/ConnectionPool.java | 98 ++++++++- .../federation/router/ConnectionPoolId.java | 19 +- .../federation/router/RemoteMethod.java | 76 +++++-- .../router/RouterNamenodeProtocol.java | 187 ++++++++++++++++++ .../federation/router/RouterRpcClient.java | 56 ++++-- .../federation/router/RouterRpcServer.java | 111 ++++++++++- .../federation/MiniRouterDFSCluster.java | 8 + .../router/TestConnectionManager.java | 56 +++++- .../federation/router/TestRouterRpc.java | 115 +++++++++-- 12 files changed, 701 insertions(+), 72 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index e097037027d..8ebd95b4866 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java index 1d27b51a167..7e779b5b1ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java @@ -17,8 +17,9 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import java.net.InetSocketAddress; + import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.ipc.RPC; /** @@ -26,18 +27,24 @@ import org.apache.hadoop.ipc.RPC; * a connection, it increments a counter to mark it as active. Once the client * is done with the connection, it decreases the counter. It also takes care of * closing the connection once is not active. + * + * The protocols currently used are: + * */ public class ConnectionContext { /** Client for the connection. */ - private final ProxyAndInfo client; + private final ProxyAndInfo client; /** How many threads are using this connection. */ private int numThreads = 0; /** If the connection is closed. */ private boolean closed = false; - public ConnectionContext(ProxyAndInfo connection) { + public ConnectionContext(ProxyAndInfo connection) { this.client = connection; } @@ -74,7 +81,7 @@ public class ConnectionContext { * * @return Connection client. */ - public synchronized ProxyAndInfo getClient() { + public synchronized ProxyAndInfo getClient() { this.numThreads++; return this.client; } @@ -96,9 +103,27 @@ public class ConnectionContext { public synchronized void close() { this.closed = true; if (this.numThreads == 0) { - ClientProtocol proxy = this.client.getProxy(); + Object proxy = this.client.getProxy(); // Nobody should be using this anymore so it should close right away RPC.stopProxy(proxy); } } + + @Override + public String toString() { + InetSocketAddress addr = this.client.getAddress(); + Object proxy = this.client.getProxy(); + Class clazz = proxy.getClass(); + + StringBuilder sb = new StringBuilder(); + sb.append(clazz.getSimpleName()); + sb.append("@"); + sb.append(addr); + sb.append("x"); + sb.append(numThreads); + if (closed) { + sb.append("[CLOSED]"); + } + return sb.toString(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index c7ab269dfdc..f62af1f7d6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -166,11 +166,12 @@ public class ConnectionManager { * * @param ugi User group information. * @param nnAddress Namenode address for the connection. + * @param protocol Protocol for the connection. * @return Proxy client to connect to nnId as UGI. * @throws IOException If the connection cannot be obtained. */ - public ConnectionContext getConnection( - UserGroupInformation ugi, String nnAddress) throws IOException { + public ConnectionContext getConnection(UserGroupInformation ugi, + String nnAddress, Class protocol) throws IOException { // Check if the manager is shutdown if (!this.running) { @@ -181,7 +182,8 @@ public class ConnectionManager { } // Try to get the pool if created - ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); + ConnectionPoolId connectionId = + new ConnectionPoolId(ugi, nnAddress, protocol); ConnectionPool pool = null; readLock.lock(); try { @@ -197,7 +199,7 @@ public class ConnectionManager { pool = this.pools.get(connectionId); if (pool == null) { pool = new ConnectionPool( - this.conf, nnAddress, ugi, this.minSize, this.maxSize); + this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol); this.pools.put(connectionId, pool); } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 06bed9c846e..bd7cac30c8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryUtils; @@ -75,6 +78,8 @@ public class ConnectionPool { private final String namenodeAddress; /** User for this connections. */ private final UserGroupInformation ugi; + /** Class of the protocol. */ + private final Class protocol; /** Pool of connections. We mimic a COW array. */ private volatile List connections = new ArrayList<>(); @@ -91,16 +96,17 @@ public class ConnectionPool { protected ConnectionPool(Configuration config, String address, - UserGroupInformation user, int minPoolSize, int maxPoolSize) - throws IOException { + UserGroupInformation user, int minPoolSize, int maxPoolSize, + Class proto) throws IOException { this.conf = config; // Connection pool target this.ugi = user; this.namenodeAddress = address; + this.protocol = proto; this.connectionPoolId = - new ConnectionPoolId(this.ugi, this.namenodeAddress); + new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol); // Set configuration parameters for the pool this.minSize = minPoolSize; @@ -287,7 +293,8 @@ public class ConnectionPool { * @throws IOException */ public ConnectionContext newConnection() throws IOException { - return newConnection(this.conf, this.namenodeAddress, this.ugi); + return newConnection( + this.conf, this.namenodeAddress, this.ugi, this.protocol); } /** @@ -299,12 +306,46 @@ public class ConnectionPool { * @param conf Configuration for the connection. * @param nnAddress Address of server supporting the ClientProtocol. * @param ugi User context. - * @return Proxy for the target ClientProtocol that contains the user's + * @param proto Interface of the protocol. + * @return proto for the target ClientProtocol that contains the user's * security context. * @throws IOException If it cannot be created. */ protected static ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi) + String nnAddress, UserGroupInformation ugi, Class proto) + throws IOException { + ConnectionContext ret; + if (proto == ClientProtocol.class) { + ret = newClientConnection(conf, nnAddress, ugi); + } else if (proto == NamenodeProtocol.class) { + ret = newNamenodeConnection(conf, nnAddress, ugi); + } else { + String msg = "Unsupported protocol for connection to NameNode: " + + ((proto != null) ? proto.getClass().getName() : "null"); + LOG.error(msg); + throw new IllegalStateException(msg); + } + return ret; + } + + /** + * Creates a proxy wrapper for a client NN connection. Each proxy contains + * context for a single user/security context. To maximize throughput it is + * recommended to use multiple connection per user+server, allowing multiple + * writes and reads to be dispatched in parallel. + * + * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the + * connection identifier. + * + * @param conf Configuration for the connection. + * @param nnAddress Address of server supporting the ClientProtocol. + * @param ugi User context. + * @return Proxy for the target ClientProtocol that contains the user's + * security context. + * @throws IOException If it cannot be created. + */ + private static ConnectionContext newClientConnection( + Configuration conf, String nnAddress, UserGroupInformation ugi) throws IOException { RPC.setProtocolEngine( conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); @@ -334,4 +375,49 @@ public class ConnectionPool { ConnectionContext connection = new ConnectionContext(clientProxy); return connection; } + + /** + * Creates a proxy wrapper for a NN connection. Each proxy contains context + * for a single user/security context. To maximize throughput it is + * recommended to use multiple connection per user+server, allowing multiple + * writes and reads to be dispatched in parallel. + * + * @param conf Configuration for the connection. + * @param nnAddress Address of server supporting the ClientProtocol. + * @param ugi User context. + * @return Proxy for the target NamenodeProtocol that contains the user's + * security context. + * @throws IOException If it cannot be created. + */ + private static ConnectionContext newNamenodeConnection( + Configuration conf, String nnAddress, UserGroupInformation ugi) + throws IOException { + RPC.setProtocolEngine( + conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class); + + final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( + conf, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, + HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, + HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, + HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); + + SocketFactory factory = SocketFactory.getDefault(); + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } + InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); + final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class); + NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class, + version, socket, ugi, conf, + factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy); + Text dtService = SecurityUtil.buildTokenService(socket); + + ProxyAndInfo clientProxy = + new ProxyAndInfo(client, dtService, socket); + ConnectionContext connection = new ConnectionContext(clientProxy); + return connection; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java index 6e1ee9a5c4d..458fec203f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java @@ -42,16 +42,21 @@ public class ConnectionPoolId implements Comparable { private final String nnId; /** Information about the user. */ private final UserGroupInformation ugi; + /** Protocol for the connection. */ + private final Class protocol; /** * New connection pool identifier. * * @param ugi Information of the user issuing the request. * @param nnId Namenode address with port. + * @param proto Protocol of the connection. */ - public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { + public ConnectionPoolId(final UserGroupInformation ugi, final String nnId, + final Class proto) { this.nnId = nnId; this.ugi = ugi; + this.protocol = proto; } @Override @@ -60,6 +65,7 @@ public class ConnectionPoolId implements Comparable { .append(this.nnId) .append(this.ugi.toString()) .append(this.getTokenIds()) + .append(this.protocol) .toHashCode(); return hash; } @@ -76,14 +82,18 @@ public class ConnectionPoolId implements Comparable { } String thisTokens = this.getTokenIds().toString(); String otherTokens = other.getTokenIds().toString(); - return thisTokens.equals(otherTokens); + if (!thisTokens.equals(otherTokens)) { + return false; + } + return this.protocol.equals(other.protocol); } return false; } @Override public String toString() { - return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; + return this.ugi + " " + this.getTokenIds() + "->" + this.nnId + " [" + + this.protocol.getSimpleName() + "]"; } @Override @@ -97,6 +107,9 @@ public class ConnectionPoolId implements Comparable { String otherTokens = other.getTokenIds().toString(); ret = thisTokens.compareTo(otherTokens); } + if (ret == 0) { + ret = this.protocol.toString().compareTo(other.protocol.toString()); + } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java index 7978105584e..6ff2b01b0b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java @@ -38,22 +38,35 @@ public class RemoteMethod { private final Object[] params; /** List of method parameters types, matches parameters. */ private final Class[] types; + /** Class of the protocol for the method. */ + private final Class protocol; /** String name of the ClientProtocol method. */ private final String methodName; /** - * Create a method with no parameters. + * Create a remote method generator for the ClientProtocol with no parameters. * - * @param method The string name of the ClientProtocol method. + * @param method The string name of the protocol method. */ public RemoteMethod(String method) { - this.params = null; - this.types = null; - this.methodName = method; + this(ClientProtocol.class, method); } /** - * Creates a remote method generator. + * Create a method with no parameters. + * + * @param proto Protocol of the method. + * @param method The string name of the ClientProtocol method. + */ + public RemoteMethod(Class proto, String method) { + this.params = null; + this.types = null; + this.methodName = method; + this.protocol = proto; + } + + /** + * Create a remote method generator for the ClientProtocol. * * @param method The string name of the ClientProtocol method. * @param pTypes A list of types to use to locate the specific method. @@ -70,16 +83,49 @@ public class RemoteMethod { */ public RemoteMethod(String method, Class[] pTypes, Object... pParams) throws IOException { + this(ClientProtocol.class, method, pTypes, pParams); + } + + /** + * Creates a remote method generator. + * + * @param proto Protocol of the method. + * @param method The string name of the ClientProtocol method. + * @param pTypes A list of types to use to locate the specific method. + * @param pParams A list of parameters for the method. The order of the + * parameter list must match the order and number of the types. + * Parameters are grouped into 2 categories: + *
    + *
  • Static parameters that are immutable across locations. + *
  • Dynamic parameters that are determined for each location by a + * RemoteParam object. To specify a dynamic parameter, pass an + * instance of RemoteParam in place of the parameter value. + *
+ * @throws IOException If the types and parameter lists are not valid. + */ + public RemoteMethod(Class proto, String method, Class[] pTypes, + Object... pParams) throws IOException { if (pParams.length != pTypes.length) { throw new IOException("Invalid parameters for method " + method); } + this.protocol = proto; this.params = pParams; this.types = Arrays.copyOf(pTypes, pTypes.length); this.methodName = method; } + /** + * Get the interface/protocol for this method. For example, ClientProtocol or + * NamenodeProtocol. + * + * @return Protocol for this method. + */ + public Class getProtocol() { + return this.protocol; + } + /** * Get the represented java method. * @@ -89,18 +135,18 @@ public class RemoteMethod { public Method getMethod() throws IOException { try { if (types != null) { - return ClientProtocol.class.getDeclaredMethod(methodName, types); + return protocol.getDeclaredMethod(methodName, types); } else { - return ClientProtocol.class.getDeclaredMethod(methodName); + return protocol.getDeclaredMethod(methodName); } } catch (NoSuchMethodException e) { // Re-throw as an IOException - LOG.error("Cannot get method {} with types {}", - methodName, Arrays.toString(types), e); + LOG.error("Cannot get method {} with types {} from {}", + methodName, Arrays.toString(types), protocol.getSimpleName(), e); throw new IOException(e); } catch (SecurityException e) { - LOG.error("Cannot access method {} with types {}", - methodName, Arrays.toString(types), e); + LOG.error("Cannot access method {} with types {} from {}", + methodName, Arrays.toString(types), protocol.getSimpleName(), e); throw new IOException(e); } } @@ -161,4 +207,10 @@ public class RemoteMethod { } return objList; } + + @Override + public String toString() { + return this.protocol.getSimpleName() + "#" + this.methodName + " " + + Arrays.toString(this.params); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java new file mode 100644 index 00000000000..85e92d3caf3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; + +/** + * Module that implements all the RPC calls in {@link NamenodeProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterNamenodeProtocol implements NamenodeProtocol { + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to map global name space to HDFS subcluster name spaces. */ + private final FileSubclusterResolver subclusterResolver; + + + public RouterNamenodeProtocol(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.subclusterResolver = this.rpcServer.getSubclusterResolver(); + } + + @Override + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + // Get the namespace where the datanode is located + Map map = + rpcServer.getDatanodeStorageReportMap(DatanodeReportType.ALL); + String nsId = null; + for (Entry entry : map.entrySet()) { + DatanodeStorageReport[] dns = entry.getValue(); + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) { + nsId = entry.getKey(); + break; + } + } + // Break the loop if already found + if (nsId != null) { + break; + } + } + + // Forward to the proper namenode + if (nsId != null) { + RemoteMethod method = new RemoteMethod( + NamenodeProtocol.class, "getBlocks", + new Class[] {DatanodeInfo.class, long.class}, + datanode, size); + return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class); + } + return null; + } + + @Override + public ExportedBlockKeys getBlockKeys() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + // We return the information from the default name space + String defaultNsId = subclusterResolver.getDefaultNamespace(); + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getBlockKeys"); + return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class); + } + + @Override + public long getTransactionID() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + // We return the information from the default name space + String defaultNsId = subclusterResolver.getDefaultNamespace(); + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getTransactionID"); + return rpcClient.invokeSingle(defaultNsId, method, long.class); + } + + @Override + public long getMostRecentCheckpointTxId() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + // We return the information from the default name space + String defaultNsId = subclusterResolver.getDefaultNamespace(); + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId"); + return rpcClient.invokeSingle(defaultNsId, method, long.class); + } + + @Override + public CheckpointSignature rollEditLog() throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE, false); + return null; + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + rpcServer.checkOperation(OperationCategory.READ); + + // We return the information from the default name space + String defaultNsId = subclusterResolver.getDefaultNamespace(); + RemoteMethod method = + new RemoteMethod(NamenodeProtocol.class, "versionRequest"); + return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class); + } + + @Override + public void errorReport(NamenodeRegistration registration, int errorCode, + String msg) throws IOException { + rpcServer.checkOperation(OperationCategory.UNCHECKED, false); + } + + @Override + public NamenodeRegistration registerSubordinateNamenode( + NamenodeRegistration registration) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE, false); + return null; + } + + @Override + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE, false); + return null; + } + + @Override + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE, false); + } + + @Override + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + rpcServer.checkOperation(OperationCategory.READ, false); + return null; + } + + @Override + public boolean isUpgradeFinalized() throws IOException { + rpcServer.checkOperation(OperationCategory.READ, false); + return false; + } + + @Override + public boolean isRollingUpgrade() throws IOException { + rpcServer.checkOperation(OperationCategory.READ, false); + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index b23a931d8a3..ed2e4b83063 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -48,7 +48,6 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; @@ -217,14 +216,14 @@ public class RouterRpcClient { * * @param ugi User group information. * @param nsId Nameservice identifier. - * @param rpcAddress ClientProtocol RPC server address of the NN. + * @param rpcAddress RPC server address of the NN. + * @param proto Protocol of the connection. * @return ConnectionContext containing a ClientProtocol proxy client for the * NN + current user. * @throws IOException If we cannot get a connection to the NameNode. */ - private ConnectionContext getConnection( - UserGroupInformation ugi, String nsId, String rpcAddress) - throws IOException { + private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, + String rpcAddress, Class proto) throws IOException { ConnectionContext connection = null; try { // Each proxy holds the UGI info for the current user when it is created. @@ -234,7 +233,7 @@ public class RouterRpcClient { // for each individual request. // TODO Add tokens from the federated UGI - connection = this.connectionManager.getConnection(ugi, rpcAddress); + connection = this.connectionManager.getConnection(ugi, rpcAddress, proto); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { @@ -318,7 +317,8 @@ public class RouterRpcClient { private Object invokeMethod( final UserGroupInformation ugi, final List namenodes, - final Method method, final Object... params) throws IOException { + final Class protocol, final Method method, final Object... params) + throws IOException { if (namenodes == null || namenodes.isEmpty()) { throw new IOException("No namenodes to invoke " + method.getName() + @@ -336,9 +336,10 @@ public class RouterRpcClient { try { String nsId = namenode.getNameserviceId(); String rpcAddress = namenode.getRpcAddress(); - connection = this.getConnection(ugi, nsId, rpcAddress); - ProxyAndInfo client = connection.getClient(); - ClientProtocol proxy = client.getProxy(); + connection = this.getConnection(ugi, nsId, rpcAddress, protocol); + ProxyAndInfo client = connection.getClient(); + final Object proxy = client.getProxy(); + ret = invoke(nsId, 0, method, proxy, params); if (failover) { // Success on alternate server, update @@ -603,7 +604,29 @@ public class RouterRpcClient { List nns = getNamenodesForNameservice(nsId); RemoteLocationContext loc = new RemoteLocation(nsId, "/"); - return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); + Class proto = method.getProtocol(); + Method m = method.getMethod(); + Object[] params = method.getParams(loc); + return invokeMethod(ugi, nns, proto, m, params); + } + + /** + * Invokes a remote method against the specified namespace. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param nsId Target namespace for the method. + * @param method The remote method and parameters to invoke. + * @param clazz Class for the return type. + * @return The result of invoking the method. + * @throws IOException If the invoke generated an error. + */ + public T invokeSingle(final String nsId, RemoteMethod method, + Class clazz) throws IOException { + @SuppressWarnings("unchecked") + T ret = (T)invokeSingle(nsId, method); + return ret; } /** @@ -681,8 +704,9 @@ public class RouterRpcClient { List namenodes = getNamenodesForNameservice(ns); try { + Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); - Object result = invokeMethod(ugi, namenodes, m, params); + Object result = invokeMethod(ugi, namenodes, proto, m, params); // Check if the result is what we expected if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { @@ -906,8 +930,9 @@ public class RouterRpcClient { String ns = location.getNameserviceId(); final List namenodes = getNamenodesForNameservice(ns); + Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); - Object result = invokeMethod(ugi, namenodes, m, paramList); + Object result = invokeMethod(ugi, namenodes, proto, m, paramList); return Collections.singletonMap(location, clazz.cast(result)); } @@ -917,6 +942,7 @@ public class RouterRpcClient { String nsId = location.getNameserviceId(); final List namenodes = getNamenodesForNameservice(nsId); + final Class proto = method.getProtocol(); final Object[] paramList = method.getParams(location); if (standby) { // Call the objectGetter to all NNs (including standby) @@ -931,7 +957,7 @@ public class RouterRpcClient { orderedLocations.add(nnLocation); callables.add(new Callable() { public Object call() throws Exception { - return invokeMethod(ugi, nnList, m, paramList); + return invokeMethod(ugi, nnList, proto, m, paramList); } }); } @@ -940,7 +966,7 @@ public class RouterRpcClient { orderedLocations.add(location); callables.add(new Callable() { public Object call() throws Exception { - return invokeMethod(ugi, namenodes, m, paramList); + return invokeMethod(ugi, namenodes, proto, m, paramList); } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 9691eaa69c1..bbae3ba30c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -90,9 +90,13 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; @@ -102,11 +106,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -134,7 +145,8 @@ import com.google.protobuf.BlockingService; * the requests to the active * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. */ -public class RouterRpcServer extends AbstractService implements ClientProtocol { +public class RouterRpcServer extends AbstractService + implements ClientProtocol, NamenodeProtocol { private static final Logger LOG = LoggerFactory.getLogger(RouterRpcServer.class); @@ -177,6 +189,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** Router Quota calls. */ private final Quota quotaCall; + /** NamenodeProtocol calls. */ + private final RouterNamenodeProtocol nnProto; /** * Construct a router RPC server. @@ -228,6 +242,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { BlockingService clientNNPbService = ClientNamenodeProtocol .newReflectiveBlockingService(clientProtocolServerTranslator); + NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = + new NamenodeProtocolServerSideTranslatorPB(this); + BlockingService nnPbService = NamenodeProtocolService + .newReflectiveBlockingService(namenodeProtocolXlator); + InetSocketAddress confRpcAddress = conf.getSocketAddr( RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, @@ -246,6 +265,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) .build(); + + // Add all the RPC protocols that the Router implements + DFSUtil.addPBProtocol( + conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); + // We don't want the server to log the full stack trace for some exceptions this.rpcServer.addTerseExceptions( RemoteException.class, @@ -276,6 +300,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { // Initialize modules this.quotaCall = new Quota(this.router, this); + this.nnProto = new RouterNamenodeProtocol(this); } @Override @@ -320,6 +345,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return rpcClient; } + /** + * Get the subcluster resolver. + * + * @return Subcluster resolver. + */ + public FileSubclusterResolver getSubclusterResolver() { + return subclusterResolver; + } + /** * Get the RPC monitor and metrics. * @@ -1319,7 +1353,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { action, isChecked); Set nss = namenodeResolver.getNamespaces(); Map results = - rpcClient.invokeConcurrent(nss, method, true, true, boolean.class); + rpcClient.invokeConcurrent(nss, method, true, true, Boolean.class); // We only report true if all the name space are in safe mode int numSafemode = 0; @@ -1339,7 +1373,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { new Class[] {String.class}, arg); final Set nss = namenodeResolver.getNamespaces(); Map ret = - rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); + rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class); boolean success = true; for (boolean s : ret.values()) { @@ -1938,6 +1972,77 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return null; } + @Override // NamenodeProtocol + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + return nnProto.getBlocks(datanode, size); + } + + @Override // NamenodeProtocol + public ExportedBlockKeys getBlockKeys() throws IOException { + return nnProto.getBlockKeys(); + } + + @Override // NamenodeProtocol + public long getTransactionID() throws IOException { + return nnProto.getTransactionID(); + } + + @Override // NamenodeProtocol + public long getMostRecentCheckpointTxId() throws IOException { + return nnProto.getMostRecentCheckpointTxId(); + } + + @Override // NamenodeProtocol + public CheckpointSignature rollEditLog() throws IOException { + return nnProto.rollEditLog(); + } + + @Override // NamenodeProtocol + public NamespaceInfo versionRequest() throws IOException { + return nnProto.versionRequest(); + } + + @Override // NamenodeProtocol + public void errorReport(NamenodeRegistration registration, int errorCode, + String msg) throws IOException { + nnProto.errorReport(registration, errorCode, msg); + } + + @Override // NamenodeProtocol + public NamenodeRegistration registerSubordinateNamenode( + NamenodeRegistration registration) throws IOException { + return nnProto.registerSubordinateNamenode(registration); + } + + @Override // NamenodeProtocol + public NamenodeCommand startCheckpoint(NamenodeRegistration registration) + throws IOException { + return nnProto.startCheckpoint(registration); + } + + @Override // NamenodeProtocol + public void endCheckpoint(NamenodeRegistration registration, + CheckpointSignature sig) throws IOException { + nnProto.endCheckpoint(registration, sig); + } + + @Override // NamenodeProtocol + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + return nnProto.getEditLogManifest(sinceTxId); + } + + @Override // NamenodeProtocol + public boolean isUpgradeFinalized() throws IOException { + return nnProto.isUpgradeFinalized(); + } + + @Override // NamenodeProtocol + public boolean isRollingUpgrade() throws IOException { + return nnProto.isRollingUpgrade(); + } + /** * Locate the location with the matching block pool id. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index c49f90a497e..0ad8536587b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -239,6 +239,10 @@ public class MiniRouterDFSCluster { } return client; } + + public Configuration getConf() { + return conf; + } } /** @@ -351,6 +355,10 @@ public class MiniRouterDFSCluster { } return suffix; } + + public Configuration getConf() { + return conf; + } } public MiniRouterDFSCluster( diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 2e4b80de10b..a7316481d7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; @@ -68,14 +70,18 @@ public class TestConnectionManager { Map poolMap = connManager.getPools(); ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); addConnectionsToPool(pool1, 9, 4); - poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1); + poolMap.put( + new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), + pool1); ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10); + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class); addConnectionsToPool(pool2, 10, 10); - poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2); + poolMap.put( + new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), + pool2); checkPoolConnections(TEST_USER1, 9, 4); checkPoolConnections(TEST_USER2, 10, 10); @@ -94,9 +100,11 @@ public class TestConnectionManager { // Make sure the number of connections doesn't go below minSize ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10); + conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class); addConnectionsToPool(pool3, 8, 0); - poolMap.put(new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS), pool3); + poolMap.put( + new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), + pool3); checkPoolConnections(TEST_USER3, 10, 0); for (int i = 0; i < 10; i++) { connManager.cleanup(pool3); @@ -119,9 +127,41 @@ public class TestConnectionManager { int activeConns = 5; ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class); addConnectionsToPool(pool, totalConns, activeConns); - poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool); + poolMap.put( + new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), + pool); + + // All remaining connections should be usable + final int remainingSlots = totalConns - activeConns; + for (int i = 0; i < remainingSlots; i++) { + ConnectionContext cc = pool.getConnection(); + assertTrue(cc.isUsable()); + cc.getClient(); + activeConns++; + } + + checkPoolConnections(TEST_USER1, totalConns, activeConns); + + // Ask for more and this returns an active connection + ConnectionContext cc = pool.getConnection(); + assertTrue(cc.isActive()); + } + + @Test + public void getGetConnectionNamenodeProtocol() throws Exception { + Map poolMap = connManager.getPools(); + final int totalConns = 10; + int activeConns = 5; + + ConnectionPool pool = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class); + addConnectionsToPool(pool, totalConns, activeConns); + poolMap.put( + new ConnectionPoolId( + TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class), + pool); // All remaining connections should be usable final int remainingSlots = totalConns - activeConns; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index a48475134f2..1c9b1b46862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -54,22 +54,29 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; @@ -83,7 +90,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Supplier; -import com.google.common.collect.Maps; /** * The the RPC interface of the {@link Router} implemented by @@ -110,6 +116,11 @@ public class TestRouterRpc { /** Client interface to the Namenode. */ private ClientProtocol nnProtocol; + /** NameNodeProtocol interface to the Router. */ + private NamenodeProtocol routerNamenodeProtocol; + /** NameNodeProtocol interface to the Namenode. */ + private NamenodeProtocol nnNamenodeProtocol; + /** Filesystem interface to the Router. */ private FileSystem routerFS; /** Filesystem interface to the Namenode. */ @@ -166,22 +177,18 @@ public class TestRouterRpc { // Wait to ensure NN has fully created its test directories Thread.sleep(100); - // Default namenode and random router for this test - this.router = cluster.getRandomRouter(); - this.ns = cluster.getNameservices().get(0); - this.namenode = cluster.getNamenode(ns, null); + // Random router for this test + RouterContext rndRouter = cluster.getRandomRouter(); + this.setRouter(rndRouter); - // Handles to the ClientProtocol interface - this.routerProtocol = router.getClient().getNamenode(); - this.nnProtocol = namenode.getClient().getNamenode(); - - // Handles to the filesystem client - this.nnFS = namenode.getFileSystem(); - this.routerFS = router.getFileSystem(); + // Pick a namenode for this test + String ns0 = cluster.getNameservices().get(0); + this.setNs(ns0); + this.setNamenode(cluster.getNamenode(ns0, null)); // Create a test file on the NN - Random r = new Random(); - String randomFile = "testfile-" + r.nextInt(); + Random rnd = new Random(); + String randomFile = "testfile-" + rnd.nextInt(); this.nnFile = cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; this.routerFile = @@ -222,6 +229,8 @@ public class TestRouterRpc { this.router = r; this.routerProtocol = r.getClient().getNamenode(); this.routerFS = r.getFileSystem(); + this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(), + router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); } protected FileSystem getRouterFileSystem() { @@ -265,6 +274,12 @@ public class TestRouterRpc { this.namenode = nn; this.nnProtocol = nn.getClient().getNamenode(); this.nnFS = nn.getFileSystem(); + + // Namenode from the default namespace + String ns0 = cluster.getNameservices().get(0); + NamenodeContext nn0 = cluster.getNamenode(ns0, null); + this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(), + nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); } protected String getNs() { @@ -934,4 +949,76 @@ public class TestRouterRpc { // The cache should be updated now assertNotEquals(jsonString0, metrics.getLiveNodes()); } + + @Test + public void testProxyVersionRequest() throws Exception { + NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest(); + NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest(); + assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID()); + assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID()); + assertEquals(nnVersion.getClusterID(), rVersion.getClusterID()); + assertEquals(nnVersion.getLayoutVersion(), rVersion.getLayoutVersion()); + assertEquals(nnVersion.getCTime(), rVersion.getCTime()); + } + + @Test + public void testProxyGetBlockKeys() throws Exception { + ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys(); + ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys(); + assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey()); + assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval()); + assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime()); + } + + @Test + public void testProxyGetBlocks() throws Exception { + // Get datanodes + DatanodeInfo[] dns = routerProtocol + .getDatanodeReport(DatanodeReportType.ALL); + DatanodeInfo dn0 = dns[0]; + + // Verify that checking that datanode works + BlocksWithLocations routerBlockLocations = + routerNamenodeProtocol.getBlocks(dn0, 1024); + BlocksWithLocations nnBlockLocations = + nnNamenodeProtocol.getBlocks(dn0, 1024); + BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks(); + BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks(); + assertEquals(nnBlocks.length, routerBlocks.length); + for (int i = 0; i < routerBlocks.length; i++) { + assertEquals(nnBlocks[i].getBlock().getBlockId(), + routerBlocks[i].getBlock().getBlockId()); + } + } + + @Test + public void testProxyGetTransactionID() throws IOException { + long routerTransactionID = routerNamenodeProtocol.getTransactionID(); + long nnTransactionID = nnNamenodeProtocol.getTransactionID(); + assertEquals(nnTransactionID, routerTransactionID); + } + + @Test + public void testProxyGetMostRecentCheckpointTxId() throws IOException { + long routerCheckPointId = + routerNamenodeProtocol.getMostRecentCheckpointTxId(); + long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId(); + assertEquals(nnCheckPointId, routerCheckPointId); + } + + @Test + public void testProxySetSafemode() throws Exception { + boolean routerSafemode = + routerProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false); + boolean nnSafemode = + nnProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false); + assertEquals(nnSafemode, routerSafemode); + } + + @Test + public void testProxyRestoreFailedStorage() throws Exception { + boolean routerSuccess = routerProtocol.restoreFailedStorage("check"); + boolean nnSuccess = nnProtocol.restoreFailedStorage("check"); + assertEquals(nnSuccess, routerSuccess); + } } \ No newline at end of file