From 2c15726999c45a53bf8ae1ce6b9d6fdcc5adfc67 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 4 Jan 2013 08:09:43 +0000 Subject: [PATCH] HDFS-4270. Introduce soft and hard limits for max replication so that replications of the highest priority are allowed to choose a source datanode that has reached its soft limit but not the hard limit. Contributed by Derek Dagit git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428739 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 5 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../server/blockmanagement/BlockManager.java | 66 +++++++++++++++---- .../blockmanagement/TestBlockManager.java | 56 ++++++++++++++++ 4 files changed, 116 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8069d65f4f9..7b4ae98ff80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -443,6 +443,11 @@ Release 2.0.3-alpha - Unreleased HDFS-4326. bump up Tomcat version for HttpFS to 6.0.36. (tucu via acmurthy) + HDFS-4270. Introduce soft and hard limits for max replication so that + replications of the highest priority are allowed to choose a source datanode + that has reached its soft limit but not the hard limit. (Derek Dagit via + szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 994390cf7ac..c62e9f7fc26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -143,6 +143,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams"; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; + public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit"; + public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4; public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled"; public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false; public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 74dbfbc295f..81bb67c50ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -189,10 +189,16 @@ public class BlockManager { /** The maximum number of replicas allowed for a block */ public final short maxReplication; - /** The maximum number of outgoing replication streams - * a given node should have at one time - */ + /** + * The maximum number of outgoing replication streams a given node should have + * at one time considering all but the highest priority replications needed. + */ int maxReplicationStreams; + /** + * The maximum number of outgoing replication streams a given node should have + * at one time. + */ + int replicationStreamsHardLimit; /** Minimum copies needed or else write is disallowed */ public final short minReplication; /** Default number of replicas */ @@ -263,9 +269,16 @@ public class BlockManager { this.minReplication = (short)minR; this.maxReplication = (short)maxR; - this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); - this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null; + this.maxReplicationStreams = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); + this.replicationStreamsHardLimit = + conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); + this.shouldCheckForEnoughRacks = + conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null + ? false : true; this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); @@ -435,7 +448,8 @@ public class BlockManager { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, - containingLiveReplicasNodes, numReplicas); + containingLiveReplicasNodes, numReplicas, + UnderReplicatedBlocks.LEVEL); assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1145,11 +1159,12 @@ public class BlockManager { liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas); + block, containingNodes, liveReplicaNodes, numReplicas, + priority); if(srcNode == null) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be repl from any node"); continue; - } + } assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending @@ -1339,16 +1354,34 @@ public class BlockManager { * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. * Otherwise we choose a random node among those that did not reach their - * replication limit. + * replication limits. However, if the replication is of the highest priority + * and all nodes have reached their replication limits, we will choose a + * random node despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. + * + * @param block Block for which a replication source is needed + * @param containingNodes List to be populated with nodes found to contain the + * given block + * @param nodesContainingLiveReplicas List to be populated with nodes found to + * contain live replicas of the given block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and + * decommissioned replicas of the given + * block. + * @param priority integer representing replication priority of the given + * block + * @return the DatanodeDescriptor of the chosen node from which to replicate + * the given block */ - private DatanodeDescriptor chooseSourceDatanode( + @VisibleForTesting + DatanodeDescriptor chooseSourceDatanode( Block block, List containingNodes, List nodesContainingLiveReplicas, - NumberReplicas numReplicas) { + NumberReplicas numReplicas, + int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; @@ -1377,8 +1410,15 @@ public class BlockManager { // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; - if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + { continue; // already reached replication limit + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) + { + continue; + } // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 22bf9b146be..b9811e7e5e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -429,4 +432,57 @@ public class TestBlockManager { } return repls; } + + /** + * Test that a source node for a highest-priority replication is chosen even if all available + * source nodes have reached their replication limits. + */ + @Test + public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { + bm.maxReplicationStreams = 0; + bm.replicationStreamsHardLimit = 1; + + long blockId = 42; // arbitrary + Block aBlock = new Block(blockId, 0, 0); + + List origNodes = getNodes(0, 1); + // Add the block to the first node. + addBlockOnNodes(blockId,origNodes.subList(0,1)); + + List cntNodes = new LinkedList(); + List liveNodes = new LinkedList(); + + assertNotNull("Chooses source node for a highest-priority replication" + + " even if all available source nodes have reached their replication" + + " limits below the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + + assertNull("Does not choose a source node for a less-than-highest-priority" + + " replication since all available source nodes have reached" + + " their replication limits.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + + // Increase the replication count to test replication count > hard limit + DatanodeDescriptor targets[] = { origNodes.get(1) }; + origNodes.get(0).addBlockToBeReplicated(aBlock, targets); + + assertNull("Does not choose a source node for a highest-priority" + + " replication when all available nodes exceed the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + } }