HDFS-3139. svn merge -c 1306549 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1306553 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-03-28 19:36:10 +00:00
parent 5384914aa8
commit ee241b9987
7 changed files with 49 additions and 48 deletions

View File

@ -785,6 +785,8 @@ Release 0.23.1 - 2012-02-17
HDFS-208. name node should warn if only one dir is listed in dfs.name.dir. HDFS-208. name node should warn if only one dir is listed in dfs.name.dir.
(Uma Maheswara Rao G via eli) (Uma Maheswara Rao G via eli)
HDFS-3139. Minor Datanode logging improvement. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd) HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -28,20 +28,20 @@
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
/** /**
* DatanodeID is composed of the data node * This class represents the primary identifier for a Datanode.
* name (hostname:portNumber) and the data storage ID, * Datanodes are identified by how they can be contacted (hostname
* which it currently represents. * and ports) and their storage ID, a unique number that associates
* * the Datanodes blocks with a particular Datanode.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class DatanodeID implements WritableComparable<DatanodeID> { public class DatanodeID implements WritableComparable<DatanodeID> {
public static final DatanodeID[] EMPTY_ARRAY = {}; public static final DatanodeID[] EMPTY_ARRAY = {};
public String name; /// hostname:portNumber public String name; // hostname:port (data transfer port)
public String storageID; /// unique per cluster storageID public String storageID; // unique per cluster storageID
protected int infoPort; /// the port where the infoserver is running protected int infoPort; // info server port
public int ipcPort; /// the port where the ipc server is running public int ipcPort; // ipc server port
/** Equivalent to DatanodeID(""). */ /** Equivalent to DatanodeID(""). */
public DatanodeID() {this("");} public DatanodeID() {this("");}

View File

@ -37,9 +37,9 @@
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* DatanodeInfo represents the status of a DataNode. * This class extends the primary identifier of a Datanode with ephemeral
* This object is used for communication in the * state, eg usage information, current administrative state, and the
* Datanode Protocol and the Client Protocol. * network location that is communicated to clients.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -52,12 +52,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
protected int xceiverCount; protected int xceiverCount;
protected String location = NetworkTopology.DEFAULT_RACK; protected String location = NetworkTopology.DEFAULT_RACK;
/** HostName as supplied by the datanode during registration as its // The FQDN of the IP associated with the Datanode's hostname
* name. Namenode uses datanode IP address as the name.
*/
protected String hostName = null; protected String hostName = null;
// administrative states of a datanode // Datanode administrative states
public enum AdminStates { public enum AdminStates {
NORMAL("In Service"), NORMAL("In Service"),
DECOMMISSION_INPROGRESS("Decommission In Progress"), DECOMMISSION_INPROGRESS("Decommission In Progress"),
@ -241,12 +239,14 @@ public String getDatanodeReport() {
long nonDFSUsed = getNonDfsUsed(); long nonDFSUsed = getNonDfsUsed();
float usedPercent = getDfsUsedPercent(); float usedPercent = getDfsUsedPercent();
float remainingPercent = getRemainingPercent(); float remainingPercent = getRemainingPercent();
String hostName = NetUtils.getHostNameOfIP(name); String lookupName = NetUtils.getHostNameOfIP(name);
buffer.append("Name: "+ name); buffer.append("Name: "+ name);
if(hostName != null) if (lookupName != null) {
buffer.append(" (" + hostName + ")"); buffer.append(" (" + lookupName + ")");
}
buffer.append("\n"); buffer.append("\n");
buffer.append("Hostname: " + getHostName() + "\n");
if (!NetworkTopology.DEFAULT_RACK.equals(location)) { if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
buffer.append("Rack: "+location+"\n"); buffer.append("Rack: "+location+"\n");

View File

@ -38,16 +38,13 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
/************************************************** /**
* DatanodeDescriptor tracks stats on a given DataNode, such as * This class extends the DatanodeInfo class with ephemeral information (eg
* available storage capacity, last update time, etc., and maintains a * health, capacity, what blocks are associated with the Datanode) that is
* set of blocks stored on the datanode. * private to the Namenode, ie this class is not exposed to clients.
* */
* This data structure is internal to the namenode. It is *not* sent
* over-the-wire to the Client or the Datanodes. Neither is it stored
* persistently in the fsImage.
**************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo { public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning. // Stores status of decommissioning.
@ -590,14 +587,14 @@ public void updateRegInfo(DatanodeID nodeReg) {
} }
/** /**
* @return Blanacer bandwidth in bytes per second for this datanode. * @return balancer bandwidth in bytes per second for this datanode
*/ */
public long getBalancerBandwidth() { public long getBalancerBandwidth() {
return this.bandwidth; return this.bandwidth;
} }
/** /**
* @param bandwidth Blanacer bandwidth in bytes per second for this datanode. * @param bandwidth balancer bandwidth in bytes per second for this datanode
*/ */
public void setBalancerBandwidth(long bandwidth) { public void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth; this.bandwidth = bandwidth;

View File

@ -335,9 +335,7 @@ conf, new AccessControlList(conf.get(DFS_ADMIN, " ")))
: new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
conf, new AccessControlList(conf.get(DFS_ADMIN, " ")), conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
secureResources.getListener()); secureResources.getListener());
if(LOG.isDebugEnabled()) { LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
}
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) { if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT); DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
@ -403,6 +401,7 @@ private void initIpcServer(Configuration conf) throws IOException {
.newReflectiveBlockingService(interDatanodeProtocolXlator); .newReflectiveBlockingService(interDatanodeProtocolXlator);
DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service, DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
ipcServer); ipcServer);
LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
// set service-level authorization security policy // set service-level authorization security policy
if (conf.getBoolean( if (conf.getBoolean(
@ -491,14 +490,14 @@ private synchronized void shutdownDirectoryScanner() {
} }
private void initDataXceiver(Configuration conf) throws IOException { private void initDataXceiver(Configuration conf) throws IOException {
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf); InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
// find free port or use privileged port provided // find free port or use privileged port provided
ServerSocket ss; ServerSocket ss;
if(secureResources == null) { if(secureResources == null) {
ss = (dnConf.socketWriteTimeout > 0) ? ss = (dnConf.socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket(); ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0); Server.bind(ss, streamingAddr, 0);
} else { } else {
ss = secureResources.getStreamingSocket(); ss = secureResources.getStreamingSocket();
} }
@ -507,8 +506,7 @@ private void initDataXceiver(Configuration conf) throws IOException {
int tmpPort = ss.getLocalPort(); int tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort); tmpPort);
LOG.info("Opened info server at " + tmpPort); LOG.info("Opened streaming server at " + selfAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer"); this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup, this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this)); new DataXceiverServer(ss, conf, this));

View File

@ -69,18 +69,19 @@ public void init(DaemonContext context) throws Exception {
args = context.getArguments(); args = context.getArguments();
// Obtain secure port for data streaming to datanode // Obtain secure port for data streaming to datanode
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf); InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
int socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, int socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsServerConstants.WRITE_TIMEOUT); HdfsServerConstants.WRITE_TIMEOUT);
ServerSocket ss = (socketWriteTimeout > 0) ? ServerSocket ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket(); ServerSocketChannel.open().socket() : new ServerSocket();
ss.bind(socAddr, 0); ss.bind(streamingAddr, 0);
// Check that we got the port we need // Check that we got the port we need
if(ss.getLocalPort() != socAddr.getPort()) if (ss.getLocalPort() != streamingAddr.getPort()) {
throw new RuntimeException("Unable to bind on specified streaming port in secure " + throw new RuntimeException("Unable to bind on specified streaming port in secure " +
"context. Needed " + socAddr.getPort() + ", got " + ss.getLocalPort()); "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
}
// Obtain secure listener for web server // Obtain secure listener for web server
SelectChannelConnector listener = SelectChannelConnector listener =
@ -90,15 +91,18 @@ public void init(DaemonContext context) throws Exception {
listener.setPort(infoSocAddr.getPort()); listener.setPort(infoSocAddr.getPort());
// Open listener here in order to bind to port as root // Open listener here in order to bind to port as root
listener.open(); listener.open();
if(listener.getPort() != infoSocAddr.getPort()) if (listener.getPort() != infoSocAddr.getPort()) {
throw new RuntimeException("Unable to bind on specified info port in secure " + throw new RuntimeException("Unable to bind on specified info port in secure " +
"context. Needed " + socAddr.getPort() + ", got " + ss.getLocalPort()); "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
}
System.err.println("Successfully obtained privileged resources (streaming port = " System.err.println("Successfully obtained privileged resources (streaming port = "
+ ss + " ) (http listener port = " + listener.getConnection() +")"); + ss + " ) (http listener port = " + listener.getConnection() +")");
if(ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) if (ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) {
throw new RuntimeException("Cannot start secure datanode with unprivileged ports"); throw new RuntimeException("Cannot start secure datanode with unprivileged ports");
}
System.err.println("Opened streaming server at " + streamingAddr);
System.err.println("Opened info server at " + infoSocAddr);
resources = new SecureResources(ss, listener); resources = new SecureResources(ss, listener);
} }

View File

@ -52,7 +52,7 @@ public void testDFSAddressConfig() throws IOException {
String selfSocketAddr = dn.getSelfAddr().toString(); String selfSocketAddr = dn.getSelfAddr().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
assertTrue(selfSocketAddr.startsWith("/127.0.0.1:")); assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* Shut down the datanodes, reconfigure, and bring them back up. * Shut down the datanodes, reconfigure, and bring them back up.
@ -78,7 +78,7 @@ public void testDFSAddressConfig() throws IOException {
selfSocketAddr = dn.getSelfAddr().toString(); selfSocketAddr = dn.getSelfAddr().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
// assert that default self socket address is 127.0.0.1 // assert that default self socket address is 127.0.0.1
assertTrue(selfSocketAddr.startsWith("/127.0.0.1:")); assertTrue(selfSocketAddr.contains("/127.0.0.1:"));
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* Shut down the datanodes, reconfigure, and bring them back up. * Shut down the datanodes, reconfigure, and bring them back up.
@ -103,7 +103,7 @@ public void testDFSAddressConfig() throws IOException {
selfSocketAddr = dn.getSelfAddr().toString(); selfSocketAddr = dn.getSelfAddr().toString();
System.out.println("DN Self Socket Addr == " + selfSocketAddr); System.out.println("DN Self Socket Addr == " + selfSocketAddr);
// assert that default self socket address is 0.0.0.0 // assert that default self socket address is 0.0.0.0
assertTrue(selfSocketAddr.startsWith("/0.0.0.0:")); assertTrue(selfSocketAddr.contains("/0.0.0.0:"));
cluster.shutdown(); cluster.shutdown();
} }