HDFS-13364. RBF: Support NamenodeProtocol in the Router. Contributed by Inigo Goiri.

This commit is contained in:
Inigo Goiri 2018-04-03 09:29:20 -07:00
parent 117a8d6573
commit 254ec1e4d3
12 changed files with 701 additions and 72 deletions

View File

@ -31,8 +31,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;

View File

@ -17,8 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import java.net.InetSocketAddress;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
/** /**
@ -26,18 +27,24 @@ import org.apache.hadoop.ipc.RPC;
* a connection, it increments a counter to mark it as active. Once the client * a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of * is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active. * closing the connection once is not active.
*
* The protocols currently used are:
* <ul>
* <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
* <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
* </ul>
*/ */
public class ConnectionContext { public class ConnectionContext {
/** Client for the connection. */ /** Client for the connection. */
private final ProxyAndInfo<ClientProtocol> client; private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */ /** How many threads are using this connection. */
private int numThreads = 0; private int numThreads = 0;
/** If the connection is closed. */ /** If the connection is closed. */
private boolean closed = false; private boolean closed = false;
public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) { public ConnectionContext(ProxyAndInfo<?> connection) {
this.client = connection; this.client = connection;
} }
@ -74,7 +81,7 @@ public class ConnectionContext {
* *
* @return Connection client. * @return Connection client.
*/ */
public synchronized ProxyAndInfo<ClientProtocol> getClient() { public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++; this.numThreads++;
return this.client; return this.client;
} }
@ -96,9 +103,27 @@ public class ConnectionContext {
public synchronized void close() { public synchronized void close() {
this.closed = true; this.closed = true;
if (this.numThreads == 0) { if (this.numThreads == 0) {
ClientProtocol proxy = this.client.getProxy(); Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away // Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy); RPC.stopProxy(proxy);
} }
} }
@Override
public String toString() {
InetSocketAddress addr = this.client.getAddress();
Object proxy = this.client.getProxy();
Class<?> clazz = proxy.getClass();
StringBuilder sb = new StringBuilder();
sb.append(clazz.getSimpleName());
sb.append("@");
sb.append(addr);
sb.append("x");
sb.append(numThreads);
if (closed) {
sb.append("[CLOSED]");
}
return sb.toString();
}
} }

View File

@ -166,11 +166,12 @@ public class ConnectionManager {
* *
* @param ugi User group information. * @param ugi User group information.
* @param nnAddress Namenode address for the connection. * @param nnAddress Namenode address for the connection.
* @param protocol Protocol for the connection.
* @return Proxy client to connect to nnId as UGI. * @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained. * @throws IOException If the connection cannot be obtained.
*/ */
public ConnectionContext getConnection( public ConnectionContext getConnection(UserGroupInformation ugi,
UserGroupInformation ugi, String nnAddress) throws IOException { String nnAddress, Class<?> protocol) throws IOException {
// Check if the manager is shutdown // Check if the manager is shutdown
if (!this.running) { if (!this.running) {
@ -181,7 +182,8 @@ public class ConnectionManager {
} }
// Try to get the pool if created // Try to get the pool if created
ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); ConnectionPoolId connectionId =
new ConnectionPoolId(ugi, nnAddress, protocol);
ConnectionPool pool = null; ConnectionPool pool = null;
readLock.lock(); readLock.lock();
try { try {
@ -197,7 +199,7 @@ public class ConnectionManager {
pool = this.pools.get(connectionId); pool = this.pools.get(connectionId);
if (pool == null) { if (pool == null) {
pool = new ConnectionPool( pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize); this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol);
this.pools.put(connectionId, pool); this.pools.put(connectionId, pool);
} }
} finally { } finally {

View File

@ -38,6 +38,9 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.io.retry.RetryUtils;
@ -75,6 +78,8 @@ public class ConnectionPool {
private final String namenodeAddress; private final String namenodeAddress;
/** User for this connections. */ /** User for this connections. */
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
/** Class of the protocol. */
private final Class<?> protocol;
/** Pool of connections. We mimic a COW array. */ /** Pool of connections. We mimic a COW array. */
private volatile List<ConnectionContext> connections = new ArrayList<>(); private volatile List<ConnectionContext> connections = new ArrayList<>();
@ -91,16 +96,17 @@ public class ConnectionPool {
protected ConnectionPool(Configuration config, String address, protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize) UserGroupInformation user, int minPoolSize, int maxPoolSize,
throws IOException { Class<?> proto) throws IOException {
this.conf = config; this.conf = config;
// Connection pool target // Connection pool target
this.ugi = user; this.ugi = user;
this.namenodeAddress = address; this.namenodeAddress = address;
this.protocol = proto;
this.connectionPoolId = this.connectionPoolId =
new ConnectionPoolId(this.ugi, this.namenodeAddress); new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol);
// Set configuration parameters for the pool // Set configuration parameters for the pool
this.minSize = minPoolSize; this.minSize = minPoolSize;
@ -287,7 +293,8 @@ public class ConnectionPool {
* @throws IOException * @throws IOException
*/ */
public ConnectionContext newConnection() throws IOException { public ConnectionContext newConnection() throws IOException {
return newConnection(this.conf, this.namenodeAddress, this.ugi); return newConnection(
this.conf, this.namenodeAddress, this.ugi, this.protocol);
} }
/** /**
@ -299,12 +306,46 @@ public class ConnectionPool {
* @param conf Configuration for the connection. * @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol. * @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context. * @param ugi User context.
* @return Proxy for the target ClientProtocol that contains the user's * @param proto Interface of the protocol.
* @return proto for the target ClientProtocol that contains the user's
* security context. * security context.
* @throws IOException If it cannot be created. * @throws IOException If it cannot be created.
*/ */
protected static ConnectionContext newConnection(Configuration conf, protected static ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi) String nnAddress, UserGroupInformation ugi, Class<?> proto)
throws IOException {
ConnectionContext ret;
if (proto == ClientProtocol.class) {
ret = newClientConnection(conf, nnAddress, ugi);
} else if (proto == NamenodeProtocol.class) {
ret = newNamenodeConnection(conf, nnAddress, ugi);
} else {
String msg = "Unsupported protocol for connection to NameNode: " +
((proto != null) ? proto.getClass().getName() : "null");
LOG.error(msg);
throw new IllegalStateException(msg);
}
return ret;
}
/**
* Creates a proxy wrapper for a client NN connection. Each proxy contains
* context for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
*
* Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
* connection identifier.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
private static ConnectionContext newClientConnection(
Configuration conf, String nnAddress, UserGroupInformation ugi)
throws IOException { throws IOException {
RPC.setProtocolEngine( RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
@ -334,4 +375,49 @@ public class ConnectionPool {
ConnectionContext connection = new ConnectionContext(clientProxy); ConnectionContext connection = new ConnectionContext(clientProxy);
return connection; return connection;
} }
/**
* Creates a proxy wrapper for a NN connection. Each proxy contains context
* for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target NamenodeProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
private static ConnectionContext newNamenodeConnection(
Configuration conf, String nnAddress, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
SocketFactory factory = SocketFactory.getDefault();
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
version, socket, ugi, conf,
factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
Text dtService = SecurityUtil.buildTokenService(socket);
ProxyAndInfo<NamenodeProtocol> clientProxy =
new ProxyAndInfo<NamenodeProtocol>(client, dtService, socket);
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
} }

View File

@ -42,16 +42,21 @@ public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
private final String nnId; private final String nnId;
/** Information about the user. */ /** Information about the user. */
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
/** Protocol for the connection. */
private final Class<?> protocol;
/** /**
* New connection pool identifier. * New connection pool identifier.
* *
* @param ugi Information of the user issuing the request. * @param ugi Information of the user issuing the request.
* @param nnId Namenode address with port. * @param nnId Namenode address with port.
* @param proto Protocol of the connection.
*/ */
public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { public ConnectionPoolId(final UserGroupInformation ugi, final String nnId,
final Class<?> proto) {
this.nnId = nnId; this.nnId = nnId;
this.ugi = ugi; this.ugi = ugi;
this.protocol = proto;
} }
@Override @Override
@ -60,6 +65,7 @@ public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
.append(this.nnId) .append(this.nnId)
.append(this.ugi.toString()) .append(this.ugi.toString())
.append(this.getTokenIds()) .append(this.getTokenIds())
.append(this.protocol)
.toHashCode(); .toHashCode();
return hash; return hash;
} }
@ -76,14 +82,18 @@ public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
} }
String thisTokens = this.getTokenIds().toString(); String thisTokens = this.getTokenIds().toString();
String otherTokens = other.getTokenIds().toString(); String otherTokens = other.getTokenIds().toString();
return thisTokens.equals(otherTokens); if (!thisTokens.equals(otherTokens)) {
return false;
}
return this.protocol.equals(other.protocol);
} }
return false; return false;
} }
@Override @Override
public String toString() { public String toString() {
return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; return this.ugi + " " + this.getTokenIds() + "->" + this.nnId + " [" +
this.protocol.getSimpleName() + "]";
} }
@Override @Override
@ -97,6 +107,9 @@ public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
String otherTokens = other.getTokenIds().toString(); String otherTokens = other.getTokenIds().toString();
ret = thisTokens.compareTo(otherTokens); ret = thisTokens.compareTo(otherTokens);
} }
if (ret == 0) {
ret = this.protocol.toString().compareTo(other.protocol.toString());
}
return ret; return ret;
} }

View File

@ -38,22 +38,35 @@ public class RemoteMethod {
private final Object[] params; private final Object[] params;
/** List of method parameters types, matches parameters. */ /** List of method parameters types, matches parameters. */
private final Class<?>[] types; private final Class<?>[] types;
/** Class of the protocol for the method. */
private final Class<?> protocol;
/** String name of the ClientProtocol method. */ /** String name of the ClientProtocol method. */
private final String methodName; private final String methodName;
/** /**
* Create a method with no parameters. * Create a remote method generator for the ClientProtocol with no parameters.
* *
* @param method The string name of the ClientProtocol method. * @param method The string name of the protocol method.
*/ */
public RemoteMethod(String method) { public RemoteMethod(String method) {
this.params = null; this(ClientProtocol.class, method);
this.types = null;
this.methodName = method;
} }
/** /**
* Creates a remote method generator. * Create a method with no parameters.
*
* @param proto Protocol of the method.
* @param method The string name of the ClientProtocol method.
*/
public RemoteMethod(Class<?> proto, String method) {
this.params = null;
this.types = null;
this.methodName = method;
this.protocol = proto;
}
/**
* Create a remote method generator for the ClientProtocol.
* *
* @param method The string name of the ClientProtocol method. * @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method. * @param pTypes A list of types to use to locate the specific method.
@ -70,16 +83,49 @@ public class RemoteMethod {
*/ */
public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams) public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
throws IOException { throws IOException {
this(ClientProtocol.class, method, pTypes, pParams);
}
/**
* Creates a remote method generator.
*
* @param proto Protocol of the method.
* @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method.
* @param pParams A list of parameters for the method. The order of the
* parameter list must match the order and number of the types.
* Parameters are grouped into 2 categories:
* <ul>
* <li>Static parameters that are immutable across locations.
* <li>Dynamic parameters that are determined for each location by a
* RemoteParam object. To specify a dynamic parameter, pass an
* instance of RemoteParam in place of the parameter value.
* </ul>
* @throws IOException If the types and parameter lists are not valid.
*/
public RemoteMethod(Class<?> proto, String method, Class<?>[] pTypes,
Object... pParams) throws IOException {
if (pParams.length != pTypes.length) { if (pParams.length != pTypes.length) {
throw new IOException("Invalid parameters for method " + method); throw new IOException("Invalid parameters for method " + method);
} }
this.protocol = proto;
this.params = pParams; this.params = pParams;
this.types = Arrays.copyOf(pTypes, pTypes.length); this.types = Arrays.copyOf(pTypes, pTypes.length);
this.methodName = method; this.methodName = method;
} }
/**
* Get the interface/protocol for this method. For example, ClientProtocol or
* NamenodeProtocol.
*
* @return Protocol for this method.
*/
public Class<?> getProtocol() {
return this.protocol;
}
/** /**
* Get the represented java method. * Get the represented java method.
* *
@ -89,18 +135,18 @@ public class RemoteMethod {
public Method getMethod() throws IOException { public Method getMethod() throws IOException {
try { try {
if (types != null) { if (types != null) {
return ClientProtocol.class.getDeclaredMethod(methodName, types); return protocol.getDeclaredMethod(methodName, types);
} else { } else {
return ClientProtocol.class.getDeclaredMethod(methodName); return protocol.getDeclaredMethod(methodName);
} }
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
// Re-throw as an IOException // Re-throw as an IOException
LOG.error("Cannot get method {} with types {}", LOG.error("Cannot get method {} with types {} from {}",
methodName, Arrays.toString(types), e); methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e); throw new IOException(e);
} catch (SecurityException e) { } catch (SecurityException e) {
LOG.error("Cannot access method {} with types {}", LOG.error("Cannot access method {} with types {} from {}",
methodName, Arrays.toString(types), e); methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e); throw new IOException(e);
} }
} }
@ -161,4 +207,10 @@ public class RemoteMethod {
} }
return objList; return objList;
} }
@Override
public String toString() {
return this.protocol.getSimpleName() + "#" + this.methodName + " " +
Arrays.toString(this.params);
}
} }

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
/**
* Module that implements all the RPC calls in {@link NamenodeProtocol} in the
* {@link RouterRpcServer}.
*/
public class RouterNamenodeProtocol implements NamenodeProtocol {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
public RouterNamenodeProtocol(RouterRpcServer server) {
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
}
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// Get the namespace where the datanode is located
Map<String, DatanodeStorageReport[]> map =
rpcServer.getDatanodeStorageReportMap(DatanodeReportType.ALL);
String nsId = null;
for (Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
DatanodeStorageReport[] dns = entry.getValue();
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
nsId = entry.getKey();
break;
}
}
// Break the loop if already found
if (nsId != null) {
break;
}
}
// Forward to the proper namenode
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks",
new Class<?>[] {DatanodeInfo.class, long.class},
datanode, size);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;
}
@Override
public ExportedBlockKeys getBlockKeys() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class);
}
@Override
public long getTransactionID() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
return rpcClient.invokeSingle(defaultNsId, method, long.class);
}
@Override
public long getMostRecentCheckpointTxId() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
return rpcClient.invokeSingle(defaultNsId, method, long.class);
}
@Override
public CheckpointSignature rollEditLog() throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public NamespaceInfo versionRequest() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "versionRequest");
return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class);
}
@Override
public void errorReport(NamenodeRegistration registration, int errorCode,
String msg) throws IOException {
rpcServer.checkOperation(OperationCategory.UNCHECKED, false);
}
@Override
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
}
@Override
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return null;
}
@Override
public boolean isUpgradeFinalized() throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return false;
}
@Override
public boolean isRollingUpgrade() throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return false;
}
}

View File

@ -48,7 +48,6 @@ import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
@ -217,14 +216,14 @@ public class RouterRpcClient {
* *
* @param ugi User group information. * @param ugi User group information.
* @param nsId Nameservice identifier. * @param nsId Nameservice identifier.
* @param rpcAddress ClientProtocol RPC server address of the NN. * @param rpcAddress RPC server address of the NN.
* @param proto Protocol of the connection.
* @return ConnectionContext containing a ClientProtocol proxy client for the * @return ConnectionContext containing a ClientProtocol proxy client for the
* NN + current user. * NN + current user.
* @throws IOException If we cannot get a connection to the NameNode. * @throws IOException If we cannot get a connection to the NameNode.
*/ */
private ConnectionContext getConnection( private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
UserGroupInformation ugi, String nsId, String rpcAddress) String rpcAddress, Class<?> proto) throws IOException {
throws IOException {
ConnectionContext connection = null; ConnectionContext connection = null;
try { try {
// Each proxy holds the UGI info for the current user when it is created. // Each proxy holds the UGI info for the current user when it is created.
@ -234,7 +233,7 @@ public class RouterRpcClient {
// for each individual request. // for each individual request.
// TODO Add tokens from the federated UGI // TODO Add tokens from the federated UGI
connection = this.connectionManager.getConnection(ugi, rpcAddress); connection = this.connectionManager.getConnection(ugi, rpcAddress, proto);
LOG.debug("User {} NN {} is using connection {}", LOG.debug("User {} NN {} is using connection {}",
ugi.getUserName(), rpcAddress, connection); ugi.getUserName(), rpcAddress, connection);
} catch (Exception ex) { } catch (Exception ex) {
@ -318,7 +317,8 @@ public class RouterRpcClient {
private Object invokeMethod( private Object invokeMethod(
final UserGroupInformation ugi, final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes, final List<? extends FederationNamenodeContext> namenodes,
final Method method, final Object... params) throws IOException { final Class<?> protocol, final Method method, final Object... params)
throws IOException {
if (namenodes == null || namenodes.isEmpty()) { if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() + throw new IOException("No namenodes to invoke " + method.getName() +
@ -336,9 +336,10 @@ public class RouterRpcClient {
try { try {
String nsId = namenode.getNameserviceId(); String nsId = namenode.getNameserviceId();
String rpcAddress = namenode.getRpcAddress(); String rpcAddress = namenode.getRpcAddress();
connection = this.getConnection(ugi, nsId, rpcAddress); connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
ProxyAndInfo<ClientProtocol> client = connection.getClient(); ProxyAndInfo<?> client = connection.getClient();
ClientProtocol proxy = client.getProxy(); final Object proxy = client.getProxy();
ret = invoke(nsId, 0, method, proxy, params); ret = invoke(nsId, 0, method, proxy, params);
if (failover) { if (failover) {
// Success on alternate server, update // Success on alternate server, update
@ -603,7 +604,29 @@ public class RouterRpcClient {
List<? extends FederationNamenodeContext> nns = List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId); getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/"); RemoteLocationContext loc = new RemoteLocation(nsId, "/");
return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
}
/**
* Invokes a remote method against the specified namespace.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param nsId Target namespace for the method.
* @param method The remote method and parameters to invoke.
* @param clazz Class for the return type.
* @return The result of invoking the method.
* @throws IOException If the invoke generated an error.
*/
public <T> T invokeSingle(final String nsId, RemoteMethod method,
Class<T> clazz) throws IOException {
@SuppressWarnings("unchecked")
T ret = (T)invokeSingle(nsId, method);
return ret;
} }
/** /**
@ -681,8 +704,9 @@ public class RouterRpcClient {
List<? extends FederationNamenodeContext> namenodes = List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns); getNamenodesForNameservice(ns);
try { try {
Class<?> proto = remoteMethod.getProtocol();
Object[] params = remoteMethod.getParams(loc); Object[] params = remoteMethod.getParams(loc);
Object result = invokeMethod(ugi, namenodes, m, params); Object result = invokeMethod(ugi, namenodes, proto, m, params);
// Check if the result is what we expected // Check if the result is what we expected
if (isExpectedClass(expectedResultClass, result) && if (isExpectedClass(expectedResultClass, result) &&
isExpectedValue(expectedResultValue, result)) { isExpectedValue(expectedResultValue, result)) {
@ -906,8 +930,9 @@ public class RouterRpcClient {
String ns = location.getNameserviceId(); String ns = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes = final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns); getNamenodesForNameservice(ns);
Class<?> proto = method.getProtocol();
Object[] paramList = method.getParams(location); Object[] paramList = method.getParams(location);
Object result = invokeMethod(ugi, namenodes, m, paramList); Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
return Collections.singletonMap(location, clazz.cast(result)); return Collections.singletonMap(location, clazz.cast(result));
} }
@ -917,6 +942,7 @@ public class RouterRpcClient {
String nsId = location.getNameserviceId(); String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes = final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(nsId); getNamenodesForNameservice(nsId);
final Class<?> proto = method.getProtocol();
final Object[] paramList = method.getParams(location); final Object[] paramList = method.getParams(location);
if (standby) { if (standby) {
// Call the objectGetter to all NNs (including standby) // Call the objectGetter to all NNs (including standby)
@ -931,7 +957,7 @@ public class RouterRpcClient {
orderedLocations.add(nnLocation); orderedLocations.add(nnLocation);
callables.add(new Callable<Object>() { callables.add(new Callable<Object>() {
public Object call() throws Exception { public Object call() throws Exception {
return invokeMethod(ugi, nnList, m, paramList); return invokeMethod(ugi, nnList, proto, m, paramList);
} }
}); });
} }
@ -940,7 +966,7 @@ public class RouterRpcClient {
orderedLocations.add(location); orderedLocations.add(location);
callables.add(new Callable<Object>() { callables.add(new Callable<Object>() {
public Object call() throws Exception { public Object call() throws Exception {
return invokeMethod(ugi, namenodes, m, paramList); return invokeMethod(ugi, namenodes, proto, m, paramList);
} }
}); });
} }

View File

@ -90,9 +90,13 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -102,11 +106,18 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -134,7 +145,8 @@ import com.google.protobuf.BlockingService;
* the requests to the active * the requests to the active
* {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
*/ */
public class RouterRpcServer extends AbstractService implements ClientProtocol { public class RouterRpcServer extends AbstractService
implements ClientProtocol, NamenodeProtocol {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class); LoggerFactory.getLogger(RouterRpcServer.class);
@ -177,6 +189,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** Router Quota calls. */ /** Router Quota calls. */
private final Quota quotaCall; private final Quota quotaCall;
/** NamenodeProtocol calls. */
private final RouterNamenodeProtocol nnProto;
/** /**
* Construct a router RPC server. * Construct a router RPC server.
@ -228,6 +242,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
BlockingService clientNNPbService = ClientNamenodeProtocol BlockingService clientNNPbService = ClientNamenodeProtocol
.newReflectiveBlockingService(clientProtocolServerTranslator); .newReflectiveBlockingService(clientProtocolServerTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService nnPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
InetSocketAddress confRpcAddress = conf.getSocketAddr( InetSocketAddress confRpcAddress = conf.getSocketAddr(
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
@ -246,6 +265,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
.setQueueSizePerHandler(handlerQueueSize) .setQueueSizePerHandler(handlerQueueSize)
.setVerbose(false) .setVerbose(false)
.build(); .build();
// Add all the RPC protocols that the Router implements
DFSUtil.addPBProtocol(
conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
// We don't want the server to log the full stack trace for some exceptions // We don't want the server to log the full stack trace for some exceptions
this.rpcServer.addTerseExceptions( this.rpcServer.addTerseExceptions(
RemoteException.class, RemoteException.class,
@ -276,6 +300,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
// Initialize modules // Initialize modules
this.quotaCall = new Quota(this.router, this); this.quotaCall = new Quota(this.router, this);
this.nnProto = new RouterNamenodeProtocol(this);
} }
@Override @Override
@ -320,6 +345,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
return rpcClient; return rpcClient;
} }
/**
* Get the subcluster resolver.
*
* @return Subcluster resolver.
*/
public FileSubclusterResolver getSubclusterResolver() {
return subclusterResolver;
}
/** /**
* Get the RPC monitor and metrics. * Get the RPC monitor and metrics.
* *
@ -1319,7 +1353,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
action, isChecked); action, isChecked);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Boolean> results = Map<FederationNamespaceInfo, Boolean> results =
rpcClient.invokeConcurrent(nss, method, true, true, boolean.class); rpcClient.invokeConcurrent(nss, method, true, true, Boolean.class);
// We only report true if all the name space are in safe mode // We only report true if all the name space are in safe mode
int numSafemode = 0; int numSafemode = 0;
@ -1339,7 +1373,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
new Class<?>[] {String.class}, arg); new Class<?>[] {String.class}, arg);
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Boolean> ret = Map<FederationNamespaceInfo, Boolean> ret =
rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
boolean success = true; boolean success = true;
for (boolean s : ret.values()) { for (boolean s : ret.values()) {
@ -1938,6 +1972,77 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
return null; return null;
} }
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
return nnProto.getBlocks(datanode, size);
}
@Override // NamenodeProtocol
public ExportedBlockKeys getBlockKeys() throws IOException {
return nnProto.getBlockKeys();
}
@Override // NamenodeProtocol
public long getTransactionID() throws IOException {
return nnProto.getTransactionID();
}
@Override // NamenodeProtocol
public long getMostRecentCheckpointTxId() throws IOException {
return nnProto.getMostRecentCheckpointTxId();
}
@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
return nnProto.rollEditLog();
}
@Override // NamenodeProtocol
public NamespaceInfo versionRequest() throws IOException {
return nnProto.versionRequest();
}
@Override // NamenodeProtocol
public void errorReport(NamenodeRegistration registration, int errorCode,
String msg) throws IOException {
nnProto.errorReport(registration, errorCode, msg);
}
@Override // NamenodeProtocol
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
return nnProto.registerSubordinateNamenode(registration);
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
return nnProto.startCheckpoint(registration);
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
nnProto.endCheckpoint(registration, sig);
}
@Override // NamenodeProtocol
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
return nnProto.getEditLogManifest(sinceTxId);
}
@Override // NamenodeProtocol
public boolean isUpgradeFinalized() throws IOException {
return nnProto.isUpgradeFinalized();
}
@Override // NamenodeProtocol
public boolean isRollingUpgrade() throws IOException {
return nnProto.isRollingUpgrade();
}
/** /**
* Locate the location with the matching block pool id. * Locate the location with the matching block pool id.
* *

View File

@ -239,6 +239,10 @@ public class MiniRouterDFSCluster {
} }
return client; return client;
} }
public Configuration getConf() {
return conf;
}
} }
/** /**
@ -351,6 +355,10 @@ public class MiniRouterDFSCluster {
} }
return suffix; return suffix;
} }
public Configuration getConf() {
return conf;
}
} }
public MiniRouterDFSCluster( public MiniRouterDFSCluster(

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After; import org.junit.After;
@ -68,14 +70,18 @@ public class TestConnectionManager {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
ConnectionPool pool1 = new ConnectionPool( ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool1, 9, 4); addConnectionsToPool(pool1, 9, 4);
poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1); poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1);
ConnectionPool pool2 = new ConnectionPool( ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10); conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool2, 10, 10); addConnectionsToPool(pool2, 10, 10);
poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2); poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
pool2);
checkPoolConnections(TEST_USER1, 9, 4); checkPoolConnections(TEST_USER1, 9, 4);
checkPoolConnections(TEST_USER2, 10, 10); checkPoolConnections(TEST_USER2, 10, 10);
@ -94,9 +100,11 @@ public class TestConnectionManager {
// Make sure the number of connections doesn't go below minSize // Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool( ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10); conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class);
addConnectionsToPool(pool3, 8, 0); addConnectionsToPool(pool3, 8, 0);
poolMap.put(new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS), pool3); poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
pool3);
checkPoolConnections(TEST_USER3, 10, 0); checkPoolConnections(TEST_USER3, 10, 0);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3); connManager.cleanup(pool3);
@ -119,9 +127,41 @@ public class TestConnectionManager {
int activeConns = 5; int activeConns = 5;
ConnectionPool pool = new ConnectionPool( ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns); addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool); poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
@Test
public void getGetConnectionNamenodeProtocol() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(
TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class),
pool);
// All remaining connections should be usable // All remaining connections should be usable
final int remainingSlots = totalConns - activeConns; final int remainingSlots = totalConns - activeConns;

View File

@ -54,22 +54,29 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
@ -83,7 +90,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
/** /**
* The the RPC interface of the {@link Router} implemented by * The the RPC interface of the {@link Router} implemented by
@ -110,6 +116,11 @@ public class TestRouterRpc {
/** Client interface to the Namenode. */ /** Client interface to the Namenode. */
private ClientProtocol nnProtocol; private ClientProtocol nnProtocol;
/** NameNodeProtocol interface to the Router. */
private NamenodeProtocol routerNamenodeProtocol;
/** NameNodeProtocol interface to the Namenode. */
private NamenodeProtocol nnNamenodeProtocol;
/** Filesystem interface to the Router. */ /** Filesystem interface to the Router. */
private FileSystem routerFS; private FileSystem routerFS;
/** Filesystem interface to the Namenode. */ /** Filesystem interface to the Namenode. */
@ -166,22 +177,18 @@ public class TestRouterRpc {
// Wait to ensure NN has fully created its test directories // Wait to ensure NN has fully created its test directories
Thread.sleep(100); Thread.sleep(100);
// Default namenode and random router for this test // Random router for this test
this.router = cluster.getRandomRouter(); RouterContext rndRouter = cluster.getRandomRouter();
this.ns = cluster.getNameservices().get(0); this.setRouter(rndRouter);
this.namenode = cluster.getNamenode(ns, null);
// Handles to the ClientProtocol interface // Pick a namenode for this test
this.routerProtocol = router.getClient().getNamenode(); String ns0 = cluster.getNameservices().get(0);
this.nnProtocol = namenode.getClient().getNamenode(); this.setNs(ns0);
this.setNamenode(cluster.getNamenode(ns0, null));
// Handles to the filesystem client
this.nnFS = namenode.getFileSystem();
this.routerFS = router.getFileSystem();
// Create a test file on the NN // Create a test file on the NN
Random r = new Random(); Random rnd = new Random();
String randomFile = "testfile-" + r.nextInt(); String randomFile = "testfile-" + rnd.nextInt();
this.nnFile = this.nnFile =
cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
this.routerFile = this.routerFile =
@ -222,6 +229,8 @@ public class TestRouterRpc {
this.router = r; this.router = r;
this.routerProtocol = r.getClient().getNamenode(); this.routerProtocol = r.getClient().getNamenode();
this.routerFS = r.getFileSystem(); this.routerFS = r.getFileSystem();
this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(),
router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
} }
protected FileSystem getRouterFileSystem() { protected FileSystem getRouterFileSystem() {
@ -265,6 +274,12 @@ public class TestRouterRpc {
this.namenode = nn; this.namenode = nn;
this.nnProtocol = nn.getClient().getNamenode(); this.nnProtocol = nn.getClient().getNamenode();
this.nnFS = nn.getFileSystem(); this.nnFS = nn.getFileSystem();
// Namenode from the default namespace
String ns0 = cluster.getNameservices().get(0);
NamenodeContext nn0 = cluster.getNamenode(ns0, null);
this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(),
nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
} }
protected String getNs() { protected String getNs() {
@ -934,4 +949,76 @@ public class TestRouterRpc {
// The cache should be updated now // The cache should be updated now
assertNotEquals(jsonString0, metrics.getLiveNodes()); assertNotEquals(jsonString0, metrics.getLiveNodes());
} }
@Test
public void testProxyVersionRequest() throws Exception {
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
assertEquals(nnVersion.getLayoutVersion(), rVersion.getLayoutVersion());
assertEquals(nnVersion.getCTime(), rVersion.getCTime());
}
@Test
public void testProxyGetBlockKeys() throws Exception {
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
}
@Test
public void testProxyGetBlocks() throws Exception {
// Get datanodes
DatanodeInfo[] dns = routerProtocol
.getDatanodeReport(DatanodeReportType.ALL);
DatanodeInfo dn0 = dns[0];
// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
for (int i = 0; i < routerBlocks.length; i++) {
assertEquals(nnBlocks[i].getBlock().getBlockId(),
routerBlocks[i].getBlock().getBlockId());
}
}
@Test
public void testProxyGetTransactionID() throws IOException {
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
assertEquals(nnTransactionID, routerTransactionID);
}
@Test
public void testProxyGetMostRecentCheckpointTxId() throws IOException {
long routerCheckPointId =
routerNamenodeProtocol.getMostRecentCheckpointTxId();
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
assertEquals(nnCheckPointId, routerCheckPointId);
}
@Test
public void testProxySetSafemode() throws Exception {
boolean routerSafemode =
routerProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
boolean nnSafemode =
nnProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
assertEquals(nnSafemode, routerSafemode);
}
@Test
public void testProxyRestoreFailedStorage() throws Exception {
boolean routerSuccess = routerProtocol.restoreFailedStorage("check");
boolean nnSuccess = nnProtocol.restoreFailedStorage("check");
assertEquals(nnSuccess, routerSuccess);
}
} }