From b884af72c563403a7264b1a4b42ab58790af72f3 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 22 Oct 2013 23:29:40 +0000 Subject: [PATCH] HDFS-5398. NameNode changes to process storage reports per storage directory. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534847 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 7 +- .../server/blockmanagement/BlockManager.java | 9 ++- .../blockmanagement/DatanodeDescriptor.java | 76 +++++++++---------- .../blockmanagement/DatanodeManager.java | 7 +- .../blockmanagement/DatanodeStorageInfo.java | 15 +++- .../blockmanagement/HeartbeatManager.java | 9 +-- .../hdfs/server/namenode/FSNamesystem.java | 8 +- .../server/namenode/NameNodeRpcServer.java | 6 +- .../hdfs/server/protocol/StorageReport.java | 2 + .../blockmanagement/BlockManagerTestUtil.java | 17 +++++ .../blockmanagement/TestBlockManager.java | 6 +- .../TestOverReplicatedBlocks.java | 5 +- .../TestReplicationPolicy.java | 28 ++++--- .../TestReplicationPolicyWithNodeGroup.java | 33 +++++--- .../hdfs/server/common/TestJspHelper.java | 6 +- .../hdfs/server/namenode/NameNodeAdapter.java | 5 +- 16 files changed, 147 insertions(+), 92 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index ff318f291fd..edc6bee4fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -4,7 +4,7 @@ IMPROVEMENTS: HDFS-4985. Add storage type to the protocol and expose it in block report and block locations. (Arpit Agarwal) - + HDFS-5115. Make StorageID a UUID. (Arpit Agarwal) HDFS-5000. DataNode configuration should allow specifying storage type. @@ -38,4 +38,7 @@ IMPROVEMENTS: (Arpit Agarwal) HDFS-5377. Heartbeats from Datandode should include one storage report - per storage directory (Arpit Agarwal) + per storage directory. (Arpit Agarwal) + + HDFS-5398. NameNode changes to process storage reports per storage + directory. (Arpit Agarwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 38525d165b3..86bc20cd6bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2628,7 +2628,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block // Decrement number of blocks scheduled to this storage. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. - node.getStorageInfo(storageID).decrementBlocksScheduled(); + DatanodeStorageInfo storageInfo = node.getStorageInfo(storageID); + if (storageInfo != null) { + storageInfo.decrementBlocksScheduled(); + } else { + throw new IllegalArgumentException( + "Unrecognized storageID " + storageID + " in block report " + + "from Datanode " + node.toString()); + } // get the deletion hint node DatanodeDescriptor delHintNode = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d2343bcc400..af1ef2dec23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -27,12 +27,16 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.util.Time; @@ -44,6 +48,7 @@ import org.apache.hadoop.util.Time; @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeDescriptor extends DatanodeInfo { + public static final Log LOG = LogFactory.getLog(DatanodeDescriptor.class); public static final DatanodeDescriptor[] EMPTY_ARRAY = {}; // Stores status of decommissioning. @@ -138,7 +143,7 @@ public class DatanodeDescriptor extends DatanodeInfo { * @param nodeID id of the data node */ public DatanodeDescriptor(DatanodeID nodeID) { - this(nodeID, 0L, 0L, 0L, 0L, 0, 0); + super(nodeID); } /** @@ -148,51 +153,21 @@ public class DatanodeDescriptor extends DatanodeInfo { */ public DatanodeDescriptor(DatanodeID nodeID, String networkLocation) { - this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0); - } - - /** - * DatanodeDescriptor constructor - * @param nodeID id of the data node - * @param capacity capacity of the data node - * @param dfsUsed space used by the data node - * @param remaining remaining capacity of the data node - * @param bpused space used by the block pool corresponding to this namenode - * @param xceiverCount # of data transfers at the data node - */ - public DatanodeDescriptor(DatanodeID nodeID, - long capacity, - long dfsUsed, - long remaining, - long bpused, - int xceiverCount, - int failedVolumes) { - super(nodeID); - updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, - failedVolumes); + this(nodeID, networkLocation, 0, 0); } /** * DatanodeDescriptor constructor * @param nodeID id of the data node * @param networkLocation location of the data node in network - * @param capacity capacity of the data node, including space used by non-dfs - * @param dfsUsed the used space by dfs datanode - * @param remaining remaining capacity of the data node - * @param bpused space used by the block pool corresponding to this namenode * @param xceiverCount # of data transfers at the data node */ public DatanodeDescriptor(DatanodeID nodeID, String networkLocation, - long capacity, - long dfsUsed, - long remaining, - long bpused, int xceiverCount, int failedVolumes) { super(nodeID, networkLocation); - updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, - failedVolumes); + updateHeartbeat(StorageReport.EMPTY_ARRAY, xceiverCount, failedVolumes); } /** @@ -294,18 +269,37 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Updates stats from datanode heartbeat. */ - public void updateHeartbeat(long capacity, long dfsUsed, long remaining, - long blockPoolUsed, int xceiverCount, int volFailures) { - setCapacity(capacity); - setRemaining(remaining); - setBlockPoolUsed(blockPoolUsed); - setDfsUsed(dfsUsed); + public void updateHeartbeat(StorageReport[] reports, int xceiverCount, + int volFailures) { + long totalCapacity = 0; + long totalRemaining = 0; + long totalBlockPoolUsed = 0; + long totalDfsUsed = 0; + setXceiverCount(xceiverCount); setLastUpdate(Time.now()); this.volumeFailures = volFailures; - for(DatanodeStorageInfo storage : getStorageInfos()) { - storage.receivedHeartbeat(getLastUpdate()); + for (StorageReport report : reports) { + DatanodeStorageInfo storage = storageMap.get(report.getStorageID()); + if (storage != null) { + storage.receivedHeartbeat(report, getLastUpdate()); + totalCapacity += report.getCapacity(); + totalRemaining += report.getRemaining(); + totalBlockPoolUsed += report.getBlockPoolUsed(); + totalDfsUsed += report.getDfsUsed(); + } else { + // This warning is generally benign during cluster initialization + // when the heartbeat is received before the initial block reports + // from each storage. + LOG.warn("Unrecognized storage ID " + report.getStorageID()); + } } + + // Update total metrics for the node. + setCapacity(totalCapacity); + setRemaining(totalRemaining); + setBlockPoolUsed(totalBlockPoolUsed); + setDfsUsed(totalDfsUsed); } private static class BlockIterator implements Iterator { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2d97941d095..93866e94987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1215,8 +1215,7 @@ public class DatanodeManager { /** Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, - final String blockPoolId, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, + StorageReport[] reports, final String blockPoolId, int xceiverCount, int maxTransfers, int failedVolumes ) throws IOException { synchronized (heartbeatManager) { @@ -1238,8 +1237,8 @@ public class DatanodeManager { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } - heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed, - remaining, blockPoolUsed, xceiverCount, failedVolumes); + heartbeatManager.updateHeartbeat(nodeinfo, reports, + xceiverCount, failedVolumes); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index af07e9c2c17..e5f4e8b21d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -96,6 +97,7 @@ public class DatanodeStorageInfo { private long capacity; private long dfsUsed; private long remaining; + private long blockPoolUsed; private volatile BlockInfo blockList = null; private int numBlocks = 0; @@ -153,7 +155,8 @@ public class DatanodeStorageInfo { blockContentsStale = true; } - void receivedHeartbeat(final long lastUpdate) { + void receivedHeartbeat(StorageReport report, final long lastUpdate) { + updateState(report); heartbeatedSinceFailover = true; rollBlocksScheduled(lastUpdate); } @@ -165,10 +168,13 @@ public class DatanodeStorageInfo { blockReportCount++; } - void setUtilization(long capacity, long dfsUsed, long remaining) { + @VisibleForTesting + public void setUtilization(long capacity, long dfsUsed, + long remaining, long blockPoolUsed) { this.capacity = capacity; this.dfsUsed = dfsUsed; this.remaining = remaining; + this.blockPoolUsed = blockPoolUsed; } public void setState(State s) { @@ -201,6 +207,10 @@ public class DatanodeStorageInfo { return remaining; } + public long getBlockPoolUsed() { + return blockPoolUsed; + } + public boolean addBlock(BlockInfo b) { if(!b.addStorage(this)) return false; @@ -232,6 +242,7 @@ public class DatanodeStorageInfo { capacity = r.getCapacity(); dfsUsed = r.getDfsUsed(); remaining = r.getRemaining(); + blockPoolUsed = r.getBlockPoolUsed(); } public DatanodeDescriptor getDatanodeDescriptor() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 0bff1bf52f7..c7aba24c21e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; @@ -170,7 +171,7 @@ class HeartbeatManager implements DatanodeStatistics { addDatanode(d); //update its timestamp - d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0); + d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0, 0); } } @@ -192,11 +193,9 @@ class HeartbeatManager implements DatanodeStatistics { } synchronized void updateHeartbeat(final DatanodeDescriptor node, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int failedVolumes) { + StorageReport[] reports, int xceiverCount, int failedVolumes) { stats.subtract(node); - node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed, - xceiverCount, failedVolumes); + node.updateHeartbeat(reports, xceiverCount, failedVolumes); stats.add(node); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 756397ec7ac..c13139d37c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -205,6 +205,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.util.ChunkedArrayList; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; @@ -4047,16 +4048,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException */ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int xmitsInProgress, int failedVolumes) + StorageReport[] reports, int xceiverCount, int xmitsInProgress, + int failedVolumes) throws IOException { readLock(); try { final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( - nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed, - xceiverCount, maxTransfer, failedVolumes); + nodeReg, reports, blockPoolId, xceiverCount, maxTransfer, failedVolumes); return new HeartbeatResponse(cmds, createHaStatusHeartbeat()); } finally { readUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4c2b5af68b1..9c92ecc8180 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -960,10 +960,8 @@ class NameNodeRpcServer implements NamenodeProtocols { StorageReport[] report, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException { verifyRequest(nodeReg); - return namesystem.handleHeartbeat(nodeReg, report[0].getCapacity(), - report[0].getDfsUsed(), report[0].getRemaining(), - report[0].getBlockPoolUsed(), xceiverCount, xmitsInProgress, - failedVolumes); + return namesystem.handleHeartbeat(nodeReg, report, xceiverCount, + xmitsInProgress, failedVolumes); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java index d018def4e21..c805f1ea455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReport.java @@ -27,6 +27,8 @@ public class StorageReport { private final long dfsUsed; private final long remaining; private final long blockPoolUsed; + + public static final StorageReport[] EMPTY_ARRAY = {}; public StorageReport(String sid, boolean failed, long capacity, long dfsUsed, long remaining, long bpUsed) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 8740b8471d6..41dbbcf5012 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -18,16 +18,19 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.util.Daemon; import org.junit.Assert; @@ -240,4 +243,18 @@ public class BlockManagerTestUtil { } return dn; } + + + public static StorageReport[] getStorageReportsForDatanode( + DatanodeDescriptor dnd) { + ArrayList reports = new ArrayList(); + for (DatanodeStorageInfo storage : dnd.getStorageInfos()) { + StorageReport report = new StorageReport( + storage.getStorageID(), false, storage.getCapacity(), + storage.getDfsUsed(), storage.getRemaining(), + storage.getBlockPoolUsed()); + reports.add(report); + } + return reports.toArray(StorageReport.EMPTY_ARRAY); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 6878eecc3b4..71b09fda02e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -103,9 +103,11 @@ public class TestBlockManager { // construct network topology for (DatanodeDescriptor dn : nodesToAdd) { cluster.add(dn); + dn.getStorageInfos()[0].setUtilization( + 2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L); dn.updateHeartbeat( - 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); + BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0, 0); bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 412dd611de5..d27b02cfac2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -103,7 +103,10 @@ public class TestOverReplicatedBlocks { String corruptMachineName = corruptDataNode.getXferAddr(); for (DatanodeDescriptor datanode : hm.getDatanodes()) { if (!corruptMachineName.equals(datanode.getXferAddr())) { - datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0, 0); + datanode.getStorageInfos()[0].setUtilization(100L, 100L, 0, 100L); + datanode.updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(datanode), + 0, 0); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 21dd5f04262..3942f5d0803 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -90,6 +90,16 @@ public class TestReplicationPolicy { @Rule public ExpectedException exception = ExpectedException.none(); + private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + int xceiverCount, int volFailures) { + dn.getStorageInfos()[0].setUtilization( + capacity, dfsUsed, remaining, blockPoolUsed); + dn.updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(dn), + xceiverCount, volFailures); + } + @BeforeClass public static void setupCluster() throws Exception { Configuration conf = new HdfsConfiguration(); @@ -126,7 +136,7 @@ public class TestReplicationPolicy { dataNodes[i]); } for (int i=0; i < NUM_OF_DATANODES; i++) { - dataNodes[i].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[i], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } @@ -150,7 +160,7 @@ public class TestReplicationPolicy { */ @Test public void testChooseTarget1() throws Exception { - dataNodes[0].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[0], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded @@ -180,7 +190,7 @@ public class TestReplicationPolicy { isOnSameRack(targets[2], targets[3])); assertFalse(isOnSameRack(targets[0], targets[2])); - dataNodes[0].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[0], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } @@ -303,7 +313,7 @@ public class TestReplicationPolicy { @Test public void testChooseTarget3() throws Exception { // make data node 0 to be not qualified to choose - dataNodes[0].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[0], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space @@ -336,7 +346,7 @@ public class TestReplicationPolicy { isOnSameRack(targets[2], targets[3])); assertFalse(isOnSameRack(targets[1], targets[3])); - dataNodes[0].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[0], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } @@ -353,7 +363,7 @@ public class TestReplicationPolicy { public void testChoooseTarget4() throws Exception { // make data node 0 & 1 to be not qualified to choose: not enough disk space for(int i=0; i<2; i++) { - dataNodes[i].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[i], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); } @@ -381,7 +391,7 @@ public class TestReplicationPolicy { assertFalse(isOnSameRack(targets[0], targets[2])); for(int i=0; i<2; i++) { - dataNodes[i].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[i], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } @@ -443,7 +453,7 @@ public class TestReplicationPolicy { public void testChooseTargetWithMoreThanAvailableNodes() throws Exception { // make data node 0 & 1 to be not qualified to choose: not enough disk space for(int i=0; i<2; i++) { - dataNodes[i].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[i], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); } @@ -468,7 +478,7 @@ public class TestReplicationPolicy { assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2")); for(int i=0; i<2; i++) { - dataNodes[i].updateHeartbeat( + updateHeartbeatWithUsage(dataNodes[i], 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index f38b4de21d2..15fc4a7e20e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -146,9 +146,20 @@ public class TestReplicationPolicyWithNodeGroup { namenode.stop(); } + private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + int xceiverCount, int volFailures) { + dn.getStorageInfos()[0].setUtilization( + capacity, dfsUsed, remaining, blockPoolUsed); + dn.updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(dn), + xceiverCount, volFailures); + } + + private static void setupDataNodeCapacity() { for(int i=0; i live = new ArrayList(); live.add(dnDesc1); live.add(dnDesc2); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index cf64c335bac..df434f6d405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; @@ -110,8 +111,8 @@ public class NameNodeAdapter { public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { - return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(), - dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0); + return namesystem.handleHeartbeat(nodeReg, + BlockManagerTestUtil.getStorageReportsForDatanode(dd), 0, 0, 0); } public static boolean setReplication(final FSNamesystem ns,