HDFS-5312. Merge change r1548629 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1551721 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b19b529f38
commit
246e9cb136
|
@ -168,6 +168,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5536. Implement HTTP policy for Namenode and DataNode. (Haohui Mai via
|
HDFS-5536. Implement HTTP policy for Namenode and DataNode. (Haohui Mai via
|
||||||
jing9)
|
jing9)
|
||||||
|
|
||||||
|
HDFS-5312. Generate HTTP / HTTPS URL in DFSUtil#getInfoServer() based on the
|
||||||
|
configured http policy. (Haohui Mai via jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -952,39 +953,71 @@ public class DFSUtil {
|
||||||
* given namenode rpc address.
|
* given namenode rpc address.
|
||||||
* @param conf
|
* @param conf
|
||||||
* @param namenodeAddr - namenode RPC address
|
* @param namenodeAddr - namenode RPC address
|
||||||
* @param httpsAddress -If true, and if security is enabled, returns server
|
* @param scheme - the scheme (http / https)
|
||||||
* https address. If false, returns server http address.
|
|
||||||
* @return server http or https address
|
* @return server http or https address
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static String getInfoServer(InetSocketAddress namenodeAddr,
|
public static URI getInfoServer(InetSocketAddress namenodeAddr,
|
||||||
Configuration conf, boolean httpsAddress) throws IOException {
|
Configuration conf, String scheme) throws IOException {
|
||||||
boolean securityOn = UserGroupInformation.isSecurityEnabled();
|
String[] suffixes = null;
|
||||||
String httpAddressKey = (securityOn && httpsAddress) ?
|
|
||||||
DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
|
||||||
String httpAddressDefault = (securityOn && httpsAddress) ?
|
|
||||||
DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT : DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
|
|
||||||
|
|
||||||
String suffixes[];
|
|
||||||
if (namenodeAddr != null) {
|
if (namenodeAddr != null) {
|
||||||
// if non-default namenode, try reverse look up
|
// if non-default namenode, try reverse look up
|
||||||
// the nameServiceID if it is available
|
// the nameServiceID if it is available
|
||||||
suffixes = getSuffixIDs(conf, namenodeAddr,
|
suffixes = getSuffixIDs(conf, namenodeAddr,
|
||||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
} else {
|
|
||||||
suffixes = new String[2];
|
|
||||||
}
|
}
|
||||||
String configuredInfoAddr = getSuffixedConf(conf, httpAddressKey,
|
|
||||||
httpAddressDefault, suffixes);
|
String authority;
|
||||||
|
if ("http".equals(scheme)) {
|
||||||
|
authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
||||||
|
DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);
|
||||||
|
} else if ("https".equals(scheme)) {
|
||||||
|
authority = getSuffixedConf(conf, DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
||||||
|
DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT, suffixes);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Invalid scheme:" + scheme);
|
||||||
|
}
|
||||||
|
|
||||||
if (namenodeAddr != null) {
|
if (namenodeAddr != null) {
|
||||||
return substituteForWildcardAddress(configuredInfoAddr,
|
authority = substituteForWildcardAddress(authority,
|
||||||
namenodeAddr.getHostName());
|
namenodeAddr.getHostName());
|
||||||
} else {
|
|
||||||
return configuredInfoAddr;
|
|
||||||
}
|
}
|
||||||
|
return URI.create(scheme + "://" + authority);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lookup the HTTP / HTTPS address of the namenode, and replace its hostname
|
||||||
|
* with defaultHost when it found out that the address is a wildcard / local
|
||||||
|
* address.
|
||||||
|
*
|
||||||
|
* @param defaultHost
|
||||||
|
* The default host name of the namenode.
|
||||||
|
* @param conf
|
||||||
|
* The configuration
|
||||||
|
* @param scheme
|
||||||
|
* HTTP or HTTPS
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static URI getInfoServerWithDefaultHost(String defaultHost,
|
||||||
|
Configuration conf, final String scheme) throws IOException {
|
||||||
|
URI configuredAddr = getInfoServer(null, conf, scheme);
|
||||||
|
String authority = substituteForWildcardAddress(
|
||||||
|
configuredAddr.getAuthority(), defaultHost);
|
||||||
|
return URI.create(scheme + "://" + authority);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine whether HTTP or HTTPS should be used to connect to the remote
|
||||||
|
* server. Currently the client only connects to the server via HTTPS if the
|
||||||
|
* policy is set to HTTPS_ONLY.
|
||||||
|
*
|
||||||
|
* @return the scheme (HTTP / HTTPS)
|
||||||
|
*/
|
||||||
|
public static String getHttpClientScheme(Configuration conf) {
|
||||||
|
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
|
||||||
|
return policy == HttpConfig.Policy.HTTPS_ONLY ? "https" : "http";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Substitute a default host in the case that an address has been configured
|
* Substitute a default host in the case that an address has been configured
|
||||||
|
@ -998,8 +1031,9 @@ public class DFSUtil {
|
||||||
* @return the substituted address
|
* @return the substituted address
|
||||||
* @throws IOException if it is a wildcard address and security is enabled
|
* @throws IOException if it is a wildcard address and security is enabled
|
||||||
*/
|
*/
|
||||||
public static String substituteForWildcardAddress(String configuredAddress,
|
@VisibleForTesting
|
||||||
String defaultHost) throws IOException {
|
static String substituteForWildcardAddress(String configuredAddress,
|
||||||
|
String defaultHost) throws IOException {
|
||||||
InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
|
InetSocketAddress sockAddr = NetUtils.createSocketAddr(configuredAddress);
|
||||||
InetSocketAddress defaultSockAddr = NetUtils.createSocketAddr(defaultHost
|
InetSocketAddress defaultSockAddr = NetUtils.createSocketAddr(defaultHost
|
||||||
+ ":0");
|
+ ":0");
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -78,7 +79,7 @@ public class BackupNode extends NameNode {
|
||||||
/** Name-node RPC address */
|
/** Name-node RPC address */
|
||||||
String nnRpcAddress;
|
String nnRpcAddress;
|
||||||
/** Name-node HTTP address */
|
/** Name-node HTTP address */
|
||||||
String nnHttpAddress;
|
URL nnHttpAddress;
|
||||||
/** Checkpoint manager */
|
/** Checkpoint manager */
|
||||||
Checkpointer checkpointManager;
|
Checkpointer checkpointManager;
|
||||||
|
|
||||||
|
@ -303,7 +304,8 @@ public class BackupNode extends NameNode {
|
||||||
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
|
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
|
||||||
true).getProxy();
|
true).getProxy();
|
||||||
this.nnRpcAddress = NetUtils.getHostPortString(nnAddress);
|
this.nnRpcAddress = NetUtils.getHostPortString(nnAddress);
|
||||||
this.nnHttpAddress = NetUtils.getHostPortString(super.getHttpServerAddress(conf));
|
this.nnHttpAddress = DFSUtil.getInfoServer(nnAddress, conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf)).toURL();
|
||||||
// get version and id info from the name-node
|
// get version and id info from the name-node
|
||||||
NamespaceInfo nsInfo = null;
|
NamespaceInfo nsInfo = null;
|
||||||
while(!isStopRequested()) {
|
while(!isStopRequested()) {
|
||||||
|
|
|
@ -24,11 +24,14 @@ import static org.apache.hadoop.util.Time.now;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
|
@ -61,6 +64,7 @@ class Checkpointer extends Daemon {
|
||||||
private String infoBindAddress;
|
private String infoBindAddress;
|
||||||
|
|
||||||
private CheckpointConf checkpointConf;
|
private CheckpointConf checkpointConf;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
private BackupImage getFSImage() {
|
private BackupImage getFSImage() {
|
||||||
return (BackupImage)backupNode.getFSImage();
|
return (BackupImage)backupNode.getFSImage();
|
||||||
|
@ -74,6 +78,7 @@ class Checkpointer extends Daemon {
|
||||||
* Create a connection to the primary namenode.
|
* Create a connection to the primary namenode.
|
||||||
*/
|
*/
|
||||||
Checkpointer(Configuration conf, BackupNode bnNode) throws IOException {
|
Checkpointer(Configuration conf, BackupNode bnNode) throws IOException {
|
||||||
|
this.conf = conf;
|
||||||
this.backupNode = bnNode;
|
this.backupNode = bnNode;
|
||||||
try {
|
try {
|
||||||
initialize(conf);
|
initialize(conf);
|
||||||
|
@ -274,10 +279,15 @@ class Checkpointer extends Daemon {
|
||||||
+ " New Image Size: " + imageSize);
|
+ " New Image Size: " + imageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private InetSocketAddress getImageListenAddress() {
|
private URL getImageListenAddress() {
|
||||||
InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
|
InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
|
||||||
int httpPort = httpSocAddr.getPort();
|
int httpPort = httpSocAddr.getPort();
|
||||||
return new InetSocketAddress(infoBindAddress, httpPort);
|
try {
|
||||||
|
return new URL(DFSUtil.getHttpClientScheme(conf) + "://" + infoBindAddress + ":" + httpPort);
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
// Unreachable
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rollForwardByApplyingLogs(
|
static void rollForwardByApplyingLogs(
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
|
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
@ -272,12 +272,13 @@ class ClusterJspHelper {
|
||||||
static class NamenodeMXBeanHelper {
|
static class NamenodeMXBeanHelper {
|
||||||
private static final ObjectMapper mapper = new ObjectMapper();
|
private static final ObjectMapper mapper = new ObjectMapper();
|
||||||
private final String host;
|
private final String host;
|
||||||
private final String httpAddress;
|
private final URI httpAddress;
|
||||||
|
|
||||||
NamenodeMXBeanHelper(InetSocketAddress addr, Configuration conf)
|
NamenodeMXBeanHelper(InetSocketAddress addr, Configuration conf)
|
||||||
throws IOException, MalformedObjectNameException {
|
throws IOException, MalformedObjectNameException {
|
||||||
this.host = addr.getHostName();
|
this.host = addr.getHostName();
|
||||||
this.httpAddress = DFSUtil.getInfoServer(addr, conf, false);
|
this.httpAddress = DFSUtil.getInfoServer(addr, conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get the map corresponding to the JSON string */
|
/** Get the map corresponding to the JSON string */
|
||||||
|
@ -356,7 +357,7 @@ class ClusterJspHelper {
|
||||||
nn.blocksCount = getProperty(props, "TotalBlocks").getLongValue();
|
nn.blocksCount = getProperty(props, "TotalBlocks").getLongValue();
|
||||||
nn.missingBlocksCount = getProperty(props, "NumberOfMissingBlocks")
|
nn.missingBlocksCount = getProperty(props, "NumberOfMissingBlocks")
|
||||||
.getLongValue();
|
.getLongValue();
|
||||||
nn.httpAddress = httpAddress;
|
nn.httpAddress = httpAddress.toURL();
|
||||||
getLiveNodeCount(getProperty(props, "LiveNodes").getValueAsText(), nn);
|
getLiveNodeCount(getProperty(props, "LiveNodes").getValueAsText(), nn);
|
||||||
getDeadNodeCount(getProperty(props, "DeadNodes").getValueAsText(), nn);
|
getDeadNodeCount(getProperty(props, "DeadNodes").getValueAsText(), nn);
|
||||||
nn.softwareVersion = getProperty(props, "SoftwareVersion").getTextValue();
|
nn.softwareVersion = getProperty(props, "SoftwareVersion").getTextValue();
|
||||||
|
@ -591,12 +592,14 @@ class ClusterJspHelper {
|
||||||
toXmlItemBlock(doc, "Blocks", Long.toString(nn.blocksCount));
|
toXmlItemBlock(doc, "Blocks", Long.toString(nn.blocksCount));
|
||||||
toXmlItemBlock(doc, "Missing Blocks",
|
toXmlItemBlock(doc, "Missing Blocks",
|
||||||
Long.toString(nn.missingBlocksCount));
|
Long.toString(nn.missingBlocksCount));
|
||||||
toXmlItemBlockWithLink(doc, nn.liveDatanodeCount + " (" +
|
toXmlItemBlockWithLink(doc, nn.liveDatanodeCount + " ("
|
||||||
nn.liveDecomCount + ")", nn.httpAddress+"/dfsnodelist.jsp?whatNodes=LIVE",
|
+ nn.liveDecomCount + ")", new URL(nn.httpAddress,
|
||||||
"Live Datanode (Decommissioned)");
|
"/dfsnodelist.jsp?whatNodes=LIVE"),
|
||||||
toXmlItemBlockWithLink(doc, nn.deadDatanodeCount + " (" +
|
"Live Datanode (Decommissioned)");
|
||||||
nn.deadDecomCount + ")", nn.httpAddress+"/dfsnodelist.jsp?whatNodes=DEAD"
|
toXmlItemBlockWithLink(doc, nn.deadDatanodeCount + " ("
|
||||||
, "Dead Datanode (Decommissioned)");
|
+ nn.deadDecomCount + ")", new URL(nn.httpAddress,
|
||||||
|
"/dfsnodelist.jsp?whatNodes=DEAD"),
|
||||||
|
"Dead Datanode (Decommissioned)");
|
||||||
toXmlItemBlock(doc, "Software Version", nn.softwareVersion);
|
toXmlItemBlock(doc, "Software Version", nn.softwareVersion);
|
||||||
doc.endTag(); // node
|
doc.endTag(); // node
|
||||||
}
|
}
|
||||||
|
@ -625,7 +628,7 @@ class ClusterJspHelper {
|
||||||
int liveDecomCount = 0;
|
int liveDecomCount = 0;
|
||||||
int deadDatanodeCount = 0;
|
int deadDatanodeCount = 0;
|
||||||
int deadDecomCount = 0;
|
int deadDecomCount = 0;
|
||||||
String httpAddress = null;
|
URL httpAddress = null;
|
||||||
String softwareVersion = "";
|
String softwareVersion = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -763,7 +766,8 @@ class ClusterJspHelper {
|
||||||
.equals(DecommissionStates.UNKNOWN.toString()))) {
|
.equals(DecommissionStates.UNKNOWN.toString()))) {
|
||||||
doc.startTag("node");
|
doc.startTag("node");
|
||||||
// dn
|
// dn
|
||||||
toXmlItemBlockWithLink(doc, dnhost, (dnhost+":"+httpPort),"DataNode");
|
toXmlItemBlockWithLink(doc, dnhost, new URL("http", dnhost, httpPort,
|
||||||
|
""), "DataNode");
|
||||||
|
|
||||||
// overall status first
|
// overall status first
|
||||||
toXmlItemBlock(doc, OVERALL_STATUS, overallStatus);
|
toXmlItemBlock(doc, OVERALL_STATUS, overallStatus);
|
||||||
|
@ -823,11 +827,11 @@ class ClusterJspHelper {
|
||||||
* link="http://hostname:50070" />
|
* link="http://hostname:50070" />
|
||||||
*/
|
*/
|
||||||
private static void toXmlItemBlockWithLink(XMLOutputter doc, String value,
|
private static void toXmlItemBlockWithLink(XMLOutputter doc, String value,
|
||||||
String url, String label) throws IOException {
|
URL url, String label) throws IOException {
|
||||||
doc.startTag("item");
|
doc.startTag("item");
|
||||||
doc.attribute("label", label);
|
doc.attribute("label", label);
|
||||||
doc.attribute("value", value);
|
doc.attribute("value", value);
|
||||||
doc.attribute("link", "///" + url);
|
doc.attribute("link", url.toString());
|
||||||
doc.endTag(); // item
|
doc.endTag(); // item
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -885,7 +889,7 @@ class ClusterJspHelper {
|
||||||
return out.toString();
|
return out.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String queryMbean(String httpAddress, Configuration conf)
|
private static String queryMbean(URI httpAddress, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
/**
|
/**
|
||||||
* Although the other namenode might support HTTPS, it is fundamentally
|
* Although the other namenode might support HTTPS, it is fundamentally
|
||||||
|
@ -896,7 +900,7 @@ class ClusterJspHelper {
|
||||||
*
|
*
|
||||||
* As a result, we just hard code the connection as an HTTP connection.
|
* As a result, we just hard code the connection as an HTTP connection.
|
||||||
*/
|
*/
|
||||||
URL url = new URL("http://" + httpAddress + JMX_QRY);
|
URL url = new URL(httpAddress.toURL(), JMX_QRY);
|
||||||
return readOutput(url);
|
return readOutput(url);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
|
@ -33,10 +34,8 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -90,8 +89,8 @@ public class GetImageServlet extends HttpServlet {
|
||||||
ServletContext context = getServletContext();
|
ServletContext context = getServletContext();
|
||||||
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
|
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
|
||||||
final GetImageParams parsedParams = new GetImageParams(request, response);
|
final GetImageParams parsedParams = new GetImageParams(request, response);
|
||||||
final Configuration conf =
|
final Configuration conf = (Configuration) context
|
||||||
(Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
|
.getAttribute(JspHelper.CURRENT_CONF);
|
||||||
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled() &&
|
if (UserGroupInformation.isSecurityEnabled() &&
|
||||||
|
@ -180,7 +179,7 @@ public class GetImageServlet extends HttpServlet {
|
||||||
// issue a HTTP get request to download the new fsimage
|
// issue a HTTP get request to download the new fsimage
|
||||||
MD5Hash downloadImageDigest =
|
MD5Hash downloadImageDigest =
|
||||||
TransferFsImage.downloadImageToStorage(
|
TransferFsImage.downloadImageToStorage(
|
||||||
parsedParams.getInfoServer(), txid,
|
parsedParams.getInfoServer(conf), txid,
|
||||||
nnImage.getStorage(), true);
|
nnImage.getStorage(), true);
|
||||||
nnImage.saveDigestAndRenameCheckpointImage(txid, downloadImageDigest);
|
nnImage.saveDigestAndRenameCheckpointImage(txid, downloadImageDigest);
|
||||||
|
|
||||||
|
@ -331,7 +330,9 @@ public class GetImageServlet extends HttpServlet {
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getParamStringToPutImage(long txid,
|
static String getParamStringToPutImage(long txid,
|
||||||
InetSocketAddress imageListenAddress, Storage storage) {
|
URL url, Storage storage) {
|
||||||
|
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
|
||||||
|
.getAuthority());
|
||||||
String machine = !imageListenAddress.isUnresolved()
|
String machine = !imageListenAddress.isUnresolved()
|
||||||
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
|
&& imageListenAddress.getAddress().isAnyLocalAddress() ? null
|
||||||
: imageListenAddress.getHostName();
|
: imageListenAddress.getHostName();
|
||||||
|
@ -441,11 +442,11 @@ public class GetImageServlet extends HttpServlet {
|
||||||
return isPutImage;
|
return isPutImage;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getInfoServer() throws IOException{
|
URL getInfoServer(Configuration conf) throws IOException {
|
||||||
if (machineName == null || remoteport == 0) {
|
if (machineName == null || remoteport == 0) {
|
||||||
throw new IOException ("MachineName and port undefined");
|
throw new IOException("MachineName and port undefined");
|
||||||
}
|
}
|
||||||
return machineName + ":" + remoteport;
|
return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean shouldFetchLatest() {
|
boolean shouldFetchLatest() {
|
||||||
|
|
|
@ -29,7 +29,9 @@ import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -111,7 +113,7 @@ public class SecondaryNameNode implements Runnable {
|
||||||
private final long starttime = Time.now();
|
private final long starttime = Time.now();
|
||||||
private volatile long lastCheckpointTime = 0;
|
private volatile long lastCheckpointTime = 0;
|
||||||
|
|
||||||
private String fsName;
|
private URL fsName;
|
||||||
private CheckpointStorage checkpointImage;
|
private CheckpointStorage checkpointImage;
|
||||||
|
|
||||||
private NamenodeProtocol namenode;
|
private NamenodeProtocol namenode;
|
||||||
|
@ -404,7 +406,7 @@ public class SecondaryNameNode implements Runnable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static boolean downloadCheckpointFiles(
|
static boolean downloadCheckpointFiles(
|
||||||
final String nnHostPort,
|
final URL nnHostPort,
|
||||||
final FSImage dstImage,
|
final FSImage dstImage,
|
||||||
final CheckpointSignature sig,
|
final CheckpointSignature sig,
|
||||||
final RemoteEditLogManifest manifest
|
final RemoteEditLogManifest manifest
|
||||||
|
@ -467,25 +469,33 @@ public class SecondaryNameNode implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Returns the Jetty server that the Namenode is listening on.
|
* Returns the Jetty server that the Namenode is listening on.
|
||||||
*/
|
*/
|
||||||
private String getInfoServer() throws IOException {
|
private URL getInfoServer() throws IOException {
|
||||||
URI fsName = FileSystem.getDefaultUri(conf);
|
URI fsName = FileSystem.getDefaultUri(conf);
|
||||||
if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
|
if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(fsName.getScheme())) {
|
||||||
throw new IOException("This is not a DFS");
|
throw new IOException("This is not a DFS");
|
||||||
}
|
}
|
||||||
|
|
||||||
String configuredAddress = DFSUtil.getInfoServer(null, conf, false);
|
final String scheme = DFSUtil.getHttpClientScheme(conf);
|
||||||
String address = DFSUtil.substituteForWildcardAddress(configuredAddress,
|
URI address = DFSUtil.getInfoServerWithDefaultHost(fsName.getHost(), conf,
|
||||||
fsName.getHost());
|
scheme);
|
||||||
LOG.debug("Will connect to NameNode at HTTP address: " + address);
|
LOG.debug("Will connect to NameNode at " + address);
|
||||||
return address;
|
return address.toURL();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the host:port of where this SecondaryNameNode is listening
|
* Return the host:port of where this SecondaryNameNode is listening
|
||||||
* for image transfers
|
* for image transfers
|
||||||
*/
|
*/
|
||||||
private InetSocketAddress getImageListenAddress() {
|
private URL getImageListenAddress() {
|
||||||
return new InetSocketAddress(infoBindAddress, infoPort);
|
StringBuilder sb = new StringBuilder()
|
||||||
|
.append(DFSUtil.getHttpClientScheme(conf)).append("://")
|
||||||
|
.append(infoBindAddress).append(":").append(infoPort);
|
||||||
|
try {
|
||||||
|
return new URL(sb.toString());
|
||||||
|
} catch (MalformedURLException e) {
|
||||||
|
// Unreachable
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,13 +17,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.File;
|
||||||
import java.net.*;
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URL;
|
||||||
import java.security.DigestInputStream;
|
import java.security.DigestInputStream;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.lang.Math;
|
|
||||||
|
|
||||||
import javax.servlet.ServletOutputStream;
|
import javax.servlet.ServletOutputStream;
|
||||||
import javax.servlet.ServletResponse;
|
import javax.servlet.ServletResponse;
|
||||||
|
@ -41,14 +46,16 @@ import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||||
import org.apache.hadoop.io.MD5Hash;
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -76,15 +83,15 @@ public class TransferFsImage {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
||||||
|
|
||||||
public static void downloadMostRecentImageToDirectory(String fsName,
|
public static void downloadMostRecentImageToDirectory(URL infoServer,
|
||||||
File dir) throws IOException {
|
File dir) throws IOException {
|
||||||
String fileId = GetImageServlet.getParamStringForMostRecentImage();
|
String fileId = GetImageServlet.getParamStringForMostRecentImage();
|
||||||
getFileClient(fsName, fileId, Lists.newArrayList(dir),
|
getFileClient(infoServer, fileId, Lists.newArrayList(dir),
|
||||||
null, false);
|
null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MD5Hash downloadImageToStorage(
|
public static MD5Hash downloadImageToStorage(
|
||||||
String fsName, long imageTxId, Storage dstStorage, boolean needDigest)
|
URL fsName, long imageTxId, Storage dstStorage, boolean needDigest)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String fileid = GetImageServlet.getParamStringForImage(
|
String fileid = GetImageServlet.getParamStringForImage(
|
||||||
imageTxId, dstStorage);
|
imageTxId, dstStorage);
|
||||||
|
@ -102,7 +109,7 @@ public class TransferFsImage {
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void downloadEditsToStorage(String fsName, RemoteEditLog log,
|
static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
|
||||||
NNStorage dstStorage) throws IOException {
|
NNStorage dstStorage) throws IOException {
|
||||||
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
|
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
|
||||||
"bad log: " + log;
|
"bad log: " + log;
|
||||||
|
@ -156,17 +163,17 @@ public class TransferFsImage {
|
||||||
* Requests that the NameNode download an image from this node.
|
* Requests that the NameNode download an image from this node.
|
||||||
*
|
*
|
||||||
* @param fsName the http address for the remote NN
|
* @param fsName the http address for the remote NN
|
||||||
* @param imageListenAddress the host/port where the local node is running an
|
* @param myNNAddress the host/port where the local node is running an
|
||||||
* HTTPServer hosting GetImageServlet
|
* HTTPServer hosting GetImageServlet
|
||||||
* @param storage the storage directory to transfer the image from
|
* @param storage the storage directory to transfer the image from
|
||||||
* @param txid the transaction ID of the image to be uploaded
|
* @param txid the transaction ID of the image to be uploaded
|
||||||
*/
|
*/
|
||||||
public static void uploadImageFromStorage(String fsName,
|
public static void uploadImageFromStorage(URL fsName,
|
||||||
InetSocketAddress imageListenAddress,
|
URL myNNAddress,
|
||||||
Storage storage, long txid) throws IOException {
|
Storage storage, long txid) throws IOException {
|
||||||
|
|
||||||
String fileid = GetImageServlet.getParamStringToPutImage(
|
String fileid = GetImageServlet.getParamStringToPutImage(
|
||||||
txid, imageListenAddress, storage);
|
txid, myNNAddress, storage);
|
||||||
// this doesn't directly upload an image, but rather asks the NN
|
// this doesn't directly upload an image, but rather asks the NN
|
||||||
// to connect back to the 2NN to download the specified image.
|
// to connect back to the 2NN to download the specified image.
|
||||||
try {
|
try {
|
||||||
|
@ -244,17 +251,11 @@ public class TransferFsImage {
|
||||||
* this storage object will be notified.
|
* this storage object will be notified.
|
||||||
* @Return a digest of the received file if getChecksum is true
|
* @Return a digest of the received file if getChecksum is true
|
||||||
*/
|
*/
|
||||||
static MD5Hash getFileClient(String nnHostPort,
|
static MD5Hash getFileClient(URL infoServer,
|
||||||
String queryString, List<File> localPaths,
|
String queryString, List<File> localPaths,
|
||||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||||
|
URL url = new URL(infoServer, "/getimage?" + queryString);
|
||||||
String str = HttpConfig.getSchemePrefix() + nnHostPort + "/getimage?" +
|
LOG.info("Opening connection to " + url);
|
||||||
queryString;
|
|
||||||
LOG.info("Opening connection to " + str);
|
|
||||||
//
|
|
||||||
// open connection to remote server
|
|
||||||
//
|
|
||||||
URL url = new URL(str);
|
|
||||||
return doGetUrl(url, localPaths, dstStorage, getChecksum);
|
return doGetUrl(url, localPaths, dstStorage, getChecksum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -69,7 +70,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
private String nnId;
|
private String nnId;
|
||||||
private String otherNNId;
|
private String otherNNId;
|
||||||
|
|
||||||
private String otherHttpAddr;
|
private URL otherHttpAddr;
|
||||||
private InetSocketAddress otherIpcAddr;
|
private InetSocketAddress otherIpcAddr;
|
||||||
private Collection<URI> dirsToFormat;
|
private Collection<URI> dirsToFormat;
|
||||||
private List<URI> editUrisToFormat;
|
private List<URI> editUrisToFormat;
|
||||||
|
@ -179,6 +180,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
// Check with the user before blowing away data.
|
// Check with the user before blowing away data.
|
||||||
if (!Storage.confirmFormat(storage.dirIterable(null),
|
if (!Storage.confirmFormat(storage.dirIterable(null),
|
||||||
force, interactive)) {
|
force, interactive)) {
|
||||||
|
storage.close();
|
||||||
return ERR_CODE_ALREADY_FORMATTED;
|
return ERR_CODE_ALREADY_FORMATTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,7 +205,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
|
|
||||||
// Download that checkpoint into our storage directories.
|
// Download that checkpoint into our storage directories.
|
||||||
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
MD5Hash hash = TransferFsImage.downloadImageToStorage(
|
||||||
otherHttpAddr.toString(), imageTxId,
|
otherHttpAddr, imageTxId,
|
||||||
storage, true);
|
storage, true);
|
||||||
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
|
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -276,11 +278,10 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
"Could not determine valid IPC address for other NameNode (%s)" +
|
"Could not determine valid IPC address for other NameNode (%s)" +
|
||||||
", got: %s", otherNNId, otherIpcAddr);
|
", got: %s", otherNNId, otherIpcAddr);
|
||||||
|
|
||||||
otherHttpAddr = DFSUtil.getInfoServer(null, otherNode, false);
|
final String scheme = DFSUtil.getHttpClientScheme(conf);
|
||||||
otherHttpAddr = DFSUtil.substituteForWildcardAddress(otherHttpAddr,
|
otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(
|
||||||
otherIpcAddr.getHostName());
|
otherIpcAddr.getHostName(), otherNode, scheme).toURL();
|
||||||
|
|
||||||
|
|
||||||
dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
||||||
editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
|
editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
|
||||||
conf, false);
|
conf, false);
|
||||||
|
|
|
@ -20,7 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -43,7 +44,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
|
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||||
import org.apache.hadoop.hdfs.util.Canceler;
|
import org.apache.hadoop.hdfs.util.Canceler;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
@ -66,8 +66,8 @@ public class StandbyCheckpointer {
|
||||||
private long lastCheckpointTime;
|
private long lastCheckpointTime;
|
||||||
private final CheckpointerThread thread;
|
private final CheckpointerThread thread;
|
||||||
private final ThreadFactory uploadThreadFactory;
|
private final ThreadFactory uploadThreadFactory;
|
||||||
private String activeNNAddress;
|
private URL activeNNAddress;
|
||||||
private InetSocketAddress myNNAddress;
|
private URL myNNAddress;
|
||||||
|
|
||||||
private Object cancelLock = new Object();
|
private Object cancelLock = new Object();
|
||||||
private Canceler canceler;
|
private Canceler canceler;
|
||||||
|
@ -94,7 +94,7 @@ public class StandbyCheckpointer {
|
||||||
*/
|
*/
|
||||||
private void setNameNodeAddresses(Configuration conf) throws IOException {
|
private void setNameNodeAddresses(Configuration conf) throws IOException {
|
||||||
// Look up our own address.
|
// Look up our own address.
|
||||||
String myAddrString = getHttpAddress(conf);
|
myNNAddress = getHttpAddress(conf);
|
||||||
|
|
||||||
// Look up the active node's address
|
// Look up the active node's address
|
||||||
Configuration confForActive = HAUtil.getConfForOtherNode(conf);
|
Configuration confForActive = HAUtil.getConfForOtherNode(conf);
|
||||||
|
@ -103,32 +103,22 @@ public class StandbyCheckpointer {
|
||||||
// Sanity-check.
|
// Sanity-check.
|
||||||
Preconditions.checkArgument(checkAddress(activeNNAddress),
|
Preconditions.checkArgument(checkAddress(activeNNAddress),
|
||||||
"Bad address for active NN: %s", activeNNAddress);
|
"Bad address for active NN: %s", activeNNAddress);
|
||||||
Preconditions.checkArgument(checkAddress(myAddrString),
|
Preconditions.checkArgument(checkAddress(myNNAddress),
|
||||||
"Bad address for standby NN: %s", myAddrString);
|
"Bad address for standby NN: %s", myNNAddress);
|
||||||
myNNAddress = NetUtils.createSocketAddr(myAddrString);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getHttpAddress(Configuration conf) throws IOException {
|
private URL getHttpAddress(Configuration conf) throws IOException {
|
||||||
String configuredAddr = DFSUtil.getInfoServer(null, conf, false);
|
final String scheme = DFSUtil.getHttpClientScheme(conf);
|
||||||
|
String defaultHost = NameNode.getServiceAddress(conf, true).getHostName();
|
||||||
// Use the hostname from the RPC address as a default, in case
|
URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme);
|
||||||
// the HTTP address is configured to 0.0.0.0.
|
return addr.toURL();
|
||||||
String hostnameFromRpc = NameNode.getServiceAddress(
|
|
||||||
conf, true).getHostName();
|
|
||||||
try {
|
|
||||||
return DFSUtil.substituteForWildcardAddress(
|
|
||||||
configuredAddr, hostnameFromRpc);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensure that the given address is valid and has a port
|
* Ensure that the given address is valid and has a port
|
||||||
* specified.
|
* specified.
|
||||||
*/
|
*/
|
||||||
private boolean checkAddress(String addrStr) {
|
private static boolean checkAddress(URL addr) {
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(addrStr);
|
|
||||||
return addr.getPort() != 0;
|
return addr.getPort() != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +334,7 @@ public class StandbyCheckpointer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String getActiveNNAddress() {
|
URL getActiveNNAddress() {
|
||||||
return activeNNAddress;
|
return activeNNAddress;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.tools;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -47,9 +48,9 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -547,8 +548,10 @@ public class DFSAdmin extends FsShell {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public int fetchImage(final String[] argv, final int idx) throws IOException {
|
public int fetchImage(final String[] argv, final int idx) throws IOException {
|
||||||
final String infoServer = DFSUtil.getInfoServer(
|
Configuration conf = getConf();
|
||||||
HAUtil.getAddressOfActive(getDFS()), getConf(), false);
|
final URL infoServer = DFSUtil.getInfoServer(
|
||||||
|
HAUtil.getAddressOfActive(getDFS()), conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf)).toURL();
|
||||||
SecurityUtil.doAsCurrentUser(new PrivilegedExceptionAction<Void>() {
|
SecurityUtil.doAsCurrentUser(new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
|
@ -37,7 +38,6 @@ import org.apache.hadoop.hdfs.HAUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
||||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -227,7 +227,7 @@ public class DFSck extends Configured implements Tool {
|
||||||
* @return Returns http address or null if failure.
|
* @return Returns http address or null if failure.
|
||||||
* @throws IOException if we can't determine the active NN address
|
* @throws IOException if we can't determine the active NN address
|
||||||
*/
|
*/
|
||||||
private String getCurrentNamenodeAddress() throws IOException {
|
private URI getCurrentNamenodeAddress() throws IOException {
|
||||||
//String nnAddress = null;
|
//String nnAddress = null;
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
|
|
||||||
|
@ -245,19 +245,21 @@ public class DFSck extends Configured implements Tool {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf, false);
|
return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int doWork(final String[] args) throws IOException {
|
private int doWork(final String[] args) throws IOException {
|
||||||
final StringBuilder url = new StringBuilder(HttpConfig.getSchemePrefix());
|
final StringBuilder url = new StringBuilder();
|
||||||
|
|
||||||
String namenodeAddress = getCurrentNamenodeAddress();
|
URI namenodeAddress = getCurrentNamenodeAddress();
|
||||||
if (namenodeAddress == null) {
|
if (namenodeAddress == null) {
|
||||||
//Error message already output in {@link #getCurrentNamenodeAddress()}
|
//Error message already output in {@link #getCurrentNamenodeAddress()}
|
||||||
System.err.println("DFSck exiting.");
|
System.err.println("DFSck exiting.");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
url.append(namenodeAddress);
|
|
||||||
|
url.append(namenodeAddress.toString());
|
||||||
System.err.println("Connecting to namenode via " + url.toString());
|
System.err.println("Connecting to namenode via " + url.toString());
|
||||||
|
|
||||||
url.append("/fsck?ugi=").append(ugi.getShortUserName());
|
url.append("/fsck?ugi=").append(ugi.getShortUserName());
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
||||||
|
@ -429,20 +428,22 @@ public class TestDFSUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetInfoServer() throws IOException {
|
public void testGetInfoServer() throws IOException, URISyntaxException {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
||||||
UserGroupInformation.setConfiguration(conf);
|
|
||||||
|
|
||||||
String httpsport = DFSUtil.getInfoServer(null, conf, true);
|
URI httpsport = DFSUtil.getInfoServer(null, conf, "https");
|
||||||
assertEquals("0.0.0.0:"+DFS_NAMENODE_HTTPS_PORT_DEFAULT, httpsport);
|
assertEquals(new URI("https", null, "0.0.0.0",
|
||||||
|
DFS_NAMENODE_HTTPS_PORT_DEFAULT, null, null, null), httpsport);
|
||||||
|
|
||||||
String httpport = DFSUtil.getInfoServer(null, conf, false);
|
URI httpport = DFSUtil.getInfoServer(null, conf, "http");
|
||||||
assertEquals("0.0.0.0:"+DFS_NAMENODE_HTTP_PORT_DEFAULT, httpport);
|
assertEquals(new URI("http", null, "0.0.0.0",
|
||||||
|
DFS_NAMENODE_HTTP_PORT_DEFAULT, null, null, null), httpport);
|
||||||
String httpAddress = DFSUtil.getInfoServer(new InetSocketAddress(
|
|
||||||
"localhost", 8020), conf, false);
|
URI httpAddress = DFSUtil.getInfoServer(new InetSocketAddress(
|
||||||
assertEquals("localhost:" + DFS_NAMENODE_HTTP_PORT_DEFAULT, httpAddress);
|
"localhost", 8020), conf, "http");
|
||||||
|
assertEquals(
|
||||||
|
URI.create("http://localhost:" + DFS_NAMENODE_HTTP_PORT_DEFAULT),
|
||||||
|
httpAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -37,6 +37,7 @@ import java.io.RandomAccessFile;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -76,7 +77,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||||
|
@ -236,6 +236,7 @@ public class TestCheckpoint {
|
||||||
assertTrue("Removed directory wasn't what was expected",
|
assertTrue("Removed directory wasn't what was expected",
|
||||||
listRsd.size() > 0 && listRsd.get(listRsd.size() - 1).getRoot().
|
listRsd.size() > 0 && listRsd.get(listRsd.size() - 1).getRoot().
|
||||||
toString().indexOf("storageDirToCheck") != -1);
|
toString().indexOf("storageDirToCheck") != -1);
|
||||||
|
nnStorage.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1963,8 +1964,9 @@ public class TestCheckpoint {
|
||||||
.format(true).build();
|
.format(true).build();
|
||||||
|
|
||||||
NamenodeProtocols nn = cluster.getNameNodeRpc();
|
NamenodeProtocols nn = cluster.getNameNodeRpc();
|
||||||
String fsName = NetUtils.getHostPortString(
|
URL fsName = DFSUtil.getInfoServer(
|
||||||
cluster.getNameNode().getHttpAddress());
|
cluster.getNameNode().getServiceRpcAddress(), conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf)).toURL();
|
||||||
|
|
||||||
// Make a finalized log on the server side.
|
// Make a finalized log on the server side.
|
||||||
nn.rollEditLog();
|
nn.rollEditLog();
|
||||||
|
@ -1996,8 +1998,7 @@ public class TestCheckpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
InetSocketAddress fakeAddr = new InetSocketAddress(1);
|
TransferFsImage.uploadImageFromStorage(fsName, new URL("http://localhost:1234"), dstImage, 0);
|
||||||
TransferFsImage.uploadImageFromStorage(fsName, fakeAddr, dstImage, 0);
|
|
||||||
fail("Storage info was not verified");
|
fail("Storage info was not verified");
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
String msg = StringUtils.stringifyException(ioe);
|
String msg = StringUtils.stringifyException(ioe);
|
||||||
|
|
|
@ -34,11 +34,11 @@ import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.http.HttpServer;
|
import org.apache.hadoop.http.HttpServer;
|
||||||
import org.apache.hadoop.http.HttpServerFunctionalTest;
|
import org.apache.hadoop.http.HttpServerFunctionalTest;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -66,8 +66,9 @@ public class TestTransferFsImage {
|
||||||
new File("/xxxxx-does-not-exist/blah"));
|
new File("/xxxxx-does-not-exist/blah"));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String fsName = NetUtils.getHostPortString(
|
URL fsName = DFSUtil.getInfoServer(
|
||||||
cluster.getNameNode().getHttpAddress());
|
cluster.getNameNode().getServiceRpcAddress(), conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf)).toURL();
|
||||||
String id = "getimage=1&txid=0";
|
String id = "getimage=1&txid=0";
|
||||||
|
|
||||||
TransferFsImage.getFileClient(fsName, id, localPath, mockStorage, false);
|
TransferFsImage.getFileClient(fsName, id, localPath, mockStorage, false);
|
||||||
|
@ -98,8 +99,10 @@ public class TestTransferFsImage {
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String fsName = NetUtils.getHostPortString(
|
URL fsName = DFSUtil.getInfoServer(
|
||||||
cluster.getNameNode().getHttpAddress());
|
cluster.getNameNode().getServiceRpcAddress(), conf,
|
||||||
|
DFSUtil.getHttpClientScheme(conf)).toURL();
|
||||||
|
|
||||||
String id = "getimage=1&txid=0";
|
String id = "getimage=1&txid=0";
|
||||||
|
|
||||||
TransferFsImage.getFileClient(fsName, id, localPaths, mockStorage, false);
|
TransferFsImage.getFileClient(fsName, id, localPaths, mockStorage, false);
|
||||||
|
@ -123,7 +126,7 @@ public class TestTransferFsImage {
|
||||||
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
|
URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
|
||||||
TransferFsImage.timeout = 2000;
|
TransferFsImage.timeout = 2000;
|
||||||
try {
|
try {
|
||||||
TransferFsImage.getFileClient(serverURL.getAuthority(), "txid=1", null,
|
TransferFsImage.getFileClient(serverURL, "txid=1", null,
|
||||||
null, false);
|
null, false);
|
||||||
fail("TransferImage Should fail with timeout");
|
fail("TransferImage Should fail with timeout");
|
||||||
} catch (SocketTimeoutException e) {
|
} catch (SocketTimeoutException e) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -86,7 +87,8 @@ public class TestHAConfiguration {
|
||||||
// 0.0.0.0, it should substitute the address from the RPC configuration
|
// 0.0.0.0, it should substitute the address from the RPC configuration
|
||||||
// above.
|
// above.
|
||||||
StandbyCheckpointer checkpointer = new StandbyCheckpointer(conf, fsn);
|
StandbyCheckpointer checkpointer = new StandbyCheckpointer(conf, fsn);
|
||||||
assertEquals("1.2.3.2:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
|
assertEquals(new URL("http", "1.2.3.2",
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, ""),
|
||||||
checkpointer.getActiveNNAddress());
|
checkpointer.getActiveNNAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue