From 9b8a35aff6d4bd7bb066ce01fa63a88fa49245ee Mon Sep 17 00:00:00 2001 From: cnauroth Date: Tue, 7 Oct 2014 14:58:54 -0700 Subject: [PATCH] HDFS-7128. Decommission slows way down when it gets towards the end. Contributed by Ming Ma. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 13 +++- .../blockmanagement/DatanodeDescriptor.java | 15 +++- .../TestUnderReplicatedBlocks.java | 77 +++++++++++++++++++ 4 files changed, 102 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fb2e419d9b8..b5699f40bb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1066,6 +1066,9 @@ Release 2.6.0 - UNRELEASED HDFS-6995. Block should be placed in the client's 'rack-local' node if 'client-local' node is not available (vinayakumarb) + HDFS-7128. Decommission slows way down when it gets towards the end. + (Ming Ma via cnauroth) + BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE HDFS-6677. Change INodeFile and FSImage to support storage policy ID. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 0981c33d4d5..c0451439e46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3599,6 +3599,7 @@ public class BlockManager { this.block = block; this.bc = bc; this.srcNode = srcNode; + this.srcNode.incrementPendingReplicationWithoutTargets(); this.containingNodes = containingNodes; this.liveReplicaStorages = liveReplicaStorages; this.additionalReplRequired = additionalReplRequired; @@ -3609,10 +3610,14 @@ public class BlockManager { private void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + try { + targets = blockplacement.chooseTarget(bc.getName(), + additionalReplRequired, srcNode, liveReplicaStorages, false, + excludedNodes, block.getNumBytes(), + storagePolicySuite.getPolicy(bc.getStoragePolicyID())); + } finally { + srcNode.decrementPendingReplicationWithoutTargets(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 55599f7d3ae..806a37cf040 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -219,6 +219,9 @@ public class DatanodeDescriptor extends DatanodeInfo { */ private boolean disallowed = false; + // The number of replication work pending before targets are determined + private int PendingReplicationWithoutTargets = 0; + /** * DatanodeDescriptor constructor * @param nodeID id of the data node @@ -408,6 +411,14 @@ public class DatanodeDescriptor extends DatanodeInfo { return new BlockIterator(getStorageInfo(storageID)); } + void incrementPendingReplicationWithoutTargets() { + PendingReplicationWithoutTargets++; + } + + void decrementPendingReplicationWithoutTargets() { + PendingReplicationWithoutTargets--; + } + /** * Store block replication work. */ @@ -439,12 +450,12 @@ public class DatanodeDescriptor extends DatanodeInfo { } } } - + /** * The number of work items that are pending to be replicated */ int getNumberOfBlocksToBeReplicated() { - return replicateBlocks.size(); + return PendingReplicationWithoutTargets + replicateBlocks.size(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index 210f0e90421..27b35f0b56a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -30,6 +32,9 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; +import java.util.Iterator; + + public class TestUnderReplicatedBlocks { @Test(timeout=60000) // 1 min timeout public void testSetrepIncWithUnderReplicatedBlocks() throws Exception { @@ -69,4 +74,76 @@ public class TestUnderReplicatedBlocks { } + /** + * The test verifies the number of outstanding replication requests for a + * given DN shouldn't exceed the limit set by configuration property + * dfs.namenode.replication.max-streams-hard-limit. + * The test does the followings: + * 1. Create a mini cluster with 2 DNs. Set large heartbeat interval so that + * replication requests won't be picked by any DN right away. + * 2. Create a file with 10 blocks and replication factor 2. Thus each + * of the 2 DNs have one replica of each block. + * 3. Add a DN to the cluster for later replication. + * 4. Remove a DN that has data. + * 5. Ask BlockManager to compute the replication work. This will assign + * replication requests to the only DN that has data. + * 6. Make sure the number of pending replication requests of that DN don't + * exceed the limit. + * @throws Exception + */ + @Test(timeout=60000) // 1 min timeout + public void testNumberOfBlocksToBeReplicated() throws Exception { + Configuration conf = new HdfsConfiguration(); + + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + + int NUM_OF_BLOCKS = 10; + final short REP_FACTOR = 2; + final String FILE_NAME = "/testFile"; + final Path FILE_PATH = new Path(FILE_NAME); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + REP_FACTOR).build(); + try { + // create a file with 10 blocks with a replication factor of 2 + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, FILE_PATH, NUM_OF_BLOCKS, REP_FACTOR, 1L); + DFSTestUtil.waitReplication(fs, FILE_PATH, REP_FACTOR); + + cluster.startDataNodes(conf, 1, true, null, null, null, null); + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); + Iterator storageInfos = + bm.blocksMap.getStorages(b.getLocalBlock()) + .iterator(); + DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); + DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor(); + + bm.getDatanodeManager().removeDatanode(firstDn); + + assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks()); + bm.computeDatanodeWork(); + + + assertTrue("The number of blocks to be replicated should be less than " + + "or equal to " + bm.replicationStreamsHardLimit, + secondDn.getNumberOfBlocksToBeReplicated() + <= bm.replicationStreamsHardLimit); + } finally { + cluster.shutdown(); + } + + } + }