diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ae1410a5279..4c502fae9a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -714,6 +714,9 @@ Release 2.6.0 - UNRELEASED HDFS-6995. Block should be placed in the client's 'rack-local' node if 'client-local' node is not available (vinayakumarb) + HDFS-7128. Decommission slows way down when it gets towards the end. + (Ming Ma via cnauroth) + BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE HDFS-6677. Change INodeFile and FSImage to support storage policy ID. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 06c61c4da2a..12afa4fdb37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3598,6 +3598,7 @@ public class BlockManager { this.block = block; this.bc = bc; this.srcNode = srcNode; + this.srcNode.incrementPendingReplicationWithoutTargets(); this.containingNodes = containingNodes; this.liveReplicaStorages = liveReplicaStorages; this.additionalReplRequired = additionalReplRequired; @@ -3608,10 +3609,14 @@ public class BlockManager { private void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + try { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNode.decrementPendingReplicationWithoutTargets(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index be1ff144689..eaea77daeca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -221,6 +221,9 @@ public class DatanodeDescriptor extends DatanodeInfo { */ private boolean disallowed = false; + // The number of replication work pending before targets are determined + private int PendingReplicationWithoutTargets = 0; + /** * DatanodeDescriptor constructor * @param nodeID id of the data node @@ -410,6 +413,14 @@ public class DatanodeDescriptor extends DatanodeInfo { return new BlockIterator(getStorageInfo(storageID)); } + void incrementPendingReplicationWithoutTargets() { + PendingReplicationWithoutTargets++; + } + + void decrementPendingReplicationWithoutTargets() { + PendingReplicationWithoutTargets--; + } + /** * Store block replication work. */ @@ -441,12 +452,12 @@ public class DatanodeDescriptor extends DatanodeInfo { } } } - + /** * The number of work items that are pending to be replicated */ int getNumberOfBlocksToBeReplicated() { - return replicateBlocks.size(); + return PendingReplicationWithoutTargets + replicateBlocks.size(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index 23bb0986dbf..334ece117be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.junit.Test; +import java.util.Iterator; + + public class TestUnderReplicatedBlocks { @Test(timeout=60000) // 1 min timeout public void testSetrepIncWithUnderReplicatedBlocks() throws Exception { @@ -63,4 +68,76 @@ public class TestUnderReplicatedBlocks { } + /** + * The test verifies the number of outstanding replication requests for a + * given DN shouldn't exceed the limit set by configuration property + * dfs.namenode.replication.max-streams-hard-limit. + * The test does the followings: + * 1. Create a mini cluster with 2 DNs. Set large heartbeat interval so that + * replication requests won't be picked by any DN right away. + * 2. Create a file with 10 blocks and replication factor 2. Thus each + * of the 2 DNs have one replica of each block. + * 3. Add a DN to the cluster for later replication. + * 4. Remove a DN that has data. + * 5. Ask BlockManager to compute the replication work. This will assign + * replication requests to the only DN that has data. + * 6. Make sure the number of pending replication requests of that DN don't + * exceed the limit. + * @throws Exception + */ + @Test(timeout=60000) // 1 min timeout + public void testNumberOfBlocksToBeReplicated() throws Exception { + Configuration conf = new HdfsConfiguration(); + + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + + int NUM_OF_BLOCKS = 10; + final short REP_FACTOR = 2; + final String FILE_NAME = "/testFile"; + final Path FILE_PATH = new Path(FILE_NAME); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + REP_FACTOR).build(); + try { + // create a file with 10 blocks with a replication factor of 2 + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, FILE_PATH, NUM_OF_BLOCKS, REP_FACTOR, 1L); + DFSTestUtil.waitReplication(fs, FILE_PATH, REP_FACTOR); + + cluster.startDataNodes(conf, 1, true, null, null, null, null); + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); + Iterator storageInfos = + bm.blocksMap.getStorages(b.getLocalBlock()) + .iterator(); + DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); + DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor(); + + bm.getDatanodeManager().removeDatanode(firstDn); + + assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks()); + bm.computeDatanodeWork(); + + + assertTrue("The number of blocks to be replicated should be less than " + + "or equal to " + bm.replicationStreamsHardLimit, + secondDn.getNumberOfBlocksToBeReplicated() + <= bm.replicationStreamsHardLimit); + } finally { + cluster.shutdown(); + } + + } + }