HDFS-3939. NN RPC address cleanup. Contributed by Eli Collins
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1387281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ad11b963c
commit
c03ca579de
|
@ -18,6 +18,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-3925. Prettify PipelineAck#toString() for printing to a log
|
||||
(Andrew Wang via todd)
|
||||
|
||||
HDFS-3939. NN RPC address cleanup. (eli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -71,7 +71,7 @@ public class FileChecksumServlets {
|
|||
String tokenString = ugi.getTokens().iterator().next().encodeToUrlString();
|
||||
dtParam = JspHelper.getDelegationTokenUrlParam(tokenString);
|
||||
}
|
||||
String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
|
||||
String addr = nn.getNameNodeAddressHostPortString();
|
||||
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
|
||||
|
||||
return new URL(scheme, hostname, port,
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
|
||||
|
@ -74,7 +73,7 @@ public class FileDataServlet extends DfsServlet {
|
|||
// Add namenode address to the url params
|
||||
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(
|
||||
getServletContext());
|
||||
String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
|
||||
String addr = nn.getNameNodeAddressHostPortString();
|
||||
String addrParam = JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
|
||||
|
||||
return new URL(scheme, hostname, port,
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
|||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Trash;
|
||||
|
@ -491,9 +490,9 @@ public class NameNode {
|
|||
LOG.warn("ServicePlugin " + p + " could not be started", t);
|
||||
}
|
||||
}
|
||||
LOG.info(getRole() + " up at: " + rpcServer.getRpcAddress());
|
||||
LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
|
||||
if (rpcServer.getServiceRpcAddress() != null) {
|
||||
LOG.info(getRole() + " service server is up at: "
|
||||
LOG.info(getRole() + " service RPC up at: "
|
||||
+ rpcServer.getServiceRpcAddress());
|
||||
}
|
||||
}
|
||||
|
@ -619,7 +618,7 @@ public class NameNode {
|
|||
*/
|
||||
public void join() {
|
||||
try {
|
||||
this.rpcServer.join();
|
||||
rpcServer.join();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Caught interrupted exception ", ie);
|
||||
}
|
||||
|
@ -667,27 +666,31 @@ public class NameNode {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the address on which the NameNodes is listening to.
|
||||
* @return namenode rpc address
|
||||
* @return NameNode RPC address
|
||||
*/
|
||||
public InetSocketAddress getNameNodeAddress() {
|
||||
return rpcServer.getRpcAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns namenode service rpc address, if set. Otherwise returns
|
||||
* namenode rpc address.
|
||||
* @return namenode service rpc address used by datanodes
|
||||
* @return NameNode RPC address in "host:port" string form
|
||||
*/
|
||||
public InetSocketAddress getServiceRpcAddress() {
|
||||
return rpcServer.getServiceRpcAddress() != null ? rpcServer.getServiceRpcAddress() : rpcServer.getRpcAddress();
|
||||
public String getNameNodeAddressHostPortString() {
|
||||
return NetUtils.getHostPortString(rpcServer.getRpcAddress());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the address of the NameNodes http server,
|
||||
* which is used to access the name-node web UI.
|
||||
*
|
||||
* @return the http address.
|
||||
* @return NameNode service RPC address if configured, the
|
||||
* NameNode RPC address otherwise
|
||||
*/
|
||||
public InetSocketAddress getServiceRpcAddress() {
|
||||
final InetSocketAddress serviceAddr = rpcServer.getServiceRpcAddress();
|
||||
return serviceAddr == null ? rpcServer.getRpcAddress() : serviceAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return NameNode HTTP address, used by the Web UI, image transfer,
|
||||
* and HTTP-based file system clients like Hftp and WebHDFS
|
||||
*/
|
||||
public InetSocketAddress getHttpAddress() {
|
||||
return httpServer.getHttpAddress();
|
||||
|
@ -1201,10 +1204,12 @@ public class NameNode {
|
|||
NAMESERVICE_SPECIFIC_KEYS);
|
||||
}
|
||||
|
||||
// If the RPC address is set use it to (re-)configure the default FS
|
||||
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
|
||||
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
|
||||
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
|
||||
conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
|
||||
LOG.info("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1226,8 +1231,9 @@ public class NameNode {
|
|||
try {
|
||||
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
|
||||
NameNode namenode = createNameNode(argv, null);
|
||||
if (namenode != null)
|
||||
if (namenode != null) {
|
||||
namenode.join();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.fatal("Exception in namenode join", e);
|
||||
terminate(1, e);
|
||||
|
|
|
@ -49,12 +49,9 @@ public class NameNodeHttpServer {
|
|||
private final Configuration conf;
|
||||
private final NameNode nn;
|
||||
|
||||
private final Log LOG = NameNode.LOG;
|
||||
private InetSocketAddress httpAddress;
|
||||
|
||||
private InetSocketAddress bindAddress;
|
||||
|
||||
|
||||
public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
|
||||
public static final String FSIMAGE_ATTRIBUTE_KEY = "name.system.image";
|
||||
protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node";
|
||||
|
@ -68,12 +65,6 @@ public class NameNodeHttpServer {
|
|||
this.bindAddress = bindAddress;
|
||||
}
|
||||
|
||||
private String getDefaultServerPrincipal() throws IOException {
|
||||
return SecurityUtil.getServerPrincipal(
|
||||
conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
|
||||
nn.getNameNodeAddress().getHostName());
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
final String infoHost = bindAddress.getHostName();
|
||||
int infoPort = bindAddress.getPort();
|
||||
|
|
|
@ -160,10 +160,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
int handlerCount =
|
||||
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
|
||||
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
|
||||
InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
|
||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
ClientNamenodeProtocolServerSideTranslatorPB
|
||||
|
||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
|
||||
ClientNamenodeProtocolServerSideTranslatorPB
|
||||
clientProtocolServerTranslator =
|
||||
new ClientNamenodeProtocolServerSideTranslatorPB(this);
|
||||
BlockingService clientNNPbService = ClientNamenodeProtocol.
|
||||
|
@ -201,18 +202,19 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
WritableRpcEngine.ensureInitialized();
|
||||
|
||||
InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
|
||||
if (dnSocketAddr != null) {
|
||||
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
|
||||
if (serviceRpcAddr != null) {
|
||||
int serviceHandlerCount =
|
||||
conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
|
||||
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
|
||||
// Add all the RPC protocols that the namenode implements
|
||||
this.serviceRpcServer =
|
||||
RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
|
||||
ClientNamenodeProtocolPB.class, clientNNPbService,
|
||||
dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
|
||||
serviceRpcAddr.getHostName(), serviceRpcAddr.getPort(),
|
||||
serviceHandlerCount,
|
||||
false, conf, namesystem.getDelegationTokenSecretManager());
|
||||
|
||||
// Add all the RPC protocols that the namenode implements
|
||||
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
||||
serviceRpcServer);
|
||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
||||
|
@ -226,18 +228,20 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
|
||||
getUserMappingService, serviceRpcServer);
|
||||
|
||||
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
|
||||
serviceRPCAddress = serviceRpcServer.getListenerAddress();
|
||||
nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
|
||||
} else {
|
||||
serviceRpcServer = null;
|
||||
serviceRPCAddress = null;
|
||||
}
|
||||
// Add all the RPC protocols that the namenode implements
|
||||
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
|
||||
this.clientRpcServer = RPC.getServer(
|
||||
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
|
||||
clientNNPbService, socAddr.getHostName(),
|
||||
socAddr.getPort(), handlerCount, false, conf,
|
||||
clientNNPbService, rpcAddr.getHostName(),
|
||||
rpcAddr.getPort(), handlerCount, false, conf,
|
||||
namesystem.getDelegationTokenSecretManager());
|
||||
|
||||
// Add all the RPC protocols that the namenode implements
|
||||
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
|
||||
clientRpcServer);
|
||||
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
|
||||
|
@ -255,41 +259,48 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
if (serviceAuthEnabled =
|
||||
conf.getBoolean(
|
||||
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
|
||||
this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
if (this.serviceRpcServer != null) {
|
||||
this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
|
||||
}
|
||||
}
|
||||
|
||||
// The rpc-server port can be ephemeral... ensure we have the correct info
|
||||
this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
|
||||
clientRpcAddress = clientRpcServer.getListenerAddress();
|
||||
nn.setRpcServerAddress(conf, clientRpcAddress);
|
||||
|
||||
this.minimumDataNodeVersion = conf.get(
|
||||
minimumDataNodeVersion = conf.get(
|
||||
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually start serving requests.
|
||||
* Start client and service RPC servers.
|
||||
*/
|
||||
void start() {
|
||||
clientRpcServer.start(); //start RPC server
|
||||
clientRpcServer.start();
|
||||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the RPC server has shut down.
|
||||
* Wait until the client RPC server has shutdown.
|
||||
*/
|
||||
void join() throws InterruptedException {
|
||||
this.clientRpcServer.join();
|
||||
clientRpcServer.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop client and service RPC servers.
|
||||
*/
|
||||
void stop() {
|
||||
if(clientRpcServer != null) clientRpcServer.stop();
|
||||
if(serviceRpcServer != null) serviceRpcServer.stop();
|
||||
if (clientRpcServer != null) {
|
||||
clientRpcServer.stop();
|
||||
}
|
||||
if (serviceRpcServer != null) {
|
||||
serviceRpcServer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
InetSocketAddress getServiceRpcAddress() {
|
||||
|
@ -326,8 +337,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
||||
verifyRequest(registration);
|
||||
LOG.info("Error report from " + registration + ": " + msg);
|
||||
if(errorCode == FATAL)
|
||||
if (errorCode == FATAL) {
|
||||
namesystem.releaseBackupNode(registration);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -395,7 +394,7 @@ class NamenodeJspHelper {
|
|||
nodeToRedirect = nn.getHttpAddress().getHostName();
|
||||
redirectPort = nn.getHttpAddress().getPort();
|
||||
}
|
||||
String addr = NetUtils.getHostPortString(nn.getNameNodeAddress());
|
||||
String addr = nn.getNameNodeAddressHostPortString();
|
||||
String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();
|
||||
redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" + redirectPort
|
||||
+ "/browseDirectory.jsp?namenodeInfoPort="
|
||||
|
@ -566,8 +565,9 @@ class NamenodeJspHelper {
|
|||
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
||||
dm.fetchDatanodes(live, dead, true);
|
||||
|
||||
InetSocketAddress nnSocketAddress = (InetSocketAddress) context
|
||||
.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
|
||||
InetSocketAddress nnSocketAddress =
|
||||
(InetSocketAddress)context.getAttribute(
|
||||
NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
|
||||
String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
|
||||
+ nnSocketAddress.getPort();
|
||||
|
||||
|
|
|
@ -34,8 +34,7 @@
|
|||
HAServiceState nnHAState = nn.getServiceState();
|
||||
boolean isActive = (nnHAState == HAServiceState.ACTIVE);
|
||||
String namenodeRole = nn.getRole().toString();
|
||||
String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":"
|
||||
+ nn.getNameNodeAddress().getPort();
|
||||
String namenodeLabel = nn.getNameNodeAddressHostPortString();
|
||||
Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks =
|
||||
fsn.listCorruptFileBlocks("/", null);
|
||||
int corruptFileCount = corruptFileBlocks.size();
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
boolean isActive = (nnHAState == HAServiceState.ACTIVE);
|
||||
String namenodeRole = nn.getRole().toString();
|
||||
String namenodeState = nnHAState.toString();
|
||||
String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":" + nn.getNameNodeAddress().getPort();
|
||||
String namenodeLabel = nn.getNameNodeAddressHostPortString();
|
||||
%>
|
||||
|
||||
<!DOCTYPE html>
|
||||
|
|
|
@ -33,7 +33,7 @@ String namenodeRole = nn.getRole().toString();
|
|||
FSNamesystem fsn = nn.getNamesystem();
|
||||
HAServiceState nnHAState = nn.getServiceState();
|
||||
boolean isActive = (nnHAState == HAServiceState.ACTIVE);
|
||||
String namenodeLabel = nn.getNameNodeAddress().getHostName() + ":" + nn.getNameNodeAddress().getPort();
|
||||
String namenodeLabel = nn.getNameNodeAddressHostPortString();
|
||||
%>
|
||||
|
||||
<!DOCTYPE html>
|
||||
|
|
|
@ -855,8 +855,8 @@ public class MiniDFSCluster {
|
|||
// After the NN has started, set back the bound ports into
|
||||
// the conf
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnId), NetUtils
|
||||
.getHostPortString(nn.getNameNodeAddress()));
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, nnId),
|
||||
nn.getNameNodeAddressHostPortString());
|
||||
conf.set(DFSUtil.addKeySuffixes(
|
||||
DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId, nnId), NetUtils
|
||||
.getHostPortString(nn.getHttpAddress()));
|
||||
|
@ -878,8 +878,8 @@ public class MiniDFSCluster {
|
|||
* @return URI of the given namenode in MiniDFSCluster
|
||||
*/
|
||||
public URI getURI(int nnIndex) {
|
||||
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress();
|
||||
String hostPort = NetUtils.getHostPortString(addr);
|
||||
String hostPort =
|
||||
nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
|
||||
URI uri = null;
|
||||
try {
|
||||
uri = new URI("hdfs://" + hostPort);
|
||||
|
|
Loading…
Reference in New Issue