From c6d09d8c0c1801d41f3d8450f9836115aca7d1a3 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 25 Sep 2013 16:03:20 +0000 Subject: [PATCH] HDFS-5222. Move block schedule information from DatanodeDescriptor to DatanodeStorageInfo. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1526215 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES_HDFS-2832.txt | 3 + .../server/blockmanagement/BlockManager.java | 9 +-- .../BlockPlacementPolicyDefault.java | 13 +--- .../blockmanagement/DatanodeDescriptor.java | 47 ++------------ .../blockmanagement/DatanodeStorageInfo.java | 62 +++++++++++++++++-- .../hdfs/server/namenode/FSNamesystem.java | 4 +- 6 files changed, 72 insertions(+), 66 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index 348be08ab5c..b99955c4e2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -30,3 +30,6 @@ IMPROVEMENTS: HDFS-5232. Protocol changes to transmit StorageUuid. (Arpit Agarwal) HDFS-5233. Use Datanode UUID to identify Datanodes. (Arpit Agarwal) + + HDFS-5222. Move block schedule information from DatanodeDescriptor to + DatanodeStorageInfo. (szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5684f02da38..5d5d02dce25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1328,10 +1328,7 @@ public class BlockManager { // Add block to the to be replicated list rw.srcNode.addBlockToBeReplicated(block, targets); scheduledWork++; - - for (DatanodeStorageInfo storage : targets) { - storage.getDatanodeDescriptor().incBlocksScheduled(); - } + DatanodeStorageInfo.incrementBlocksScheduled(targets); // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry @@ -2621,10 +2618,10 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block @VisibleForTesting void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) throws IOException { - // decrement number of blocks scheduled to this datanode. + // Decrement number of blocks scheduled to this storage. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. - node.decBlocksScheduled(); + node.getStorageInfo(storageID).decrementBlocksScheduled(); // get the deletion hint node DatanodeDescriptor delHintNode = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index af786e716dc..493e6f87c26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -620,19 +620,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE; - if (requiredSize > storage.getRemaining()) { + final long scheduledSize = blockSize = storage.getBlocksScheduled(); + if (requiredSize > storage.getRemaining() - scheduledSize) { logNodeIsNotChosen(storage, "the storage does not have enough space "); return false; } - //TODO: move getBlocksScheduled() to DatanodeStorageInfo. - long remaining = node.getRemaining() - - (node.getBlocksScheduled() * blockSize); - // check the remaining capacity of the target machine - if (requiredSize > remaining) { - logNodeIsNotChosen(storage, "the node does not have enough space "); - return false; - } - + // check the communication traffic of the target machine if (considerLoad) { double avgLoad = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 4cdac91faf6..d2343bcc400 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -125,15 +125,6 @@ public class DatanodeDescriptor extends DatanodeInfo { /** A set of blocks to be invalidated by this datanode */ private LightWeightHashSet invalidateBlocks = new LightWeightHashSet(); - /* Variables for maintaining number of blocks scheduled to be written to - * this datanode. This count is approximate and might be slightly bigger - * in case of errors (e.g. datanode does not report if an error occurs - * while writing the block). - */ - private int currApproxBlocksScheduled = 0; - private int prevApproxBlocksScheduled = 0; - private long lastBlocksScheduledRollTime = 0; - private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private int volumeFailures = 0; /** @@ -313,9 +304,8 @@ public class DatanodeDescriptor extends DatanodeInfo { setLastUpdate(Time.now()); this.volumeFailures = volFailures; for(DatanodeStorageInfo storage : getStorageInfos()) { - storage.receivedHeartbeat(); + storage.receivedHeartbeat(getLastUpdate()); } - rollBlocksScheduled(getLastUpdate()); } private static class BlockIterator implements Iterator { @@ -437,38 +427,11 @@ public class DatanodeDescriptor extends DatanodeInfo { * to this datanode. */ public int getBlocksScheduled() { - return currApproxBlocksScheduled + prevApproxBlocksScheduled; - } - - /** - * Increments counter for number of blocks scheduled. - */ - public void incBlocksScheduled() { - currApproxBlocksScheduled++; - } - - /** - * Decrements counter for number of blocks scheduled. - */ - void decBlocksScheduled() { - if (prevApproxBlocksScheduled > 0) { - prevApproxBlocksScheduled--; - } else if (currApproxBlocksScheduled > 0) { - currApproxBlocksScheduled--; - } - // its ok if both counters are zero. - } - - /** - * Adjusts curr and prev number of blocks scheduled every few minutes. - */ - private void rollBlocksScheduled(long now) { - if ((now - lastBlocksScheduledRollTime) > - BLOCKS_SCHEDULED_ROLL_INTERVAL) { - prevApproxBlocksScheduled = currApproxBlocksScheduled; - currApproxBlocksScheduled = 0; - lastBlocksScheduledRollTime = now; + int n = 0; + for(DatanodeStorageInfo storage : getStorageInfos()) { + n += storage.getBlocksScheduled(); } + return n; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 6695e072446..af07e9c2c17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -92,9 +92,11 @@ public class DatanodeStorageInfo { private final String storageID; private final StorageType storageType; private State state; + private long capacity; private long dfsUsed; private long remaining; + private volatile BlockInfo blockList = null; private int numBlocks = 0; @@ -117,6 +119,16 @@ public class DatanodeStorageInfo { */ private boolean blockContentsStale = true; + /* Variables for maintaining number of blocks scheduled to be written to + * this storage. This count is approximate and might be slightly bigger + * in case of errors (e.g. datanode does not report if an error occurs + * while writing the block). + */ + private int currApproxBlocksScheduled = 0; + private int prevApproxBlocksScheduled = 0; + private long lastBlocksScheduledRollTime = 0; + private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min + public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { this.dn = dn; this.storageID = s.getStorageID(); @@ -132,27 +144,28 @@ public class DatanodeStorageInfo { this.blockReportCount = blockReportCount; } - public boolean areBlockContentsStale() { + boolean areBlockContentsStale() { return blockContentsStale; } - public void markStaleAfterFailover() { + void markStaleAfterFailover() { heartbeatedSinceFailover = false; blockContentsStale = true; } - public void receivedHeartbeat() { + void receivedHeartbeat(final long lastUpdate) { heartbeatedSinceFailover = true; + rollBlocksScheduled(lastUpdate); } - public void receivedBlockReport() { + void receivedBlockReport() { if (heartbeatedSinceFailover) { blockContentsStale = false; } blockReportCount++; } - public void setUtilization(long capacity, long dfsUsed, long remaining) { + void setUtilization(long capacity, long dfsUsed, long remaining) { this.capacity = capacity; this.dfsUsed = dfsUsed; this.remaining = remaining; @@ -224,7 +237,46 @@ public class DatanodeStorageInfo { public DatanodeDescriptor getDatanodeDescriptor() { return dn; } + + /** + * @return Approximate number of blocks currently scheduled to be written + * to this storage. + */ + int getBlocksScheduled() { + return currApproxBlocksScheduled + prevApproxBlocksScheduled; + } + + /** Increment the number of blocks scheduled for each given storage */ + public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) { + for (DatanodeStorageInfo s : storages) { + s.incrementBlocksScheduled(); + } + } + + /** Increment the number of blocks scheduled. */ + private void incrementBlocksScheduled() { + currApproxBlocksScheduled++; + } + /** Decrement the number of blocks scheduled. */ + void decrementBlocksScheduled() { + if (prevApproxBlocksScheduled > 0) { + prevApproxBlocksScheduled--; + } else if (currApproxBlocksScheduled > 0) { + currApproxBlocksScheduled--; + } + // its ok if both counters are zero. + } + + /** Adjusts curr and prev number of blocks scheduled every few minutes. */ + private void rollBlocksScheduled(long now) { + if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) { + prevApproxBlocksScheduled = currApproxBlocksScheduled; + currApproxBlocksScheduled = 0; + lastBlocksScheduledRollTime = now; + } + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ebf69dff20e..38aef7ca7c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2868,9 +2868,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets); NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " + getBlockPoolId() + " " + b); - for (DatanodeStorageInfo storage : targets) { - storage.getDatanodeDescriptor().incBlocksScheduled(); - } + DatanodeStorageInfo.incrementBlocksScheduled(targets); return b; }