HDFS-5233. Use Datanode UUID to identify Datanodes.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1525407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eb9f1b6707
commit
4551da302d
|
@ -28,3 +28,5 @@ IMPROVEMENTS:
|
||||||
datanodes. (szetszwo)
|
datanodes. (szetszwo)
|
||||||
|
|
||||||
HDFS-5232. Protocol changes to transmit StorageUuid. (Arpit Agarwal)
|
HDFS-5232. Protocol changes to transmit StorageUuid. (Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-5233. Use Datanode UUID to identify Datanodes. (Arpit Agarwal)
|
||||||
|
|
|
@ -1290,7 +1290,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
|
||||||
* deadNodes and added currentNode again. Thats ok. */
|
* deadNodes and added currentNode again. Thats ok. */
|
||||||
deadNodes.remove(oldNode);
|
deadNodes.remove(oldNode);
|
||||||
}
|
}
|
||||||
if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
|
if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
|
||||||
currentNode = newNode;
|
currentNode = newNode;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hdfs.protocol;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents the primary identifier for a Datanode.
|
* This class represents the primary identifier for a Datanode.
|
||||||
* Datanodes are identified by how they can be contacted (hostname
|
* Datanodes are identified by how they can be contacted (hostname
|
||||||
|
@ -40,35 +42,42 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
private String ipAddr; // IP address
|
private String ipAddr; // IP address
|
||||||
private String hostName; // hostname claimed by datanode
|
private String hostName; // hostname claimed by datanode
|
||||||
private String peerHostName; // hostname from the actual connection
|
private String peerHostName; // hostname from the actual connection
|
||||||
private String storageID; // unique per cluster storageID
|
|
||||||
private int xferPort; // data streaming port
|
private int xferPort; // data streaming port
|
||||||
private int infoPort; // info server port
|
private int infoPort; // info server port
|
||||||
private int ipcPort; // IPC server port
|
private int ipcPort; // IPC server port
|
||||||
|
|
||||||
|
// UUID identifying a given datanode. For upgraded Datanodes this is the
|
||||||
|
// same as the StorageID that was previously used by this Datanode. For
|
||||||
|
// newly formatted Datanodes it is a UUID.
|
||||||
|
private String datanodeUuid = null;
|
||||||
|
|
||||||
public DatanodeID(DatanodeID from) {
|
public DatanodeID(DatanodeID from) {
|
||||||
this(from.getIpAddr(),
|
this(from.getIpAddr(),
|
||||||
from.getHostName(),
|
from.getHostName(),
|
||||||
from.getStorageID(),
|
from.getDatanodeUuid(),
|
||||||
from.getXferPort(),
|
from.getXferPort(),
|
||||||
from.getInfoPort(),
|
from.getInfoPort(),
|
||||||
from.getIpcPort());
|
from.getIpcPort());
|
||||||
this.peerHostName = from.getPeerHostName();
|
this.peerHostName = from.getPeerHostName();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a DatanodeID
|
* Create a DatanodeID
|
||||||
* @param ipAddr IP
|
* @param ipAddr IP
|
||||||
* @param hostName hostname
|
* @param hostName hostname
|
||||||
* @param storageID data storage ID
|
* @param datanodeUuid data node ID, UUID for new Datanodes, may be the
|
||||||
|
* storage ID for pre-UUID datanodes. NULL if unknown
|
||||||
|
* e.g. if this is a new datanode. A new UUID will
|
||||||
|
* be assigned by the namenode.
|
||||||
* @param xferPort data transfer port
|
* @param xferPort data transfer port
|
||||||
* @param infoPort info server port
|
* @param infoPort info server port
|
||||||
* @param ipcPort ipc server port
|
* @param ipcPort ipc server port
|
||||||
*/
|
*/
|
||||||
public DatanodeID(String ipAddr, String hostName, String storageID,
|
public DatanodeID(String ipAddr, String hostName, String datanodeUuid,
|
||||||
int xferPort, int infoPort, int ipcPort) {
|
int xferPort, int infoPort, int ipcPort) {
|
||||||
this.ipAddr = ipAddr;
|
this.ipAddr = ipAddr;
|
||||||
this.hostName = hostName;
|
this.hostName = hostName;
|
||||||
this.storageID = storageID;
|
this.datanodeUuid = checkDatanodeUuid(datanodeUuid);
|
||||||
this.xferPort = xferPort;
|
this.xferPort = xferPort;
|
||||||
this.infoPort = infoPort;
|
this.infoPort = infoPort;
|
||||||
this.ipcPort = ipcPort;
|
this.ipcPort = ipcPort;
|
||||||
|
@ -82,8 +91,28 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
this.peerHostName = peerHostName;
|
this.peerHostName = peerHostName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setStorageID(String storageID) {
|
/**
|
||||||
this.storageID = storageID;
|
* @return data node ID.
|
||||||
|
*/
|
||||||
|
public String getDatanodeUuid() {
|
||||||
|
return datanodeUuid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDatanodeUuid(String datanodeUuid) {
|
||||||
|
this.datanodeUuid = datanodeUuid;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String checkDatanodeUuid(String uuid) {
|
||||||
|
if (uuid == null || uuid.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
return uuid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String generateNewDatanodeUuid() {
|
||||||
|
datanodeUuid = UUID.randomUUID().toString();
|
||||||
|
return datanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -158,13 +187,6 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
|
return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return data storage ID.
|
|
||||||
*/
|
|
||||||
public String getStorageID() {
|
|
||||||
return storageID;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return xferPort (the port for data streaming)
|
* @return xferPort (the port for data streaming)
|
||||||
*/
|
*/
|
||||||
|
@ -195,12 +217,12 @@ public class DatanodeID implements Comparable<DatanodeID> {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return (getXferAddr().equals(((DatanodeID)to).getXferAddr()) &&
|
return (getXferAddr().equals(((DatanodeID)to).getXferAddr()) &&
|
||||||
storageID.equals(((DatanodeID)to).getStorageID()));
|
datanodeUuid.equals(((DatanodeID)to).getDatanodeUuid()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return getXferAddr().hashCode()^ storageID.hashCode();
|
return getXferAddr().hashCode()^ datanodeUuid.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -108,18 +108,18 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
final long capacity, final long dfsUsed, final long remaining,
|
final long capacity, final long dfsUsed, final long remaining,
|
||||||
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
||||||
final AdminStates adminState) {
|
final AdminStates adminState) {
|
||||||
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(),
|
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(), nodeID.getXferPort(),
|
||||||
nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
|
nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
|
||||||
blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
|
blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Constructor */
|
/** Constructor */
|
||||||
public DatanodeInfo(final String ipAddr, final String hostName,
|
public DatanodeInfo(final String ipAddr, final String hostName,
|
||||||
final String storageID, final int xferPort, final int infoPort, final int ipcPort,
|
final String DatanodeUuid, final int xferPort, final int infoPort, final int ipcPort,
|
||||||
final long capacity, final long dfsUsed, final long remaining,
|
final long capacity, final long dfsUsed, final long remaining,
|
||||||
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
|
||||||
final String networkLocation, final AdminStates adminState) {
|
final String networkLocation, final AdminStates adminState) {
|
||||||
super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort);
|
super(ipAddr, hostName, DatanodeUuid, xferPort, infoPort, ipcPort);
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.dfsUsed = dfsUsed;
|
this.dfsUsed = dfsUsed;
|
||||||
this.remaining = remaining;
|
this.remaining = remaining;
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class UnregisteredNodeException extends IOException {
|
||||||
*/
|
*/
|
||||||
public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
|
public UnregisteredNodeException(DatanodeID nodeID, DatanodeInfo storedNode) {
|
||||||
super("Data node " + nodeID + " is attempting to report storage ID "
|
super("Data node " + nodeID + " is attempting to report storage ID "
|
||||||
+ nodeID.getStorageID() + ". Node "
|
+ nodeID.getDatanodeUuid() + ". Node "
|
||||||
+ storedNode + " is expected to serve this storage.");
|
+ storedNode + " is expected to serve this storage.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class PBHelper {
|
||||||
return DatanodeIDProto.newBuilder()
|
return DatanodeIDProto.newBuilder()
|
||||||
.setIpAddr(dn.getIpAddr())
|
.setIpAddr(dn.getIpAddr())
|
||||||
.setHostName(dn.getHostName())
|
.setHostName(dn.getHostName())
|
||||||
.setDatanodeUuid(dn.getStorageID())
|
.setDatanodeUuid(dn.getDatanodeUuid())
|
||||||
.setXferPort(dn.getXferPort())
|
.setXferPort(dn.getXferPort())
|
||||||
.setInfoPort(dn.getInfoPort())
|
.setInfoPort(dn.getInfoPort())
|
||||||
.setIpcPort(dn.getIpcPort()).build();
|
.setIpcPort(dn.getIpcPort()).build();
|
||||||
|
|
|
@ -551,7 +551,7 @@ public class Balancer {
|
||||||
|
|
||||||
/* Get the storage id of the datanode */
|
/* Get the storage id of the datanode */
|
||||||
protected String getStorageID() {
|
protected String getStorageID() {
|
||||||
return datanode.getStorageID();
|
return datanode.getDatanodeUuid();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Decide if still need to move more bytes */
|
/** Decide if still need to move more bytes */
|
||||||
|
@ -895,7 +895,7 @@ public class Balancer {
|
||||||
datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
|
datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.datanodes.put(datanode.getStorageID(), datanodeS);
|
this.datanodes.put(datanode.getDatanodeUuid(), datanodeS);
|
||||||
}
|
}
|
||||||
|
|
||||||
//logging
|
//logging
|
||||||
|
|
|
@ -989,7 +989,7 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
node.resetBlocks();
|
node.resetBlocks();
|
||||||
invalidateBlocks.remove(node.getStorageID());
|
invalidateBlocks.remove(node.getDatanodeUuid());
|
||||||
|
|
||||||
// If the DN hasn't block-reported since the most recent
|
// If the DN hasn't block-reported since the most recent
|
||||||
// failover, then we may have been holding up on processing
|
// failover, then we may have been holding up on processing
|
||||||
|
@ -1472,7 +1472,7 @@ public class BlockManager {
|
||||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
||||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||||
LightWeightLinkedSet<Block> excessBlocks =
|
LightWeightLinkedSet<Block> excessBlocks =
|
||||||
excessReplicateMap.get(node.getStorageID());
|
excessReplicateMap.get(node.getDatanodeUuid());
|
||||||
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
|
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
|
||||||
corrupt++;
|
corrupt++;
|
||||||
else if (node.isDecommissionInProgress() || node.isDecommissioned())
|
else if (node.isDecommissionInProgress() || node.isDecommissioned())
|
||||||
|
@ -1901,7 +1901,7 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore replicas already scheduled to be removed from the DN
|
// Ignore replicas already scheduled to be removed from the DN
|
||||||
if(invalidateBlocks.contains(dn.getStorageID(), block)) {
|
if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
|
||||||
/* TODO: following assertion is incorrect, see HDFS-2668
|
/* TODO: following assertion is incorrect, see HDFS-2668
|
||||||
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
+ " in recentInvalidatesSet should not appear in DN " + dn; */
|
+ " in recentInvalidatesSet should not appear in DN " + dn; */
|
||||||
|
@ -2441,7 +2441,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
|
||||||
.getStorageID());
|
.getDatanodeUuid());
|
||||||
if (excessBlocks == null || !excessBlocks.contains(block)) {
|
if (excessBlocks == null || !excessBlocks.contains(block)) {
|
||||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
||||||
// exclude corrupt replicas
|
// exclude corrupt replicas
|
||||||
|
@ -2530,10 +2530,10 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
|
|
||||||
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
||||||
assert namesystem.hasWriteLock();
|
assert namesystem.hasWriteLock();
|
||||||
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
|
||||||
if (excessBlocks == null) {
|
if (excessBlocks == null) {
|
||||||
excessBlocks = new LightWeightLinkedSet<Block>();
|
excessBlocks = new LightWeightLinkedSet<Block>();
|
||||||
excessReplicateMap.put(dn.getStorageID(), excessBlocks);
|
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
|
||||||
}
|
}
|
||||||
if (excessBlocks.add(block)) {
|
if (excessBlocks.add(block)) {
|
||||||
excessBlocksCount.incrementAndGet();
|
excessBlocksCount.incrementAndGet();
|
||||||
|
@ -2581,7 +2581,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
// in "excess" there.
|
// in "excess" there.
|
||||||
//
|
//
|
||||||
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
|
||||||
.getStorageID());
|
.getDatanodeUuid());
|
||||||
if (excessBlocks != null) {
|
if (excessBlocks != null) {
|
||||||
if (excessBlocks.remove(block)) {
|
if (excessBlocks.remove(block)) {
|
||||||
excessBlocksCount.decrementAndGet();
|
excessBlocksCount.decrementAndGet();
|
||||||
|
@ -2590,7 +2590,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
+ block + " is removed from excessBlocks");
|
+ block + " is removed from excessBlocks");
|
||||||
}
|
}
|
||||||
if (excessBlocks.size() == 0) {
|
if (excessBlocks.size() == 0) {
|
||||||
excessReplicateMap.remove(node.getStorageID());
|
excessReplicateMap.remove(node.getDatanodeUuid());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2760,7 +2760,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
||||||
decommissioned++;
|
decommissioned++;
|
||||||
} else {
|
} else {
|
||||||
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
|
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
|
||||||
.getStorageID());
|
.getDatanodeUuid());
|
||||||
if (blocksExcess != null && blocksExcess.contains(b)) {
|
if (blocksExcess != null && blocksExcess.contains(b)) {
|
||||||
excess++;
|
excess++;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -418,9 +418,9 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/** Get a datanode descriptor given corresponding storageID */
|
/** Get a datanode descriptor given corresponding DatanodeUUID */
|
||||||
DatanodeDescriptor getDatanode(final String storageID) {
|
DatanodeDescriptor getDatanode(final String datanodeUuid) {
|
||||||
return datanodeMap.get(storageID);
|
return datanodeMap.get(datanodeUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -432,7 +432,7 @@ public class DatanodeManager {
|
||||||
*/
|
*/
|
||||||
public DatanodeDescriptor getDatanode(DatanodeID nodeID
|
public DatanodeDescriptor getDatanode(DatanodeID nodeID
|
||||||
) throws UnregisteredNodeException {
|
) throws UnregisteredNodeException {
|
||||||
final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
|
final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
|
||||||
if (node == null)
|
if (node == null)
|
||||||
return null;
|
return null;
|
||||||
if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
|
if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
|
||||||
|
@ -536,7 +536,7 @@ public class DatanodeManager {
|
||||||
// remove from host2DatanodeMap the datanodeDescriptor removed
|
// remove from host2DatanodeMap the datanodeDescriptor removed
|
||||||
// from datanodeMap before adding node to host2DatanodeMap.
|
// from datanodeMap before adding node to host2DatanodeMap.
|
||||||
synchronized(datanodeMap) {
|
synchronized(datanodeMap) {
|
||||||
host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
|
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
|
||||||
}
|
}
|
||||||
|
|
||||||
networktopology.add(node); // may throw InvalidTopologyException
|
networktopology.add(node); // may throw InvalidTopologyException
|
||||||
|
@ -551,7 +551,7 @@ public class DatanodeManager {
|
||||||
|
|
||||||
/** Physically remove node from datanodeMap. */
|
/** Physically remove node from datanodeMap. */
|
||||||
private void wipeDatanode(final DatanodeID node) {
|
private void wipeDatanode(final DatanodeID node) {
|
||||||
final String key = node.getStorageID();
|
final String key = node.getDatanodeUuid();
|
||||||
synchronized (datanodeMap) {
|
synchronized (datanodeMap) {
|
||||||
host2DatanodeMap.remove(datanodeMap.remove(key));
|
host2DatanodeMap.remove(datanodeMap.remove(key));
|
||||||
}
|
}
|
||||||
|
@ -774,9 +774,9 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
|
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
|
||||||
+ nodeReg + " storage " + nodeReg.getStorageID());
|
+ nodeReg + " storage " + nodeReg.getDatanodeUuid());
|
||||||
|
|
||||||
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getDatanodeUuid());
|
||||||
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
||||||
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
||||||
|
|
||||||
|
@ -811,7 +811,7 @@ public class DatanodeManager {
|
||||||
*/
|
*/
|
||||||
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
|
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
|
||||||
+ " is replaced by " + nodeReg + " with the same storageID "
|
+ " is replaced by " + nodeReg + " with the same storageID "
|
||||||
+ nodeReg.getStorageID());
|
+ nodeReg.getDatanodeUuid());
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
@ -846,14 +846,14 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a new datanode serving a new data storage
|
// this is a new datanode serving a new data storage
|
||||||
if ("".equals(nodeReg.getStorageID())) {
|
if ("".equals(nodeReg.getDatanodeUuid())) {
|
||||||
// this data storage has never been registered
|
// this data storage has never been registered
|
||||||
// it is either empty or was created by pre-storageID version of DFS
|
// it is either empty or was created by pre-storageID version of DFS
|
||||||
nodeReg.setStorageID(DatanodeStorage.newStorageID());
|
nodeReg.setDatanodeUuid(DatanodeStorage.newStorageID());
|
||||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug(
|
||||||
"BLOCK* NameSystem.registerDatanode: "
|
"BLOCK* NameSystem.registerDatanode: "
|
||||||
+ "new storageID " + nodeReg.getStorageID() + " assigned.");
|
+ "new Datanode UUID " + nodeReg.getDatanodeUuid() + " assigned.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,10 +78,10 @@ class InvalidateBlocks {
|
||||||
*/
|
*/
|
||||||
synchronized void add(final Block block, final DatanodeInfo datanode,
|
synchronized void add(final Block block, final DatanodeInfo datanode,
|
||||||
final boolean log) {
|
final boolean log) {
|
||||||
LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
|
LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
|
||||||
if (set == null) {
|
if (set == null) {
|
||||||
set = new LightWeightHashSet<Block>();
|
set = new LightWeightHashSet<Block>();
|
||||||
node2blocks.put(datanode.getStorageID(), set);
|
node2blocks.put(datanode.getDatanodeUuid(), set);
|
||||||
}
|
}
|
||||||
if (set.add(block)) {
|
if (set.add(block)) {
|
||||||
numBlocks++;
|
numBlocks++;
|
||||||
|
|
|
@ -159,24 +159,24 @@ class BPOfferService {
|
||||||
synchronized NamespaceInfo getNamespaceInfo() {
|
synchronized NamespaceInfo getNamespaceInfo() {
|
||||||
return bpNSInfo;
|
return bpNSInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
if (bpNSInfo == null) {
|
if (bpNSInfo == null) {
|
||||||
// If we haven't yet connected to our NN, we don't yet know our
|
// If we haven't yet connected to our NN, we don't yet know our
|
||||||
// own block pool ID.
|
// own block pool ID.
|
||||||
// If _none_ of the block pools have connected yet, we don't even
|
// If _none_ of the block pools have connected yet, we don't even
|
||||||
// know the storage ID of this DN.
|
// know the DatanodeID ID of this DN.
|
||||||
String storageId = dn.getStorageId();
|
String datanodeUuid = dn.getDatanodeUuid();
|
||||||
if (storageId == null || "".equals(storageId)) {
|
|
||||||
storageId = "unknown";
|
if (datanodeUuid == null || datanodeUuid.isEmpty()) {
|
||||||
|
datanodeUuid = "unassigned";
|
||||||
}
|
}
|
||||||
return "Block pool <registering> (storage id " + storageId +
|
return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
|
||||||
")";
|
|
||||||
} else {
|
} else {
|
||||||
return "Block pool " + getBlockPoolId() +
|
return "Block pool " + getBlockPoolId() +
|
||||||
" (storage id " + dn.getStorageId() +
|
" (Datanode Uuid " + dn.getDatanodeUuid() +
|
||||||
")";
|
")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ import static org.apache.hadoop.util.Time.now;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -50,7 +49,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
|
@ -279,7 +277,7 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
if (receivedAndDeletedBlockArray != null) {
|
if (receivedAndDeletedBlockArray != null) {
|
||||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||||
bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
|
bpRegistration.getDatanodeUuid(), receivedAndDeletedBlockArray) };
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
|
bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
|
||||||
|
@ -398,7 +396,7 @@ class BPServiceActor implements Runnable {
|
||||||
// Send block report
|
// Send block report
|
||||||
long brSendStartTime = now();
|
long brSendStartTime = now();
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(bpRegistration.getStorageID()),
|
new DatanodeStorage(bpRegistration.getDatanodeUuid()),
|
||||||
bReport.getBlockListAsLongs()) };
|
bReport.getBlockListAsLongs()) };
|
||||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
|
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), report);
|
||||||
|
|
||||||
|
@ -436,7 +434,7 @@ class BPServiceActor implements Runnable {
|
||||||
LOG.debug("Sending heartbeat from service actor: " + this);
|
LOG.debug("Sending heartbeat from service actor: " + this);
|
||||||
}
|
}
|
||||||
// reports number of failed volumes
|
// reports number of failed volumes
|
||||||
StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
|
StorageReport[] report = { new StorageReport(bpRegistration.getDatanodeUuid(),
|
||||||
false,
|
false,
|
||||||
dn.getFSDataset().getCapacity(),
|
dn.getFSDataset().getCapacity(),
|
||||||
dn.getFSDataset().getDfsUsed(),
|
dn.getFSDataset().getDfsUsed(),
|
||||||
|
|
|
@ -1079,7 +1079,7 @@ class BlockReceiver implements Closeable {
|
||||||
.getBlockPoolId());
|
.getBlockPoolId());
|
||||||
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
|
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
|
||||||
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
|
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
|
||||||
dnR.getStorageID(), block, endTime - startTime));
|
dnR.getDatanodeUuid(), block, endTime - startTime));
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Received " + block + " size " + block.getNumBytes()
|
LOG.info("Received " + block + " size " + block.getNumBytes()
|
||||||
+ " from " + inAddr);
|
+ " from " + inAddr);
|
||||||
|
|
|
@ -750,7 +750,7 @@ public class DataNode extends Configured
|
||||||
}
|
}
|
||||||
DatanodeID dnId = new DatanodeID(
|
DatanodeID dnId = new DatanodeID(
|
||||||
streamingAddr.getAddress().getHostAddress(), hostName,
|
streamingAddr.getAddress().getHostAddress(), hostName,
|
||||||
getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
|
getDatanodeUuid(), getXferPort(), getInfoPort(), getIpcPort());
|
||||||
return new DatanodeRegistration(dnId, storageInfo,
|
return new DatanodeRegistration(dnId, storageInfo,
|
||||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||||
}
|
}
|
||||||
|
@ -770,13 +770,13 @@ public class DataNode extends Configured
|
||||||
|
|
||||||
if (storage.getStorageID().equals("")) {
|
if (storage.getStorageID().equals("")) {
|
||||||
// This is a fresh datanode, persist the NN-provided storage ID
|
// This is a fresh datanode, persist the NN-provided storage ID
|
||||||
storage.setStorageID(bpRegistration.getStorageID());
|
storage.setStorageID(bpRegistration.getDatanodeUuid());
|
||||||
storage.writeAll();
|
storage.writeAll();
|
||||||
LOG.info("New storage id " + bpRegistration.getStorageID()
|
LOG.info("New storage id " + bpRegistration.getDatanodeUuid()
|
||||||
+ " is assigned to data-node " + bpRegistration);
|
+ " is assigned to data-node " + bpRegistration);
|
||||||
} else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
|
} else if(!storage.getStorageID().equals(bpRegistration.getDatanodeUuid())) {
|
||||||
throw new IOException("Inconsistent storage IDs. Name-node returned "
|
throw new IOException("Inconsistent storage IDs. Name-node returned "
|
||||||
+ bpRegistration.getStorageID()
|
+ bpRegistration.getDatanodeUuid()
|
||||||
+ ". Expecting " + storage.getStorageID());
|
+ ". Expecting " + storage.getStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -925,7 +925,7 @@ public class DataNode extends Configured
|
||||||
return streamingAddr.getPort();
|
return streamingAddr.getPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
String getStorageId() {
|
String getDatanodeUuid() {
|
||||||
return storage.getStorageID();
|
return storage.getStorageID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -940,7 +940,7 @@ public class DataNode extends Configured
|
||||||
/**
|
/**
|
||||||
* NB: The datanode can perform data transfer on the streaming
|
* NB: The datanode can perform data transfer on the streaming
|
||||||
* address however clients are given the IPC IP address for data
|
* address however clients are given the IPC IP address for data
|
||||||
* transfer, and that may be a different address.
|
* transfer, and that may be a different address.
|
||||||
*
|
*
|
||||||
* @return socket address for data transfer
|
* @return socket address for data transfer
|
||||||
*/
|
*/
|
||||||
|
@ -1016,7 +1016,7 @@ public class DataNode extends Configured
|
||||||
|
|
||||||
public static void setNewStorageID(DatanodeID dnId) {
|
public static void setNewStorageID(DatanodeID dnId) {
|
||||||
LOG.info("Datanode is " + dnId);
|
LOG.info("Datanode is " + dnId);
|
||||||
dnId.setStorageID(DatanodeStorage.newStorageID());
|
dnId.setDatanodeUuid(DatanodeStorage.newStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Ensure the authentication method is kerberos */
|
/** Ensure the authentication method is kerberos */
|
||||||
|
@ -1818,7 +1818,7 @@ public class DataNode extends Configured
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DataNode{data=" + data + ", localName='" + getDisplayName()
|
return "DataNode{data=" + data + ", localName='" + getDisplayName()
|
||||||
+ "', storageID='" + getStorageId() + "', xmitsInProgress="
|
+ "', storageID='" + getDatanodeUuid() + "', xmitsInProgress="
|
||||||
+ xmitsInProgress.get() + "}";
|
+ xmitsInProgress.get() + "}";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -284,7 +284,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
BlockSender.ClientTraceLog.info(String.format(
|
BlockSender.ClientTraceLog.info(String.format(
|
||||||
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
|
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
|
||||||
" blockid: %s, srvID: %s, success: %b",
|
" blockid: %s, srvID: %s, success: %b",
|
||||||
blk.getBlockId(), dnR.getStorageID(), (fis != null)
|
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if (fis != null) {
|
if (fis != null) {
|
||||||
|
@ -317,7 +317,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
|
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
|
||||||
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
|
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
|
||||||
"%d", "HDFS_READ", clientName, "%d",
|
"%d", "HDFS_READ", clientName, "%d",
|
||||||
dnR.getStorageID(), block, "%d")
|
dnR.getDatanodeUuid(), block, "%d")
|
||||||
: dnR + " Served block " + block + " to " +
|
: dnR + " Served block " + block + " to " +
|
||||||
remoteAddress;
|
remoteAddress;
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class DatanodeRegistration extends DatanodeID
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName()
|
return getClass().getSimpleName()
|
||||||
+ "(" + getIpAddr()
|
+ "(" + getIpAddr()
|
||||||
+ ", storageID=" + getStorageID()
|
+ ", storageID=" + getDatanodeUuid()
|
||||||
+ ", infoPort=" + getInfoPort()
|
+ ", infoPort=" + getInfoPort()
|
||||||
+ ", ipcPort=" + getIpcPort()
|
+ ", ipcPort=" + getIpcPort()
|
||||||
+ ", storageInfo=" + storageInfo
|
+ ", storageInfo=" + storageInfo
|
||||||
|
|
|
@ -292,7 +292,7 @@ public class JsonUtil {
|
||||||
final Map<String, Object> m = new TreeMap<String, Object>();
|
final Map<String, Object> m = new TreeMap<String, Object>();
|
||||||
m.put("ipAddr", datanodeinfo.getIpAddr());
|
m.put("ipAddr", datanodeinfo.getIpAddr());
|
||||||
m.put("hostName", datanodeinfo.getHostName());
|
m.put("hostName", datanodeinfo.getHostName());
|
||||||
m.put("storageID", datanodeinfo.getStorageID());
|
m.put("storageID", datanodeinfo.getDatanodeUuid());
|
||||||
m.put("xferPort", datanodeinfo.getXferPort());
|
m.put("xferPort", datanodeinfo.getXferPort());
|
||||||
m.put("infoPort", datanodeinfo.getInfoPort());
|
m.put("infoPort", datanodeinfo.getInfoPort());
|
||||||
m.put("ipcPort", datanodeinfo.getIpcPort());
|
m.put("ipcPort", datanodeinfo.getIpcPort());
|
||||||
|
|
|
@ -223,7 +223,7 @@ public class TestDatanodeRegistration {
|
||||||
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
||||||
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
||||||
doReturn(123).when(mockDnReg).getXferPort();
|
doReturn(123).when(mockDnReg).getXferPort();
|
||||||
doReturn("fake-storage-id").when(mockDnReg).getStorageID();
|
doReturn("fake-storage-id").when(mockDnReg).getDatanodeUuid();
|
||||||
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
||||||
|
|
||||||
// Should succeed when software versions are the same.
|
// Should succeed when software versions are the same.
|
||||||
|
@ -270,7 +270,7 @@ public class TestDatanodeRegistration {
|
||||||
|
|
||||||
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
|
||||||
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
doReturn(HdfsConstants.LAYOUT_VERSION).when(mockDnReg).getVersion();
|
||||||
doReturn("fake-storage-id").when(mockDnReg).getStorageID();
|
doReturn("fake-storage-id").when(mockDnReg).getDatanodeUuid();
|
||||||
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
doReturn(mockStorageInfo).when(mockDnReg).getStorageInfo();
|
||||||
|
|
||||||
// Should succeed when software versions are the same and CTimes are the
|
// Should succeed when software versions are the same and CTimes are the
|
||||||
|
|
|
@ -145,7 +145,7 @@ public class TestPBHelper {
|
||||||
void compare(DatanodeID dn, DatanodeID dn2) {
|
void compare(DatanodeID dn, DatanodeID dn2) {
|
||||||
assertEquals(dn.getIpAddr(), dn2.getIpAddr());
|
assertEquals(dn.getIpAddr(), dn2.getIpAddr());
|
||||||
assertEquals(dn.getHostName(), dn2.getHostName());
|
assertEquals(dn.getHostName(), dn2.getHostName());
|
||||||
assertEquals(dn.getStorageID(), dn2.getStorageID());
|
assertEquals(dn.getDatanodeUuid(), dn2.getDatanodeUuid());
|
||||||
assertEquals(dn.getXferPort(), dn2.getXferPort());
|
assertEquals(dn.getXferPort(), dn2.getXferPort());
|
||||||
assertEquals(dn.getInfoPort(), dn2.getInfoPort());
|
assertEquals(dn.getInfoPort(), dn2.getInfoPort());
|
||||||
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
|
assertEquals(dn.getIpcPort(), dn2.getIpcPort());
|
||||||
|
|
|
@ -527,7 +527,7 @@ public class TestBlockManager {
|
||||||
public void testSafeModeIBR() throws Exception {
|
public void testSafeModeIBR() throws Exception {
|
||||||
DatanodeDescriptor node = spy(nodes.get(0));
|
DatanodeDescriptor node = spy(nodes.get(0));
|
||||||
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
||||||
node.setStorageID(ds.getStorageID());
|
node.setDatanodeUuid(ds.getStorageID());
|
||||||
|
|
||||||
node.isAlive = true;
|
node.isAlive = true;
|
||||||
|
|
||||||
|
@ -571,7 +571,7 @@ public class TestBlockManager {
|
||||||
public void testSafeModeIBRAfterIncremental() throws Exception {
|
public void testSafeModeIBRAfterIncremental() throws Exception {
|
||||||
DatanodeDescriptor node = spy(nodes.get(0));
|
DatanodeDescriptor node = spy(nodes.get(0));
|
||||||
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
DatanodeStorageInfo ds = node.getStorageInfos()[0];
|
||||||
node.setStorageID(ds.getStorageID());
|
node.setDatanodeUuid(ds.getStorageID());
|
||||||
node.isAlive = true;
|
node.isAlive = true;
|
||||||
|
|
||||||
DatanodeRegistration nodeReg =
|
DatanodeRegistration nodeReg =
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class TestDatanodeManager {
|
||||||
it.next();
|
it.next();
|
||||||
}
|
}
|
||||||
DatanodeRegistration toRemove = it.next().getValue();
|
DatanodeRegistration toRemove = it.next().getValue();
|
||||||
Log.info("Removing node " + toRemove.getStorageID() + " ip " +
|
Log.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
|
||||||
toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
|
toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
|
||||||
|
|
||||||
//Remove that random node
|
//Remove that random node
|
||||||
|
@ -90,7 +90,7 @@ public class TestDatanodeManager {
|
||||||
String storageID = "someStorageID" + rng.nextInt(5000);
|
String storageID = "someStorageID" + rng.nextInt(5000);
|
||||||
|
|
||||||
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
|
||||||
Mockito.when(dr.getStorageID()).thenReturn(storageID);
|
Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);
|
||||||
|
|
||||||
//If this storageID had already been registered before
|
//If this storageID had already been registered before
|
||||||
if(sIdToDnReg.containsKey(storageID)) {
|
if(sIdToDnReg.containsKey(storageID)) {
|
||||||
|
@ -110,7 +110,7 @@ public class TestDatanodeManager {
|
||||||
Mockito.when(dr.getSoftwareVersion()).thenReturn(
|
Mockito.when(dr.getSoftwareVersion()).thenReturn(
|
||||||
"version" + rng.nextInt(5));
|
"version" + rng.nextInt(5));
|
||||||
|
|
||||||
Log.info("Registering node storageID: " + dr.getStorageID() +
|
Log.info("Registering node storageID: " + dr.getDatanodeUuid() +
|
||||||
", version: " + dr.getSoftwareVersion() + ", IP address: "
|
", version: " + dr.getSoftwareVersion() + ", IP address: "
|
||||||
+ dr.getXferAddr());
|
+ dr.getXferAddr());
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -101,7 +100,7 @@ public class TestNodeCount {
|
||||||
DatanodeDescriptor nonExcessDN = null;
|
DatanodeDescriptor nonExcessDN = null;
|
||||||
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
|
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
|
||||||
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
|
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
|
||||||
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
|
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
|
||||||
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
|
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
|
||||||
nonExcessDN = dn;
|
nonExcessDN = dn;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class TestOverReplicatedBlocks {
|
||||||
DataNode lastDN = cluster.getDataNodes().get(3);
|
DataNode lastDN = cluster.getDataNodes().get(3);
|
||||||
DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
|
DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
|
||||||
lastDN, namesystem.getBlockPoolId());
|
lastDN, namesystem.getBlockPoolId());
|
||||||
String lastDNid = dnReg.getStorageID();
|
String lastDNid = dnReg.getDatanodeUuid();
|
||||||
|
|
||||||
final Path fileName = new Path("/foo2");
|
final Path fileName = new Path("/foo2");
|
||||||
DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
|
DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
|
||||||
|
|
|
@ -198,7 +198,7 @@ public class TestPendingReplication {
|
||||||
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
||||||
poolId);
|
poolId);
|
||||||
StorageReceivedDeletedBlocks[] report = {
|
StorageReceivedDeletedBlocks[] report = {
|
||||||
new StorageReceivedDeletedBlocks(dnR.getStorageID(),
|
new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(),
|
||||||
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
||||||
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
||||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
||||||
|
@ -215,7 +215,7 @@ public class TestPendingReplication {
|
||||||
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
|
||||||
poolId);
|
poolId);
|
||||||
StorageReceivedDeletedBlocks[] report =
|
StorageReceivedDeletedBlocks[] report =
|
||||||
{ new StorageReceivedDeletedBlocks(dnR.getStorageID(),
|
{ new StorageReceivedDeletedBlocks(dnR.getDatanodeUuid(),
|
||||||
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
|
||||||
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
|
||||||
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -380,16 +379,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
private final Map<String, Map<Block, BInfo>> blockMap
|
private final Map<String, Map<Block, BInfo>> blockMap
|
||||||
= new HashMap<String, Map<Block,BInfo>>();
|
= new HashMap<String, Map<Block,BInfo>>();
|
||||||
private final SimulatedStorage storage;
|
private final SimulatedStorage storage;
|
||||||
private final String storageId;
|
private final String datanodeUuid;
|
||||||
|
|
||||||
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
||||||
if (storage != null) {
|
if (storage != null) {
|
||||||
storage.createStorageID();
|
storage.createStorageID();
|
||||||
this.storageId = storage.getStorageID();
|
this.datanodeUuid = storage.getStorageID();
|
||||||
} else {
|
} else {
|
||||||
this.storageId = "unknownStorageId-" + UUID.randomUUID();
|
this.datanodeUuid = "unknownStorageId-" + UUID.randomUUID();
|
||||||
}
|
}
|
||||||
registerMBean(storageId);
|
registerMBean(datanodeUuid);
|
||||||
this.storage = new SimulatedStorage(
|
this.storage = new SimulatedStorage(
|
||||||
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
|
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
|
||||||
}
|
}
|
||||||
|
@ -884,7 +883,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getStorageInfo() {
|
public String getStorageInfo() {
|
||||||
return "Simulated FSDataset-" + storageId;
|
return "Simulated FSDataset-" + datanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -911,7 +910,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
|
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
|
||||||
long recoveryId,
|
long recoveryId,
|
||||||
long newlength) {
|
long newlength) {
|
||||||
return storageId;
|
return datanodeUuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
|
|
|
@ -265,7 +265,7 @@ public class TestBlockReplacement {
|
||||||
// sendRequest
|
// sendRequest
|
||||||
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
|
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
|
||||||
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
|
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
|
||||||
source.getStorageID(), sourceProxy);
|
source.getDatanodeUuid(), sourceProxy);
|
||||||
out.flush();
|
out.flush();
|
||||||
// receiveResponse
|
// receiveResponse
|
||||||
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
DataInputStream reply = new DataInputStream(sock.getInputStream());
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
|
||||||
|
@ -236,7 +236,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
|
||||||
|
@ -278,7 +278,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
DatanodeCommand dnCmd =
|
DatanodeCommand dnCmd =
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
@ -332,7 +332,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
@ -368,7 +368,7 @@ public class TestBlockReport {
|
||||||
int randIndex = rand.nextInt(blocks.size());
|
int randIndex = rand.nextInt(blocks.size());
|
||||||
// Get a block and screw its GS
|
// Get a block and screw its GS
|
||||||
Block corruptedBlock = blocks.get(randIndex);
|
Block corruptedBlock = blocks.get(randIndex);
|
||||||
String secondNode = cluster.getDataNodes().get(DN_N1).getStorageId();
|
String secondNode = cluster.getDataNodes().get(DN_N1).getDatanodeUuid();
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Working with " + secondNode);
|
LOG.debug("Working with " + secondNode);
|
||||||
LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
|
LOG.debug("BlockGS before " + blocks.get(randIndex).getGenerationStamp());
|
||||||
|
@ -383,7 +383,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
@ -407,7 +407,7 @@ public class TestBlockReport {
|
||||||
}
|
}
|
||||||
|
|
||||||
report[0] = new StorageBlockReport(
|
report[0] = new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
@ -459,7 +459,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
@ -507,7 +507,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
|
@ -163,7 +163,7 @@ public class TestDataNodeMultipleRegistrations {
|
||||||
|
|
||||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||||
LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration + "; sid="
|
LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration + "; sid="
|
||||||
+ bpos.bpRegistration.getStorageID() + "; nna=" +
|
+ bpos.bpRegistration.getDatanodeUuid() + "; nna=" +
|
||||||
getNNSocketAddress(bpos));
|
getNNSocketAddress(bpos));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class TestDataNodeVolumeFailure {
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
||||||
final StorageBlockReport[] report = {
|
final StorageBlockReport[] report = {
|
||||||
new StorageBlockReport(
|
new StorageBlockReport(
|
||||||
new DatanodeStorage(dnR.getStorageID()),
|
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||||
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
|
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
|
||||||
).getBlockListAsLongs())
|
).getBlockListAsLongs())
|
||||||
};
|
};
|
||||||
|
|
|
@ -846,7 +846,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
// register datanode
|
// register datanode
|
||||||
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
|
||||||
//first block reports
|
//first block reports
|
||||||
storage = new DatanodeStorage(dnRegistration.getStorageID());
|
storage = new DatanodeStorage(dnRegistration.getDatanodeUuid());
|
||||||
final StorageBlockReport[] reports = {
|
final StorageBlockReport[] reports = {
|
||||||
new StorageBlockReport(storage,
|
new StorageBlockReport(storage,
|
||||||
new BlockListAsLongs(null, null).getBlockListAsLongs())
|
new BlockListAsLongs(null, null).getBlockListAsLongs())
|
||||||
|
@ -862,7 +862,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
void sendHeartbeat() throws IOException {
|
void sendHeartbeat() throws IOException {
|
||||||
// register datanode
|
// register datanode
|
||||||
// TODO:FEDERATION currently a single block pool is supported
|
// TODO:FEDERATION currently a single block pool is supported
|
||||||
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
|
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(),
|
||||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
||||||
rep, 0, 0, 0).getCommands();
|
rep, 0, 0, 0).getCommands();
|
||||||
|
@ -909,7 +909,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
@SuppressWarnings("unused") // keep it for future blockReceived benchmark
|
@SuppressWarnings("unused") // keep it for future blockReceived benchmark
|
||||||
int replicateBlocks() throws IOException {
|
int replicateBlocks() throws IOException {
|
||||||
// register datanode
|
// register datanode
|
||||||
StorageReport[] rep = { new StorageReport(dnRegistration.getStorageID(),
|
StorageReport[] rep = { new StorageReport(dnRegistration.getDatanodeUuid(),
|
||||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
||||||
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
|
||||||
rep, 0, 0, 0).getCommands();
|
rep, 0, 0, 0).getCommands();
|
||||||
|
@ -939,14 +939,14 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
DatanodeInfo dnInfo = blockTargets[t];
|
DatanodeInfo dnInfo = blockTargets[t];
|
||||||
DatanodeRegistration receivedDNReg;
|
DatanodeRegistration receivedDNReg;
|
||||||
receivedDNReg = new DatanodeRegistration(dnInfo,
|
receivedDNReg = new DatanodeRegistration(dnInfo,
|
||||||
new DataStorage(nsInfo, dnInfo.getStorageID()),
|
new DataStorage(nsInfo, dnInfo.getDatanodeUuid()),
|
||||||
new ExportedBlockKeys(), VersionInfo.getVersion());
|
new ExportedBlockKeys(), VersionInfo.getVersion());
|
||||||
ReceivedDeletedBlockInfo[] rdBlocks = {
|
ReceivedDeletedBlockInfo[] rdBlocks = {
|
||||||
new ReceivedDeletedBlockInfo(
|
new ReceivedDeletedBlockInfo(
|
||||||
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
||||||
null) };
|
null) };
|
||||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||||
receivedDNReg.getStorageID(), rdBlocks) };
|
receivedDNReg.getDatanodeUuid(), rdBlocks) };
|
||||||
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
|
nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
|
||||||
.getNamesystem().getBlockPoolId(), report);
|
.getNamesystem().getBlockPoolId(), report);
|
||||||
}
|
}
|
||||||
|
@ -1069,7 +1069,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
loc.getBlock().getLocalBlock(),
|
loc.getBlock().getLocalBlock(),
|
||||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
|
||||||
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
||||||
datanodes[dnIdx].dnRegistration.getStorageID(), rdBlocks) };
|
datanodes[dnIdx].dnRegistration.getDatanodeUuid(), rdBlocks) };
|
||||||
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
|
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
|
||||||
.getBlock().getBlockPoolId(), report);
|
.getBlock().getBlockPoolId(), report);
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,11 +104,11 @@ public class TestDeadDatanode {
|
||||||
DatanodeRegistration reg =
|
DatanodeRegistration reg =
|
||||||
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
|
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
|
||||||
|
|
||||||
waitForDatanodeState(reg.getStorageID(), true, 20000);
|
waitForDatanodeState(reg.getDatanodeUuid(), true, 20000);
|
||||||
|
|
||||||
// Shutdown and wait for datanode to be marked dead
|
// Shutdown and wait for datanode to be marked dead
|
||||||
dn.shutdown();
|
dn.shutdown();
|
||||||
waitForDatanodeState(reg.getStorageID(), false, 20000);
|
waitForDatanodeState(reg.getDatanodeUuid(), false, 20000);
|
||||||
|
|
||||||
DatanodeProtocol dnp = cluster.getNameNodeRpc();
|
DatanodeProtocol dnp = cluster.getNameNodeRpc();
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ public class TestDeadDatanode {
|
||||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
|
||||||
null) };
|
null) };
|
||||||
StorageReceivedDeletedBlocks[] storageBlocks = {
|
StorageReceivedDeletedBlocks[] storageBlocks = {
|
||||||
new StorageReceivedDeletedBlocks(reg.getStorageID(), blocks) };
|
new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
|
||||||
|
|
||||||
// Ensure blockReceived call from dead datanode is rejected with IOException
|
// Ensure blockReceived call from dead datanode is rejected with IOException
|
||||||
try {
|
try {
|
||||||
|
@ -129,7 +129,7 @@ public class TestDeadDatanode {
|
||||||
|
|
||||||
// Ensure blockReport from dead datanode is rejected with IOException
|
// Ensure blockReport from dead datanode is rejected with IOException
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(
|
StorageBlockReport[] report = { new StorageBlockReport(
|
||||||
new DatanodeStorage(reg.getStorageID()),
|
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||||
new long[] { 0L, 0L, 0L }) };
|
new long[] { 0L, 0L, 0L }) };
|
||||||
try {
|
try {
|
||||||
dnp.blockReport(reg, poolId, report);
|
dnp.blockReport(reg, poolId, report);
|
||||||
|
@ -140,7 +140,7 @@ public class TestDeadDatanode {
|
||||||
|
|
||||||
// Ensure heartbeat from dead datanode is rejected with a command
|
// Ensure heartbeat from dead datanode is rejected with a command
|
||||||
// that asks datanode to register again
|
// that asks datanode to register again
|
||||||
StorageReport[] rep = { new StorageReport(reg.getStorageID(), false, 0, 0,
|
StorageReport[] rep = { new StorageReport(reg.getDatanodeUuid(), false, 0, 0,
|
||||||
0, 0) };
|
0, 0) };
|
||||||
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands();
|
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0, 0, 0).getCommands();
|
||||||
assertEquals(1, cmd.length);
|
assertEquals(1, cmd.length);
|
||||||
|
|
Loading…
Reference in New Issue