HDFS-3912. Merging change 1397211 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1397219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-10-11 18:30:36 +00:00
parent 267bfc5ff2
commit caf5213a50
10 changed files with 576 additions and 110 deletions

View File

@ -9,8 +9,7 @@ Release 2.0.3-alpha - Unreleased
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
(Jaimin D Jetly and Jing Zhao via szetszwo)
HDFS-3703. Datanodes are marked stale if heartbeat is not received in
configured timeout and are selected as the last location to read from.
HDFS-3912. Detect and avoid stale datanodes for writes.
(Jing Zhao via suresh)
IMPROVEMENTS
@ -121,6 +120,10 @@ Release 2.0.2-alpha - 2012-09-07
HDFS-2793. Add an admin command to trigger an edit log roll. (todd)
HDFS-3703. Datanodes are marked stale if heartbeat is not received in
configured timeout and are selected as the last location to read from.
(Jing Zhao via suresh)
IMPROVEMENTS
HDFS-3040. TestMulitipleNNDataBlockScanner is misspelled. (Madhukara Phatak

View File

@ -180,9 +180,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
// The default value of the time interval for marking datanodes as stale
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
// The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states.
// This value uses the times of heartbeat interval to define the minimum value for stale interval.
public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
// When the number stale datanodes marked as stale reached this certian ratio,
// stop avoiding writing to stale nodes so as to prevent causing hotspots.
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =

View File

@ -62,6 +62,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
private NetworkTopology clusterMap;
private FSClusterStats stats;
private long heartbeatInterval; // interval for DataNode heartbeats
private long staleInterval; // interval used to identify stale DataNodes
/**
* A miss of that many heartbeats is tolerated for replica deletion policy.
*/
@ -78,7 +80,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
this.heartbeatInterval = conf.getLong(
@ -87,6 +90,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
this.tolerateHeartbeatMultiplier = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
private ThreadLocal<StringBuilder> threadLocalBuilder =
@ -154,9 +160,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
writer=null;
}
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize,
maxNodesPerRack, results);
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
}
@ -172,8 +179,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results) {
List<DatanodeDescriptor> results,
final boolean avoidStaleNodes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@ -184,18 +191,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (writer == null && !newBlock) {
writer = results.get(0);
}
// Keep a copy of original excludedNodes
final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
writer = chooseLocalNode(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 1) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack, results);
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (--numOfReplicas == 0) {
return writer;
}
@ -203,24 +213,36 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack, results);
blocksize, maxNodesPerRack,
results, avoidStaleNodes);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
maxNodesPerRack, results);
maxNodesPerRack, results, avoidStaleNodes);
} else {
chooseLocalRack(writer, excludedNodes, blocksize,
maxNodesPerRack, results);
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas + " to reach " + totalReplicasExpected + "\n"
+ e.getMessage());
if (avoidStaleNodes) {
// ecxludedNodes now has - initial excludedNodes, any nodes that were
// chosen and nodes that were tried but were not chosen because they
// were stale, decommissioned or for any other reason a node is not
// chosen for write. Retry again now not avoiding stale node
for (Node node : results) {
oldExcludedNodes.put(node, node);
}
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false);
}
}
return writer;
}
@ -235,26 +257,27 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
if (preferLocalNode) {
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results)) {
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
return localMachine;
}
}
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
/* choose one node from the rack that <i>localMachine</i> is on.
@ -269,19 +292,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack
try {
return chooseRandom(
localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
@ -295,18 +318,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
if (newLocal != null) {
try {
return chooseRandom(
newLocal.getNetworkLocation(),
excludedNodes, blocksize, maxNodesPerRack, results);
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@ -322,17 +344,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results);
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results);
maxReplicasPerRack, results, avoidStaleNodes);
}
}
@ -344,7 +368,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@ -362,7 +387,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // choosendNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
return chosenNode;
} else {
@ -388,7 +414,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results)
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
@ -407,7 +434,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (oldNode == null) {
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
} else {
@ -434,9 +462,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
*/
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
List<DatanodeDescriptor> results) {
return isGoodTarget(node, blockSize, maxTargetPerRack,
this.considerLoad, results);
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
results, avoidStaleNodes);
}
/**
@ -449,7 +478,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* the cluster and total number of replicas for a block
* @param considerLoad whether or not to consider load of the target node
* @param results A list containing currently chosen nodes. Used to check if
* too many nodes has been chosen in the target rack.
* too many nodes has been chosen in the target rack.
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
* @return Return true if <i>node</i> has enough space,
* does not have too much load,
* and the rack does not have too many nodes.
@ -457,7 +487,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results) {
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
if(LOG.isDebugEnabled()) {
@ -468,6 +499,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
if (LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node is staled ");
}
return false;
}
}
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
@ -85,8 +86,8 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@ -124,20 +125,26 @@ public class DatanodeManager {
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
/** Whether or not to check stale DataNodes for read/write */
private final boolean checkForStaleDataNodes;
/** The interval for judging stale DataNodes for read/write */
private final long staleInterval;
/** Whether or not to avoid using stale DataNodes for writing */
private volatile boolean avoidStaleDataNodesForWrite;
/** The number of stale DataNodes */
private volatile int numStaleNodes;
/**
* Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology.
*/
private boolean hasClusterEverBeenMultiRack = false;
/** Whether or not to check the stale datanodes */
private volatile boolean checkForStaleNodes;
/** The time interval for detecting stale datanodes */
private volatile long staleInterval;
DatanodeManager(final BlockManager blockManager,
final Namesystem namesystem, final Configuration conf
) throws IOException {
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
@ -172,25 +179,69 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
// set the value of stale interval based on configuration
this.checkForStaleNodes = conf.getBoolean(
checkForStaleDataNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
if (this.checkForStaleNodes) {
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
LOG.warn("The given interval for marking stale datanode = "
+ this.staleInterval + ", which is smaller than the default value "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
+ ".");
}
}
staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
checkForStaleDataNodes);
}
private Daemon decommissionthread = null;
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
Preconditions.checkArgument(staleInterval > 0,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
" = '" + staleInterval + "' is invalid. " +
"It should be a positive non-zero value.");
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
// The stale interval value cannot be smaller than
// 3 times of heartbeat interval
final long minStaleInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
* heartbeatIntervalSeconds * 1000;
if (staleInterval < minStaleInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is less than "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ " heartbeat intervals. This may cause too frequent changes of "
+ "stale states of DataNodes since a heartbeat msg may be missing "
+ "due to temporary short-term failures. Reset stale interval to "
+ minStaleInterval + ".");
staleInterval = minStaleInterval;
}
if (staleInterval > heartbeatExpireInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is larger than heartbeat expire interval "
+ heartbeatExpireInterval + ".");
}
return staleInterval;
}
static boolean getAvoidStaleForWriteFromConf(Configuration conf,
boolean checkForStale) {
boolean avoid = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
if (!checkForStale && avoid) {
LOG.warn("Cannot set "
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+ " as false while setting "
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+ " as true.");
}
return avoidStaleDataNodesForWrite;
}
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
@ -235,9 +286,10 @@ public class DatanodeManager {
//sort the blocks
final DatanodeDescriptor client = getDatanodeByHost(targethost);
Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned/stale datanodes to the bottom
@ -705,7 +757,7 @@ public class DatanodeManager {
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() throws IOException {
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node)) {
@ -764,7 +816,61 @@ public class DatanodeManager {
namesystem.readUnlock();
}
}
/* Getter and Setter for stale DataNodes related attributes */
/**
* @return whether or not to avoid writing to stale datanodes
*/
public boolean isAvoidingStaleDataNodesForWrite() {
return avoidStaleDataNodesForWrite;
}
/**
* Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
* The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
* half of the DataNodes are marked as stale.
*
* @param avoidStaleDataNodesForWrite
* The value to set to
* {@link DatanodeManager#avoidStaleDataNodesForWrite}
*/
void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
}
/**
* @return Whether or not to check stale DataNodes for R/W
*/
boolean isCheckingForStaleDataNodes() {
return checkForStaleDataNodes;
}
/**
* @return The time interval used to mark DataNodes as stale.
*/
long getStaleInterval() {
return staleInterval;
}
/**
* Set the number of current stale DataNodes. The HeartbeatManager got this
* number based on DataNodes' heartbeats.
*
* @param numStaleNodes
* The number of stale DataNodes to be set.
*/
void setNumStaleNodes(int numStaleNodes) {
this.numStaleNodes = numStaleNodes;
}
/**
* @return Return the current number of stale DataNodes (detected by
* HeartbeatManager).
*/
int getNumStaleNodes() {
return this.numStaleNodes;
}
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
@ -943,7 +1049,7 @@ public class DatanodeManager {
return nodes;
}
private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
/**
* Manage the heartbeats received from datanodes.
* The datanode list and statistics are synchronized
@ -54,18 +56,48 @@ class HeartbeatManager implements DatanodeStatistics {
private final long heartbeatRecheckInterval;
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
/**
* The initial setting of end user which indicates whether or not to avoid
* writing to stale datanodes.
*/
private final boolean initialAvoidWriteStaleNodes;
/**
* When the ratio of stale datanodes reaches this number, stop avoiding
* writing to stale datanodes, i.e., continue using stale nodes for writing.
*/
private final float ratioUseStaleDataNodesForWrite;
final Namesystem namesystem;
final BlockManager blockManager;
HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
final Configuration conf) {
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
HeartbeatManager(final Namesystem namesystem,
final BlockManager blockManager, final Configuration conf) {
this.namesystem = namesystem;
this.blockManager = blockManager;
boolean checkStaleNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
long recheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
this.initialAvoidWriteStaleNodes = DatanodeManager
.getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
Preconditions.checkArgument(
(ratioUseStaleDataNodesForWrite > 0 &&
ratioUseStaleDataNodesForWrite <= 1.0f),
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f.");
this.heartbeatRecheckInterval = (checkStaleNodes
&& initialAvoidWriteStaleNodes
&& staleInterval < recheckInterval) ? staleInterval : recheckInterval;
}
void activate(Configuration conf) {
@ -210,16 +242,39 @@ class HeartbeatManager implements DatanodeStatistics {
if (namesystem.isInSafeMode()) {
return;
}
boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
if (dm.isDatanodeDead(d)) {
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
break;
if (!checkStaleNodes) {
break;
}
}
if (checkStaleNodes &&
d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++;
}
}
// Change whether to avoid using stale datanodes for writing
// based on proportion of stale datanodes
if (checkStaleNodes) {
dm.setNumStaleNodes(numOfStaleNodes);
if (numOfStaleNodes >
datanodes.size() * ratioUseStaleDataNodesForWrite) {
dm.setAvoidStaleDataNodesForWrite(false);
} else {
if (this.initialAvoidWriteStaleNodes) {
dm.setAvoidStaleDataNodesForWrite(true);
}
}
}
}

View File

@ -255,7 +255,7 @@ public class DataNode extends Configured
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
private boolean heartbeatsDisabledForTests = false;
private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
DataNodeMetrics metrics;

View File

@ -32,8 +32,16 @@ public interface FSClusterStats {
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
public int getTotalLoad() ;
public int getTotalLoad();
/**
* Indicate whether or not the cluster is now avoiding
* to use stale DataNodes for writing.
*
* @return True if the cluster is currently avoiding using stale DataNodes
* for writing targets, and false otherwise.
*/
public boolean isAvoidingStaleDataNodesForWrite();
}

View File

@ -5524,4 +5524,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
@Override
public boolean isAvoidingStaleDataNodesForWrite() {
return this.blockManager.getDatanodeManager()
.isAvoidingStaleDataNodesForWrite();
}
}

View File

@ -983,12 +983,28 @@
<name>dfs.namenode.check.stale.datanode</name>
<value>false</value>
<description>
Indicate whether or not to check "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter is set as true, the stale datanodes will be moved to
the end of the target node list for reading. The writing will
also try to avoid stale nodes.
Indicate whether or not to check "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter is set as true, the system will keep track
of the number of stale datanodes. The stale datanodes will be
moved to the end of the node list returned for reading. See
dfs.namenode.avoid.write.stale.datanode for details on how this
affects writes.
</description>
</property>
<property>
<name>dfs.namenode.avoid.write.stale.datanode</name>
<value>false</value>
<description>
Indicate whether or not to avoid writing to "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter and dfs.namenode.check.stale.datanode are both set as true,
the writing will avoid using stale datanodes unless a high number
of datanodes are marked as stale. See
dfs.namenode.write.stale.datanode.ratio for details.
</description>
</property>
@ -996,10 +1012,24 @@
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
<description>
Default time interval for marking a datanode as "stale", i.e., if
the namenode has not received heartbeat msg from a datanode for
more than this time interval, the datanode will be marked and treated
as "stale" by default.
Default time interval for marking a datanode as "stale", i.e., if
the namenode has not received heartbeat msg from a datanode for
more than this time interval, the datanode will be marked and treated
as "stale" by default. The stale interval cannot be too small since
otherwise this may cause too frequent change of stale states.
We thus set a minimum stale interval value (the default value is 3 times
of heartbeat interval) and guarantee that the stale interval cannot be less
than the minimum value.
</description>
</property>
<property>
<name>dfs.namenode.write.stale.datanode.ratio</name>
<value>0.5f</value>
<description>
When the ratio of number stale datanodes to total datanodes marked
is greater than this ratio, stop avoiding writing to stale nodes so
as to prevent causing hotspots.
</description>
</property>

View File

@ -37,9 +37,12 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -54,6 +57,9 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@Rule
public ExpectedException exception = ExpectedException.none();
@ -76,6 +82,8 @@ public class TestReplicationPolicy {
"test.build.data", "build/test/data"), "dfs/");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
// Enable the checking for stale datanodes in the beginning
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
DFSTestUtil.formatNameNode(conf);
namenode = new NameNode(conf);
@ -228,7 +236,7 @@ public class TestReplicationPolicy {
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@ -368,6 +376,202 @@ public class TestReplicationPolicy {
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
}
private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].equals(target)) {
return true;
}
}
return false;
}
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Enable avoidng writing to stale datanodes
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(true);
// Set dataNodes[0] as stale
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
// reset
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(false);
dataNodes[0].setLastUpdate(Time.now());
}
/**
* In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
* and when the number of replicas is less or equal to 3, all the healthy
* datanodes should be returned by the chooseTarget method. When the number
* of replicas is 4, a stale node should be included.
*
* @throws Exception
*/
@Test
public void testChooseTargetWithHalfStaleNodes() throws Exception {
// Enable stale datanodes checking
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(true);
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
for (int i = 0; i < 3; i++) {
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
// We set the datanode[0] as stale, thus should choose datanode[1]
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 2, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
targets = replicator.chooseTarget(filename, 4, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 4);
assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
// reset
namenode.getNamesystem().getBlockManager().getDatanodeManager()
.setAvoidStaleDataNodesForWrite(false);
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
}
}
@Test
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
conf.setBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
String[] hosts = new String[]{"host1", "host2", "host3",
"host4", "host5", "host6"};
String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2",
"/d1/r2", "/d2/r3", "/d2/r3"};
MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
.hosts(hosts).numDataNodes(hosts.length).build();
miniCluster.waitActive();
try {
// Step 1. Make two datanodes as stale, check whether the
// avoidStaleDataNodesForWrite calculation is correct.
// First stop the heartbeat of host1 and host2
for (int i = 0; i < 2; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
}
// Instead of waiting, explicitly call heartbeatCheck to
// let heartbeat manager to detect stale nodes
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
int numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getDatanodeManager()
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 4);
// According to our strategy, stale datanodes will be included for writing
// to avoid hotspots
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
for (int i = 2; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now());
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
numStaleNodes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager().getNumStaleNodes();
assertEquals(numStaleNodes, 2);
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
}
/**
* This testcase tests re-replication, when dataNodes[0] is already chosen.
@ -489,8 +693,8 @@ public class TestReplicationPolicy {
.format(true).build();
try {
cluster.waitActive();
final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
.getNameNode().getNamesystem().getBlockManager().neededReplications;
final UnderReplicatedBlocks neededReplications = cluster.getNameNode()
.getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
@ -528,10 +732,10 @@ public class TestReplicationPolicy {
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block