HDFS-7411. Change decommission logic to throttle by blocks rather

than nodes in each interval. Contributed by Andrew Wang
This commit is contained in:
Chris Douglas 2015-03-08 18:31:04 -07:00
parent 7ce3c76353
commit 6ee0d32b98
13 changed files with 1002 additions and 380 deletions

View File

@ -719,6 +719,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
HDFS-7411. Change decommission logic to throttle by blocks rather than
nodes in each interval. (Andrew Wang via cdouglas)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -453,8 +453,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
public static final int DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT = 5;
public static final String DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY = "dfs.namenode.decommission.blocks.per.interval";
public static final int DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT = 500000;
public static final String DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES = "dfs.namenode.decommission.max.concurrent.tracked.nodes";
public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";

View File

@ -139,7 +139,7 @@ public class HdfsConfiguration extends Configuration {
new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS)
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
});
}

View File

@ -3189,28 +3189,6 @@ public class BlockManager {
return live;
}
private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
NumberReplicas num) {
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
BlockCollection bc = blocksMap.getBlockCollection(block);
StringBuilder nodeList = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
nodeList.append(node);
nodeList.append(" ");
}
LOG.info("Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress());
}
/**
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock().
@ -3240,89 +3218,30 @@ public class BlockManager {
}
/**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
* Returns whether a node can be safely decommissioned based on its
* liveness. Dead nodes cannot always be safely decommissioned.
*/
boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0;
final Iterator<? extends Block> it = srcNode.getBlockIterator();
while(it.hasNext()) {
final Block block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block);
if (bc != null) {
NumberReplicas num = countNodes(block);
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
if (curExpectedReplicas > curReplicas) {
if (bc.isUnderConstruction()) {
if (block.equals(bc.getLastBlock()) && curReplicas > minReplication) {
continue;
}
underReplicatedInOpenFiles++;
boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
if (node.isAlive) {
return true;
}
// Log info about one block for this node which needs replication
if (!status) {
status = true;
if (firstReplicationLog) {
logBlockReplicationInfo(block, srcNode, num);
}
// Allowing decommission as long as default replication is met
if (curReplicas >= defaultReplication) {
status = false;
firstReplicationLog = false;
}
}
underReplicatedBlocks++;
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
}
if (!neededReplications.contains(block) &&
pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
//
// These blocks have been reported from the datanode
// after the startDecommission method has been executed. These
// blocks were in flight when the decommissioning was started.
// Process these blocks only when active NN is out of safe mode.
//
neededReplications.add(block,
curReplicas,
num.decommissionedReplicas(),
curExpectedReplicas);
}
}
}
}
if (!status && !srcNode.isAlive) {
updateState();
if (pendingReplicationBlocksCount == 0 &&
underReplicatedBlocksCount == 0) {
LOG.info("srcNode {} is dead and there are no under-replicated" +
" blocks or blocks pending replication. Marking as " +
"decommissioned.");
} else {
LOG.warn("srcNode " + srcNode + " is dead " +
"while decommission is in progress. Continuing to mark " +
"it as decommission in progress so when it rejoins the " +
"cluster it can continue the decommission process.");
status = true;
}
LOG.info("Node {} is dead and there are no under-replicated" +
" blocks or blocks pending replication. Safe to decommission.",
node);
return true;
}
srcNode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
return status;
LOG.warn("Node {} is dead " +
"while decommission is in progress. Cannot be safely " +
"decommissioned since there is risk of reduced " +
"data durability or data loss. Either restart the failed node or" +
" force decommissioning by removing, calling refreshNodes, " +
"then re-adding to the excludes files.", node);
return false;
}
public int getActiveBlockCount() {
@ -3493,7 +3412,7 @@ public class BlockManager {
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
private boolean isNeededReplication(Block b, int expected, int current) {
boolean isNeededReplication(Block b, int expected, int current) {
return current < expected || !blockHasEnoughRacks(b);
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@ -53,8 +52,6 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import static org.apache.hadoop.util.Time.now;
/**
* Manage datanodes, include decommission and other activities.
*/
@ -65,9 +62,9 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final DecommissionManager decomManager;
private final HeartbeatManager heartbeatManager;
private final FSClusterStats fsClusterStats;
private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@ -110,7 +107,7 @@ public class DatanodeManager {
private final HostFileManager hostFileManager = new HostFileManager();
/** The period to wait for datanode heartbeat.*/
private final long heartbeatExpireInterval;
private long heartbeatExpireInterval;
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
@ -184,6 +181,8 @@ public class DatanodeManager {
networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.decomManager = new DecommissionManager(namesystem, blockManager,
heartbeatManager);
this.fsClusterStats = newFSClusterStats();
this.defaultXferPort = NetUtils.createSocketAddr(
@ -307,25 +306,12 @@ public class DatanodeManager {
}
void activate(final Configuration conf) {
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
decommissionthread.start();
decomManager.activate(conf);
heartbeatManager.activate(conf);
}
void close() {
if (decommissionthread != null) {
decommissionthread.interrupt();
try {
decommissionthread.join(3000);
} catch (InterruptedException e) {
}
}
decomManager.close();
heartbeatManager.close();
}
@ -339,6 +325,20 @@ public class DatanodeManager {
return heartbeatManager;
}
@VisibleForTesting
public DecommissionManager getDecomManager() {
return decomManager;
}
HostFileManager getHostFileManager() {
return hostFileManager;
}
@VisibleForTesting
public void setHeartbeatExpireInterval(long expiryMs) {
this.heartbeatExpireInterval = expiryMs;
}
@VisibleForTesting
public FSClusterStats getFSClusterStats() {
return fsClusterStats;
@ -826,63 +826,14 @@ public class DatanodeManager {
}
/**
* Decommission the node if it is in exclude list.
* Decommission the node if it is in the host exclude list.
*
* @param nodeReg datanode
*/
private void checkDecommissioning(DatanodeDescriptor nodeReg) {
void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it
if (hostFileManager.isExcluded(nodeReg)) {
startDecommission(nodeReg);
}
}
/**
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
boolean checkDecommissionState(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
if (node.isDecommissionInProgress() && node.checkBlockReportReceived()) {
if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned();
LOG.info("Decommission complete for " + node);
}
}
return node.isDecommissioned();
}
/** Start decommissioning the specified datanode. */
@InterfaceAudience.Private
@VisibleForTesting
public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress()) {
if (!node.isAlive) {
LOG.info("Dead node " + node + " is decommissioned immediately.");
node.setDecommissioned();
} else if (!node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage
+ " with " + storage.numBlocks() + " blocks");
}
heartbeatManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
// all the blocks that reside on this node have to be replicated.
checkDecommissionState(node);
}
}
}
/** Stop decommissioning the specified datanodes. */
void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stop Decommissioning " + node);
heartbeatManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
if (getHostFileManager().isExcluded(nodeReg)) {
decomManager.startDecommission(nodeReg);
}
}
@ -993,7 +944,7 @@ public class DatanodeManager {
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
checkDecommissioning(nodeS);
startDecommissioningIfExcluded(nodeS);
success = true;
} finally {
if (!success) {
@ -1029,7 +980,7 @@ public class DatanodeManager {
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
checkDecommissioning(nodeDescr);
startDecommissioningIfExcluded(nodeDescr);
success = true;
} finally {
if (!success) {
@ -1092,9 +1043,9 @@ public class DatanodeManager {
node.setDisallowed(true); // case 2.
} else {
if (hostFileManager.isExcluded(node)) {
startDecommission(node); // case 3.
decomManager.startDecommission(node); // case 3.
} else {
stopDecommission(node); // case 4.
decomManager.stopDecommission(node); // case 4.
}
}
}

View File

@ -17,88 +17,605 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.AbstractList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.util.ChunkedArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.util.Time.now;
/**
* Manage node decommissioning.
* Manages datanode decommissioning. A background monitor thread
* periodically checks the status of datanodes that are in-progress of
* decommissioning.
* <p/>
* A datanode can be decommissioned in a few situations:
* <ul>
* <li>If a DN is dead, it is decommissioned immediately.</li>
* <li>If a DN is alive, it is decommissioned after all of its blocks
* are sufficiently replicated. Merely under-replicated blocks do not
* block decommissioning as long as they are above a replication
* threshold.</li>
* </ul>
* In the second case, the datanode transitions to a
* decommission-in-progress state and is tracked by the monitor thread. The
* monitor periodically scans through the list of insufficiently replicated
* blocks on these datanodes to
* determine if they can be decommissioned. The monitor also prunes this list
* as blocks become replicated, so monitor scans will become more efficient
* over time.
* <p/>
* Decommission-in-progress nodes that become dead do not progress to
* decommissioned until they become live again. This prevents potential
* durability loss for singly-replicated blocks (see HDFS-6791).
* <p/>
* This class depends on the FSNamesystem lock for synchronization.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
public class DecommissionManager {
private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
.class);
private final Namesystem namesystem;
private final BlockManager blockmanager;
private final BlockManager blockManager;
private final HeartbeatManager hbManager;
private final ScheduledExecutorService executor;
/**
* Map containing the decommission-in-progress datanodes that are being
* tracked so they can be be marked as decommissioned.
* <p/>
* This holds a set of references to the under-replicated blocks on the DN at
* the time the DN is added to the map, i.e. the blocks that are preventing
* the node from being marked as decommissioned. During a monitor tick, this
* list is pruned as blocks becomes replicated.
* <p/>
* Note also that the reference to the list of under-replicated blocks
* will be null on initial add
* <p/>
* However, this map can become out-of-date since it is not updated by block
* reports or other events. Before being finally marking as decommissioned,
* another check is done with the actual block map.
*/
private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
decomNodeBlocks;
/**
* Tracking a node in decomNodeBlocks consumes additional memory. To limit
* the impact on NN memory consumption, we limit the number of nodes in
* decomNodeBlocks. Additional nodes wait in pendingNodes.
*/
private final Queue<DatanodeDescriptor> pendingNodes;
private Monitor monitor = null;
DecommissionManager(final Namesystem namesystem,
final BlockManager blockmanager) {
final BlockManager blockManager, final HeartbeatManager hbManager) {
this.namesystem = namesystem;
this.blockmanager = blockmanager;
}
this.blockManager = blockManager;
this.hbManager = hbManager;
/** Periodically check decommission status. */
class Monitor implements Runnable {
/** recheckInterval is how often namenode checks
* if a node has finished decommission
*/
private final long recheckInterval;
/** The number of decommission nodes to check for each interval */
private final int numNodesPerCheck;
/** firstkey can be initialized to anything. */
private String firstkey = "";
Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
this.recheckInterval = recheckIntervalInSecond * 1000L;
this.numNodesPerCheck = numNodesPerCheck;
executor = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
.setDaemon(true).build());
decomNodeBlocks = new TreeMap<>();
pendingNodes = new LinkedList<>();
}
/**
* Check decommission status of numNodesPerCheck nodes
* for every recheckInterval milliseconds.
* Start the decommission monitor thread.
* @param conf
*/
void activate(Configuration conf) {
final int intervalSecs =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT);
checkArgument(intervalSecs >= 0, "Cannot set a negative " +
"value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
// By default, the new configuration key overrides the deprecated one.
// No # node limit is set.
int blocksPerInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
int nodesPerInterval = Integer.MAX_VALUE;
// If the expected key isn't present and the deprecated one is,
// use the deprecated one into the new one. This overrides the
// default.
//
// Also print a deprecation warning.
final String deprecatedKey =
"dfs.namenode.decommission.nodes.per.interval";
final String strNodes = conf.get(deprecatedKey);
if (strNodes != null) {
nodesPerInterval = Integer.parseInt(strNodes);
blocksPerInterval = Integer.MAX_VALUE;
LOG.warn("Using deprecated configuration key {} value of {}.",
deprecatedKey, nodesPerInterval);
LOG.warn("Please update your configuration to use {} instead.",
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
}
checkArgument(blocksPerInterval > 0,
"Must set a positive value for "
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
final int maxConcurrentTrackedNodes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
"value for "
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
monitor = new Monitor(blocksPerInterval,
nodesPerInterval, maxConcurrentTrackedNodes);
executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
TimeUnit.SECONDS);
LOG.debug("Activating DecommissionManager with interval {} seconds, " +
"{} max blocks per interval, {} max nodes per interval, " +
"{} max concurrently tracked nodes.", intervalSecs,
blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
}
/**
* Stop the decommission monitor thread, waiting briefly for it to terminate.
*/
void close() {
executor.shutdownNow();
try {
executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {}
}
/**
* Start decommissioning the specified datanode.
* @param node
*/
@VisibleForTesting
public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress()) {
if (!node.isAlive) {
LOG.info("Dead node {} is decommissioned immediately.", node);
node.setDecommissioned();
} else if (!node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Starting decommission of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
pendingNodes.add(node);
}
} else {
LOG.trace("startDecommission: Node {} is already decommission in "
+ "progress, nothing to do.", node);
}
}
/**
* Stop decommissioning the specified datanode.
* @param node
*/
void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.info("Stopping decommissioning of node {}", node);
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
decomNodeBlocks.remove(node);
} else {
LOG.trace("stopDecommission: Node {} is not decommission in progress " +
"or decommissioned, nothing to do.", node);
}
}
private void setDecommissioned(DatanodeDescriptor dn) {
dn.setDecommissioned();
LOG.info("Decommissioning complete for node {}", dn);
}
/**
* Checks whether a block is sufficiently replicated for decommissioning.
* Full-strength replication is not always necessary, hence "sufficient".
* @return true if sufficient, else false.
*/
private boolean isSufficientlyReplicated(BlockInfoContiguous block,
BlockCollection bc,
NumberReplicas numberReplicas) {
final int numExpected = bc.getBlockReplication();
final int numLive = numberReplicas.liveReplicas();
if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
// Block doesn't need replication. Skip.
LOG.trace("Block {} does not need replication.", block);
return true;
}
// Block is under-replicated
LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
numLive);
if (numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas
if (numLive >= blockManager.minReplication) {
LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+ ">= minR ({})", block, numLive, blockManager.minReplication);
return true;
} else {
LOG.trace("UC block {} insufficiently-replicated since numLive "
+ "({}) < minR ({})", block, numLive,
blockManager.minReplication);
}
} else {
// Can decom a non-UC as long as the default replication is met
if (numLive >= blockManager.defaultReplication) {
return true;
}
}
}
return false;
}
private static void logBlockReplicationInfo(Block block, BlockCollection bc,
DatanodeDescriptor srcNode, NumberReplicas num,
Iterable<DatanodeStorageInfo> storages) {
int curReplicas = num.liveReplicas();
int curExpectedReplicas = bc.getBlockReplication();
StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
nodeList.append(node);
nodeList.append(" ");
}
LOG.info("Block: " + block + ", Expected Replicas: "
+ curExpectedReplicas + ", live replicas: " + curReplicas
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress());
}
@VisibleForTesting
public int getNumPendingNodes() {
return pendingNodes.size();
}
@VisibleForTesting
public int getNumTrackedNodes() {
return decomNodeBlocks.size();
}
@VisibleForTesting
public int getNumNodesChecked() {
return monitor.numNodesChecked;
}
/**
* Checks to see if DNs have finished decommissioning.
* <p/>
* Since this is done while holding the namesystem lock,
* the amount of work per monitor tick is limited.
*/
private class Monitor implements Runnable {
/**
* The maximum number of blocks to check per tick.
*/
private final int numBlocksPerCheck;
/**
* The maximum number of nodes to check per tick.
*/
private final int numNodesPerCheck;
/**
* The maximum number of nodes to track in decomNodeBlocks. A value of 0
* means no limit.
*/
private final int maxConcurrentTrackedNodes;
/**
* The number of blocks that have been checked on this tick.
*/
private int numBlocksChecked = 0;
/**
* The number of nodes that have been checked on this tick. Used for
* testing.
*/
private int numNodesChecked = 0;
/**
* The last datanode in decomNodeBlocks that we've processed
*/
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
DatanodeID("", "", "", 0, 0, 0, 0));
Monitor(int numBlocksPerCheck, int numNodesPerCheck, int
maxConcurrentTrackedNodes) {
this.numBlocksPerCheck = numBlocksPerCheck;
this.numNodesPerCheck = numNodesPerCheck;
this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
}
private boolean exceededNumBlocksPerCheck() {
LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
return numBlocksChecked >= numBlocksPerCheck;
}
@Deprecated
private boolean exceededNumNodesPerCheck() {
LOG.trace("Processed {} nodes so far this tick", numNodesChecked);
return numNodesChecked >= numNodesPerCheck;
}
@Override
public void run() {
for(; namesystem.isRunning(); ) {
if (!namesystem.isRunning()) {
LOG.info("Namesystem is not running, skipping decommissioning checks"
+ ".");
return;
}
// Reset the checked count at beginning of each iteration
numBlocksChecked = 0;
numNodesChecked = 0;
// Check decom progress
namesystem.writeLock();
try {
processPendingNodes();
check();
} finally {
namesystem.writeUnlock();
}
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
if (numBlocksChecked + numNodesChecked > 0) {
LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
numNodesChecked);
}
}
/**
* Pop datanodes off the pending list and into decomNodeBlocks,
* subject to the maxConcurrentTrackedNodes limit.
*/
private void processPendingNodes() {
while (!pendingNodes.isEmpty() &&
(maxConcurrentTrackedNodes == 0 ||
decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
decomNodeBlocks.put(pendingNodes.poll(), null);
}
}
private void check() {
final DatanodeManager dm = blockmanager.getDatanodeManager();
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
: dm.getDatanodeCyclicIteration(firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>>
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
if (d.isDecommissionInProgress()) {
try {
dm.checkDecommissionState(d);
} catch(Exception e) {
LOG.warn("entry=" + entry, e);
while (it.hasNext()
&& !exceededNumBlocksPerCheck()
&& !exceededNumNodesPerCheck()) {
numNodesChecked++;
final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
entry = it.next();
final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfoContiguous> blocks = entry.getValue();
boolean fullScan = false;
if (blocks == null) {
// This is a newly added datanode, run through its list to schedule
// under-replicated blocks for replication and collect the blocks
// that are insufficiently replicated for further tracking
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
blocks = handleInsufficientlyReplicated(dn);
decomNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can be decommed
LOG.debug("Processing decommission-in-progress node {}", dn);
pruneSufficientlyReplicated(dn, blocks);
}
if (++count == numNodesPerCheck) {
return;
if (blocks.size() == 0) {
if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the
// full block map.
//
// We've replicated all the known insufficiently replicated
// blocks. Re-check with the full block map before finally
// marking the datanode as decommissioned
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
blocks = handleInsufficientlyReplicated(dn);
decomNodeBlocks.put(dn, blocks);
}
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as decommissioned.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommission(dn);
if (blocks.size() == 0 && isHealthy) {
setDecommissioned(dn);
toRemove.add(dn);
LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as decommissioned.", dn);
} else {
if (LOG.isDebugEnabled()) {
StringBuilder b = new StringBuilder("Node {} ");
if (isHealthy) {
b.append("is ");
} else {
b.append("isn't ");
}
b.append("healthy and still needs to replicate {} more blocks," +
" decommissioning is still in progress.");
LOG.debug(b.toString(), dn, blocks.size());
}
}
} else {
LOG.debug("Node {} still has {} blocks to replicate "
+ "before it is a candidate to finish decommissioning.",
dn, blocks.size());
}
iterkey = dn;
}
// Remove the datanodes that are decommissioned
for (DatanodeDescriptor dn : toRemove) {
Preconditions.checkState(dn.isDecommissioned(),
"Removing a node that is not yet decommissioned!");
decomNodeBlocks.remove(dn);
}
}
/**
* Removes sufficiently replicated blocks from the block list of a
* datanode.
*/
private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
AbstractList<BlockInfoContiguous> blocks) {
processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
}
/**
* Returns a list of blocks on a datanode that are insufficiently
* replicated, i.e. are under-replicated enough to prevent decommission.
* <p/>
* As part of this, it also schedules replication work for
* any under-replicated blocks.
*
* @param datanode
* @return List of insufficiently replicated blocks
*/
private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated(
final DatanodeDescriptor datanode) {
AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>();
processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
insufficient, false);
return insufficient;
}
/**
* Used while checking if decommission-in-progress datanodes can be marked
* as decommissioned. Combines shared logic of
* pruneSufficientlyReplicated and handleInsufficientlyReplicated.
*
* @param datanode Datanode
* @param it Iterator over the blocks on the
* datanode
* @param insufficientlyReplicated Return parameter. If it's not null,
* will contain the insufficiently
* replicated-blocks from the list.
* @param pruneSufficientlyReplicated whether to remove sufficiently
* replicated blocks from the iterator
* @return true if there are under-replicated blocks in the provided block
* iterator, else false.
*/
private void processBlocksForDecomInternal(
final DatanodeDescriptor datanode,
final Iterator<BlockInfoContiguous> it,
final List<BlockInfoContiguous> insufficientlyReplicated,
boolean pruneSufficientlyReplicated) {
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0;
while (it.hasNext()) {
numBlocksChecked++;
final BlockInfoContiguous block = it.next();
// Remove the block from the list if it's no longer in the block map,
// e.g. the containing file has been deleted
if (blockManager.blocksMap.getStoredBlock(block) == null) {
LOG.trace("Removing unknown block {}", block);
it.remove();
continue;
}
BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
if (bc == null) {
// Orphan block, will be invalidated eventually. Skip.
continue;
}
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
final int curReplicas = liveReplicas;
// Schedule under-replicated blocks for replication if not already
// pending
if (blockManager.isNeededReplication(block, bc.getBlockReplication(),
liveReplicas)) {
if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
curReplicas,
num.decommissionedReplicas(),
bc.getBlockReplication());
}
}
// Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated
if (isSufficientlyReplicated(block, bc, num)) {
if (pruneSufficientlyReplicated) {
it.remove();
}
continue;
}
// We've found an insufficiently replicated block.
if (insufficientlyReplicated != null) {
insufficientlyReplicated.add(block);
}
// Log if this is our first time through
if (firstReplicationLog) {
logBlockReplicationInfo(block, bc, datanode, num,
blockManager.blocksMap.getStorages(block));
firstReplicationLog = false;
}
// Update various counts
underReplicatedBlocks++;
if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
}
datanode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
}
}
@VisibleForTesting
void runMonitor() throws ExecutionException, InterruptedException {
Future f = executor.submit(monitor);
f.get();
}
}

View File

@ -736,10 +736,25 @@
</property>
<property>
<name>dfs.namenode.decommission.nodes.per.interval</name>
<value>5</value>
<description>The number of nodes namenode checks if decommission is complete
in each dfs.namenode.decommission.interval.</description>
<name>dfs.namenode.decommission.blocks.per.interval</name>
<value>500000</value>
<description>The approximate number of blocks to process per
decommission interval, as defined in dfs.namenode.decommission.interval.
</description>
</property>
<property>
<name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
<value>100</value>
<description>
The maximum number of decommission-in-progress datanodes nodes that will be
tracked at one time by the namenode. Tracking a decommission-in-progress
datanode consumes additional NN memory proportional to the number of blocks
on the datnode. Having a conservative limit reduces the potential impact
of decomissioning a large number of nodes at once.
A value of 0 means no limit will be enforced.
</description>
</property>
<property>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -26,39 +27,56 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class tests the decommissioning of nodes.
*/
public class TestDecommission {
public static final Log LOG = LogFactory.getLog(TestDecommission.class);
public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
.class);
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
@ -90,6 +108,7 @@ public class TestDecommission {
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
@ -106,7 +125,7 @@ public class TestDecommission {
}
}
private void writeConfigFile(Path name, ArrayList<String> nodes)
private void writeConfigFile(Path name, List<String> nodes)
throws IOException {
// delete if it already exists
if (localFileSys.exists(name)) {
@ -150,7 +169,7 @@ public class TestDecommission {
* @param downnode - if null, there is no decommissioned node for this file.
* @return - null if no failure found, else an error message string.
*/
private String checkFile(FileSystem fileSys, Path name, int repl,
private static String checkFile(FileSystem fileSys, Path name, int repl,
String downnode, int numDatanodes) throws IOException {
boolean isNodeDown = (downnode != null);
// need a raw stream
@ -262,7 +281,7 @@ public class TestDecommission {
/* Ask a specific NN to stop decommission of the datanode and wait for each
* to reach the NORMAL state.
*/
private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
LOG.info("Recommissioning node: " + decommissionedNode);
writeConfigFile(excludeFile, null);
refreshNodes(cluster.getNamesystem(nnIndex), conf);
@ -280,7 +299,7 @@ public class TestDecommission {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
Thread.sleep(HEARTBEAT_INTERVAL * 500);
} catch (InterruptedException e) {
// nothing
}
@ -322,28 +341,27 @@ public class TestDecommission {
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
DatanodeInfo node, boolean decommissioning)
DatanodeInfo info, DataNode node, boolean decommissioning)
throws InterruptedException, IOException {
// Do the stats check over 10 iterations
// Do the stats check over 10 heartbeats
for (int i = 0; i < 10; i++) {
long[] newStats = namenode.getRpcServer().getStats();
// For decommissioning nodes, ensure capacity of the DN is no longer
// counted. Only used space of the DN is counted in cluster capacity
assertEquals(newStats[0], decommissioning ? node.getDfsUsed() :
node.getCapacity());
assertEquals(newStats[0],
decommissioning ? info.getDfsUsed() : info.getCapacity());
// Ensure cluster used capacity is counted for both normal and
// decommissioning nodes
assertEquals(newStats[1], node.getDfsUsed());
assertEquals(newStats[1], info.getDfsUsed());
// For decommissioning nodes, remaining space from the DN is not counted
assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
// Ensure transceiver count is same as that DN
assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
DataNodeTestUtils.triggerHeartbeat(node);
}
}
@ -407,14 +425,6 @@ public class TestDecommission {
cluster.shutdown();
}
/**
* Tests recommission for non federated cluster
*/
@Test(timeout=360000)
public void testRecommission() throws IOException {
testRecommission(1, 6);
}
/**
* Test decommission for federeated cluster
*/
@ -501,12 +511,12 @@ public class TestDecommission {
// 1. the last DN would have been chosen as excess replica, given its
// heartbeat is considered old.
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
// 2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
// 2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
// and one excess replica ( 3 )
// After the fix,
// After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
// After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
Thread.sleep(slowHeartbeatDNwaitTime);
recomissionNode(1, decomNodeFromSBN);
recommissionNode(1, decomNodeFromSBN);
// Step 3.b, ask ANN to recommission the first DN.
// To verify the fix, the test makes sure the excess replica picked by ANN
@ -525,7 +535,7 @@ public class TestDecommission {
cluster.restartDataNode(nextToLastDNprop);
cluster.waitActive();
Thread.sleep(slowHeartbeatDNwaitTime);
recomissionNode(0, decommissionedNodeFromANN);
recommissionNode(0, decommissionedNodeFromANN);
// Step 3.c, make sure the DN has deleted the block and report to NNs
cluster.triggerHeartbeats();
@ -607,70 +617,89 @@ public class TestDecommission {
cluster.shutdown();
}
private void testRecommission(int numNamenodes, int numDatanodes)
throws IOException {
/**
* Test that over-replicated blocks are deleted on recommission.
*/
@Test(timeout=120000)
public void testRecommission() throws Exception {
final int numDatanodes = 6;
try {
LOG.info("Starting test testRecommission");
startCluster(numNamenodes, numDatanodes, conf);
startCluster(1, numDatanodes, conf);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
for(int i = 0; i < numNamenodes; i++) {
namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
}
Path file1 = new Path("testDecommission.dat");
int replicas = numDatanodes - 1;
final Path file1 = new Path("testDecommission.dat");
final int replicas = numDatanodes - 1;
for (int i = 0; i < numNamenodes; i++) {
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
FileSystem fileSys = cluster.getFileSystem(i);
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
final FileSystem fileSys = cluster.getFileSystem();
// Write a file to n-1 datanodes
writeFile(fileSys, file1, replicas);
// Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
// Decommission one of the datanodes with a replica
BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
assertEquals("Unexpected number of replicas from getFileBlockLocations",
replicas, loc.getHosts().length);
final String toDecomHost = loc.getNames()[0];
String toDecomUuid = null;
for (DataNode d : cluster.getDataNodes()) {
if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
toDecomUuid = d.getDatanodeId().getDatanodeUuid();
break;
}
}
assertNotNull("Could not find a dn with the block!", toDecomUuid);
final DatanodeInfo decomNode =
decommissionNode(0, toDecomUuid, decommissionedNodes,
AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
final BlockManager blockManager =
cluster.getNamesystem().getBlockManager();
final DatanodeManager datanodeManager =
blockManager.getDatanodeManager();
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
// Ensure decommissioned datanode is not automatically shutdown
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
DFSClient client = getDfsClient(cluster.getNameNode(), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
int tries =0;
// wait for the block to be replicated
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
}
}
assertTrue("Checked if block was replicated after decommission, tried "
+ tries + " times.", tries < 20);
// stop decommission and check if the new replicas are removed
recomissionNode(0, decomNode);
// wait for the block to be deleted
tries = 0;
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
// wait for the block to be replicated
final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
final String uuid = toDecomUuid;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockInfoContiguous info =
blockManager.getStoredBlock(b.getLocalBlock());
int count = 0;
StringBuilder sb = new StringBuilder("Replica locations: ");
for (int i = 0; i < info.numNodes(); i++) {
DatanodeDescriptor dn = info.getDatanode(i);
sb.append(dn + ", ");
if (!dn.getDatanodeUuid().equals(uuid)) {
count++;
}
}
LOG.info(sb.toString());
LOG.info("Count: " + count);
return count == replicas;
}
}, 500, 30000);
// redecommission and wait for over-replication to be fixed
recommissionNode(0, decomNode);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0);
cleanupFile(fileSys, file1);
assertTrue("Checked if node was recommissioned " + tries + " times.",
tries < 20);
LOG.info("tried: " + tries + " times before recommissioned");
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Tests cluster storage statistics during decommissioning for non
@ -703,20 +732,35 @@ public class TestDecommission {
FSNamesystem fsn = cluster.getNamesystem(i);
NameNode namenode = cluster.getNameNode(i);
DatanodeInfo downnode = decommissionNode(i, null, null,
DatanodeInfo decomInfo = decommissionNode(i, null, null,
AdminStates.DECOMMISSION_INPROGRESS);
DataNode decomNode = getDataNode(decomInfo);
// Check namenode stats for multiple datanode heartbeats
verifyStats(namenode, fsn, downnode, true);
verifyStats(namenode, fsn, decomInfo, decomNode, true);
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
refreshNodes(fsn, conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
waitNodeState(ret, AdminStates.NORMAL);
verifyStats(namenode, fsn, ret, false);
DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
DataNode retNode = getDataNode(decomInfo);
waitNodeState(retInfo, AdminStates.NORMAL);
verifyStats(namenode, fsn, retInfo, retNode, false);
}
}
private DataNode getDataNode(DatanodeInfo decomInfo) {
DataNode decomNode = null;
for (DataNode dn: cluster.getDataNodes()) {
if (decomInfo.equals(dn.getDatanodeId())) {
decomNode = dn;
break;
}
}
assertNotNull("Could not find decomNode in cluster!", decomNode);
return decomNode;
}
/**
* Test host/include file functionality. Only datanodes
* in the include file are allowed to connect to the namenode in a non
@ -902,9 +946,9 @@ public class TestDecommission {
* It is not recommended to use a registration name which is not also a
* valid DNS hostname for the DataNode. See HDFS-5237 for background.
*/
@Ignore
@Test(timeout=360000)
public void testIncludeByRegistrationName() throws IOException,
InterruptedException {
public void testIncludeByRegistrationName() throws Exception {
Configuration hdfsConf = new Configuration(conf);
// Any IPv4 address starting with 127 functions as a "loopback" address
// which is connected to the current host. So by choosing 127.0.0.100
@ -927,15 +971,22 @@ public class TestDecommission {
refreshNodes(cluster.getNamesystem(0), hdfsConf);
// Wait for the DN to be marked dead.
DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
while (true) {
LOG.info("Waiting for DN to be marked as dead.");
final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockManagerTestUtil
.checkHeartbeat(cluster.getNamesystem().getBlockManager());
try {
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
if (info.length == 1) {
break;
return info.length == 1;
} catch (IOException e) {
LOG.warn("Failed to check dead DNs", e);
return false;
}
LOG.info("Waiting for datanode to be marked dead");
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
}
}, 500, 5000);
// Use a non-empty include file with our registration name.
// It should work.
@ -945,18 +996,169 @@ public class TestDecommission {
writeConfigFile(hostsFile, nodes);
refreshNodes(cluster.getNamesystem(0), hdfsConf);
cluster.restartDataNode(0);
cluster.triggerHeartbeats();
// Wait for the DN to come back.
while (true) {
LOG.info("Waiting for DN to come back.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
BlockManagerTestUtil
.checkHeartbeat(cluster.getNamesystem().getBlockManager());
try {
DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
if (info.length == 1) {
Assert.assertFalse(info[0].isDecommissioned());
Assert.assertFalse(info[0].isDecommissionInProgress());
assertEquals(registrationName, info[0].getHostName());
break;
return true;
}
LOG.info("Waiting for datanode to come back");
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
} catch (IOException e) {
LOG.warn("Failed to check dead DNs", e);
}
return false;
}
}, 500, 5000);
}
@Test(timeout=120000)
public void testBlocksPerInterval() throws Exception {
Configuration newConf = new Configuration(conf);
org.apache.log4j.Logger.getLogger(DecommissionManager.class)
.setLevel(Level.TRACE);
// Turn the blocks per interval way down
newConf.setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
3);
// Disable the normal monitor runs
newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3, newConf);
final FileSystem fs = cluster.getFileSystem();
final DatanodeManager datanodeManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
final DecommissionManager decomManager = datanodeManager.getDecomManager();
// Write a 3 block file, so each node has one block. Should scan 3 nodes.
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 3);
// Write another file, should only scan two
DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 2);
// One more file, should only scan 1
DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 1);
// blocks on each DN now exceeds limit, still scan at least one node
DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
doDecomCheck(datanodeManager, decomManager, 1);
}
@Deprecated
@Test(timeout=120000)
public void testNodesPerInterval() throws Exception {
Configuration newConf = new Configuration(conf);
org.apache.log4j.Logger.getLogger(DecommissionManager.class)
.setLevel(Level.TRACE);
// Set the deprecated configuration key which limits the # of nodes per
// interval
newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1);
// Disable the normal monitor runs
newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3, newConf);
final FileSystem fs = cluster.getFileSystem();
final DatanodeManager datanodeManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
final DecommissionManager decomManager = datanodeManager.getDecomManager();
// Write a 3 block file, so each node has one block. Should scan 1 node
// each time.
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
for (int i=0; i<3; i++) {
doDecomCheck(datanodeManager, decomManager, 1);
}
}
private void doDecomCheck(DatanodeManager datanodeManager,
DecommissionManager decomManager, int expectedNumCheckedNodes)
throws IOException, ExecutionException, InterruptedException {
// Decom all nodes
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
for (DataNode d: cluster.getDataNodes()) {
DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
// Run decom scan and check
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes,
decomManager.getNumNodesChecked());
// Recommission all nodes
for (DatanodeInfo dn : decommissionedNodes) {
recommissionNode(0, dn);
}
}
@Test(timeout=120000)
public void testPendingNodes() throws Exception {
Configuration newConf = new Configuration(conf);
org.apache.log4j.Logger.getLogger(DecommissionManager.class)
.setLevel(Level.TRACE);
// Only allow one node to be decom'd at a time
newConf.setInt(
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1);
// Disable the normal monitor runs
newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
Integer.MAX_VALUE);
startCluster(1, 3, newConf);
final FileSystem fs = cluster.getFileSystem();
final DatanodeManager datanodeManager =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
final DecommissionManager decomManager = datanodeManager.getDecomManager();
// Keep a file open to prevent decom from progressing
HdfsDataOutputStream open1 =
(HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
// Flush and trigger block reports so the block definitely shows up on NN
open1.write(123);
open1.hflush();
for (DataNode d: cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(d);
}
// Decom two nodes, so one is still alive
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
for (int i=0; i<2; i++) {
final DataNode d = cluster.getDataNodes().get(i);
DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
}
for (int i=2; i>=0; i--) {
assertTrackedAndPending(decomManager, 0, i);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
}
// Close file, try to decom the last node, should get stuck in tracked
open1.close();
final DataNode d = cluster.getDataNodes().get(2);
DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(dn);
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertTrackedAndPending(decomManager, 1, 0);
}
private void assertTrackedAndPending(DecommissionManager decomManager,
int tracked, int pending) {
assertEquals("Unexpected number of tracked nodes", tracked,
decomManager.getNumTrackedNodes());
assertEquals("Unexpected number of pending nodes", pending,
decomManager.getNumPendingNodes());
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -300,9 +301,8 @@ public class BlockManagerTestUtil {
* Have DatanodeManager check decommission state.
* @param dm the DatanodeManager to manipulate
*/
public static void checkDecommissionState(DatanodeManager dm,
DatanodeDescriptor node) {
dm.checkDecommissionState(node);
public static void recheckDecommissionState(DatanodeManager dm)
throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor();
}
}

View File

@ -137,7 +137,7 @@ public class TestReplicationPolicyConsiderLoad {
// returns false
for (int i = 0; i < 3; i++) {
DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
dnManager.startDecommission(d);
dnManager.getDecomManager().startDecommission(d);
d.setDecommissioned();
}
assertEquals((double)load/3, dnManager.getFSClusterStats()

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -29,7 +28,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
@ -53,7 +51,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -90,7 +93,8 @@ public class TestDecommissioningStatus {
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
Path includeFile = new Path(dir, "include");
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
4);
@ -104,6 +108,9 @@ public class TestDecommissioningStatus {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
cluster.getNamesystem().getBlockManager().getDatanodeManager()
.setHeartbeatExpireInterval(3000);
Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
}
@AfterClass
@ -186,13 +193,16 @@ public class TestDecommissioningStatus {
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
int expectedUnderRep, int expectedDecommissionOnly,
int expectedUnderRepInOpenFiles) {
assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(),
expectedUnderRep);
assertEquals("Unexpected num under-replicated blocks",
expectedUnderRep,
decommNode.decommissioningStatus.getUnderReplicatedBlocks());
assertEquals("Unexpected number of decom-only replicas",
expectedDecommissionOnly,
decommNode.decommissioningStatus.getDecommissionOnlyReplicas());
assertEquals(
decommNode.decommissioningStatus.getDecommissionOnlyReplicas(),
expectedDecommissionOnly);
assertEquals(decommNode.decommissioningStatus
.getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles);
"Unexpected number of replicas in under-replicated open files",
expectedUnderRepInOpenFiles,
decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles());
}
private void checkDFSAdminDecommissionStatus(
@ -244,7 +254,7 @@ public class TestDecommissioningStatus {
* Tests Decommissioning Status in DFS.
*/
@Test
public void testDecommissionStatus() throws IOException, InterruptedException {
public void testDecommissionStatus() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", cluster
.getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
@ -253,7 +263,7 @@ public class TestDecommissioningStatus {
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSAdmin admin = new DFSAdmin(cluster.getConfiguration(0));
short replicas = 2;
short replicas = numDatanodes;
//
// Decommission one node. Verify the decommission status
//
@ -263,7 +273,9 @@ public class TestDecommissioningStatus {
Path file2 = new Path("decommission1.dat");
FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
Thread.sleep(5000);
for (DataNode d: cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(d);
}
FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
@ -271,19 +283,22 @@ public class TestDecommissioningStatus {
String downnode = decommissionNode(fsn, client, localFileSys, iteration);
dm.refreshNodes(conf);
decommissionedNodes.add(downnode);
Thread.sleep(5000);
BlockManagerTestUtil.recheckDecommissionState(dm);
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
if (iteration == 0) {
assertEquals(decommissioningNodes.size(), 1);
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
checkDecommissionStatus(decommNode, 4, 0, 2);
checkDecommissionStatus(decommNode, 3, 0, 1);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
fileSys, admin);
} else {
assertEquals(decommissioningNodes.size(), 2);
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
checkDecommissionStatus(decommNode1, 4, 4, 2);
// This one is still 3,3,1 since it passed over the UC block
// earlier, before node 2 was decommed
checkDecommissionStatus(decommNode1, 3, 3, 1);
// This one is 4,4,2 since it has the full state
checkDecommissionStatus(decommNode2, 4, 4, 2);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2),
fileSys, admin);
@ -305,8 +320,7 @@ public class TestDecommissioningStatus {
* the replication process after it rejoins the cluster.
*/
@Test(timeout=120000)
public void testDecommissionStatusAfterDNRestart()
throws IOException, InterruptedException {
public void testDecommissionStatusAfterDNRestart() throws Exception {
DistributedFileSystem fileSys =
(DistributedFileSystem)cluster.getFileSystem();
@ -345,7 +359,7 @@ public class TestDecommissioningStatus {
BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
// Force DatanodeManager to check decommission state.
BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
BlockManagerTestUtil.recheckDecommissionState(dm);
// Verify that the DN remains in DECOMMISSION_INPROGRESS state.
assertTrue("the node should be DECOMMISSION_IN_PROGRESSS",
@ -359,7 +373,7 @@ public class TestDecommissioningStatus {
// Delete the under-replicated file, which should let the
// DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
cleanupFile(fileSys, f);
BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue("the node should be decommissioned",
dead.get(0).isDecommissioned());
@ -380,8 +394,9 @@ public class TestDecommissioningStatus {
* DECOMMISSIONED
*/
@Test(timeout=120000)
public void testDecommissionDeadDN()
throws IOException, InterruptedException, TimeoutException {
public void testDecommissionDeadDN() throws Exception {
Logger log = Logger.getLogger(DecommissionManager.class);
log.setLevel(Level.DEBUG);
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
String dnName = dnID.getXferAddr();
DataNodeProperties stoppedDN = cluster.stopDataNode(0);
@ -392,7 +407,7 @@ public class TestDecommissioningStatus {
DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
decommissionNode(fsn, localFileSys, dnName);
dm.refreshNodes(conf);
BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue(dnDescriptor.isDecommissioned());
// Add the node back

View File

@ -1305,7 +1305,7 @@ public class TestFsck {
.getBlockManager().getBlockCollection(eb.getLocalBlock())
.getBlocks()[0].getDatanode(0);
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().startDecommission(dn);
.getDatanodeManager().getDecomManager().startDecommission(dn);
String dnName = dn.getXferAddr();
//wait for decommission start

View File

@ -30,8 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
@ -240,7 +238,7 @@ public class TestNamenodeCapacityReport {
DatanodeDescriptor dnd =
dnm.getDatanode(datanodes.get(i).getDatanodeId());
expectedInServiceLoad -= dnd.getXceiverCount();
dnm.startDecommission(dnd);
dnm.getDecomManager().startDecommission(dnd);
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);