HDFS-5398. NameNode changes to process storage reports per storage directory.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534847 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-22 23:29:40 +00:00
parent ef17685f79
commit b884af72c5
16 changed files with 147 additions and 92 deletions

View File

@ -38,4 +38,7 @@ IMPROVEMENTS:
(Arpit Agarwal) (Arpit Agarwal)
HDFS-5377. Heartbeats from Datandode should include one storage report HDFS-5377. Heartbeats from Datandode should include one storage report
per storage directory (Arpit Agarwal) per storage directory. (Arpit Agarwal)
HDFS-5398. NameNode changes to process storage reports per storage
directory. (Arpit Agarwal)

View File

@ -2628,7 +2628,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// Decrement number of blocks scheduled to this storage. // Decrement number of blocks scheduled to this storage.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number. // RECEIVED_BLOCK), we currently also decrease the approximate number.
node.getStorageInfo(storageID).decrementBlocksScheduled(); DatanodeStorageInfo storageInfo = node.getStorageInfo(storageID);
if (storageInfo != null) {
storageInfo.decrementBlocksScheduled();
} else {
throw new IllegalArgumentException(
"Unrecognized storageID " + storageID + " in block report " +
"from Datanode " + node.toString());
}
// get the deletion hint node // get the deletion hint node
DatanodeDescriptor delHintNode = null; DatanodeDescriptor delHintNode = null;

View File

@ -27,12 +27,16 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -44,6 +48,7 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo { public class DatanodeDescriptor extends DatanodeInfo {
public static final Log LOG = LogFactory.getLog(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
// Stores status of decommissioning. // Stores status of decommissioning.
@ -138,7 +143,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
* @param nodeID id of the data node * @param nodeID id of the data node
*/ */
public DatanodeDescriptor(DatanodeID nodeID) { public DatanodeDescriptor(DatanodeID nodeID) {
this(nodeID, 0L, 0L, 0L, 0L, 0, 0); super(nodeID);
} }
/** /**
@ -148,51 +153,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/ */
public DatanodeDescriptor(DatanodeID nodeID, public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) { String networkLocation) {
this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0); this(nodeID, networkLocation, 0, 0);
}
/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param capacity capacity of the data node
* @param dfsUsed space used by the data node
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
* @param xceiverCount # of data transfers at the data node
*/
public DatanodeDescriptor(DatanodeID nodeID,
long capacity,
long dfsUsed,
long remaining,
long bpused,
int xceiverCount,
int failedVolumes) {
super(nodeID);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount,
failedVolumes);
} }
/** /**
* DatanodeDescriptor constructor * DatanodeDescriptor constructor
* @param nodeID id of the data node * @param nodeID id of the data node
* @param networkLocation location of the data node in network * @param networkLocation location of the data node in network
* @param capacity capacity of the data node, including space used by non-dfs
* @param dfsUsed the used space by dfs datanode
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
* @param xceiverCount # of data transfers at the data node * @param xceiverCount # of data transfers at the data node
*/ */
public DatanodeDescriptor(DatanodeID nodeID, public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation, String networkLocation,
long capacity,
long dfsUsed,
long remaining,
long bpused,
int xceiverCount, int xceiverCount,
int failedVolumes) { int failedVolumes) {
super(nodeID, networkLocation); super(nodeID, networkLocation);
updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, updateHeartbeat(StorageReport.EMPTY_ARRAY, xceiverCount, failedVolumes);
failedVolumes);
} }
/** /**
@ -294,18 +269,37 @@ public class DatanodeDescriptor extends DatanodeInfo {
/** /**
* Updates stats from datanode heartbeat. * Updates stats from datanode heartbeat.
*/ */
public void updateHeartbeat(long capacity, long dfsUsed, long remaining, public void updateHeartbeat(StorageReport[] reports, int xceiverCount,
long blockPoolUsed, int xceiverCount, int volFailures) { int volFailures) {
setCapacity(capacity); long totalCapacity = 0;
setRemaining(remaining); long totalRemaining = 0;
setBlockPoolUsed(blockPoolUsed); long totalBlockPoolUsed = 0;
setDfsUsed(dfsUsed); long totalDfsUsed = 0;
setXceiverCount(xceiverCount); setXceiverCount(xceiverCount);
setLastUpdate(Time.now()); setLastUpdate(Time.now());
this.volumeFailures = volFailures; this.volumeFailures = volFailures;
for(DatanodeStorageInfo storage : getStorageInfos()) { for (StorageReport report : reports) {
storage.receivedHeartbeat(getLastUpdate()); DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
if (storage != null) {
storage.receivedHeartbeat(report, getLastUpdate());
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
} else {
// This warning is generally benign during cluster initialization
// when the heartbeat is received before the initial block reports
// from each storage.
LOG.warn("Unrecognized storage ID " + report.getStorageID());
}
} }
// Update total metrics for the node.
setCapacity(totalCapacity);
setRemaining(totalRemaining);
setBlockPoolUsed(totalBlockPoolUsed);
setDfsUsed(totalDfsUsed);
} }
private static class BlockIterator implements Iterator<BlockInfo> { private static class BlockIterator implements Iterator<BlockInfo> {

View File

@ -1215,8 +1215,7 @@ public class DatanodeManager {
/** Handle heartbeat from datanodes. */ /** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId, StorageReport[] reports, final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException { ) throws IOException {
synchronized (heartbeatManager) { synchronized (heartbeatManager) {
@ -1238,8 +1237,8 @@ public class DatanodeManager {
return new DatanodeCommand[]{RegisterCommand.REGISTER}; return new DatanodeCommand[]{RegisterCommand.REGISTER};
} }
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, heartbeatManager.updateHeartbeat(nodeinfo, reports,
remaining, blockPoolUsed, xceiverCount, failedVolumes); xceiverCount, failedVolumes);
// If we are in safemode, do not send back any recovery / replication // If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work. // requests. Don't even drain the existing queue of work.

View File

@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -96,6 +97,7 @@ public class DatanodeStorageInfo {
private long capacity; private long capacity;
private long dfsUsed; private long dfsUsed;
private long remaining; private long remaining;
private long blockPoolUsed;
private volatile BlockInfo blockList = null; private volatile BlockInfo blockList = null;
private int numBlocks = 0; private int numBlocks = 0;
@ -153,7 +155,8 @@ public class DatanodeStorageInfo {
blockContentsStale = true; blockContentsStale = true;
} }
void receivedHeartbeat(final long lastUpdate) { void receivedHeartbeat(StorageReport report, final long lastUpdate) {
updateState(report);
heartbeatedSinceFailover = true; heartbeatedSinceFailover = true;
rollBlocksScheduled(lastUpdate); rollBlocksScheduled(lastUpdate);
} }
@ -165,10 +168,13 @@ public class DatanodeStorageInfo {
blockReportCount++; blockReportCount++;
} }
void setUtilization(long capacity, long dfsUsed, long remaining) { @VisibleForTesting
public void setUtilization(long capacity, long dfsUsed,
long remaining, long blockPoolUsed) {
this.capacity = capacity; this.capacity = capacity;
this.dfsUsed = dfsUsed; this.dfsUsed = dfsUsed;
this.remaining = remaining; this.remaining = remaining;
this.blockPoolUsed = blockPoolUsed;
} }
public void setState(State s) { public void setState(State s) {
@ -201,6 +207,10 @@ public class DatanodeStorageInfo {
return remaining; return remaining;
} }
public long getBlockPoolUsed() {
return blockPoolUsed;
}
public boolean addBlock(BlockInfo b) { public boolean addBlock(BlockInfo b) {
if(!b.addStorage(this)) if(!b.addStorage(this))
return false; return false;
@ -232,6 +242,7 @@ public class DatanodeStorageInfo {
capacity = r.getCapacity(); capacity = r.getCapacity();
dfsUsed = r.getDfsUsed(); dfsUsed = r.getDfsUsed();
remaining = r.getRemaining(); remaining = r.getRemaining();
blockPoolUsed = r.getBlockPoolUsed();
} }
public DatanodeDescriptor getDatanodeDescriptor() { public DatanodeDescriptor getDatanodeDescriptor() {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -170,7 +171,7 @@ class HeartbeatManager implements DatanodeStatistics {
addDatanode(d); addDatanode(d);
//update its timestamp //update its timestamp
d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0); d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0, 0);
} }
} }
@ -192,11 +193,9 @@ class HeartbeatManager implements DatanodeStatistics {
} }
synchronized void updateHeartbeat(final DatanodeDescriptor node, synchronized void updateHeartbeat(final DatanodeDescriptor node,
long capacity, long dfsUsed, long remaining, long blockPoolUsed, StorageReport[] reports, int xceiverCount, int failedVolumes) {
int xceiverCount, int failedVolumes) {
stats.subtract(node); stats.subtract(node);
node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed, node.updateHeartbeat(reports, xceiverCount, failedVolumes);
xceiverCount, failedVolumes);
stats.add(node); stats.add(node);
} }

View File

@ -205,6 +205,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.ChunkedArrayList; import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -4047,16 +4048,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException * @throws IOException
*/ */
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed, StorageReport[] reports, int xceiverCount, int xmitsInProgress,
int xceiverCount, int xmitsInProgress, int failedVolumes) int failedVolumes)
throws IOException { throws IOException {
readLock(); readLock();
try { try {
final int maxTransfer = blockManager.getMaxReplicationStreams() final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress; - xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed, nodeReg, reports, blockPoolId, xceiverCount, maxTransfer, failedVolumes);
xceiverCount, maxTransfer, failedVolumes);
return new HeartbeatResponse(cmds, createHaStatusHeartbeat()); return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally { } finally {
readUnlock(); readUnlock();

View File

@ -960,10 +960,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
StorageReport[] report, int xmitsInProgress, int xceiverCount, StorageReport[] report, int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException { int failedVolumes) throws IOException {
verifyRequest(nodeReg); verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), return namesystem.handleHeartbeat(nodeReg, report, xceiverCount,
report[0].getDfsUsed(), report[0].getRemaining(), xmitsInProgress, failedVolumes);
report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress,
failedVolumes);
} }
@Override // DatanodeProtocol @Override // DatanodeProtocol

View File

@ -28,6 +28,8 @@ public class StorageReport {
private final long remaining; private final long remaining;
private final long blockPoolUsed; private final long blockPoolUsed;
public static final StorageReport[] EMPTY_ARRAY = {};
public StorageReport(String sid, boolean failed, long capacity, long dfsUsed, public StorageReport(String sid, boolean failed, long capacity, long dfsUsed,
long remaining, long bpUsed) { long remaining, long bpUsed) {
this.storageID = sid; this.storageID = sid;

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.junit.Assert; import org.junit.Assert;
@ -240,4 +243,18 @@ public class BlockManagerTestUtil {
} }
return dn; return dn;
} }
public static StorageReport[] getStorageReportsForDatanode(
DatanodeDescriptor dnd) {
ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
for (DatanodeStorageInfo storage : dnd.getStorageInfos()) {
StorageReport report = new StorageReport(
storage.getStorageID(), false, storage.getCapacity(),
storage.getDfsUsed(), storage.getRemaining(),
storage.getBlockPoolUsed());
reports.add(report);
}
return reports.toArray(StorageReport.EMPTY_ARRAY);
}
} }

View File

@ -103,9 +103,11 @@ public class TestBlockManager {
// construct network topology // construct network topology
for (DatanodeDescriptor dn : nodesToAdd) { for (DatanodeDescriptor dn : nodesToAdd) {
cluster.add(dn); cluster.add(dn);
dn.getStorageInfos()[0].setUtilization(
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
dn.updateHeartbeat( dn.updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0, 0);
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn); bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
} }
} }

View File

@ -103,7 +103,10 @@ public class TestOverReplicatedBlocks {
String corruptMachineName = corruptDataNode.getXferAddr(); String corruptMachineName = corruptDataNode.getXferAddr();
for (DatanodeDescriptor datanode : hm.getDatanodes()) { for (DatanodeDescriptor datanode : hm.getDatanodes()) {
if (!corruptMachineName.equals(datanode.getXferAddr())) { if (!corruptMachineName.equals(datanode.getXferAddr())) {
datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0); datanode.getStorageInfos()[0].setUtilization(100L, 100L, 0, 100L);
datanode.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
0, 0);
} }
} }

View File

@ -90,6 +90,16 @@ public class TestReplicationPolicy {
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int volFailures) {
dn.getStorageInfos()[0].setUtilization(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
xceiverCount, volFailures);
}
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -126,7 +136,7 @@ public class TestReplicationPolicy {
dataNodes[i]); dataNodes[i]);
} }
for (int i=0; i < NUM_OF_DATANODES; i++) { for (int i=0; i < NUM_OF_DATANODES; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -150,7 +160,7 @@ public class TestReplicationPolicy {
*/ */
@Test @Test
public void testChooseTarget1() throws Exception { public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
@ -180,7 +190,7 @@ public class TestReplicationPolicy {
isOnSameRack(targets[2], targets[3])); isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2])); assertFalse(isOnSameRack(targets[0], targets[2]));
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -303,7 +313,7 @@ public class TestReplicationPolicy {
@Test @Test
public void testChooseTarget3() throws Exception { public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose // make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
@ -336,7 +346,7 @@ public class TestReplicationPolicy {
isOnSameRack(targets[2], targets[3])); isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3])); assertFalse(isOnSameRack(targets[1], targets[3]));
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -353,7 +363,7 @@ public class TestReplicationPolicy {
public void testChoooseTarget4() throws Exception { public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space // make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) { for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
} }
@ -381,7 +391,7 @@ public class TestReplicationPolicy {
assertFalse(isOnSameRack(targets[0], targets[2])); assertFalse(isOnSameRack(targets[0], targets[2]));
for(int i=0; i<2; i++) { for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -443,7 +453,7 @@ public class TestReplicationPolicy {
public void testChooseTargetWithMoreThanAvailableNodes() throws Exception { public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space // make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) { for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
} }
@ -468,7 +478,7 @@ public class TestReplicationPolicy {
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2")); assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
for(int i=0; i<2; i++) { for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }

View File

@ -146,9 +146,20 @@ public class TestReplicationPolicyWithNodeGroup {
namenode.stop(); namenode.stop();
} }
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int volFailures) {
dn.getStorageInfos()[0].setUtilization(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
xceiverCount, volFailures);
}
private static void setupDataNodeCapacity() { private static void setupDataNodeCapacity() {
for(int i=0; i<NUM_OF_DATANODES; i++) { for(int i=0; i<NUM_OF_DATANODES; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -231,7 +242,7 @@ public class TestReplicationPolicyWithNodeGroup {
*/ */
@Test @Test
public void testChooseTarget1() throws Exception { public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
@ -268,7 +279,7 @@ public class TestReplicationPolicyWithNodeGroup {
// Make sure no more than one replicas are on the same nodegroup // Make sure no more than one replicas are on the same nodegroup
verifyNoTwoTargetsOnSameNodeGroup(targets); verifyNoTwoTargetsOnSameNodeGroup(targets);
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -336,7 +347,7 @@ public class TestReplicationPolicyWithNodeGroup {
@Test @Test
public void testChooseTarget3() throws Exception { public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose // make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
@ -367,7 +378,7 @@ public class TestReplicationPolicyWithNodeGroup {
assertTrue(isOnSameRack(targets[1], targets[2]) || assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3])); isOnSameRack(targets[2], targets[3]));
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -385,7 +396,7 @@ public class TestReplicationPolicyWithNodeGroup {
public void testChooseTarget4() throws Exception { public void testChooseTarget4() throws Exception {
// make data node 0-2 to be not qualified to choose: not enough disk space // make data node 0-2 to be not qualified to choose: not enough disk space
for(int i=0; i<3; i++) { for(int i=0; i<3; i++) {
dataNodes[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
} }
@ -613,11 +624,11 @@ public class TestReplicationPolicyWithNodeGroup {
cluster.add(dataNodesInBoundaryCase[i]); cluster.add(dataNodesInBoundaryCase[i]);
} }
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodes[0].updateHeartbeat( updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
dataNodesInBoundaryCase[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -648,7 +659,7 @@ public class TestReplicationPolicyWithNodeGroup {
@Test @Test
public void testRereplicateOnBoundaryTopology() throws Exception { public void testRereplicateOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodesInBoundaryCase[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }
@ -686,7 +697,7 @@ public class TestReplicationPolicyWithNodeGroup {
} }
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
dataNodesInMoreTargetsCase[i].updateHeartbeat( updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
} }

View File

@ -451,10 +451,8 @@ public class TestJspHelper {
1234, 2345, 3456, 4567); 1234, 2345, 3456, 4567);
DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "datanode2", DatanodeID dnId2 = new DatanodeID("127.0.0.2", "localhost2", "datanode2",
1235, 2346, 3457, 4568); 1235, 2346, 3457, 4568);
DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1", 1024, DatanodeDescriptor dnDesc1 = new DatanodeDescriptor(dnId1, "rack1", 10, 2);
100, 924, 100, 10, 2); DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2", 20, 1);
DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2", 2500,
200, 1848, 200, 20, 1);
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
live.add(dnDesc1); live.add(dnDesc1);
live.add(dnDesc2); live.add(dnDesc2);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
@ -110,8 +111,8 @@ public class NameNodeAdapter {
public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(), return namesystem.handleHeartbeat(nodeReg,
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0); BlockManagerTestUtil.getStorageReportsForDatanode(dd), 0, 0, 0);
} }
public static boolean setReplication(final FSNamesystem ns, public static boolean setReplication(final FSNamesystem ns,