HDFS-3912. Detect and avoid stale datanodes for writes. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1397211 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-10-11 18:08:27 +00:00
parent f174adfc15
commit 2887bbb33c
11 changed files with 635 additions and 163 deletions

View File

@ -13,10 +13,6 @@ Trunk (Unreleased)
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology. (Junping Du via szetszwo)
HDFS-3703. Datanodes are marked stale if heartbeat is not received in
configured timeout and are selected as the last location to read from.
(Jing Zhao via suresh)
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
@ -238,6 +234,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
(Jaimin D Jetly and Jing Zhao via szetszwo)
HDFS-3912. Detect and avoid stale datanodes for writes.
(Jing Zhao via suresh)
IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@ -349,6 +348,11 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-2793. Add an admin command to trigger an edit log roll. (todd)
HDFS-3703. Datanodes are marked stale if heartbeat is not received in
configured timeout and are selected as the last location to read from.
(Jing Zhao via suresh)
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

View File

@ -180,9 +180,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
// The default value of the time interval for marking datanodes as stale
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
// The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states.
// This value uses the times of heartbeat interval to define the minimum value for stale interval.
public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
// When the number stale datanodes marked as stale reached this certian ratio,
// stop avoiding writing to stale nodes so as to prevent causing hotspots.
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =

View File

@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected NetworkTopology clusterMap;
private FSClusterStats stats;
protected long heartbeatInterval; // interval for DataNode heartbeats
private long staleInterval; // interval used to identify stale DataNodes
/**
* A miss of that many heartbeats is tolerated for replica deletion policy.
*/
@ -78,7 +80,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
this.heartbeatInterval = conf.getLong(
@ -87,6 +90,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
this.tolerateHeartbeatMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
protected ThreadLocal<StringBuilder> threadLocalBuilder =
@ -155,9 +161,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
writer=null;
}
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize,
maxNodesPerRack, results);
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
}
@ -173,8 +180,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results) {
List<DatanodeDescriptor> results,
final boolean avoidStaleNodes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@ -185,18 +192,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (writer == null && !newBlock) {
writer = results.get(0);
}
// Keep a copy of original excludedNodes
final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
writer = chooseLocalNode(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack, results);
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
@ -204,24 +214,36 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack,
results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
maxNodesPerRack, results);
maxNodesPerRack, results, avoidStaleNodes);
} else {
chooseLocalRack(writer, excludedNodes, blocksize,
maxNodesPerRack, results);
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas + " to reach " + totalReplicasExpected + "\n"
+ e.getMessage());
if (avoidStaleNodes) {
// ecxludedNodes now has - initial excludedNodes, any nodes that were
// chosen and nodes that were tried but were not chosen because they
// were stale, decommissioned or for any other reason a node is not
// chosen for write. Retry again now not avoiding stale node
for (Node node : results) {
oldExcludedNodes.put(node, node);
}
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false);
}
}
return writer;
}
@ -236,26 +258,27 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (preferLocalNode) {
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results)) {
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
return localMachine;
}
}
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
/* choose one node from the rack that <i>localMachine</i> is on.
@ -270,19 +293,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack
try {
return chooseRandom(
localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@ -296,18 +319,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
if (newLocal != null) {
try {
return chooseRandom(
newLocal.getNetworkLocation(),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@ -323,17 +345,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results);
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results);
maxReplicasPerRack, results, avoidStaleNodes);
}
}
@ -345,7 +369,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@ -363,7 +388,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
@ -390,7 +416,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
@ -409,7 +436,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (oldNode == null) {
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
@ -451,9 +479,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
*/
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
List<DatanodeDescriptor> results) {
return isGoodTarget(node, blockSize, maxTargetPerRack,
this.considerLoad, results);
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
results, avoidStaleNodes);
}
/**
@ -466,7 +495,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* the cluster and total number of replicas for a block
* @param considerLoad whether or not to consider load of the target node
* @param results A list containing currently chosen nodes. Used to check if
* too many nodes has been chosen in the target rack.
* too many nodes has been chosen in the target rack.
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
* @return Return true if <i>node</i> has enough space,
* does not have too much load,
* and the rack does not have too many nodes.
@ -474,7 +504,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results) {
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
if(LOG.isDebugEnabled()) {
@ -485,6 +516,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
if (LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node is staled ");
}
return false;
}
}
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine

View File

@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* @return the chosen node
*/
@Override
protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results)) {
maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
@ -92,13 +89,13 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
@Override
@ -119,17 +116,15 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
@Override
protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
throws NotEnoughReplicasException {
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack, results,
avoidStaleNodes);
}
// choose one from the local rack, but off-nodegroup
@ -137,7 +132,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
maxNodesPerRack, results);
maxNodesPerRack, results,
avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@ -151,39 +147,39 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
if (newLocal != null) {
try {
return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(
clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results)
throws NotEnoughReplicasException {
DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxReplicasPerRack, results);
chooseRandom(
numOfReplicas,
"~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results);
chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
}
}
@ -193,19 +189,22 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
private DatanodeDescriptor chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local node group
try {
return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(
clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@ -219,17 +218,19 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
if (newLocal != null) {
try {
return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(
clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
@ -88,8 +89,8 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@ -127,28 +128,33 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
/** Whether or not to check stale DataNodes for read/write */
private final boolean checkForStaleDataNodes;
/** The interval for judging stale DataNodes for read/write */
private final long staleInterval;
/** Whether or not to avoid using stale DataNodes for writing */
private volatile boolean avoidStaleDataNodesForWrite;
/** The number of stale DataNodes */
private volatile int numStaleNodes;
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
/** Whether or not to check the stale datanodes */
private volatile boolean checkForStaleNodes;
/** The time interval for detecting stale datanodes */
private volatile long staleInterval;
DatanodeManager(final BlockManager blockManager,
final Namesystem namesystem, final Configuration conf
) throws IOException {
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
networktopology = (NetworkTopology) ReflectionUtils.newInstance(
networkTopologyClass, conf);
networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
@ -181,25 +187,69 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
// set the value of stale interval based on configuration
this.checkForStaleNodes = conf.getBoolean(
checkForStaleDataNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
if (this.checkForStaleNodes) {
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
LOG.warn("The given interval for marking stale datanode = "
+ this.staleInterval + ", which is smaller than the default value "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
+ ".");
}
}
staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
checkForStaleDataNodes);
}
private Daemon decommissionthread = null;
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
Preconditions.checkArgument(staleInterval > 0,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
" = '" + staleInterval + "' is invalid. " +
"It should be a positive non-zero value.");
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
// The stale interval value cannot be smaller than
// 3 times of heartbeat interval
final long minStaleInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
* heartbeatIntervalSeconds * 1000;
if (staleInterval < minStaleInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is less than "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ " heartbeat intervals. This may cause too frequent changes of "
+ "stale states of DataNodes since a heartbeat msg may be missing "
+ "due to temporary short-term failures. Reset stale interval to "
+ minStaleInterval + ".");
staleInterval = minStaleInterval;
}
if (staleInterval > heartbeatExpireInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is larger than heartbeat expire interval "
+ heartbeatExpireInterval + ".");
}
return staleInterval;
}
static boolean getAvoidStaleForWriteFromConf(Configuration conf,
boolean checkForStale) {
boolean avoid = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
if (!checkForStale && avoid) {
LOG.warn("Cannot set "
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+ " as false while setting "
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+ " as true.");
}
return avoidStaleDataNodesForWrite;
}
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
@ -253,9 +303,10 @@ public class DatanodeManager {
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
}
Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned/stale datanodes to the bottom
@ -723,7 +774,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() throws IOException {
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
@ -782,7 +833,61 @@ public class DatanodeManager {
namesystem.readUnlock();
}
}
/* Getter and Setter for stale DataNodes related attributes */
/**
* @return whether or not to avoid writing to stale datanodes
*/
public boolean isAvoidingStaleDataNodesForWrite() {
return avoidStaleDataNodesForWrite;
}
/**
* Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
* The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
* half of the DataNodes are marked as stale.
*
* @param avoidStaleDataNodesForWrite
* The value to set to
* {@link DatanodeManager#avoidStaleDataNodesForWrite}
*/
void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
}
/**
* @return Whether or not to check stale DataNodes for R/W
*/
boolean isCheckingForStaleDataNodes() {
return checkForStaleDataNodes;
}
/**
* @return The time interval used to mark DataNodes as stale.
*/
long getStaleInterval() {
return staleInterval;
}
/**
* Set the number of current stale DataNodes. The HeartbeatManager got this
* number based on DataNodes' heartbeats.
*
* @param numStaleNodes
* The number of stale DataNodes to be set.
*/
void setNumStaleNodes(int numStaleNodes) {
this.numStaleNodes = numStaleNodes;
}
/**
* @return Return the current number of stale DataNodes (detected by
* HeartbeatManager).
*/
int getNumStaleNodes() {
return this.numStaleNodes;
}
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
@ -961,7 +1066,7 @@ public class DatanodeManager {
return nodes;
}
private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
@ -54,18 +56,48 @@ class HeartbeatManager implements DatanodeStatistics {
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
/**
* The initial setting of end user which indicates whether or not to avoid
* writing to stale datanodes.
*/
private final boolean initialAvoidWriteStaleNodes;
/**
* When the ratio of stale datanodes reaches this number, stop avoiding
* writing to stale datanodes, i.e., continue using stale nodes for writing.
*/
private final float ratioUseStaleDataNodesForWrite;
final Namesystem namesystem;
final BlockManager blockManager;
HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
final Configuration conf) {
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
HeartbeatManager(final Namesystem namesystem,
final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
boolean checkStaleNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
long recheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
this.initialAvoidWriteStaleNodes = DatanodeManager
.getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
Preconditions.checkArgument(
(ratioUseStaleDataNodesForWrite > 0 &&
ratioUseStaleDataNodesForWrite <= 1.0f),
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f.");
this.heartbeatRecheckInterval = (checkStaleNodes
&& initialAvoidWriteStaleNodes
&& staleInterval < recheckInterval) ? staleInterval : recheckInterval;
}
void activate(Configuration conf) {
@ -210,16 +242,39 @@ class HeartbeatManager implements DatanodeStatistics {
if (namesystem.isInSafeMode()) {
return;
}
boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
if (dm.isDatanodeDead(d)) {
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
break;
if (!checkStaleNodes) {
break;
}
}
if (checkStaleNodes &&
d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
}
// Change whether to avoid using stale datanodes for writing
// based on proportion of stale datanodes
if (checkStaleNodes) {
dm.setNumStaleNodes(numOfStaleNodes);
if (numOfStaleNodes >
datanodes.size() * ratioUseStaleDataNodesForWrite) {
dm.setAvoidStaleDataNodesForWrite(false);
} else {
if (this.initialAvoidWriteStaleNodes) {
dm.setAvoidStaleDataNodesForWrite(true);
}
}
}
}

View File

@ -251,7 +251,7 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
private boolean heartbeatsDisabledForTests = false;
private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;

View File

@ -32,8 +32,16 @@ public interface FSClusterStats {
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
public int getTotalLoad() ;
public int getTotalLoad();
/**
* Indicate whether or not the cluster is now avoiding
* to use stale DataNodes for writing.
*
* @return True if the cluster is currently avoiding using stale DataNodes
* for writing targets, and false otherwise.
*/
public boolean isAvoidingStaleDataNodesForWrite();
}

View File

@ -5539,4 +5539,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
@Override
public boolean isAvoidingStaleDataNodesForWrite() {
return this.blockManager.getDatanodeManager()
.isAvoidingStaleDataNodesForWrite();
}
}

View File

@ -988,12 +988,28 @@
<name>dfs.namenode.check.stale.datanode</name>
<value>false</value>
<description>
Indicate whether or not to check "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter is set as true, the stale datanodes will be moved to
the end of the target node list for reading. The writing will
also try to avoid stale nodes.
Indicate whether or not to check "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter is set as true, the system will keep track
of the number of stale datanodes. The stale datanodes will be
moved to the end of the node list returned for reading. See
dfs.namenode.avoid.write.stale.datanode for details on how this
affects writes.
</description>
</property>
<property>
<name>dfs.namenode.avoid.write.stale.datanode</name>
<value>false</value>
<description>
Indicate whether or not to avoid writing to "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter and dfs.namenode.check.stale.datanode are both set as true,
the writing will avoid using stale datanodes unless a high number
of datanodes are marked as stale. See
dfs.namenode.write.stale.datanode.ratio for details.
</description>
</property>
@ -1001,10 +1017,24 @@
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
<description>
Default time interval for marking a datanode as "stale", i.e., if
the namenode has not received heartbeat msg from a datanode for
more than this time interval, the datanode will be marked and treated
as "stale" by default.
Default time interval for marking a datanode as "stale", i.e., if
the namenode has not received heartbeat msg from a datanode for
more than this time interval, the datanode will be marked and treated
as "stale" by default. The stale interval cannot be too small since
otherwise this may cause too frequent change of stale states.
We thus set a minimum stale interval value (the default value is 3 times
of heartbeat interval) and guarantee that the stale interval cannot be less
than the minimum value.
</description>
</property>
<property>
<name>dfs.namenode.write.stale.datanode.ratio</name>
<value>0.5f</value>
<description>
When the ratio of number stale datanodes to total datanodes marked
is greater than this ratio, stop avoiding writing to stale nodes so
as to prevent causing hotspots.
</description>
</property>

View File

@ -38,9 +38,12 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -55,6 +58,9 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@Rule
public ExpectedException exception = ExpectedException.none();
@ -77,6 +83,8 @@ public class TestReplicationPolicy {
"test.build.data", "build/test/data"), "dfs/");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
// Enable the checking for stale datanodes in the beginning
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
DFSTestUtil.formatNameNode(conf);
namenode = new NameNode(conf);
@ -229,7 +237,7 @@ public class TestReplicationPolicy {
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@ -369,6 +377,202 @@ public class TestReplicationPolicy {
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
}
private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].equals(target)) {
return true;
}
}
return false;
}
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Enable avoidng writing to stale datanodes
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(true);
// Set dataNodes[0] as stale
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
// reset
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(false);
dataNodes[0].setLastUpdate(Time.now());
}
/**
* In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
* and when the number of replicas is less or equal to 3, all the healthy
* datanodes should be returned by the chooseTarget method. When the number
* of replicas is 4, a stale node should be included.
*
* @throws Exception
*/
@Test
public void testChooseTargetWithHalfStaleNodes() throws Exception {
// Enable stale datanodes checking
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(true);
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
for (int i = 0; i < 3; i++) {
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
// We set the datanode[0] as stale, thus should choose datanode[1]
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 2, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
targets = replicator.chooseTarget(filename, 4, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 4);
assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
// reset
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(false);
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
}
}
@Test
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
String[] hosts = new String[]{"host1", "host2", "host3",
"host4", "host5", "host6"};
String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2",
"/d1/r2", "/d2/r3", "/d2/r3"};
MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
.hosts(hosts).numDataNodes(hosts.length).build();
miniCluster.waitActive();
try {
// Step 1. Make two datanodes as stale, check whether the
// avoidStaleDataNodesForWrite calculation is correct.
// First stop the heartbeat of host1 and host2
for (int i = 0; i < 2; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
}
// Instead of waiting, explicitly call heartbeatCheck to
// let heartbeat manager to detect stale nodes
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
int numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getDatanodeManager()
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 4);
// According to our strategy, stale datanodes will be included for writing
// to avoid hotspots
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
for (int i = 2; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now());
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
}
/**
* This testcase tests re-replication, when dataNodes[0] is already chosen.
@ -490,8 +694,8 @@ public class TestReplicationPolicy {
.format(true).build();
try {
cluster.waitActive();
final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
.getNameNode().getNamesystem().getBlockManager().neededReplications;
final UnderReplicatedBlocks neededReplications = cluster.getNameNode()
.getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
@ -529,10 +733,10 @@ public class TestReplicationPolicy {
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
@ -618,6 +822,11 @@ public class TestReplicationPolicy {
dataNodes[5].setRemaining(1*1024*1024);
replicaNodeList.add(dataNodes[5]);
// Refresh the last update time for all the datanodes
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
}
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
replicator.splitNodesWithRack(