svn merge -c 1428739 from trunk for HDFS-4270. Introduce soft and hard limits for max replication so that replications of the highest priority are allowed to choose a source datanode that has reached its soft limit but not the hard limit.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1428743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b4b26fc0a1
commit
3e68d15c1e
|
@ -153,6 +153,11 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HDFS-4326. bump up Tomcat version for HttpFS to 6.0.36. (tucu via acmurthy)
|
||||
|
||||
HDFS-4270. Introduce soft and hard limits for max replication so that
|
||||
replications of the highest priority are allowed to choose a source datanode
|
||||
that has reached its soft limit but not the hard limit. (Derek Dagit via
|
||||
szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -143,6 +143,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
|
||||
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
|
||||
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
|
||||
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
|
||||
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
|
||||
public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
|
||||
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
|
||||
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
|
||||
|
|
|
@ -190,10 +190,16 @@ public class BlockManager {
|
|||
|
||||
/** The maximum number of replicas allowed for a block */
|
||||
public final short maxReplication;
|
||||
/** The maximum number of outgoing replication streams
|
||||
* a given node should have at one time
|
||||
/**
|
||||
* The maximum number of outgoing replication streams a given node should have
|
||||
* at one time considering all but the highest priority replications needed.
|
||||
*/
|
||||
int maxReplicationStreams;
|
||||
/**
|
||||
* The maximum number of outgoing replication streams a given node should have
|
||||
* at one time.
|
||||
*/
|
||||
int replicationStreamsHardLimit;
|
||||
/** Minimum copies needed or else write is disallowed */
|
||||
public final short minReplication;
|
||||
/** Default number of replicas */
|
||||
|
@ -264,9 +270,16 @@ public class BlockManager {
|
|||
this.minReplication = (short)minR;
|
||||
this.maxReplication = (short)maxR;
|
||||
|
||||
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
||||
this.maxReplicationStreams =
|
||||
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
|
||||
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
|
||||
this.replicationStreamsHardLimit =
|
||||
conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
|
||||
this.shouldCheckForEnoughRacks =
|
||||
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
|
||||
? false : true;
|
||||
|
||||
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
||||
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
||||
|
@ -436,7 +449,8 @@ public class BlockManager {
|
|||
NumberReplicas numReplicas = new NumberReplicas();
|
||||
// source node returned is not used
|
||||
chooseSourceDatanode(block, containingNodes,
|
||||
containingLiveReplicasNodes, numReplicas);
|
||||
containingLiveReplicasNodes, numReplicas,
|
||||
UnderReplicatedBlocks.LEVEL);
|
||||
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
|
||||
int usableReplicas = numReplicas.liveReplicas() +
|
||||
numReplicas.decommissionedReplicas();
|
||||
|
@ -1146,7 +1160,8 @@ public class BlockManager {
|
|||
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
|
||||
NumberReplicas numReplicas = new NumberReplicas();
|
||||
srcNode = chooseSourceDatanode(
|
||||
block, containingNodes, liveReplicaNodes, numReplicas);
|
||||
block, containingNodes, liveReplicaNodes, numReplicas,
|
||||
priority);
|
||||
if(srcNode == null) { // block can not be replicated from any node
|
||||
LOG.debug("Block " + block + " cannot be repl from any node");
|
||||
continue;
|
||||
|
@ -1340,16 +1355,34 @@ public class BlockManager {
|
|||
* since the former do not have write traffic and hence are less busy.
|
||||
* We do not use already decommissioned nodes as a source.
|
||||
* Otherwise we choose a random node among those that did not reach their
|
||||
* replication limit.
|
||||
* replication limits. However, if the replication is of the highest priority
|
||||
* and all nodes have reached their replication limits, we will choose a
|
||||
* random node despite the replication limit.
|
||||
*
|
||||
* In addition form a list of all nodes containing the block
|
||||
* and calculate its replication numbers.
|
||||
*
|
||||
* @param block Block for which a replication source is needed
|
||||
* @param containingNodes List to be populated with nodes found to contain the
|
||||
* given block
|
||||
* @param nodesContainingLiveReplicas List to be populated with nodes found to
|
||||
* contain live replicas of the given block
|
||||
* @param numReplicas NumberReplicas instance to be initialized with the
|
||||
* counts of live, corrupt, excess, and
|
||||
* decommissioned replicas of the given
|
||||
* block.
|
||||
* @param priority integer representing replication priority of the given
|
||||
* block
|
||||
* @return the DatanodeDescriptor of the chosen node from which to replicate
|
||||
* the given block
|
||||
*/
|
||||
private DatanodeDescriptor chooseSourceDatanode(
|
||||
@VisibleForTesting
|
||||
DatanodeDescriptor chooseSourceDatanode(
|
||||
Block block,
|
||||
List<DatanodeDescriptor> containingNodes,
|
||||
List<DatanodeDescriptor> nodesContainingLiveReplicas,
|
||||
NumberReplicas numReplicas) {
|
||||
NumberReplicas numReplicas,
|
||||
int priority) {
|
||||
containingNodes.clear();
|
||||
nodesContainingLiveReplicas.clear();
|
||||
DatanodeDescriptor srcNode = null;
|
||||
|
@ -1378,8 +1411,15 @@ public class BlockManager {
|
|||
// If so, do not select the node as src node
|
||||
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
|
||||
continue;
|
||||
if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
||||
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
|
||||
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
||||
{
|
||||
continue; // already reached replication limit
|
||||
}
|
||||
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
// the block must not be scheduled for removal on srcNode
|
||||
if(excessBlocks != null && excessBlocks.contains(block))
|
||||
continue;
|
||||
|
|
|
@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
|
@ -429,4 +432,57 @@ public class TestBlockManager {
|
|||
}
|
||||
return repls;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a source node for a highest-priority replication is chosen even if all available
|
||||
* source nodes have reached their replication limits.
|
||||
*/
|
||||
@Test
|
||||
public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
|
||||
bm.maxReplicationStreams = 0;
|
||||
bm.replicationStreamsHardLimit = 1;
|
||||
|
||||
long blockId = 42; // arbitrary
|
||||
Block aBlock = new Block(blockId, 0, 0);
|
||||
|
||||
List<DatanodeDescriptor> origNodes = getNodes(0, 1);
|
||||
// Add the block to the first node.
|
||||
addBlockOnNodes(blockId,origNodes.subList(0,1));
|
||||
|
||||
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
|
||||
List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
|
||||
|
||||
assertNotNull("Chooses source node for a highest-priority replication"
|
||||
+ " even if all available source nodes have reached their replication"
|
||||
+ " limits below the hard limit.",
|
||||
bm.chooseSourceDatanode(
|
||||
aBlock,
|
||||
cntNodes,
|
||||
liveNodes,
|
||||
new NumberReplicas(),
|
||||
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
|
||||
|
||||
assertNull("Does not choose a source node for a less-than-highest-priority"
|
||||
+ " replication since all available source nodes have reached"
|
||||
+ " their replication limits.",
|
||||
bm.chooseSourceDatanode(
|
||||
aBlock,
|
||||
cntNodes,
|
||||
liveNodes,
|
||||
new NumberReplicas(),
|
||||
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
|
||||
|
||||
// Increase the replication count to test replication count > hard limit
|
||||
DatanodeDescriptor targets[] = { origNodes.get(1) };
|
||||
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
|
||||
|
||||
assertNull("Does not choose a source node for a highest-priority"
|
||||
+ " replication when all available nodes exceed the hard limit.",
|
||||
bm.chooseSourceDatanode(
|
||||
aBlock,
|
||||
cntNodes,
|
||||
liveNodes,
|
||||
new NumberReplicas(),
|
||||
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue