HDFS-3144. svn merge -c 1308205 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1308207 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6e7c70870a
commit
0a73d4cb96
|
@ -192,6 +192,8 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HDFS-3171. The DatanodeID "name" field is overloaded. (eli)
|
||||
|
||||
HDFS-3144. Refactor DatanodeID#getName by use. (eli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2477. Optimize computing the diff between a block report and the
|
||||
|
|
|
@ -1380,7 +1380,8 @@ public class DFSClient implements java.io.Closeable {
|
|||
//connect to a datanode
|
||||
sock = socketFactory.createSocket();
|
||||
NetUtils.connect(sock,
|
||||
NetUtils.createSocketAddr(datanodes[j].getName()), timeout);
|
||||
NetUtils.createSocketAddr(datanodes[j].getXferAddr()),
|
||||
timeout);
|
||||
sock.setSoTimeout(timeout);
|
||||
|
||||
out = new DataOutputStream(
|
||||
|
@ -1389,7 +1390,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
in = new DataInputStream(NetUtils.getInputStream(sock));
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("write to " + datanodes[j].getName() + ": "
|
||||
LOG.debug("write to " + datanodes[j] + ": "
|
||||
+ Op.BLOCK_CHECKSUM + ", block=" + block);
|
||||
}
|
||||
// get block MD5
|
||||
|
@ -1404,7 +1405,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
||||
+ "for file " + src + " for block " + block
|
||||
+ " from datanode " + datanodes[j].getName()
|
||||
+ " from datanode " + datanodes[j]
|
||||
+ ". Will retry the block once.");
|
||||
}
|
||||
lastRetriedIndex = i;
|
||||
|
@ -1414,7 +1415,7 @@ public class DFSClient implements java.io.Closeable {
|
|||
break;
|
||||
} else {
|
||||
throw new IOException("Bad response " + reply + " for block "
|
||||
+ block + " from datanode " + datanodes[j].getName());
|
||||
+ block + " from datanode " + datanodes[j]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1449,12 +1450,10 @@ public class DFSClient implements java.io.Closeable {
|
|||
LOG.debug("set bytesPerCRC=" + bytesPerCRC
|
||||
+ ", crcPerBlock=" + crcPerBlock);
|
||||
}
|
||||
LOG.debug("got reply from " + datanodes[j].getName()
|
||||
+ ": md5=" + md5);
|
||||
LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
|
||||
+ datanodes[j].getName(), ie);
|
||||
LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
||||
IOUtils.closeStream(out);
|
||||
|
|
|
@ -489,7 +489,7 @@ public class DFSInputStream extends FSInputStream {
|
|||
return blockReader.read(buf, off, len);
|
||||
} catch ( ChecksumException ce ) {
|
||||
DFSClient.LOG.warn("Found Checksum error for "
|
||||
+ getCurrentBlock() + " from " + currentNode.getName()
|
||||
+ getCurrentBlock() + " from " + currentNode
|
||||
+ " at " + ce.getPos());
|
||||
ioe = ce;
|
||||
retryCurrentNode = false;
|
||||
|
@ -601,7 +601,7 @@ public class DFSInputStream extends FSInputStream {
|
|||
try {
|
||||
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
|
||||
InetSocketAddress targetAddr =
|
||||
NetUtils.createSocketAddr(chosenNode.getName());
|
||||
NetUtils.createSocketAddr(chosenNode.getXferAddr());
|
||||
return new DNAddrPair(chosenNode, targetAddr);
|
||||
} catch (IOException ie) {
|
||||
String blockInfo = block.getBlock() + " file=" + src;
|
||||
|
@ -676,7 +676,7 @@ public class DFSInputStream extends FSInputStream {
|
|||
} catch (ChecksumException e) {
|
||||
DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
|
||||
src + " at " + block.getBlock() + ":" +
|
||||
e.getPos() + " from " + chosenNode.getName());
|
||||
e.getPos() + " from " + chosenNode);
|
||||
// we want to remember what we have tried
|
||||
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
|
||||
} catch (AccessControlException ex) {
|
||||
|
|
|
@ -667,7 +667,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
throw new IOException("Bad response " + reply +
|
||||
" for block " + block +
|
||||
" from datanode " +
|
||||
targets[i].getName());
|
||||
targets[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -898,7 +898,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
if (errorIndex >= 0) {
|
||||
StringBuilder pipelineMsg = new StringBuilder();
|
||||
for (int j = 0; j < nodes.length; j++) {
|
||||
pipelineMsg.append(nodes[j].getName());
|
||||
pipelineMsg.append(nodes[j]);
|
||||
if (j < nodes.length - 1) {
|
||||
pipelineMsg.append(", ");
|
||||
}
|
||||
|
@ -911,7 +911,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
}
|
||||
DFSClient.LOG.warn("Error Recovery for block " + block +
|
||||
" in pipeline " + pipelineMsg +
|
||||
": bad datanode " + nodes[errorIndex].getName());
|
||||
": bad datanode " + nodes[errorIndex]);
|
||||
failed.add(nodes[errorIndex]);
|
||||
|
||||
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
||||
|
@ -1005,7 +1005,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
String firstBadLink = "";
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
DFSClient.LOG.debug("pipeline = " + nodes[i].getName());
|
||||
DFSClient.LOG.debug("pipeline = " + nodes[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1061,7 +1061,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
// find the datanode that matches
|
||||
if (firstBadLink.length() != 0) {
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
if (nodes[i].getName().equals(firstBadLink)) {
|
||||
if (nodes[i].getXferAddr().equals(firstBadLink)) {
|
||||
errorIndex = i;
|
||||
break;
|
||||
}
|
||||
|
@ -1165,9 +1165,10 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|||
static Socket createSocketForPipeline(final DatanodeInfo first,
|
||||
final int length, final DFSClient client) throws IOException {
|
||||
if(DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Connecting to datanode " + first.getName());
|
||||
DFSClient.LOG.debug("Connecting to datanode " + first);
|
||||
}
|
||||
final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
|
||||
final InetSocketAddress isa =
|
||||
NetUtils.createSocketAddr(first.getXferAddr());
|
||||
final Socket sock = client.socketFactory.createSocket();
|
||||
final int timeout = client.getDatanodeReadTimeout(length);
|
||||
NetUtils.connect(sock, isa, timeout);
|
||||
|
|
|
@ -302,16 +302,16 @@ public class DFSUtil {
|
|||
assert idx < nrBlocks : "Incorrect index";
|
||||
DatanodeInfo[] locations = blk.getLocations();
|
||||
String[] hosts = new String[locations.length];
|
||||
String[] names = new String[locations.length];
|
||||
String[] xferAddrs = new String[locations.length];
|
||||
String[] racks = new String[locations.length];
|
||||
for (int hCnt = 0; hCnt < locations.length; hCnt++) {
|
||||
hosts[hCnt] = locations[hCnt].getHostName();
|
||||
names[hCnt] = locations[hCnt].getName();
|
||||
NodeBase node = new NodeBase(names[hCnt],
|
||||
xferAddrs[hCnt] = locations[hCnt].getXferAddr();
|
||||
NodeBase node = new NodeBase(xferAddrs[hCnt],
|
||||
locations[hCnt].getNetworkLocation());
|
||||
racks[hCnt] = node.toString();
|
||||
}
|
||||
blkLocations[idx] = new BlockLocation(names, hosts, racks,
|
||||
blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks,
|
||||
blk.getStartOffset(),
|
||||
blk.getBlockSize(),
|
||||
blk.isCorrupt());
|
||||
|
|
|
@ -688,7 +688,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
lblocks[0] = new LocatedBlock(dataBlock, dataNode);
|
||||
LOG.info("Found checksum error in data stream at block="
|
||||
+ dataBlock + " on datanode="
|
||||
+ dataNode[0].getName());
|
||||
+ dataNode[0]);
|
||||
|
||||
// Find block in checksum stream
|
||||
DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
|
||||
|
@ -700,8 +700,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
|
||||
lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
|
||||
LOG.info("Found checksum error in checksum stream at block="
|
||||
+ sumsBlock + " on datanode="
|
||||
+ sumsNode[0].getName());
|
||||
+ sumsBlock + " on datanode=" + sumsNode[0]);
|
||||
|
||||
// Ask client to delete blocks.
|
||||
dfs.reportChecksumFailure(f.toString(), lblocks);
|
||||
|
|
|
@ -32,23 +32,32 @@ import org.apache.hadoop.io.WritableComparable;
|
|||
* Datanodes are identified by how they can be contacted (hostname
|
||||
* and ports) and their storage ID, a unique number that associates
|
||||
* the Datanodes blocks with a particular Datanode.
|
||||
*
|
||||
* {@link DatanodeInfo#getName()} should be used to get the network
|
||||
* location (for topology) of a datanode, instead of using
|
||||
* {@link DatanodeID#getXferAddr()} here. Helpers are defined below
|
||||
* for each context in which a DatanodeID is used.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class DatanodeID implements WritableComparable<DatanodeID> {
|
||||
public static final DatanodeID[] EMPTY_ARRAY = {};
|
||||
|
||||
protected String name; // IP:port (data transfer port)
|
||||
protected String ipAddr; // IP address
|
||||
protected String hostName; // hostname
|
||||
protected String storageID; // unique per cluster storageID
|
||||
protected int xferPort; // data streaming port
|
||||
protected int infoPort; // info server port
|
||||
protected int ipcPort; // IPC server port
|
||||
|
||||
/** Equivalent to DatanodeID(""). */
|
||||
public DatanodeID() {this("");}
|
||||
|
||||
/** Equivalent to DatanodeID(nodeName, "", -1, -1). */
|
||||
public DatanodeID(String nodeName) {this(nodeName, "", "", -1, -1);}
|
||||
/** Equivalent to DatanodeID(ipAddr, "", -1, -1, -1). */
|
||||
public DatanodeID(String ipAddr) {this(ipAddr, "", "", -1, -1, -1);}
|
||||
|
||||
/** Equivalent to DatanodeID(ipAddr, "", xferPort, -1, -1). */
|
||||
public DatanodeID(String ipAddr, int xferPort) {this(ipAddr, "", "", xferPort, -1, -1);}
|
||||
|
||||
/**
|
||||
* DatanodeID copy constructor
|
||||
|
@ -56,38 +65,45 @@ public class DatanodeID implements WritableComparable<DatanodeID> {
|
|||
* @param from
|
||||
*/
|
||||
public DatanodeID(DatanodeID from) {
|
||||
this(from.getName(),
|
||||
this(from.getIpAddr(),
|
||||
from.getHostName(),
|
||||
from.getStorageID(),
|
||||
from.getXferPort(),
|
||||
from.getInfoPort(),
|
||||
from.getIpcPort());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create DatanodeID
|
||||
* @param node IP:port
|
||||
* @param ipAddr IP
|
||||
* @param hostName hostname
|
||||
* @param storageID data storage ID
|
||||
* @param xferPort data transfer port
|
||||
* @param infoPort info server port
|
||||
* @param ipcPort ipc server port
|
||||
*/
|
||||
public DatanodeID(String name, String hostName,
|
||||
String storageID, int infoPort, int ipcPort) {
|
||||
this.name = name;
|
||||
public DatanodeID(String ipAddr, String hostName, String storageID,
|
||||
int xferPort, int infoPort, int ipcPort) {
|
||||
this.ipAddr = ipAddr;
|
||||
this.hostName = hostName;
|
||||
this.storageID = storageID;
|
||||
this.xferPort = xferPort;
|
||||
this.infoPort = infoPort;
|
||||
this.ipcPort = ipcPort;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
public void setIpAddr(String ipAddr) {
|
||||
this.ipAddr = ipAddr;
|
||||
}
|
||||
|
||||
public void setHostName(String hostName) {
|
||||
this.hostName = hostName;
|
||||
}
|
||||
|
||||
public void setXferPort(int xferPort) {
|
||||
this.xferPort = xferPort;
|
||||
}
|
||||
|
||||
public void setInfoPort(int infoPort) {
|
||||
this.infoPort = infoPort;
|
||||
}
|
||||
|
@ -95,26 +111,65 @@ public class DatanodeID implements WritableComparable<DatanodeID> {
|
|||
public void setIpcPort(int ipcPort) {
|
||||
this.ipcPort = ipcPort;
|
||||
}
|
||||
|
||||
|
||||
public void setStorageID(String storageID) {
|
||||
this.storageID = storageID;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname:portNumber.
|
||||
* @return ipAddr;
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
public String getIpAddr() {
|
||||
return ipAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname
|
||||
*/
|
||||
public String getHostName() {
|
||||
return (hostName == null || hostName.length() == 0) ? getHost() : hostName;
|
||||
return hostName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return IP:xferPort string
|
||||
*/
|
||||
public String getXferAddr() {
|
||||
return ipAddr + ":" + xferPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return IP:ipcPort string
|
||||
*/
|
||||
public String getIpcAddr() {
|
||||
return ipAddr + ":" + ipcPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return IP:infoPort string
|
||||
*/
|
||||
public String getInfoAddr() {
|
||||
return ipAddr + ":" + infoPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname:xferPort
|
||||
*/
|
||||
public String getXferAddrWithHostname() {
|
||||
return hostName + ":" + xferPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return data storage ID.
|
||||
*/
|
||||
public String getStorageID() {
|
||||
return this.storageID;
|
||||
return storageID;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return xferPort (the port for data streaming)
|
||||
*/
|
||||
public int getXferPort() {
|
||||
return xferPort;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,33 +186,6 @@ public class DatanodeID implements WritableComparable<DatanodeID> {
|
|||
return ipcPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* sets the data storage ID.
|
||||
*/
|
||||
public void setStorageID(String storageID) {
|
||||
this.storageID = storageID;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return hostname and no :portNumber.
|
||||
*/
|
||||
public String getHost() {
|
||||
int colon = name.indexOf(":");
|
||||
if (colon < 0) {
|
||||
return name;
|
||||
} else {
|
||||
return name.substring(0, colon);
|
||||
}
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
int colon = name.indexOf(":");
|
||||
if (colon < 0) {
|
||||
return 50010; // default port.
|
||||
}
|
||||
return Integer.parseInt(name.substring(colon+1));
|
||||
}
|
||||
|
||||
public boolean equals(Object to) {
|
||||
if (this == to) {
|
||||
return true;
|
||||
|
@ -165,16 +193,16 @@ public class DatanodeID implements WritableComparable<DatanodeID> {
|
|||
if (!(to instanceof DatanodeID)) {
|
||||
return false;
|
||||
}
|
||||
return (name.equals(((DatanodeID)to).getName()) &&
|
||||
return (getXferAddr().equals(((DatanodeID)to).getXferAddr()) &&
|
||||
storageID.equals(((DatanodeID)to).getStorageID()));
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return name.hashCode()^ storageID.hashCode();
|
||||
return getXferAddr().hashCode()^ storageID.hashCode();
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return name;
|
||||
return getXferAddr();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -182,43 +210,44 @@ public class DatanodeID implements WritableComparable<DatanodeID> {
|
|||
* Note that this does not update storageID.
|
||||
*/
|
||||
public void updateRegInfo(DatanodeID nodeReg) {
|
||||
name = nodeReg.getName();
|
||||
ipAddr = nodeReg.getIpAddr();
|
||||
hostName = nodeReg.getHostName();
|
||||
xferPort = nodeReg.getXferPort();
|
||||
infoPort = nodeReg.getInfoPort();
|
||||
ipcPort = nodeReg.getIpcPort();
|
||||
}
|
||||
|
||||
/** Comparable.
|
||||
* Basis of compare is the String name (host:portNumber) only.
|
||||
/**
|
||||
* Compare based on data transfer address.
|
||||
*
|
||||
* @param that
|
||||
* @return as specified by Comparable.
|
||||
* @return as specified by Comparable
|
||||
*/
|
||||
public int compareTo(DatanodeID that) {
|
||||
return name.compareTo(that.getName());
|
||||
return getXferAddr().compareTo(that.getXferAddr());
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
// Writable
|
||||
/////////////////////////////////////////////////
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, name);
|
||||
Text.writeString(out, ipAddr);
|
||||
Text.writeString(out, hostName);
|
||||
Text.writeString(out, storageID);
|
||||
out.writeShort(xferPort);
|
||||
out.writeShort(infoPort);
|
||||
out.writeShort(ipcPort);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
name = Text.readString(in);
|
||||
ipAddr = Text.readString(in);
|
||||
hostName = Text.readString(in);
|
||||
storageID = Text.readString(in);
|
||||
// The port read could be negative, if the port is a large number (more
|
||||
// than 15 bits in storage size (but less than 16 bits).
|
||||
// So chop off the first two bytes (and hence the signed bits) before
|
||||
// setting the field.
|
||||
this.infoPort = in.readShort() & 0x0000ffff;
|
||||
this.ipcPort = in.readShort() & 0x0000ffff;
|
||||
xferPort = in.readShort() & 0x0000ffff;
|
||||
infoPort = in.readShort() & 0x0000ffff;
|
||||
ipcPort = in.readShort() & 0x0000ffff;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,18 +116,18 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
||||
final AdminStates adminState) {
|
||||
this(nodeID.getName(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getInfoPort(), nodeID
|
||||
.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, lastUpdate,
|
||||
xceiverCount, location, adminState);
|
||||
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(),
|
||||
nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
|
||||
blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
|
||||
}
|
||||
|
||||
/** Constructor */
|
||||
public DatanodeInfo(final String name, final String hostName,
|
||||
final String storageID, final int infoPort, final int ipcPort,
|
||||
final String storageID, final int xferPort, final int infoPort, final int ipcPort,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
||||
final String networkLocation, final AdminStates adminState) {
|
||||
super(name, hostName, storageID, infoPort, ipcPort);
|
||||
super(name, hostName, storageID, xferPort, infoPort, ipcPort);
|
||||
this.capacity = capacity;
|
||||
this.dfsUsed = dfsUsed;
|
||||
this.remaining = remaining;
|
||||
|
@ -138,6 +138,11 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
this.adminState = adminState;
|
||||
}
|
||||
|
||||
/** Network location name */
|
||||
public String getName() {
|
||||
return getXferAddr();
|
||||
}
|
||||
|
||||
/** The raw capacity. */
|
||||
public long getCapacity() { return capacity; }
|
||||
|
||||
|
@ -224,9 +229,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
long nonDFSUsed = getNonDfsUsed();
|
||||
float usedPercent = getDfsUsedPercent();
|
||||
float remainingPercent = getRemainingPercent();
|
||||
String lookupName = NetUtils.getHostNameOfIP(name);
|
||||
String lookupName = NetUtils.getHostNameOfIP(getName());
|
||||
|
||||
buffer.append("Name: "+ name);
|
||||
buffer.append("Name: "+ getName());
|
||||
if (lookupName != null) {
|
||||
buffer.append(" (" + lookupName + ")");
|
||||
}
|
||||
|
@ -260,7 +265,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
long c = getCapacity();
|
||||
long r = getRemaining();
|
||||
long u = getDfsUsed();
|
||||
buffer.append(name);
|
||||
buffer.append(ipAddr);
|
||||
if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
|
||||
buffer.append(" "+location);
|
||||
}
|
||||
|
|
|
@ -84,9 +84,10 @@ public abstract class HdfsProtoUtil {
|
|||
private static HdfsProtos.DatanodeIDProto toProto(
|
||||
DatanodeID dni) {
|
||||
return HdfsProtos.DatanodeIDProto.newBuilder()
|
||||
.setName(dni.getName())
|
||||
.setIpAddr(dni.getIpAddr())
|
||||
.setHostName(dni.getHostName())
|
||||
.setStorageID(dni.getStorageID())
|
||||
.setXferPort(dni.getXferPort())
|
||||
.setInfoPort(dni.getInfoPort())
|
||||
.setIpcPort(dni.getIpcPort())
|
||||
.build();
|
||||
|
@ -94,9 +95,10 @@ public abstract class HdfsProtoUtil {
|
|||
|
||||
private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
|
||||
return new DatanodeID(
|
||||
idProto.getName(),
|
||||
idProto.getIpAddr(),
|
||||
idProto.getHostName(),
|
||||
idProto.getStorageID(),
|
||||
idProto.getXferPort(),
|
||||
idProto.getInfoPort(),
|
||||
idProto.getIpcPort());
|
||||
}
|
||||
|
|
|
@ -45,9 +45,8 @@ public class UnregisteredNodeException extends IOException {
|
|||
* @param storedNode data-node stored in the system with this storage id
|
||||
*/
|
||||
public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
|
||||
super("Data node " + nodeID.getName()
|
||||
+ " is attempting to report storage ID "
|
||||
super("Data node " + nodeID + " is attempting to report storage ID "
|
||||
+ nodeID.getStorageID() + ". Node "
|
||||
+ storedNode.getName() + " is expected to serve this storage.");
|
||||
+ storedNode + " is expected to serve this storage.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,8 +97,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
*/
|
||||
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
||||
Configuration conf, int socketTimeout) throws IOException {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost()
|
||||
+ ":" + datanodeid.getIpcPort());
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
||||
rpcProxy = createClientDatanodeProtocolProxy(addr,
|
||||
UserGroupInformation.getCurrentUser(), conf,
|
||||
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
||||
|
@ -107,8 +106,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
|
||||
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
||||
LocatedBlock locatedBlock) throws IOException {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(
|
||||
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("ClientDatanodeProtocol addr=" + addr);
|
||||
}
|
||||
|
|
|
@ -204,15 +204,18 @@ public class PBHelper {
|
|||
|
||||
// DatanodeId
|
||||
public static DatanodeID convert(DatanodeIDProto dn) {
|
||||
return new DatanodeID(dn.getName(), dn.getHostName(), dn.getStorageID(), dn.getInfoPort(),
|
||||
dn.getIpcPort());
|
||||
return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(),
|
||||
dn.getXferPort(), dn.getInfoPort(), dn.getIpcPort());
|
||||
}
|
||||
|
||||
public static DatanodeIDProto convert(DatanodeID dn) {
|
||||
return DatanodeIDProto.newBuilder()
|
||||
.setName(dn.getName()).setHostName(dn.getHostName())
|
||||
.setInfoPort(dn.getInfoPort()).setIpcPort(dn.getIpcPort())
|
||||
.setStorageID(dn.getStorageID()).build();
|
||||
.setIpAddr(dn.getIpAddr())
|
||||
.setHostName(dn.getHostName())
|
||||
.setStorageID(dn.getStorageID())
|
||||
.setXferPort(dn.getXferPort())
|
||||
.setInfoPort(dn.getInfoPort())
|
||||
.setIpcPort(dn.getIpcPort()).build();
|
||||
}
|
||||
|
||||
// Arrays of DatanodeId
|
||||
|
|
|
@ -305,8 +305,9 @@ public class Balancer {
|
|||
DataOutputStream out = null;
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
sock.connect(NetUtils.createSocketAddr(
|
||||
target.datanode.getName()), HdfsServerConstants.READ_TIMEOUT);
|
||||
sock.connect(
|
||||
NetUtils.createSocketAddr(target.datanode.getXferAddr()),
|
||||
HdfsServerConstants.READ_TIMEOUT);
|
||||
sock.setKeepAlive(true);
|
||||
out = new DataOutputStream( new BufferedOutputStream(
|
||||
sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
@ -587,7 +588,7 @@ public class Balancer {
|
|||
/** Add a node task */
|
||||
private void addNodeTask(NodeTask task) {
|
||||
assert (task.datanode != this) :
|
||||
"Source and target are the same " + datanode.getName();
|
||||
"Source and target are the same " + datanode;
|
||||
incScheduledSize(task.getSize());
|
||||
nodeTasks.add(task);
|
||||
}
|
||||
|
@ -1007,7 +1008,7 @@ public class Balancer {
|
|||
targetCandidates.remove();
|
||||
}
|
||||
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
|
||||
+source.datanode.getName() + " to " + target.datanode.getName());
|
||||
+source.datanode + " to " + target.datanode);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -1055,7 +1056,7 @@ public class Balancer {
|
|||
sourceCandidates.remove();
|
||||
}
|
||||
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
|
||||
+source.datanode.getName() + " to " + target.datanode.getName());
|
||||
+source.datanode + " to " + target.datanode);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -810,9 +810,9 @@ public class BlockManager {
|
|||
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
|
||||
if (node == null) {
|
||||
NameNode.stateChangeLog.warn("BLOCK* getBlocks: "
|
||||
+ "Asking for blocks from an unrecorded node " + datanode.getName());
|
||||
+ "Asking for blocks from an unrecorded node " + datanode);
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Datanode " + datanode.getName() + " not found.");
|
||||
"Datanode " + datanode + " not found.");
|
||||
}
|
||||
|
||||
int numBlocks = node.numBlocks();
|
||||
|
@ -884,7 +884,7 @@ public class BlockManager {
|
|||
.hasNext();) {
|
||||
DatanodeDescriptor node = it.next();
|
||||
invalidateBlocks.add(b, node, false);
|
||||
datanodes.append(node.getName()).append(" ");
|
||||
datanodes.append(node).append(" ");
|
||||
}
|
||||
if (datanodes.length() != 0) {
|
||||
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
|
||||
|
@ -923,7 +923,7 @@ public class BlockManager {
|
|||
if (node == null) {
|
||||
throw new IOException("Cannot mark block " +
|
||||
storedBlock.getBlockName() +
|
||||
" as corrupt because datanode " + dn.getName() +
|
||||
" as corrupt because datanode " + dn +
|
||||
" does not exist. ");
|
||||
}
|
||||
|
||||
|
@ -957,11 +957,11 @@ public class BlockManager {
|
|||
private void invalidateBlock(Block blk, DatanodeInfo dn)
|
||||
throws IOException {
|
||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
|
||||
+ blk + " on " + dn.getName());
|
||||
+ blk + " on " + dn);
|
||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||
if (node == null) {
|
||||
throw new IOException("Cannot invalidate block " + blk
|
||||
+ " because datanode " + dn.getName() + " does not exist.");
|
||||
+ " because datanode " + dn + " does not exist.");
|
||||
}
|
||||
|
||||
// Check how many copies we have of the block
|
||||
|
@ -979,11 +979,11 @@ public class BlockManager {
|
|||
removeStoredBlock(blk, node);
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
|
||||
+ blk + " on " + dn.getName() + " listed for deletion.");
|
||||
+ blk + " on " + dn + " listed for deletion.");
|
||||
}
|
||||
} else {
|
||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
|
||||
+ dn.getName() + " is the only copy and was not deleted.");
|
||||
+ dn + " is the only copy and was not deleted.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1226,11 +1226,11 @@ public class BlockManager {
|
|||
StringBuilder targetList = new StringBuilder("datanode(s)");
|
||||
for (int k = 0; k < targets.length; k++) {
|
||||
targetList.append(' ');
|
||||
targetList.append(targets[k].getName());
|
||||
targetList.append(targets[k]);
|
||||
}
|
||||
NameNode.stateChangeLog.info(
|
||||
"BLOCK* ask "
|
||||
+ rw.srcNode.getName() + " to replicate "
|
||||
+ rw.srcNode + " to replicate "
|
||||
+ rw.block + " to " + targetList);
|
||||
}
|
||||
}
|
||||
|
@ -1412,15 +1412,15 @@ public class BlockManager {
|
|||
try {
|
||||
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
||||
if (node == null || !node.isAlive) {
|
||||
throw new IOException("ProcessReport from dead or unregistered node: "
|
||||
+ nodeID.getName());
|
||||
throw new IOException(
|
||||
"ProcessReport from dead or unregistered node: " + nodeID);
|
||||
}
|
||||
|
||||
// To minimize startup time, we discard any second (or later) block reports
|
||||
// that we receive while still in startup phase.
|
||||
if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
|
||||
NameNode.stateChangeLog.info("BLOCK* processReport: "
|
||||
+ "discarded non-initial block report from " + nodeID.getName()
|
||||
+ "discarded non-initial block report from " + nodeID
|
||||
+ " because namenode still in startup phase");
|
||||
return;
|
||||
}
|
||||
|
@ -1453,7 +1453,7 @@ public class BlockManager {
|
|||
// Log the block report processing stats from Namenode perspective
|
||||
NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
|
||||
NameNode.stateChangeLog.info("BLOCK* processReport: from "
|
||||
+ nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
|
||||
+ nodeID + ", blocks: " + newReport.getNumberOfBlocks()
|
||||
+ ", processing time: " + (endTime - startTime) + " msecs");
|
||||
}
|
||||
|
||||
|
@ -1513,7 +1513,7 @@ public class BlockManager {
|
|||
}
|
||||
for (Block b : toInvalidate) {
|
||||
NameNode.stateChangeLog.info("BLOCK* processReport: block "
|
||||
+ b + " on " + node.getName() + " size " + b.getNumBytes()
|
||||
+ b + " on " + node + " size " + b.getNumBytes()
|
||||
+ " does not belong to any file.");
|
||||
addToInvalidates(b, node);
|
||||
}
|
||||
|
@ -1664,7 +1664,7 @@ public class BlockManager {
|
|||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Reported block " + block
|
||||
+ " on " + dn.getName() + " size " + block.getNumBytes()
|
||||
+ " on " + dn + " size " + block.getNumBytes()
|
||||
+ " replicaState = " + reportedState);
|
||||
}
|
||||
|
||||
|
@ -1839,7 +1839,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
// closed. So, ignore this report, assuming we will get a
|
||||
// FINALIZED replica later. See HDFS-2791
|
||||
LOG.info("Received an RBW replica for block " + storedBlock +
|
||||
" on " + dn.getName() + ": ignoring it, since the block is " +
|
||||
" on " + dn + ": ignoring it, since the block is " +
|
||||
"complete with the same generation stamp.");
|
||||
return null;
|
||||
} else {
|
||||
|
@ -1852,7 +1852,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
default:
|
||||
String msg = "Unexpected replica state " + reportedState
|
||||
+ " for block: " + storedBlock +
|
||||
" on " + dn.getName() + " size " + storedBlock.getNumBytes();
|
||||
" on " + dn + " size " + storedBlock.getNumBytes();
|
||||
// log here at WARN level since this is really a broken HDFS
|
||||
// invariant
|
||||
LOG.warn(msg);
|
||||
|
@ -1951,7 +1951,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
if (storedBlock == null || storedBlock.getINode() == null) {
|
||||
// If this block does not belong to anyfile, then we are done.
|
||||
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
|
||||
+ node.getName() + " size " + block.getNumBytes()
|
||||
+ node + " size " + block.getNumBytes()
|
||||
+ " but it does not belong to any file.");
|
||||
// we could add this block to invalidate set of this datanode.
|
||||
// it will happen in next block report otherwise.
|
||||
|
@ -1974,7 +1974,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
curReplicaDelta = 0;
|
||||
NameNode.stateChangeLog.warn("BLOCK* addStoredBlock: "
|
||||
+ "Redundant addStoredBlock request received for " + storedBlock
|
||||
+ " on " + node.getName() + " size " + storedBlock.getNumBytes());
|
||||
+ " on " + node + " size " + storedBlock.getNumBytes());
|
||||
}
|
||||
|
||||
// Now check for completion of blocks and safe block count
|
||||
|
@ -2037,7 +2037,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
|
||||
StringBuilder sb = new StringBuilder(500);
|
||||
sb.append("BLOCK* addStoredBlock: blockMap updated: ")
|
||||
.append(node.getName())
|
||||
.append(node)
|
||||
.append(" is added to ");
|
||||
storedBlock.appendStringTo(sb);
|
||||
sb.append(" size " )
|
||||
|
@ -2071,7 +2071,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
} catch (IOException e) {
|
||||
NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
|
||||
"error in deleting bad block " + blk +
|
||||
" on " + node + e);
|
||||
" on " + node, e);
|
||||
gotException = true;
|
||||
}
|
||||
}
|
||||
|
@ -2337,7 +2337,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
//
|
||||
addToInvalidates(b, cur);
|
||||
NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
|
||||
+"("+cur.getName()+", "+b+") is added to invalidated blocks set.");
|
||||
+"("+cur+", "+b+") is added to invalidated blocks set.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2352,7 +2352,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
excessBlocksCount++;
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("BLOCK* addToExcessReplicate:"
|
||||
+ " (" + dn.getName() + ", " + block
|
||||
+ " (" + dn + ", " + block
|
||||
+ ") is added to excessReplicateMap");
|
||||
}
|
||||
}
|
||||
|
@ -2365,7 +2365,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
public void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
|
||||
+ block + " from " + node.getName());
|
||||
+ block + " from " + node);
|
||||
}
|
||||
assert (namesystem.hasWriteLock());
|
||||
{
|
||||
|
@ -2478,7 +2478,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
}
|
||||
for (Block b : toInvalidate) {
|
||||
NameNode.stateChangeLog.info("BLOCK* addBlock: block "
|
||||
+ b + " on " + node.getName() + " size " + b.getNumBytes()
|
||||
+ b + " on " + node + " size " + b.getNumBytes()
|
||||
+ " does not belong to any file.");
|
||||
addToInvalidates(b, node);
|
||||
}
|
||||
|
@ -2506,7 +2506,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
NameNode.stateChangeLog
|
||||
.warn("BLOCK* processIncrementalBlockReport"
|
||||
+ " is received from dead or unregistered node "
|
||||
+ nodeID.getName());
|
||||
+ nodeID);
|
||||
throw new IOException(
|
||||
"Got incremental block report from unregistered or dead node");
|
||||
}
|
||||
|
@ -2528,7 +2528,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
break;
|
||||
default:
|
||||
String msg =
|
||||
"Unknown block status code reported by " + nodeID.getName() +
|
||||
"Unknown block status code reported by " + nodeID +
|
||||
": " + rdbi;
|
||||
NameNode.stateChangeLog.warn(msg);
|
||||
assert false : msg; // if assertions are enabled, throw.
|
||||
|
@ -2537,14 +2537,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("BLOCK* block "
|
||||
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
|
||||
+ " is received from " + nodeID.getName());
|
||||
+ " is received from " + nodeID);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
NameNode.stateChangeLog
|
||||
.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
|
||||
+ nodeID.getName()
|
||||
+ nodeID
|
||||
+ " receiving: " + receiving + ", "
|
||||
+ " received: " + received + ", "
|
||||
+ " deleted: " + deleted);
|
||||
|
@ -2620,7 +2620,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
StringBuilder nodeList = new StringBuilder();
|
||||
while (nodeIter.hasNext()) {
|
||||
DatanodeDescriptor node = nodeIter.next();
|
||||
nodeList.append(node.getName());
|
||||
nodeList.append(node);
|
||||
nodeList.append(" ");
|
||||
}
|
||||
LOG.info("Block: " + block + ", Expected Replicas: "
|
||||
|
@ -2630,7 +2630,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
+ ", excess replicas: " + num.excessReplicas()
|
||||
+ ", Is Open File: " + fileINode.isUnderConstruction()
|
||||
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
|
||||
+ srcNode.getName() + ", Is current datanode decommissioning: "
|
||||
+ srcNode + ", Is current datanode decommissioning: "
|
||||
+ srcNode.isDecommissionInProgress());
|
||||
}
|
||||
|
||||
|
|
|
@ -65,14 +65,14 @@ public class CorruptReplicasMap{
|
|||
nodes.add(dn);
|
||||
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
|
||||
blk.getBlockName() +
|
||||
" added as corrupt on " + dn.getName() +
|
||||
" added as corrupt on " + dn +
|
||||
" by " + Server.getRemoteIp() +
|
||||
reasonText);
|
||||
} else {
|
||||
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
|
||||
"duplicate requested for " +
|
||||
blk.getBlockName() + " to add as corrupt " +
|
||||
"on " + dn.getName() +
|
||||
"on " + dn +
|
||||
" by " + Server.getRemoteIp() +
|
||||
reasonText);
|
||||
}
|
||||
|
|
|
@ -238,7 +238,7 @@ public class DatanodeManager {
|
|||
final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
|
||||
if (node == null)
|
||||
return null;
|
||||
if (!node.getName().equals(nodeID.getName())) {
|
||||
if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
|
||||
final UnregisteredNodeException e = new UnregisteredNodeException(
|
||||
nodeID, node);
|
||||
NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
|
||||
|
@ -270,7 +270,7 @@ public class DatanodeManager {
|
|||
networktopology.remove(nodeInfo);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("remove datanode " + nodeInfo.getName());
|
||||
LOG.debug("remove datanode " + nodeInfo);
|
||||
}
|
||||
namesystem.checkSafeMode();
|
||||
}
|
||||
|
@ -288,7 +288,7 @@ public class DatanodeManager {
|
|||
removeDatanode(descriptor);
|
||||
} else {
|
||||
NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
|
||||
+ node.getName() + " does not exist");
|
||||
+ node + " does not exist");
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
|
@ -306,7 +306,7 @@ public class DatanodeManager {
|
|||
}
|
||||
if (d != null && isDatanodeDead(d)) {
|
||||
NameNode.stateChangeLog.info(
|
||||
"BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
|
||||
"BLOCK* removeDeadDatanode: lost heartbeat from " + d);
|
||||
removeDatanode(d);
|
||||
}
|
||||
}
|
||||
|
@ -332,7 +332,7 @@ public class DatanodeManager {
|
|||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
|
||||
+ "node " + node.getName() + " is added to datanodeMap.");
|
||||
+ "node " + node + " is added to datanodeMap.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -344,7 +344,7 @@ public class DatanodeManager {
|
|||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
|
||||
+ node.getName() + "): storage " + key
|
||||
+ node + "): storage " + key
|
||||
+ " is removed from datanodeMap.");
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +354,7 @@ public class DatanodeManager {
|
|||
List<String> names = new ArrayList<String>(1);
|
||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||
// get the node's IP address
|
||||
names.add(node.getHost());
|
||||
names.add(node.getIpAddr());
|
||||
} else {
|
||||
// get the node's host name
|
||||
String hostName = node.getHostName();
|
||||
|
@ -376,12 +376,12 @@ public class DatanodeManager {
|
|||
node.setNetworkLocation(networkLocation);
|
||||
}
|
||||
|
||||
private boolean inHostsList(DatanodeID node, String ipAddr) {
|
||||
return checkInList(node, ipAddr, hostsReader.getHosts(), false);
|
||||
private boolean inHostsList(DatanodeID node) {
|
||||
return checkInList(node, hostsReader.getHosts(), false);
|
||||
}
|
||||
|
||||
private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
|
||||
return checkInList(node, ipAddr, hostsReader.getExcludedHosts(), true);
|
||||
private boolean inExcludedHostsList(DatanodeID node) {
|
||||
return checkInList(node, hostsReader.getExcludedHosts(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -419,7 +419,7 @@ public class DatanodeManager {
|
|||
|
||||
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
|
||||
DatanodeDescriptor node = it.next();
|
||||
if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
|
||||
if ((!inHostsList(node)) && (!inExcludedHostsList(node))
|
||||
&& node.isDecommissioned()) {
|
||||
// Include list is not empty, an existing datanode does not appear
|
||||
// in both include or exclude lists and it has been decommissioned.
|
||||
|
@ -430,37 +430,23 @@ public class DatanodeManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if the given node (of DatanodeID or ipAddress) is in the (include or
|
||||
* exclude) list. If ipAddress in null, check only based upon the given
|
||||
* DatanodeID. If ipAddress is not null, the ipAddress should refers to the
|
||||
* same host that given DatanodeID refers to.
|
||||
* Check if the given DatanodeID is in the given (include or exclude) list.
|
||||
*
|
||||
* @param node, the host DatanodeID
|
||||
* @param ipAddress, if not null, should refers to the same host
|
||||
* that DatanodeID refers to
|
||||
* @param hostsList, the list of hosts in the include/exclude file
|
||||
* @param isExcludeList, boolean, true if this is the exclude list
|
||||
* @return boolean, if in the list
|
||||
* @param node the DatanodeID to check
|
||||
* @param hostsList the list of hosts in the include/exclude file
|
||||
* @param isExcludeList true if this is the exclude list
|
||||
* @return true if the node is in the list, false otherwise
|
||||
*/
|
||||
private static boolean checkInList(final DatanodeID node,
|
||||
final String ipAddress,
|
||||
final Set<String> hostsList,
|
||||
final boolean isExcludeList) {
|
||||
final InetAddress iaddr;
|
||||
if (ipAddress != null) {
|
||||
try {
|
||||
iaddr = InetAddress.getByName(ipAddress);
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("Unknown ip address: " + ipAddress, e);
|
||||
return isExcludeList;
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
iaddr = InetAddress.getByName(node.getHost());
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("Unknown host: " + node.getHost(), e);
|
||||
return isExcludeList;
|
||||
}
|
||||
|
||||
try {
|
||||
iaddr = InetAddress.getByName(node.getIpAddr());
|
||||
} catch (UnknownHostException e) {
|
||||
LOG.warn("Unknown IP: " + node.getIpAddr(), e);
|
||||
return isExcludeList;
|
||||
}
|
||||
|
||||
// if include list is empty, host is in include list
|
||||
|
@ -470,10 +456,10 @@ public class DatanodeManager {
|
|||
return // compare ipaddress(:port)
|
||||
(hostsList.contains(iaddr.getHostAddress().toString()))
|
||||
|| (hostsList.contains(iaddr.getHostAddress().toString() + ":"
|
||||
+ node.getPort()))
|
||||
+ node.getXferPort()))
|
||||
// compare hostname(:port)
|
||||
|| (hostsList.contains(iaddr.getHostName()))
|
||||
|| (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
|
||||
|| (hostsList.contains(iaddr.getHostName() + ":" + node.getXferPort()))
|
||||
|| ((node instanceof DatanodeInfo) && hostsList
|
||||
.contains(((DatanodeInfo) node).getHostName()));
|
||||
}
|
||||
|
@ -483,7 +469,7 @@ public class DatanodeManager {
|
|||
*/
|
||||
private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) {
|
||||
// If the registered node is in exclude list, then decommission it
|
||||
if (inExcludedHostsList(nodeReg, ipAddr)) {
|
||||
if (inExcludedHostsList(nodeReg)) {
|
||||
startDecommission(nodeReg);
|
||||
}
|
||||
}
|
||||
|
@ -498,7 +484,7 @@ public class DatanodeManager {
|
|||
if (node.isDecommissionInProgress()) {
|
||||
if (!blockManager.isReplicationInProgress(node)) {
|
||||
node.setDecommissioned();
|
||||
LOG.info("Decommission complete for node " + node.getName());
|
||||
LOG.info("Decommission complete for node " + node);
|
||||
}
|
||||
}
|
||||
return node.isDecommissioned();
|
||||
|
@ -507,7 +493,7 @@ public class DatanodeManager {
|
|||
/** Start decommissioning the specified datanode. */
|
||||
private void startDecommission(DatanodeDescriptor node) {
|
||||
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
||||
LOG.info("Start Decommissioning node " + node.getName() + " with " +
|
||||
LOG.info("Start Decommissioning node " + node + " with " +
|
||||
node.numBlocks() + " blocks.");
|
||||
heartbeatManager.startDecommission(node);
|
||||
node.decommissioningStatus.setStartTime(now());
|
||||
|
@ -520,7 +506,7 @@ public class DatanodeManager {
|
|||
/** Stop decommissioning the specified datanodes. */
|
||||
void stopDecommission(DatanodeDescriptor node) {
|
||||
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
||||
LOG.info("Stop Decommissioning node " + node.getName());
|
||||
LOG.info("Stop Decommissioning node " + node);
|
||||
heartbeatManager.stopDecommission(node);
|
||||
blockManager.processOverReplicatedBlocksOnReCommission(node);
|
||||
}
|
||||
|
@ -558,30 +544,30 @@ public class DatanodeManager {
|
|||
if (dnAddress == null) {
|
||||
// Mostly called inside an RPC.
|
||||
// But if not, use address passed by the data-node.
|
||||
dnAddress = nodeReg.getHost();
|
||||
}
|
||||
dnAddress = nodeReg.getIpAddr();
|
||||
}
|
||||
|
||||
// Update the IP to the address of the RPC request that is
|
||||
// registering this datanode.
|
||||
nodeReg.setIpAddr(dnAddress);
|
||||
nodeReg.setExportedKeys(blockManager.getBlockKeys());
|
||||
|
||||
// Checks if the node is not on the hosts list. If it is not, then
|
||||
// it will be disallowed from registering.
|
||||
if (!inHostsList(nodeReg, dnAddress)) {
|
||||
if (!inHostsList(nodeReg)) {
|
||||
throw new DisallowedDatanodeException(nodeReg);
|
||||
}
|
||||
|
||||
// Update "name" with the IP address of the RPC request that
|
||||
// is registering this datanode.
|
||||
nodeReg.setName(dnAddress + ":" + nodeReg.getPort());
|
||||
nodeReg.setExportedKeys(blockManager.getBlockKeys());
|
||||
|
||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
|
||||
+ "node registration from " + nodeReg.getName()
|
||||
+ "node registration from " + nodeReg
|
||||
+ " storage " + nodeReg.getStorageID());
|
||||
|
||||
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
||||
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
|
||||
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
|
||||
|
||||
if (nodeN != null && nodeN != nodeS) {
|
||||
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
|
||||
+ "node from name: " + nodeN.getName());
|
||||
+ "node from name: " + nodeN);
|
||||
// nodeN previously served a different data storage,
|
||||
// which is not served by anybody anymore.
|
||||
removeDatanode(nodeN);
|
||||
|
@ -610,8 +596,8 @@ public class DatanodeManager {
|
|||
but this is might not work if VERSION file format has changed
|
||||
*/
|
||||
NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
|
||||
+ "node " + nodeS.getName()
|
||||
+ " is replaced by " + nodeReg.getName() +
|
||||
+ "node " + nodeS
|
||||
+ " is replaced by " + nodeReg +
|
||||
" with the same storageID " +
|
||||
nodeReg.getStorageID());
|
||||
}
|
||||
|
@ -691,10 +677,10 @@ public class DatanodeManager {
|
|||
private void refreshDatanodes() throws IOException {
|
||||
for(DatanodeDescriptor node : datanodeMap.values()) {
|
||||
// Check if not include.
|
||||
if (!inHostsList(node, null)) {
|
||||
if (!inHostsList(node)) {
|
||||
node.setDisallowed(true); // case 2.
|
||||
} else {
|
||||
if (inExcludedHostsList(node, null)) {
|
||||
if (inExcludedHostsList(node)) {
|
||||
startDecommission(node); // case 3.
|
||||
} else {
|
||||
stopDecommission(node); // case 4.
|
||||
|
@ -821,16 +807,16 @@ public class DatanodeManager {
|
|||
}
|
||||
//Remove any form of the this datanode in include/exclude lists.
|
||||
try {
|
||||
InetAddress inet = InetAddress.getByName(dn.getHost());
|
||||
InetAddress inet = InetAddress.getByName(dn.getIpAddr());
|
||||
// compare hostname(:port)
|
||||
mustList.remove(inet.getHostName());
|
||||
mustList.remove(inet.getHostName()+":"+dn.getPort());
|
||||
mustList.remove(inet.getHostName()+":"+dn.getXferPort());
|
||||
// compare ipaddress(:port)
|
||||
mustList.remove(inet.getHostAddress().toString());
|
||||
mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
|
||||
mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getXferPort());
|
||||
} catch ( UnknownHostException e ) {
|
||||
mustList.remove(dn.getName());
|
||||
mustList.remove(dn.getHost());
|
||||
mustList.remove(dn.getIpAddr());
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,10 @@ class Host2NodesMap {
|
|||
return false;
|
||||
}
|
||||
|
||||
String host = node.getHost();
|
||||
String ipAddr = node.getIpAddr();
|
||||
hostmapLock.readLock().lock();
|
||||
try {
|
||||
DatanodeDescriptor[] nodes = map.get(host);
|
||||
DatanodeDescriptor[] nodes = map.get(ipAddr);
|
||||
if (nodes != null) {
|
||||
for(DatanodeDescriptor containedNode:nodes) {
|
||||
if (node==containedNode) {
|
||||
|
@ -66,8 +66,8 @@ class Host2NodesMap {
|
|||
return false;
|
||||
}
|
||||
|
||||
String host = node.getHost();
|
||||
DatanodeDescriptor[] nodes = map.get(host);
|
||||
String ipAddr = node.getIpAddr();
|
||||
DatanodeDescriptor[] nodes = map.get(ipAddr);
|
||||
DatanodeDescriptor[] newNodes;
|
||||
if (nodes==null) {
|
||||
newNodes = new DatanodeDescriptor[1];
|
||||
|
@ -77,7 +77,7 @@ class Host2NodesMap {
|
|||
System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
|
||||
newNodes[nodes.length] = node;
|
||||
}
|
||||
map.put(host, newNodes);
|
||||
map.put(ipAddr, newNodes);
|
||||
return true;
|
||||
} finally {
|
||||
hostmapLock.writeLock().unlock();
|
||||
|
@ -92,17 +92,17 @@ class Host2NodesMap {
|
|||
return false;
|
||||
}
|
||||
|
||||
String host = node.getHost();
|
||||
String ipAddr = node.getIpAddr();
|
||||
hostmapLock.writeLock().lock();
|
||||
try {
|
||||
|
||||
DatanodeDescriptor[] nodes = map.get(host);
|
||||
DatanodeDescriptor[] nodes = map.get(ipAddr);
|
||||
if (nodes==null) {
|
||||
return false;
|
||||
}
|
||||
if (nodes.length==1) {
|
||||
if (nodes[0]==node) {
|
||||
map.remove(host);
|
||||
map.remove(ipAddr);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
|
@ -122,7 +122,7 @@ class Host2NodesMap {
|
|||
newNodes = new DatanodeDescriptor[nodes.length-1];
|
||||
System.arraycopy(nodes, 0, newNodes, 0, i);
|
||||
System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
|
||||
map.put(host, newNodes);
|
||||
map.put(ipAddr, newNodes);
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
|
@ -130,17 +130,18 @@ class Host2NodesMap {
|
|||
}
|
||||
}
|
||||
|
||||
/** get a data node by its host.
|
||||
* @return DatanodeDescriptor if found; otherwise null.
|
||||
/**
|
||||
* Get a data node by its IP address.
|
||||
* @return DatanodeDescriptor if found, null otherwise
|
||||
*/
|
||||
DatanodeDescriptor getDatanodeByHost(String host) {
|
||||
if (host==null) {
|
||||
DatanodeDescriptor getDatanodeByHost(String ipAddr) {
|
||||
if (ipAddr == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
hostmapLock.readLock().lock();
|
||||
try {
|
||||
DatanodeDescriptor[] nodes = map.get(host);
|
||||
DatanodeDescriptor[] nodes = map.get(ipAddr);
|
||||
// no entry
|
||||
if (nodes== null) {
|
||||
return null;
|
||||
|
@ -155,40 +156,4 @@ class Host2NodesMap {
|
|||
hostmapLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find data node by its name.
|
||||
*
|
||||
* @return DatanodeDescriptor if found or null otherwise
|
||||
*/
|
||||
public DatanodeDescriptor getDatanodeByName(String name) {
|
||||
if (name==null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int colon = name.indexOf(":");
|
||||
String host;
|
||||
if (colon < 0) {
|
||||
host = name;
|
||||
} else {
|
||||
host = name.substring(0, colon);
|
||||
}
|
||||
|
||||
hostmapLock.readLock().lock();
|
||||
try {
|
||||
DatanodeDescriptor[] nodes = map.get(host);
|
||||
// no entry
|
||||
if (nodes== null) {
|
||||
return null;
|
||||
}
|
||||
for(DatanodeDescriptor containedNode:nodes) {
|
||||
if (name.equals(containedNode.getName())) {
|
||||
return containedNode;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
hostmapLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ class InvalidateBlocks {
|
|||
numBlocks++;
|
||||
if (log) {
|
||||
NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
|
||||
+ ": add " + block + " to " + datanode.getName());
|
||||
+ ": add " + block + " to " + datanode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +113,8 @@ class InvalidateBlocks {
|
|||
for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
|
||||
final LightWeightHashSet<Block> blocks = entry.getValue();
|
||||
if (blocks.size() > 0) {
|
||||
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
|
||||
out.println(datanodeManager.getDatanode(entry.getKey()));
|
||||
out.println(blocks);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -137,7 +138,7 @@ class InvalidateBlocks {
|
|||
|
||||
if (NameNode.stateChangeLog.isInfoEnabled()) {
|
||||
NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
|
||||
+ ": ask " + dn.getName() + " to delete " + toInvalidate);
|
||||
+ ": ask " + dn + " to delete " + toInvalidate);
|
||||
}
|
||||
return toInvalidate.size();
|
||||
}
|
||||
|
|
|
@ -88,9 +88,6 @@ public class JspHelper {
|
|||
private static class NodeRecord extends DatanodeInfo {
|
||||
int frequency;
|
||||
|
||||
public NodeRecord() {
|
||||
frequency = -1;
|
||||
}
|
||||
public NodeRecord(DatanodeInfo info, int count) {
|
||||
super(info);
|
||||
this.frequency = count;
|
||||
|
@ -172,7 +169,7 @@ public class JspHelper {
|
|||
|
||||
//just ping to check whether the node is alive
|
||||
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
|
||||
chosenNode.getHost() + ":" + chosenNode.getInfoPort());
|
||||
chosenNode.getInfoAddr());
|
||||
|
||||
try {
|
||||
s = NetUtils.getDefaultSocketFactory(conf).createSocket();
|
||||
|
|
|
@ -672,7 +672,9 @@ public class DataNode extends Configured
|
|||
* @param nsInfo the namespace info from the first part of the NN handshake
|
||||
*/
|
||||
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
|
||||
DatanodeRegistration bpRegistration = new DatanodeRegistration(getXferAddr());
|
||||
final String xferIp = streamingAddr.getAddress().getHostAddress();
|
||||
DatanodeRegistration bpRegistration = new DatanodeRegistration(xferIp);
|
||||
bpRegistration.setXferPort(getXferPort());
|
||||
bpRegistration.setInfoPort(getInfoPort());
|
||||
bpRegistration.setIpcPort(getIpcPort());
|
||||
bpRegistration.setHostName(hostName);
|
||||
|
@ -707,7 +709,7 @@ public class DataNode extends Configured
|
|||
storage.setStorageID(bpRegistration.getStorageID());
|
||||
storage.writeAll();
|
||||
LOG.info("New storage id " + bpRegistration.getStorageID()
|
||||
+ " is assigned to data-node " + bpRegistration.getName());
|
||||
+ " is assigned to data-node " + bpRegistration);
|
||||
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
|
||||
throw new IOException("Inconsistent storage IDs. Name-node returned "
|
||||
+ bpRegistration.getStorageID()
|
||||
|
@ -877,13 +879,6 @@ public class DataNode extends Configured
|
|||
return streamingAddr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the IP:port to report to the NN for data transfer
|
||||
*/
|
||||
private String getXferAddr() {
|
||||
return streamingAddr.getAddress().getHostAddress() + ":" + getXferPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the datanode's IPC port
|
||||
*/
|
||||
|
@ -926,8 +921,8 @@ public class DataNode extends Configured
|
|||
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
|
||||
DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
|
||||
throws IOException {
|
||||
final InetSocketAddress addr = NetUtils.createSocketAddr(
|
||||
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
||||
final InetSocketAddress addr =
|
||||
NetUtils.createSocketAddr(datanodeid.getIpcAddr());
|
||||
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
|
||||
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
|
||||
}
|
||||
|
@ -951,7 +946,7 @@ public class DataNode extends Configured
|
|||
|
||||
public static void setNewStorageID(DatanodeID dnId) {
|
||||
LOG.info("Datanode is " + dnId);
|
||||
dnId.setStorageID(createNewStorageId(dnId.getPort()));
|
||||
dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
|
||||
}
|
||||
|
||||
static String createNewStorageId(int port) {
|
||||
|
@ -1227,7 +1222,7 @@ public class DataNode extends Configured
|
|||
if (LOG.isInfoEnabled()) {
|
||||
StringBuilder xfersBuilder = new StringBuilder();
|
||||
for (int i = 0; i < numTargets; i++) {
|
||||
xfersBuilder.append(xferTargets[i].getName());
|
||||
xfersBuilder.append(xferTargets[i]);
|
||||
xfersBuilder.append(" ");
|
||||
}
|
||||
LOG.info(bpReg + " Starting thread to transfer block " +
|
||||
|
@ -1385,7 +1380,7 @@ public class DataNode extends Configured
|
|||
|
||||
try {
|
||||
InetSocketAddress curTarget =
|
||||
NetUtils.createSocketAddr(targets[0].getName());
|
||||
NetUtils.createSocketAddr(targets[0].getXferAddr());
|
||||
sock = newSocket();
|
||||
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
|
||||
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
|
||||
|
@ -1438,9 +1433,8 @@ public class DataNode extends Configured
|
|||
}
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.warn(
|
||||
bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
|
||||
+ " got ", ie);
|
||||
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
|
||||
targets[0] + " got ", ie);
|
||||
// check if there are any disk problem
|
||||
checkDiskError();
|
||||
|
||||
|
@ -1994,9 +1988,9 @@ public class DataNode extends Configured
|
|||
|
||||
private static void logRecoverBlock(String who,
|
||||
ExtendedBlock block, DatanodeID[] targets) {
|
||||
StringBuilder msg = new StringBuilder(targets[0].getName());
|
||||
StringBuilder msg = new StringBuilder(targets[0].toString());
|
||||
for (int i = 1; i < targets.length; i++) {
|
||||
msg.append(", " + targets[i].getName());
|
||||
msg.append(", " + targets[i]);
|
||||
}
|
||||
LOG.info(who + " calls recoverBlock(block=" + block
|
||||
+ ", targets=[" + msg + "])");
|
||||
|
|
|
@ -352,7 +352,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
if (targets.length > 0) {
|
||||
InetSocketAddress mirrorTarget = null;
|
||||
// Connect to backup machine
|
||||
mirrorNode = targets[0].getName();
|
||||
mirrorNode = targets[0].getXferAddr();
|
||||
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
||||
mirrorSock = datanode.newSocket();
|
||||
try {
|
||||
|
@ -667,8 +667,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
try {
|
||||
// get the output stream to the proxy
|
||||
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
|
||||
proxySource.getName());
|
||||
InetSocketAddress proxyAddr =
|
||||
NetUtils.createSocketAddr(proxySource.getXferAddr());
|
||||
proxySock = datanode.newSocket();
|
||||
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
|
||||
proxySock.setSoTimeout(dnConf.socketTimeout);
|
||||
|
@ -820,7 +820,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
|
||||
DatanodeRegistration dnR =
|
||||
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
|
||||
resp.setFirstBadLink(dnR.getName());
|
||||
resp.setFirstBadLink(dnR.getXferAddr());
|
||||
}
|
||||
resp.build().writeDelimitedTo(out);
|
||||
out.flush();
|
||||
|
|
|
@ -136,10 +136,8 @@ public class DatanodeJspHelper {
|
|||
out.print("Empty file");
|
||||
} else {
|
||||
DatanodeInfo chosenNode = JspHelper.bestNode(firstBlock, conf);
|
||||
String fqdn = canonicalize(chosenNode.getHost());
|
||||
String datanodeAddr = chosenNode.getName();
|
||||
int datanodePort = Integer.parseInt(datanodeAddr.substring(
|
||||
datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
|
||||
String fqdn = canonicalize(chosenNode.getIpAddr());
|
||||
int datanodePort = chosenNode.getXferPort();
|
||||
String redirectLocation = "http://" + fqdn + ":"
|
||||
+ chosenNode.getInfoPort() + "/browseBlock.jsp?blockId="
|
||||
+ firstBlock.getBlock().getBlockId() + "&blockSize="
|
||||
|
@ -313,7 +311,7 @@ public class DatanodeJspHelper {
|
|||
dfs.close();
|
||||
return;
|
||||
}
|
||||
String fqdn = canonicalize(chosenNode.getHost());
|
||||
String fqdn = canonicalize(chosenNode.getIpAddr());
|
||||
String tailUrl = "http://" + fqdn + ":" + chosenNode.getInfoPort()
|
||||
+ "/tail.jsp?filename=" + URLEncoder.encode(filename, "UTF-8")
|
||||
+ "&namenodeInfoPort=" + namenodeInfoPort
|
||||
|
@ -360,10 +358,9 @@ public class DatanodeJspHelper {
|
|||
out.print("<td>" + blockidstring + ":</td>");
|
||||
DatanodeInfo[] locs = cur.getLocations();
|
||||
for (int j = 0; j < locs.length; j++) {
|
||||
String datanodeAddr = locs[j].getName();
|
||||
datanodePort = Integer.parseInt(datanodeAddr.substring(datanodeAddr
|
||||
.indexOf(':') + 1, datanodeAddr.length()));
|
||||
fqdn = canonicalize(locs[j].getHost());
|
||||
String datanodeAddr = locs[j].getXferAddr();
|
||||
datanodePort = locs[j].getXferPort();
|
||||
fqdn = canonicalize(locs[j].getIpAddr());
|
||||
String blockUrl = "http://" + fqdn + ":" + locs[j].getInfoPort()
|
||||
+ "/browseBlock.jsp?blockId=" + blockidstring
|
||||
+ "&blockSize=" + blockSize
|
||||
|
@ -519,10 +516,8 @@ public class DatanodeJspHelper {
|
|||
nextStartOffset = 0;
|
||||
nextBlockSize = nextBlock.getBlock().getNumBytes();
|
||||
DatanodeInfo d = JspHelper.bestNode(nextBlock, conf);
|
||||
String datanodeAddr = d.getName();
|
||||
nextDatanodePort = Integer.parseInt(datanodeAddr.substring(
|
||||
datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
|
||||
nextHost = d.getHost();
|
||||
nextDatanodePort = d.getXferPort();
|
||||
nextHost = d.getIpAddr();
|
||||
nextPort = d.getInfoPort();
|
||||
}
|
||||
}
|
||||
|
@ -573,10 +568,8 @@ public class DatanodeJspHelper {
|
|||
prevStartOffset = 0;
|
||||
prevBlockSize = prevBlock.getBlock().getNumBytes();
|
||||
DatanodeInfo d = JspHelper.bestNode(prevBlock, conf);
|
||||
String datanodeAddr = d.getName();
|
||||
prevDatanodePort = Integer.parseInt(datanodeAddr.substring(
|
||||
datanodeAddr.indexOf(':') + 1, datanodeAddr.length()));
|
||||
prevHost = d.getHost();
|
||||
prevDatanodePort = d.getXferPort();
|
||||
prevHost = d.getIpAddr();
|
||||
prevPort = d.getInfoPort();
|
||||
}
|
||||
}
|
||||
|
@ -693,7 +686,8 @@ public class DatanodeJspHelper {
|
|||
dfs.close();
|
||||
return;
|
||||
}
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(chosenNode.getName());
|
||||
InetSocketAddress addr =
|
||||
NetUtils.createSocketAddr(chosenNode.getXferAddr());
|
||||
// view the last chunkSizeToView bytes while Tailing
|
||||
final long startOffset = blockSize >= chunkSizeToView ? blockSize
|
||||
- chunkSizeToView : 0;
|
||||
|
|
|
@ -59,7 +59,7 @@ public class FileChecksumServlets {
|
|||
HttpServletRequest request, NameNode nn)
|
||||
throws IOException {
|
||||
final String hostname = host instanceof DatanodeInfo
|
||||
? ((DatanodeInfo)host).getHostName() : host.getHost();
|
||||
? ((DatanodeInfo)host).getHostName() : host.getIpAddr();
|
||||
final String scheme = request.getScheme();
|
||||
final int port = "https".equals(scheme)
|
||||
? (Integer)getServletContext().getAttribute("datanode.https.port")
|
||||
|
|
|
@ -59,7 +59,7 @@ public class FileDataServlet extends DfsServlet {
|
|||
if (host instanceof DatanodeInfo) {
|
||||
hostname = ((DatanodeInfo)host).getHostName();
|
||||
} else {
|
||||
hostname = host.getHost();
|
||||
hostname = host.getIpAddr();
|
||||
}
|
||||
final int port = "https".equals(scheme)
|
||||
? (Integer)getServletContext().getAttribute("datanode.https.port")
|
||||
|
|
|
@ -855,7 +855,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
BlockListAsLongs blist = new BlockListAsLongs(reports[0].getBlocks());
|
||||
if(stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
|
||||
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
|
||||
+ "from " + nodeReg + " " + blist.getNumberOfBlocks()
|
||||
+ " blocks");
|
||||
}
|
||||
|
||||
|
@ -871,7 +871,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
verifyRequest(nodeReg);
|
||||
if(stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
|
||||
+"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
|
||||
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
|
||||
+" blocks.");
|
||||
}
|
||||
namesystem.getBlockManager().processIncrementalBlockReport(
|
||||
|
@ -881,7 +881,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override // DatanodeProtocol
|
||||
public void errorReport(DatanodeRegistration nodeReg,
|
||||
int errorCode, String msg) throws IOException {
|
||||
String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
|
||||
String dnName =
|
||||
(nodeReg == null) ? "Unknown DataNode" : nodeReg.toString();
|
||||
|
||||
if (errorCode == DatanodeProtocol.NOTIFY) {
|
||||
LOG.info("Error report from " + dnName + ": " + msg);
|
||||
|
@ -910,13 +911,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
}
|
||||
|
||||
/**
|
||||
* Verify request.
|
||||
* Verifies the given registration.
|
||||
*
|
||||
* Verifies correctness of the datanode version, registration ID, and
|
||||
* if the datanode does not need to be shutdown.
|
||||
*
|
||||
* @param nodeReg data node registration
|
||||
* @throws IOException
|
||||
* @param nodeReg node registration
|
||||
* @throws UnregisteredNodeException if the registration is invalid
|
||||
*/
|
||||
void verifyRequest(NodeRegistration nodeReg) throws IOException {
|
||||
verifyVersion(nodeReg.getVersion());
|
||||
|
|
|
@ -496,7 +496,7 @@ public class NamenodeFsck {
|
|||
|
||||
try {
|
||||
chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
|
||||
targetAddr = NetUtils.createSocketAddr(chosenNode.getName());
|
||||
targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
|
||||
} catch (IOException ie) {
|
||||
if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
|
||||
throw new IOException("Could not obtain block " + lblock);
|
||||
|
|
|
@ -260,14 +260,14 @@ class NamenodeJspHelper {
|
|||
// Find out common suffix. Should this be before or after the sort?
|
||||
String port_suffix = null;
|
||||
if (live.size() > 0) {
|
||||
String name = live.get(0).getName();
|
||||
String name = live.get(0).getXferAddr();
|
||||
int idx = name.indexOf(':');
|
||||
if (idx > 0) {
|
||||
port_suffix = name.substring(idx);
|
||||
}
|
||||
|
||||
for (int i = 1; port_suffix != null && i < live.size(); i++) {
|
||||
if (live.get(i).getName().endsWith(port_suffix) == false) {
|
||||
if (live.get(i).getXferAddr().endsWith(port_suffix) == false) {
|
||||
port_suffix = null;
|
||||
break;
|
||||
}
|
||||
|
@ -404,7 +404,7 @@ class NamenodeJspHelper {
|
|||
final String nodeToRedirect;
|
||||
int redirectPort;
|
||||
if (datanode != null) {
|
||||
nodeToRedirect = datanode.getHost();
|
||||
nodeToRedirect = datanode.getIpAddr();
|
||||
redirectPort = datanode.getInfoPort();
|
||||
} else {
|
||||
nodeToRedirect = nn.getHttpAddress().getHostName();
|
||||
|
@ -466,14 +466,14 @@ class NamenodeJspHelper {
|
|||
+ URLEncoder.encode("/", "UTF-8")
|
||||
+ JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnaddr);
|
||||
|
||||
String name = d.getHostName() + ":" + d.getPort();
|
||||
String name = d.getXferAddrWithHostname();
|
||||
if (!name.matches("\\d+\\.\\d+.\\d+\\.\\d+.*"))
|
||||
name = name.replaceAll("\\.[^.:]*", "");
|
||||
int idx = (suffix != null && name.endsWith(suffix)) ? name
|
||||
.indexOf(suffix) : -1;
|
||||
|
||||
out.print(rowTxt() + "<td class=\"name\"><a title=\"" + d.getHost() + ":"
|
||||
+ d.getPort() + "\" href=\"" + url + "\">"
|
||||
out.print(rowTxt() + "<td class=\"name\"><a title=\"" + d.getXferAddr()
|
||||
+ "\" href=\"" + url + "\">"
|
||||
+ ((idx > 0) ? name.substring(0, idx) : name) + "</a>"
|
||||
+ ((alive) ? "" : "\n"));
|
||||
}
|
||||
|
@ -599,14 +599,14 @@ class NamenodeJspHelper {
|
|||
// Find out common suffix. Should this be before or after the sort?
|
||||
String port_suffix = null;
|
||||
if (live.size() > 0) {
|
||||
String name = live.get(0).getName();
|
||||
String name = live.get(0).getXferAddr();
|
||||
int idx = name.indexOf(':');
|
||||
if (idx > 0) {
|
||||
port_suffix = name.substring(idx);
|
||||
}
|
||||
|
||||
for (int i = 1; port_suffix != null && i < live.size(); i++) {
|
||||
if (live.get(i).getName().endsWith(port_suffix) == false) {
|
||||
if (live.get(i).getXferAddr().endsWith(port_suffix) == false) {
|
||||
port_suffix = null;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -62,8 +62,8 @@ implements Writable, NodeRegistration {
|
|||
/**
|
||||
* Create DatanodeRegistration
|
||||
*/
|
||||
public DatanodeRegistration(String nodeName) {
|
||||
this(nodeName, new StorageInfo(), new ExportedBlockKeys());
|
||||
public DatanodeRegistration(String ipAddr) {
|
||||
this(ipAddr, new StorageInfo(), new ExportedBlockKeys());
|
||||
}
|
||||
|
||||
public DatanodeRegistration(DatanodeID dn, StorageInfo info,
|
||||
|
@ -73,9 +73,9 @@ implements Writable, NodeRegistration {
|
|||
this.exportedKeys = keys;
|
||||
}
|
||||
|
||||
public DatanodeRegistration(String nodeName, StorageInfo info,
|
||||
public DatanodeRegistration(String ipAddr, StorageInfo info,
|
||||
ExportedBlockKeys keys) {
|
||||
super(nodeName);
|
||||
super(ipAddr);
|
||||
this.storageInfo = info;
|
||||
this.exportedKeys = keys;
|
||||
}
|
||||
|
@ -108,13 +108,13 @@ implements Writable, NodeRegistration {
|
|||
|
||||
@Override // NodeRegistration
|
||||
public String getAddress() {
|
||||
return getName();
|
||||
return getXferAddr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName()
|
||||
+ "(" + name
|
||||
+ "(" + ipAddr
|
||||
+ ", storageID=" + storageID
|
||||
+ ", infoPort=" + infoPort
|
||||
+ ", ipcPort=" + ipcPort
|
||||
|
|
|
@ -38,6 +38,6 @@ public class DisallowedDatanodeException extends IOException {
|
|||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public DisallowedDatanodeException(DatanodeID nodeID) {
|
||||
super("Datanode denied communication with namenode: " + nodeID.getName());
|
||||
super("Datanode denied communication with namenode: " + nodeID);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
public interface NodeRegistration {
|
||||
/**
|
||||
* Get address of the server node.
|
||||
* @return hostname:portNumber
|
||||
* @return ipAddr:portNumber
|
||||
*/
|
||||
public String getAddress();
|
||||
|
||||
|
|
|
@ -280,10 +280,11 @@ public class JsonUtil {
|
|||
}
|
||||
|
||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||
m.put("name", datanodeinfo.getName());
|
||||
m.put("ipAddr", datanodeinfo.getIpAddr());
|
||||
m.put("hostName", datanodeinfo.getHostName());
|
||||
m.put("storageID", datanodeinfo.getStorageID());
|
||||
m.put("xferPort", datanodeinfo.getXferPort());
|
||||
m.put("infoPort", datanodeinfo.getInfoPort());
|
||||
|
||||
m.put("ipcPort", datanodeinfo.getIpcPort());
|
||||
|
||||
m.put("capacity", datanodeinfo.getCapacity());
|
||||
|
@ -293,7 +294,6 @@ public class JsonUtil {
|
|||
m.put("lastUpdate", datanodeinfo.getLastUpdate());
|
||||
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
||||
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
||||
m.put("hostName", datanodeinfo.getHostName());
|
||||
m.put("adminState", datanodeinfo.getAdminState().name());
|
||||
return m;
|
||||
}
|
||||
|
@ -308,6 +308,7 @@ public class JsonUtil {
|
|||
(String)m.get("name"),
|
||||
(String)m.get("hostName"),
|
||||
(String)m.get("storageID"),
|
||||
(int)(long)(Long)m.get("xferPort"),
|
||||
(int)(long)(Long)m.get("infoPort"),
|
||||
(int)(long)(Long)m.get("ipcPort"),
|
||||
|
||||
|
|
|
@ -48,11 +48,12 @@ message BlockTokenIdentifierProto {
|
|||
* Identifies a Datanode
|
||||
*/
|
||||
message DatanodeIDProto {
|
||||
required string name = 1; // IP:port (data transfer port)
|
||||
required string ipAddr = 1; // IP address
|
||||
required string hostName = 2; // hostname
|
||||
required string storageID = 3; // unique storage id
|
||||
required uint32 infoPort = 4; // info server port
|
||||
required uint32 ipcPort = 5; // ipc server port
|
||||
required uint32 xferPort = 4; // data streaming port
|
||||
required uint32 infoPort = 5; // info server port
|
||||
required uint32 ipcPort = 6; // ipc server port
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -139,7 +139,7 @@ public class BlockReaderTestUtil {
|
|||
Socket sock = null;
|
||||
ExtendedBlock block = testBlock.getBlock();
|
||||
DatanodeInfo[] nodes = testBlock.getLocations();
|
||||
targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
|
||||
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
|
||||
sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
|
||||
sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
|
||||
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
|
|
@ -339,7 +339,7 @@ public class DFSTestUtil {
|
|||
}
|
||||
|
||||
/*
|
||||
* Wait up to 20s for the given DN (host:port) to be decommissioned.
|
||||
* Wait up to 20s for the given DN (IP:port) to be decommissioned
|
||||
*/
|
||||
public static void waitForDecommission(FileSystem fs, String name)
|
||||
throws IOException, InterruptedException, TimeoutException {
|
||||
|
@ -351,7 +351,7 @@ public class DFSTestUtil {
|
|||
Thread.sleep(1000);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
||||
for (DatanodeInfo info : dfs.getDataNodeStats()) {
|
||||
if (name.equals(info.getName())) {
|
||||
if (name.equals(info.getXferAddr())) {
|
||||
dn = info;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1439,7 +1439,7 @@ public class MiniDFSCluster {
|
|||
DataNode dn = dataNodes.get(i).datanode;
|
||||
LOG.info("DN name=" + dnName + " found DN=" + dn +
|
||||
" with name=" + dn.getDisplayName());
|
||||
if (dnName.equals(dn.getDatanodeId().getName())) {
|
||||
if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ public class TestClientReportBadBlock {
|
|||
final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
|
||||
corruptBlock(block, dn);
|
||||
LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
|
||||
+ dninfo.getName());
|
||||
+ dninfo);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -332,7 +332,7 @@ public class TestDFSClientRetries extends TestCase {
|
|||
LocatedBlock badLocatedBlock = new LocatedBlock(
|
||||
goodLocatedBlock.getBlock(),
|
||||
new DatanodeInfo[] {
|
||||
new DatanodeInfo(new DatanodeID("255.255.255.255:234"))
|
||||
new DatanodeInfo(new DatanodeID("255.255.255.255", 234))
|
||||
},
|
||||
goodLocatedBlock.getStartOffset(),
|
||||
false);
|
||||
|
@ -606,7 +606,7 @@ public class TestDFSClientRetries extends TestCase {
|
|||
cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE)
|
||||
.getLocatedBlocks();
|
||||
final DatanodeInfo first = locatedblocks.get(0).getLocations()[0];
|
||||
cluster.stopDataNode(first.getName());
|
||||
cluster.stopDataNode(first.getXferAddr());
|
||||
|
||||
//get checksum again
|
||||
final FileChecksum cs2 = fs.getFileChecksum(p);
|
||||
|
@ -627,7 +627,7 @@ public class TestDFSClientRetries extends TestCase {
|
|||
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
DatanodeID fakeDnId = new DatanodeID(
|
||||
"localhost:" + addr.getPort(), "localhost", "fake-storage", 0, addr.getPort());
|
||||
"localhost", "localhost", "fake-storage", addr.getPort(), 0, addr.getPort());
|
||||
|
||||
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
|
||||
|
|
|
@ -128,8 +128,7 @@ public class TestDataTransferProtocol extends TestCase {
|
|||
|
||||
if (eofExpected) {
|
||||
throw new IOException("Did not recieve IOException when an exception " +
|
||||
"is expected while reading from " +
|
||||
datanode.getName());
|
||||
"is expected while reading from " + datanode);
|
||||
}
|
||||
|
||||
byte[] needed = recvBuf.toByteArray();
|
||||
|
@ -215,7 +214,7 @@ public class TestDataTransferProtocol extends TestCase {
|
|||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
datanode = DataNodeTestUtils.getDNRegistrationForBP(
|
||||
cluster.getDataNodes().get(0), poolId);
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getName());
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
|
||||
/* Test writing to finalized replicas */
|
||||
|
@ -349,7 +348,7 @@ public class TestDataTransferProtocol extends TestCase {
|
|||
new InetSocketAddress("localhost", cluster.getNameNodePort()),
|
||||
conf);
|
||||
datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getName());
|
||||
dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
|
||||
int fileLen = Math.min(conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
|
||||
|
|
|
@ -389,9 +389,8 @@ public class TestDatanodeDeath extends TestCase {
|
|||
cluster.stopDataNode(victim);
|
||||
} else {
|
||||
int victim = datanodeToKill;
|
||||
System.out.println("SimpleTest stopping datanode " +
|
||||
targets[victim].getName());
|
||||
cluster.stopDataNode(targets[victim].getName());
|
||||
System.out.println("SimpleTest stopping datanode " + targets[victim]);
|
||||
cluster.stopDataNode(targets[victim].getXferAddr());
|
||||
}
|
||||
System.out.println("SimpleTest stopping datanode complete");
|
||||
|
||||
|
|
|
@ -151,27 +151,27 @@ public class TestDecommission {
|
|||
int hasdown = 0;
|
||||
DatanodeInfo[] nodes = blk.getLocations();
|
||||
for (int j = 0; j < nodes.length; j++) { // for each replica
|
||||
if (isNodeDown && nodes[j].getName().equals(downnode)) {
|
||||
if (isNodeDown && nodes[j].getXferAddr().equals(downnode)) {
|
||||
hasdown++;
|
||||
//Downnode must actually be decommissioned
|
||||
if (!nodes[j].isDecommissioned()) {
|
||||
return "For block " + blk.getBlock() + " replica on " +
|
||||
nodes[j].getName() + " is given as downnode, " +
|
||||
nodes[j] + " is given as downnode, " +
|
||||
"but is not decommissioned";
|
||||
}
|
||||
//Decommissioned node (if any) should only be last node in list.
|
||||
if (j != nodes.length - 1) {
|
||||
return "For block " + blk.getBlock() + " decommissioned node "
|
||||
+ nodes[j].getName() + " was not last node in list: "
|
||||
+ nodes[j] + " was not last node in list: "
|
||||
+ (j + 1) + " of " + nodes.length;
|
||||
}
|
||||
LOG.info("Block " + blk.getBlock() + " replica on " +
|
||||
nodes[j].getName() + " is decommissioned.");
|
||||
nodes[j] + " is decommissioned.");
|
||||
} else {
|
||||
//Non-downnodes must not be decommissioned
|
||||
if (nodes[j].isDecommissioned()) {
|
||||
return "For block " + blk.getBlock() + " replica on " +
|
||||
nodes[j].getName() + " is unexpectedly decommissioned";
|
||||
nodes[j] + " is unexpectedly decommissioned";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ public class TestDecommission {
|
|||
found = true;
|
||||
}
|
||||
}
|
||||
String nodename = info[index].getName();
|
||||
String nodename = info[index].getXferAddr();
|
||||
LOG.info("Decommissioning node: " + nodename);
|
||||
|
||||
// write nodename into the exclude file.
|
||||
|
@ -236,7 +236,7 @@ public class TestDecommission {
|
|||
|
||||
/* stop decommission of the datanode and wait for each to reach the NORMAL state */
|
||||
private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
|
||||
LOG.info("Recommissioning node: " + decommissionedNode.getName());
|
||||
LOG.info("Recommissioning node: " + decommissionedNode);
|
||||
writeConfigFile(excludeFile, null);
|
||||
refreshNodes(cluster.getNamesystem(), conf);
|
||||
waitNodeState(decommissionedNode, AdminStates.NORMAL);
|
||||
|
@ -373,7 +373,7 @@ public class TestDecommission {
|
|||
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
|
||||
assertEquals("All datanodes must be alive", numDatanodes,
|
||||
client.datanodeReport(DatanodeReportType.LIVE).length);
|
||||
assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
|
||||
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes));
|
||||
cleanupFile(fileSys, file1);
|
||||
}
|
||||
}
|
||||
|
@ -414,7 +414,7 @@ public class TestDecommission {
|
|||
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
|
||||
assertEquals("All datanodes must be alive", numDatanodes,
|
||||
client.datanodeReport(DatanodeReportType.LIVE).length);
|
||||
assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
|
||||
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes));
|
||||
|
||||
// stop decommission and check if the new replicas are removed
|
||||
recomissionNode(decomNode);
|
||||
|
|
|
@ -147,7 +147,7 @@ public class TestHftpFileSystem {
|
|||
// if we were redirected to the right DN.
|
||||
BlockLocation[] locations =
|
||||
hdfs.getFileBlockLocations(path, 0, 10);
|
||||
String locationName = locations[0].getNames()[0];
|
||||
String xferAddr = locations[0].getNames()[0];
|
||||
|
||||
// Connect to the NN to get redirected
|
||||
URL u = hftpFs.getNamenodeURL(
|
||||
|
@ -164,7 +164,7 @@ public class TestHftpFileSystem {
|
|||
for (DataNode node : cluster.getDataNodes()) {
|
||||
DatanodeRegistration dnR =
|
||||
DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId);
|
||||
if (dnR.getName().equals(locationName)) {
|
||||
if (dnR.getXferAddr().equals(xferAddr)) {
|
||||
checked = true;
|
||||
assertEquals(dnR.getInfoPort(), conn.getURL().getPort());
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class TestIsMethodSupported {
|
|||
.numDataNodes(1).build();
|
||||
nnAddress = cluster.getNameNode().getNameNodeAddress();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
dnAddress = new InetSocketAddress(dn.getDatanodeId().getHost(),
|
||||
dnAddress = new InetSocketAddress(dn.getDatanodeId().getIpAddr(),
|
||||
dn.getIpcPort());
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TestReplication extends TestCase {
|
|||
isOnSameRack = false;
|
||||
isNotOnSameRack = false;
|
||||
for (int i = 0; i < datanodes.length-1; i++) {
|
||||
LOG.info("datanode "+ i + ": "+ datanodes[i].getName());
|
||||
LOG.info("datanode "+ i + ": "+ datanodes[i]);
|
||||
boolean onRack = false;
|
||||
for( int j=i+1; j<datanodes.length; j++) {
|
||||
if( datanodes[i].getNetworkLocation().equals(
|
||||
|
|
|
@ -130,20 +130,19 @@ public class TestPBHelper {
|
|||
|
||||
@Test
|
||||
public void testConvertDatanodeID() {
|
||||
DatanodeID dn = new DatanodeID("node", "node", "sid", 1, 2);
|
||||
DatanodeID dn = new DatanodeID("node", "node", "sid", 1, 2, 3);
|
||||
DatanodeIDProto dnProto = PBHelper.convert(dn);
|
||||
DatanodeID dn2 = PBHelper.convert(dnProto);
|
||||
compare(dn, dn2);
|
||||
}
|
||||
|
||||
void compare(DatanodeID dn, DatanodeID dn2) {
|
||||
assertEquals(dn.getHost(), dn2.getHost());
|
||||
assertEquals(dn.getIpAddr(), dn2.getIpAddr());
|
||||
assertEquals(dn.getHostName(), dn2.getHostName());
|
||||
assertEquals(dn.getStorageID(), dn2.getStorageID());
|
||||
assertEquals(dn.getXferPort(), dn2.getXferPort());
|
||||
assertEquals(dn.getInfoPort(), dn2.getInfoPort());
|
||||
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
|
||||
assertEquals(dn.getName(), dn2.getName());
|
||||
assertEquals(dn.getHostName(), dn2.getHostName());
|
||||
assertEquals(dn.getPort(), dn2.getPort());
|
||||
assertEquals(dn.getStorageID(), dn2.getStorageID());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -281,7 +280,7 @@ public class TestPBHelper {
|
|||
}
|
||||
|
||||
private DatanodeInfo getDNInfo() {
|
||||
return new DatanodeInfo(new DatanodeID("node", "node", "sid", 1, 2));
|
||||
return new DatanodeInfo(new DatanodeID("node", "node", "sid", 0, 1, 2));
|
||||
}
|
||||
|
||||
private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
|
||||
|
@ -292,7 +291,7 @@ public class TestPBHelper {
|
|||
assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
|
||||
assertEquals(dn1.getDfsUsed(), dn1.getDfsUsed());
|
||||
assertEquals(dn1.getDfsUsedPercent(), dn1.getDfsUsedPercent());
|
||||
assertEquals(dn1.getHost(), dn2.getHost());
|
||||
assertEquals(dn1.getIpAddr(), dn2.getIpAddr());
|
||||
assertEquals(dn1.getHostName(), dn2.getHostName());
|
||||
assertEquals(dn1.getInfoPort(), dn2.getInfoPort());
|
||||
assertEquals(dn1.getIpcPort(), dn2.getIpcPort());
|
||||
|
@ -401,11 +400,11 @@ public class TestPBHelper {
|
|||
@Test
|
||||
public void testConvertLocatedBlock() {
|
||||
DatanodeInfo [] dnInfos = new DatanodeInfo[3];
|
||||
dnInfos[0] = new DatanodeInfo("host0", "host0", "0", 5000, 5001, 20000, 10001, 9999,
|
||||
dnInfos[0] = new DatanodeInfo("host0", "host0", "0", 5000, 5001, 5002, 20000, 10001, 9999,
|
||||
59, 69, 32, "local", AdminStates.DECOMMISSION_INPROGRESS);
|
||||
dnInfos[1] = new DatanodeInfo("host1", "host1", "1", 5000, 5001, 20000, 10001, 9999,
|
||||
dnInfos[1] = new DatanodeInfo("host1", "host1", "1", 5000, 5001, 5002, 20000, 10001, 9999,
|
||||
59, 69, 32, "local", AdminStates.DECOMMISSIONED);
|
||||
dnInfos[2] = new DatanodeInfo("host2", "host2", "2", 5000, 5001, 20000, 10001, 9999,
|
||||
dnInfos[2] = new DatanodeInfo("host2", "host2", "2", 5000, 5001, 5002, 20000, 10001, 9999,
|
||||
59, 69, 32, "local", AdminStates.NORMAL);
|
||||
LocatedBlock lb = new LocatedBlock(
|
||||
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
|
||||
|
@ -424,7 +423,7 @@ public class TestPBHelper {
|
|||
|
||||
@Test
|
||||
public void testConvertDatanodeRegistration() {
|
||||
DatanodeID dnId = new DatanodeID("host", "host", "xyz", 1, 0);
|
||||
DatanodeID dnId = new DatanodeID("host", "host", "xyz", 0, 1, 0);
|
||||
BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) };
|
||||
ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
|
||||
getBlockKey(1), keys);
|
||||
|
|
|
@ -279,8 +279,8 @@ public class TestBlockToken {
|
|||
server.start();
|
||||
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
DatanodeID fakeDnId = new DatanodeID("localhost:" + addr.getPort(),
|
||||
"localhost", "fake-storage", 0, addr.getPort());
|
||||
DatanodeID fakeDnId = new DatanodeID("localhost",
|
||||
"localhost", "fake-storage", addr.getPort(), 0, addr.getPort());
|
||||
|
||||
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
|
||||
|
|
|
@ -165,7 +165,7 @@ public class BlockManagerTestUtil {
|
|||
DatanodeDescriptor[] dnds = hbm.getDatanodes();
|
||||
DatanodeDescriptor theDND = null;
|
||||
for (DatanodeDescriptor dnd : dnds) {
|
||||
if (dnd.getName().equals(dnName)) {
|
||||
if (dnd.getXferAddr().equals(dnName)) {
|
||||
theDND = dnd;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,12 +48,12 @@ import com.google.common.collect.Lists;
|
|||
|
||||
public class TestBlockManager {
|
||||
private final List<DatanodeDescriptor> nodes = ImmutableList.of(
|
||||
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4:5020"), "/rackB"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5:5020"), "/rackB"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6:5020"), "/rackB")
|
||||
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/rackA"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4", 5020), "/rackB"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5", 5020), "/rackB"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6", 5020), "/rackB")
|
||||
);
|
||||
private final List<DatanodeDescriptor> rackA = nodes.subList(0, 3);
|
||||
private final List<DatanodeDescriptor> rackB = nodes.subList(3, 6);
|
||||
|
@ -272,7 +272,7 @@ public class TestBlockManager {
|
|||
|
||||
// the block is still under-replicated. Add a new node. This should allow
|
||||
// the third off-rack replica.
|
||||
DatanodeDescriptor rackCNode = new DatanodeDescriptor(new DatanodeID("h7:5020"), "/rackC");
|
||||
DatanodeDescriptor rackCNode = new DatanodeDescriptor(new DatanodeID("h7", 100), "/rackC");
|
||||
addNodes(ImmutableList.of(rackCNode));
|
||||
try {
|
||||
DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestBlockTokenWithDFS {
|
|||
ExtendedBlock block = lblock.getBlock();
|
||||
try {
|
||||
DatanodeInfo[] nodes = lblock.getLocations();
|
||||
targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
|
||||
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
|
||||
s = NetUtils.getDefaultSocketFactory(conf).createSocket();
|
||||
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
|
||||
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
||||
|
|
|
@ -28,13 +28,13 @@ import org.junit.Test;
|
|||
public class TestHost2NodesMap {
|
||||
private Host2NodesMap map = new Host2NodesMap();
|
||||
private final DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
|
||||
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5030"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("ip1", "h1", "", 5020, -1, -1), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("ip2", "h1", "", 5020, -1, -1), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("ip3", "h1", "", 5020, -1, -1), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("ip3", "h1", "", 5030, -1, -1), "/d1/r2"),
|
||||
};
|
||||
private final DatanodeDescriptor NULL_NODE = null;
|
||||
private final DatanodeDescriptor NODE = new DatanodeDescriptor(new DatanodeID("h3:5040"),
|
||||
private final DatanodeDescriptor NODE = new DatanodeDescriptor(new DatanodeID("h3", 5040),
|
||||
"/d1/r4");
|
||||
|
||||
@Before
|
||||
|
@ -56,24 +56,11 @@ public class TestHost2NodesMap {
|
|||
|
||||
@Test
|
||||
public void testGetDatanodeByHost() throws Exception {
|
||||
assertTrue(map.getDatanodeByHost("h1")==dataNodes[0]);
|
||||
assertTrue(map.getDatanodeByHost("h2")==dataNodes[1]);
|
||||
DatanodeDescriptor node = map.getDatanodeByHost("h3");
|
||||
assertTrue(map.getDatanodeByHost("ip1")==dataNodes[0]);
|
||||
assertTrue(map.getDatanodeByHost("ip2")==dataNodes[1]);
|
||||
DatanodeDescriptor node = map.getDatanodeByHost("ip3");
|
||||
assertTrue(node==dataNodes[2] || node==dataNodes[3]);
|
||||
assertTrue(null==map.getDatanodeByHost("h4"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDatanodeByName() throws Exception {
|
||||
assertTrue(map.getDatanodeByName("h1:5020")==dataNodes[0]);
|
||||
assertTrue(map.getDatanodeByName("h1:5030")==null);
|
||||
assertTrue(map.getDatanodeByName("h2:5020")==dataNodes[1]);
|
||||
assertTrue(map.getDatanodeByName("h2:5030")==null);
|
||||
assertTrue(map.getDatanodeByName("h3:5020")==dataNodes[2]);
|
||||
assertTrue(map.getDatanodeByName("h3:5030")==dataNodes[3]);
|
||||
assertTrue(map.getDatanodeByName("h3:5040")==null);
|
||||
assertTrue(map.getDatanodeByName("h4")==null);
|
||||
assertTrue(map.getDatanodeByName(null)==null);
|
||||
assertTrue(null==map.getDatanodeByHost("ip4"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -81,21 +68,21 @@ public class TestHost2NodesMap {
|
|||
assertFalse(map.remove(NODE));
|
||||
|
||||
assertTrue(map.remove(dataNodes[0]));
|
||||
assertTrue(map.getDatanodeByHost("h1")==null);
|
||||
assertTrue(map.getDatanodeByHost("h2")==dataNodes[1]);
|
||||
DatanodeDescriptor node = map.getDatanodeByHost("h3");
|
||||
assertTrue(map.getDatanodeByHost("ip1")==null);
|
||||
assertTrue(map.getDatanodeByHost("ip2")==dataNodes[1]);
|
||||
DatanodeDescriptor node = map.getDatanodeByHost("ip3");
|
||||
assertTrue(node==dataNodes[2] || node==dataNodes[3]);
|
||||
assertTrue(null==map.getDatanodeByHost("h4"));
|
||||
assertTrue(null==map.getDatanodeByHost("ip4"));
|
||||
|
||||
assertTrue(map.remove(dataNodes[2]));
|
||||
assertTrue(map.getDatanodeByHost("h1")==null);
|
||||
assertTrue(map.getDatanodeByHost("h2")==dataNodes[1]);
|
||||
assertTrue(map.getDatanodeByHost("h3")==dataNodes[3]);
|
||||
assertTrue(map.getDatanodeByHost("ip1")==null);
|
||||
assertTrue(map.getDatanodeByHost("ip2")==dataNodes[1]);
|
||||
assertTrue(map.getDatanodeByHost("ip3")==dataNodes[3]);
|
||||
|
||||
assertTrue(map.remove(dataNodes[3]));
|
||||
assertTrue(map.getDatanodeByHost("h1")==null);
|
||||
assertTrue(map.getDatanodeByHost("h2")==dataNodes[1]);
|
||||
assertTrue(map.getDatanodeByHost("h3")==null);
|
||||
assertTrue(map.getDatanodeByHost("ip1")==null);
|
||||
assertTrue(map.getDatanodeByHost("ip2")==dataNodes[1]);
|
||||
assertTrue(map.getDatanodeByHost("ip3")==null);
|
||||
|
||||
assertFalse(map.remove(NULL_NODE));
|
||||
assertTrue(map.remove(dataNodes[1]));
|
||||
|
|
|
@ -78,11 +78,11 @@ public class TestNodeCount extends TestCase {
|
|||
|
||||
// bring down first datanode
|
||||
DatanodeDescriptor datanode = datanodes[0];
|
||||
DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName());
|
||||
DataNodeProperties dnprop = cluster.stopDataNode(datanode.getXferAddr());
|
||||
|
||||
// make sure that NN detects that the datanode is down
|
||||
BlockManagerTestUtil.noticeDeadDatanode(
|
||||
cluster.getNameNode(), datanode.getName());
|
||||
cluster.getNameNode(), datanode.getXferAddr());
|
||||
|
||||
// the block will be replicated
|
||||
DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
|
||||
|
@ -112,10 +112,10 @@ public class TestNodeCount extends TestCase {
|
|||
assertTrue(nonExcessDN!=null);
|
||||
|
||||
// bring down non excessive datanode
|
||||
dnprop = cluster.stopDataNode(nonExcessDN.getName());
|
||||
dnprop = cluster.stopDataNode(nonExcessDN.getXferAddr());
|
||||
// make sure that NN detects that the datanode is down
|
||||
BlockManagerTestUtil.noticeDeadDatanode(
|
||||
cluster.getNameNode(), nonExcessDN.getName());
|
||||
cluster.getNameNode(), nonExcessDN.getXferAddr());
|
||||
|
||||
// The block should be replicated
|
||||
initializeTimeout(TIMEOUT);
|
||||
|
|
|
@ -91,9 +91,9 @@ public class TestOverReplicatedBlocks extends TestCase {
|
|||
synchronized(hm) {
|
||||
// set live datanode's remaining space to be 0
|
||||
// so they will be chosen to be deleted when over-replication occurs
|
||||
String corruptMachineName = corruptDataNode.getName();
|
||||
String corruptMachineName = corruptDataNode.getXferAddr();
|
||||
for (DatanodeDescriptor datanode : hm.getDatanodes()) {
|
||||
if (!corruptMachineName.equals(datanode.getName())) {
|
||||
if (!corruptMachineName.equals(datanode.getXferAddr())) {
|
||||
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ public class TestPendingDataNodeMessages {
|
|||
private final Block block2Gs1 = new Block(2, 0, 1);
|
||||
|
||||
private final DatanodeDescriptor fakeDN = new DatanodeDescriptor(
|
||||
new DatanodeID("fake"));
|
||||
new DatanodeID("fake", 100));
|
||||
|
||||
@Test
|
||||
public void testQueues() {
|
||||
|
|
|
@ -52,16 +52,16 @@ public class TestReplicationPolicy {
|
|||
private static final String filename = "/dummyfile.txt";
|
||||
private static final DatanodeDescriptor dataNodes[] =
|
||||
new DatanodeDescriptor[] {
|
||||
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d2/r3"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3")
|
||||
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4", 5020), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5", 5020), "/d2/r3"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6", 5020), "/d2/r3")
|
||||
};
|
||||
|
||||
private final static DatanodeDescriptor NODE =
|
||||
new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r4");
|
||||
new DatanodeDescriptor(new DatanodeID("h7", 5020), "/d2/r4");
|
||||
|
||||
static {
|
||||
try {
|
||||
|
|
|
@ -197,9 +197,9 @@ public class TestBlockRecovery {
|
|||
locs, RECOVERY_ID);
|
||||
ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
|
||||
BlockRecord record1 = new BlockRecord(
|
||||
new DatanodeID("xx", "yy", "zz", 44, 55), dn1, replica1);
|
||||
new DatanodeID("xx", "yy", "zz", 1, 2, 3), dn1, replica1);
|
||||
BlockRecord record2 = new BlockRecord(
|
||||
new DatanodeID("aa", "bb", "cc", 11, 22), dn2, replica2);
|
||||
new DatanodeID("aa", "bb", "cc", 1, 2, 3), dn2, replica2);
|
||||
syncList.add(record1);
|
||||
syncList.add(record2);
|
||||
|
||||
|
@ -402,7 +402,7 @@ public class TestBlockRecovery {
|
|||
private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
|
||||
Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
|
||||
DatanodeInfo mockOtherDN = new DatanodeInfo(
|
||||
new DatanodeID("127.0.0.1", "localhost", "storage-1234", 0, 0));
|
||||
new DatanodeID("127.0.0.1", "localhost", "storage-1234", 0, 0, 0));
|
||||
DatanodeInfo[] locs = new DatanodeInfo[] {
|
||||
new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())),
|
||||
mockOtherDN };
|
||||
|
|
|
@ -162,16 +162,16 @@ public class TestBlockReplacement extends TestCase {
|
|||
|
||||
// start to replace the block
|
||||
// case 1: proxySource does not contain the block
|
||||
LOG.info("Testcase 1: Proxy " + newNode.getName()
|
||||
LOG.info("Testcase 1: Proxy " + newNode
|
||||
+ " does not contain the block " + b);
|
||||
assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
|
||||
// case 2: destination already contains the block
|
||||
LOG.info("Testcase 2: Destination " + proxies.get(1).getName()
|
||||
LOG.info("Testcase 2: Destination " + proxies.get(1)
|
||||
+ " contains the block " + b);
|
||||
assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
|
||||
// case 3: correct case
|
||||
LOG.info("Testcase 3: Source=" + source.getName() + " Proxy=" +
|
||||
proxies.get(0).getName() + " Destination=" + newNode.getName() );
|
||||
LOG.info("Testcase 3: Source=" + source + " Proxy=" +
|
||||
proxies.get(0) + " Destination=" + newNode );
|
||||
assertTrue(replaceBlock(b, source, proxies.get(0), newNode));
|
||||
// after cluster has time to resolve the over-replication,
|
||||
// block locations should contain two proxies and newNode
|
||||
|
@ -181,7 +181,7 @@ public class TestBlockReplacement extends TestCase {
|
|||
DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
|
||||
// case 4: proxies.get(0) is not a valid del hint
|
||||
// expect either source or newNode replica to be deleted instead
|
||||
LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
|
||||
LOG.info("Testcase 4: invalid del hint " + proxies.get(0) );
|
||||
assertTrue(replaceBlock(b, proxies.get(0), proxies.get(1), source));
|
||||
// after cluster has time to resolve the over-replication,
|
||||
// block locations should contain two proxies,
|
||||
|
@ -222,7 +222,7 @@ public class TestBlockReplacement extends TestCase {
|
|||
for (DatanodeInfo node : includeNodes) {
|
||||
if (!nodeLocations.contains(node) ) {
|
||||
notDone=true;
|
||||
LOG.info("Block is not located at " + node.getName() );
|
||||
LOG.info("Block is not located at " + node );
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -231,9 +231,9 @@ public class TestBlockReplacement extends TestCase {
|
|||
String expectedNodesList = "";
|
||||
String currentNodesList = "";
|
||||
for (DatanodeInfo dn : includeNodes)
|
||||
expectedNodesList += dn.getName() + ", ";
|
||||
expectedNodesList += dn + ", ";
|
||||
for (DatanodeInfo dn : nodes)
|
||||
currentNodesList += dn.getName() + ", ";
|
||||
currentNodesList += dn + ", ";
|
||||
LOG.info("Expected replica nodes are: " + expectedNodesList);
|
||||
LOG.info("Current actual replica nodes are: " + currentNodesList);
|
||||
throw new TimeoutException(
|
||||
|
@ -254,7 +254,7 @@ public class TestBlockReplacement extends TestCase {
|
|||
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
|
||||
Socket sock = new Socket();
|
||||
sock.connect(NetUtils.createSocketAddr(
|
||||
destination.getName()), HdfsServerConstants.READ_TIMEOUT);
|
||||
destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT);
|
||||
sock.setKeepAlive(true);
|
||||
// sendRequest
|
||||
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
|
||||
|
|
|
@ -161,7 +161,7 @@ public class TestDataNodeMultipleRegistrations {
|
|||
assertEquals("number of volumes is wrong", 2, volInfos.size());
|
||||
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.getName() + "; sid="
|
||||
LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration + "; sid="
|
||||
+ bpos.bpRegistration.getStorageID() + "; nna=" +
|
||||
getNNSocketAddress(bpos));
|
||||
}
|
||||
|
|
|
@ -270,7 +270,7 @@ public class TestDataNodeVolumeFailure {
|
|||
Socket s = null;
|
||||
ExtendedBlock block = lblock.getBlock();
|
||||
|
||||
targetAddr = NetUtils.createSocketAddr(datanode.getName());
|
||||
targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
||||
|
||||
s = NetUtils.getDefaultSocketFactory(conf).createSocket();
|
||||
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
|
||||
|
|
|
@ -183,7 +183,7 @@ public class TestDeleteBlockPool {
|
|||
Assert.assertEquals(1, dn1.getAllBpOs().length);
|
||||
|
||||
DFSAdmin admin = new DFSAdmin(nn1Conf);
|
||||
String dn1Address = dn1.getDatanodeId().getHost() + ":" + dn1.getIpcPort();
|
||||
String dn1Address = dn1.getDatanodeId().getIpAddr() + ":" + dn1.getIpcPort();
|
||||
String[] args = { "-deleteBlockPool", dn1Address, bpid2 };
|
||||
|
||||
int ret = admin.run(args);
|
||||
|
|
|
@ -348,7 +348,7 @@ public class TestInterDatanodeProtocol {
|
|||
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
DatanodeID fakeDnId = new DatanodeID(
|
||||
"localhost:" + addr.getPort(), "localhost", "fake-storage", 0, addr.getPort());
|
||||
"localhost", "localhost", "fake-storage", addr.getPort(), 0, addr.getPort());
|
||||
DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
|
||||
InterDatanodeProtocol proxy = null;
|
||||
|
||||
|
|
|
@ -766,28 +766,33 @@ public class NNThroughputBenchmark {
|
|||
long[] blockReportList;
|
||||
|
||||
/**
|
||||
* Get data-node in the form
|
||||
* <host name> : <port>
|
||||
* where port is a 6 digit integer.
|
||||
* Return a a 6 digit integer port.
|
||||
* This is necessary in order to provide lexocographic ordering.
|
||||
* Host names are all the same, the ordering goes by port numbers.
|
||||
*/
|
||||
private static String getNodeName(int port) throws IOException {
|
||||
String machineName = DNS.getDefaultHost("default", "default");
|
||||
String sPort = String.valueOf(100000 + port);
|
||||
if(sPort.length() > 6)
|
||||
throw new IOException("Too many data-nodes.");
|
||||
return machineName + ":" + sPort;
|
||||
private static int getNodePort(int num) throws IOException {
|
||||
int port = 100000 + num;
|
||||
if (String.valueOf(port).length() > 6) {
|
||||
throw new IOException("Too many data-nodes");
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
|
||||
dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
|
||||
String hostName = DNS.getDefaultHost("default", "default");
|
||||
dnRegistration = new DatanodeRegistration(hostName);
|
||||
dnRegistration.setXferPort(getNodePort(dnIdx));
|
||||
dnRegistration.setHostName(hostName);
|
||||
this.blocks = new ArrayList<Block>(blockCapacity);
|
||||
this.nrBlocks = 0;
|
||||
}
|
||||
|
||||
String getName() {
|
||||
return dnRegistration.getName();
|
||||
public String toString() {
|
||||
return dnRegistration.toString();
|
||||
}
|
||||
|
||||
String getXferAddr() {
|
||||
return dnRegistration.getXferAddr();
|
||||
}
|
||||
|
||||
void register() throws IOException {
|
||||
|
@ -850,8 +855,8 @@ public class NNThroughputBenchmark {
|
|||
return blockReportList;
|
||||
}
|
||||
|
||||
public int compareTo(String name) {
|
||||
return getName().compareTo(name);
|
||||
public int compareTo(String xferAddr) {
|
||||
return getXferAddr().compareTo(xferAddr);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -889,10 +894,12 @@ public class NNThroughputBenchmark {
|
|||
for(int t = 0; t < blockTargets.length; t++) {
|
||||
DatanodeInfo dnInfo = blockTargets[t];
|
||||
DatanodeRegistration receivedDNReg;
|
||||
receivedDNReg = new DatanodeRegistration(dnInfo.getName());
|
||||
receivedDNReg = new DatanodeRegistration(dnInfo.getIpAddr());
|
||||
receivedDNReg.setStorageInfo(
|
||||
new DataStorage(nsInfo, dnInfo.getStorageID()));
|
||||
receivedDNReg.setXferPort(dnInfo.getXferPort());
|
||||
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
|
||||
receivedDNReg.setIpcPort(dnInfo.getIpcPort());
|
||||
ReceivedDeletedBlockInfo[] rdBlocks = {
|
||||
new ReceivedDeletedBlockInfo(
|
||||
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
||||
|
@ -977,10 +984,10 @@ public class NNThroughputBenchmark {
|
|||
for(int idx=0; idx < nrDatanodes; idx++) {
|
||||
datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
|
||||
datanodes[idx].register();
|
||||
assert datanodes[idx].getName().compareTo(prevDNName) > 0
|
||||
assert datanodes[idx].getXferAddr().compareTo(prevDNName) > 0
|
||||
: "Data-nodes must be sorted lexicographically.";
|
||||
datanodes[idx].sendHeartbeat();
|
||||
prevDNName = datanodes[idx].getName();
|
||||
prevDNName = datanodes[idx].getXferAddr();
|
||||
}
|
||||
|
||||
// create files
|
||||
|
@ -1010,7 +1017,7 @@ public class NNThroughputBenchmark {
|
|||
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, prevBlock, null);
|
||||
prevBlock = loc.getBlock();
|
||||
for(DatanodeInfo dnInfo : loc.getLocations()) {
|
||||
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
|
||||
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());
|
||||
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
|
||||
ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
|
||||
loc.getBlock().getLocalBlock(),
|
||||
|
@ -1165,9 +1172,9 @@ public class NNThroughputBenchmark {
|
|||
for(int i=0; i < nodesToDecommission; i++) {
|
||||
TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i];
|
||||
numDecommissionedBlocks += dn.nrBlocks;
|
||||
excludeFile.write(dn.getName().getBytes());
|
||||
excludeFile.write(dn.getXferAddr().getBytes());
|
||||
excludeFile.write('\n');
|
||||
LOG.info("Datanode " + dn.getName() + " is decommissioned.");
|
||||
LOG.info("Datanode " + dn + " is decommissioned.");
|
||||
}
|
||||
excludeFile.close();
|
||||
nameNodeProto.refreshNodes();
|
||||
|
|
|
@ -156,7 +156,7 @@ public class TestDecommissioningStatus {
|
|||
throws IOException {
|
||||
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
|
||||
|
||||
String nodename = info[nodeIndex].getName();
|
||||
String nodename = info[nodeIndex].getXferAddr();
|
||||
System.out.println("Decommissioning node: " + nodename);
|
||||
|
||||
// write nodename into the exclude file.
|
||||
|
|
|
@ -167,7 +167,7 @@ public class TestStandbyIsHot {
|
|||
|
||||
// Stop the DN.
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
String dnName = dn.getDatanodeId().getName();
|
||||
String dnName = dn.getDatanodeId().getXferAddr();
|
||||
DataNodeProperties dnProps = cluster.stopDataNode(0);
|
||||
|
||||
// Make sure both NNs register it as dead.
|
||||
|
|
|
@ -30,16 +30,16 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|||
public class TestNetworkTopology extends TestCase {
|
||||
private final static NetworkTopology cluster = new NetworkTopology();
|
||||
private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
|
||||
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3"),
|
||||
new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r3")
|
||||
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h4", 5020), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h5", 5020), "/d1/r2"),
|
||||
new DatanodeDescriptor(new DatanodeID("h6", 5020), "/d2/r3"),
|
||||
new DatanodeDescriptor(new DatanodeID("h7", 5020), "/d2/r3")
|
||||
};
|
||||
private final static DatanodeDescriptor NODE =
|
||||
new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r4");
|
||||
new DatanodeDescriptor(new DatanodeID("h8", 5020), "/d2/r4");
|
||||
|
||||
static {
|
||||
for(int i=0; i<dataNodes.length; i++) {
|
||||
|
@ -61,9 +61,9 @@ public class TestNetworkTopology extends TestCase {
|
|||
public void testCreateInvalidTopology() throws Exception {
|
||||
NetworkTopology invalCluster = new NetworkTopology();
|
||||
DatanodeDescriptor invalDataNodes[] = new DatanodeDescriptor[] {
|
||||
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1")
|
||||
new DatanodeDescriptor(new DatanodeID("h1", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h2", 5020), "/d1/r1"),
|
||||
new DatanodeDescriptor(new DatanodeID("h3", 5020), "/d1")
|
||||
};
|
||||
invalCluster.add(invalDataNodes[0]);
|
||||
invalCluster.add(invalDataNodes[1]);
|
||||
|
|
|
@ -15739,6 +15739,10 @@
|
|||
<type>RegexpComparator</type>
|
||||
<expected-output>Name: [0-9\.:]+ \([-.a-zA-z0-9\.]+\)</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>Hostname: [-.a-zA-z0-9\.]+</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>Decommission Status : [a-zA-Z]+</expected-output>
|
||||
|
@ -15836,6 +15840,10 @@
|
|||
<type>RegexpComparator</type>
|
||||
<expected-output>Name: [0-9\.:]+ \([-.a-zA-z0-9\.]+\)</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>Hostname: [-.a-zA-z0-9\.]+</expected-output>
|
||||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>Decommission Status : [a-zA-Z]+</expected-output>
|
||||
|
|
Loading…
Reference in New Issue