diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ef72cc6e4db..76ee828a67f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -414,6 +414,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
+ HDFS-7411. Change decommission logic to throttle by blocks rather than
+ nodes in each interval. (Andrew Wang via cdouglas)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6fdf304a419..d1c37df3f00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -455,8 +455,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
- public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
- public static final int DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT = 5;
+ public static final String DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY = "dfs.namenode.decommission.blocks.per.interval";
+ public static final int DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT = 500000;
+ public static final String DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES = "dfs.namenode.decommission.max.concurrent.tracked.nodes";
+ public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
index 8f2966ac47b..29a26678cbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
@@ -139,7 +139,7 @@ public class HdfsConfiguration extends Configuration {
new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS)
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
});
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f0c994b8105..2c7d6e10d5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3191,28 +3191,6 @@ public class BlockManager {
}
return live;
}
-
- private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
- NumberReplicas num) {
- int curReplicas = num.liveReplicas();
- int curExpectedReplicas = getReplication(block);
- BlockCollection bc = blocksMap.getBlockCollection(block);
- StringBuilder nodeList = new StringBuilder();
- for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
- final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- nodeList.append(node);
- nodeList.append(" ");
- }
- LOG.info("Block: " + block + ", Expected Replicas: "
- + curExpectedReplicas + ", live replicas: " + curReplicas
- + ", corrupt replicas: " + num.corruptReplicas()
- + ", decommissioned replicas: " + num.decommissionedReplicas()
- + ", excess replicas: " + num.excessReplicas()
- + ", Is Open File: " + bc.isUnderConstruction()
- + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
- + srcNode + ", Is current datanode decommissioning: "
- + srcNode.isDecommissionInProgress());
- }
/**
* On stopping decommission, check if the node has excess replicas.
@@ -3243,89 +3221,30 @@ public class BlockManager {
}
/**
- * Return true if there are any blocks on this node that have not
- * yet reached their replication factor. Otherwise returns false.
+ * Returns whether a node can be safely decommissioned based on its
+ * liveness. Dead nodes cannot always be safely decommissioned.
*/
- boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
- boolean status = false;
- boolean firstReplicationLog = true;
- int underReplicatedBlocks = 0;
- int decommissionOnlyReplicas = 0;
- int underReplicatedInOpenFiles = 0;
- final Iterator extends Block> it = srcNode.getBlockIterator();
- while(it.hasNext()) {
- final Block block = it.next();
- BlockCollection bc = blocksMap.getBlockCollection(block);
-
- if (bc != null) {
- NumberReplicas num = countNodes(block);
- int curReplicas = num.liveReplicas();
- int curExpectedReplicas = getReplication(block);
-
- if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
- if (curExpectedReplicas > curReplicas) {
- if (bc.isUnderConstruction()) {
- if (block.equals(bc.getLastBlock()) && curReplicas > minReplication) {
- continue;
- }
- underReplicatedInOpenFiles++;
- }
-
- // Log info about one block for this node which needs replication
- if (!status) {
- status = true;
- if (firstReplicationLog) {
- logBlockReplicationInfo(block, srcNode, num);
- }
- // Allowing decommission as long as default replication is met
- if (curReplicas >= defaultReplication) {
- status = false;
- firstReplicationLog = false;
- }
- }
- underReplicatedBlocks++;
- if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
- decommissionOnlyReplicas++;
- }
- }
- if (!neededReplications.contains(block) &&
- pendingReplications.getNumReplicas(block) == 0 &&
- namesystem.isPopulatingReplQueues()) {
- //
- // These blocks have been reported from the datanode
- // after the startDecommission method has been executed. These
- // blocks were in flight when the decommissioning was started.
- // Process these blocks only when active NN is out of safe mode.
- //
- neededReplications.add(block,
- curReplicas,
- num.decommissionedReplicas(),
- curExpectedReplicas);
- }
- }
- }
+ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+ if (node.isAlive) {
+ return true;
}
- if (!status && !srcNode.isAlive) {
- updateState();
- if (pendingReplicationBlocksCount == 0 &&
- underReplicatedBlocksCount == 0) {
- LOG.info("srcNode {} is dead and there are no under-replicated" +
- " blocks or blocks pending replication. Marking as " +
- "decommissioned.");
- } else {
- LOG.warn("srcNode " + srcNode + " is dead " +
- "while decommission is in progress. Continuing to mark " +
- "it as decommission in progress so when it rejoins the " +
- "cluster it can continue the decommission process.");
- status = true;
- }
+ updateState();
+ if (pendingReplicationBlocksCount == 0 &&
+ underReplicatedBlocksCount == 0) {
+ LOG.info("Node {} is dead and there are no under-replicated" +
+ " blocks or blocks pending replication. Safe to decommission.",
+ node);
+ return true;
}
- srcNode.decommissioningStatus.set(underReplicatedBlocks,
- decommissionOnlyReplicas,
- underReplicatedInOpenFiles);
- return status;
+ LOG.warn("Node {} is dead " +
+ "while decommission is in progress. Cannot be safely " +
+ "decommissioned since there is risk of reduced " +
+ "data durability or data loss. Either restart the failed node or" +
+ " force decommissioning by removing, calling refreshNodes, " +
+ "then re-adding to the excludes files.", node);
+ return false;
}
public int getActiveBlockCount() {
@@ -3496,7 +3415,7 @@ public class BlockManager {
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
- private boolean isNeededReplication(Block b, int expected, int current) {
+ boolean isNeededReplication(Block b, int expected, int current) {
return current < expected || !blockHasEnoughRacks(b);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index ec55e4ba8f4..1bd8f976893 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
-import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -53,8 +52,6 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
-import static org.apache.hadoop.util.Time.now;
-
/**
* Manage datanodes, include decommission and other activities.
*/
@@ -65,9 +62,9 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
+ private final DecommissionManager decomManager;
private final HeartbeatManager heartbeatManager;
private final FSClusterStats fsClusterStats;
- private Daemon decommissionthread = null;
/**
* Stores the datanode -> block map.
@@ -110,7 +107,7 @@ public class DatanodeManager {
private final HostFileManager hostFileManager = new HostFileManager();
/** The period to wait for datanode heartbeat.*/
- private final long heartbeatExpireInterval;
+ private long heartbeatExpireInterval;
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
@@ -182,6 +179,8 @@ public class DatanodeManager {
this.blockManager = blockManager;
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
+ this.decomManager = new DecommissionManager(namesystem, blockManager,
+ heartbeatManager);
this.fsClusterStats = newFSClusterStats();
networktopology = NetworkTopology.getInstance(conf);
@@ -307,25 +306,12 @@ public class DatanodeManager {
}
void activate(final Configuration conf) {
- final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
- this.decommissionthread = new Daemon(dm.new Monitor(
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
- decommissionthread.start();
-
+ decomManager.activate(conf);
heartbeatManager.activate(conf);
}
void close() {
- if (decommissionthread != null) {
- decommissionthread.interrupt();
- try {
- decommissionthread.join(3000);
- } catch (InterruptedException e) {
- }
- }
+ decomManager.close();
heartbeatManager.close();
}
@@ -339,6 +325,20 @@ public class DatanodeManager {
return heartbeatManager;
}
+ @VisibleForTesting
+ public DecommissionManager getDecomManager() {
+ return decomManager;
+ }
+
+ HostFileManager getHostFileManager() {
+ return hostFileManager;
+ }
+
+ @VisibleForTesting
+ public void setHeartbeatExpireInterval(long expiryMs) {
+ this.heartbeatExpireInterval = expiryMs;
+ }
+
@VisibleForTesting
public FSClusterStats getFSClusterStats() {
return fsClusterStats;
@@ -825,63 +825,14 @@ public class DatanodeManager {
}
/**
- * Decommission the node if it is in exclude list.
+ * Decommission the node if it is in the host exclude list.
+ *
+ * @param nodeReg datanode
*/
- private void checkDecommissioning(DatanodeDescriptor nodeReg) {
+ void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it
- if (hostFileManager.isExcluded(nodeReg)) {
- startDecommission(nodeReg);
- }
- }
-
- /**
- * Change, if appropriate, the admin state of a datanode to
- * decommission completed. Return true if decommission is complete.
- */
- boolean checkDecommissionState(DatanodeDescriptor node) {
- // Check to see if all blocks in this decommissioned
- // node has reached their target replication factor.
- if (node.isDecommissionInProgress() && node.checkBlockReportReceived()) {
- if (!blockManager.isReplicationInProgress(node)) {
- node.setDecommissioned();
- LOG.info("Decommission complete for " + node);
- }
- }
- return node.isDecommissioned();
- }
-
- /** Start decommissioning the specified datanode. */
- @InterfaceAudience.Private
- @VisibleForTesting
- public void startDecommission(DatanodeDescriptor node) {
- if (!node.isDecommissionInProgress()) {
- if (!node.isAlive) {
- LOG.info("Dead node " + node + " is decommissioned immediately.");
- node.setDecommissioned();
- } else if (!node.isDecommissioned()) {
- for (DatanodeStorageInfo storage : node.getStorageInfos()) {
- LOG.info("Start Decommissioning " + node + " " + storage
- + " with " + storage.numBlocks() + " blocks");
- }
- heartbeatManager.startDecommission(node);
- node.decommissioningStatus.setStartTime(now());
-
- // all the blocks that reside on this node have to be replicated.
- checkDecommissionState(node);
- }
- }
- }
-
- /** Stop decommissioning the specified datanodes. */
- void stopDecommission(DatanodeDescriptor node) {
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- LOG.info("Stop Decommissioning " + node);
- heartbeatManager.stopDecommission(node);
- // Over-replicated blocks will be detected and processed when
- // the dead node comes back and send in its full block report.
- if (node.isAlive) {
- blockManager.processOverReplicatedBlocksOnReCommission(node);
- }
+ if (getHostFileManager().isExcluded(nodeReg)) {
+ decomManager.startDecommission(nodeReg);
}
}
@@ -992,7 +943,7 @@ public class DatanodeManager {
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
- checkDecommissioning(nodeS);
+ startDecommissioningIfExcluded(nodeS);
success = true;
} finally {
if (!success) {
@@ -1028,7 +979,7 @@ public class DatanodeManager {
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
- checkDecommissioning(nodeDescr);
+ startDecommissioningIfExcluded(nodeDescr);
success = true;
} finally {
if (!success) {
@@ -1091,9 +1042,9 @@ public class DatanodeManager {
node.setDisallowed(true); // case 2.
} else {
if (hostFileManager.isExcluded(node)) {
- startDecommission(node); // case 3.
+ decomManager.startDecommission(node); // case 3.
} else {
- stopDecommission(node); // case 4.
+ decomManager.stopDecommission(node); // case 4.
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index a234cf545fc..71c88f18c4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -17,88 +17,605 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.AbstractList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.util.ChunkedArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.util.Time.now;
/**
- * Manage node decommissioning.
+ * Manages datanode decommissioning. A background monitor thread
+ * periodically checks the status of datanodes that are in-progress of
+ * decommissioning.
+ *
+ * A datanode can be decommissioned in a few situations:
+ *
+ * - If a DN is dead, it is decommissioned immediately.
+ * - If a DN is alive, it is decommissioned after all of its blocks
+ * are sufficiently replicated. Merely under-replicated blocks do not
+ * block decommissioning as long as they are above a replication
+ * threshold.
+ *
+ * In the second case, the datanode transitions to a
+ * decommission-in-progress state and is tracked by the monitor thread. The
+ * monitor periodically scans through the list of insufficiently replicated
+ * blocks on these datanodes to
+ * determine if they can be decommissioned. The monitor also prunes this list
+ * as blocks become replicated, so monitor scans will become more efficient
+ * over time.
+ *
+ * Decommission-in-progress nodes that become dead do not progress to
+ * decommissioned until they become live again. This prevents potential
+ * durability loss for singly-replicated blocks (see HDFS-6791).
+ *
+ * This class depends on the FSNamesystem lock for synchronization.
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
-class DecommissionManager {
- static final Log LOG = LogFactory.getLog(DecommissionManager.class);
+public class DecommissionManager {
+ private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
+ .class);
private final Namesystem namesystem;
- private final BlockManager blockmanager;
+ private final BlockManager blockManager;
+ private final HeartbeatManager hbManager;
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Map containing the decommission-in-progress datanodes that are being
+ * tracked so they can be be marked as decommissioned.
+ *
+ * This holds a set of references to the under-replicated blocks on the DN at
+ * the time the DN is added to the map, i.e. the blocks that are preventing
+ * the node from being marked as decommissioned. During a monitor tick, this
+ * list is pruned as blocks becomes replicated.
+ *
+ * Note also that the reference to the list of under-replicated blocks
+ * will be null on initial add
+ *
+ * However, this map can become out-of-date since it is not updated by block
+ * reports or other events. Before being finally marking as decommissioned,
+ * another check is done with the actual block map.
+ */
+ private final TreeMap>
+ decomNodeBlocks;
+
+ /**
+ * Tracking a node in decomNodeBlocks consumes additional memory. To limit
+ * the impact on NN memory consumption, we limit the number of nodes in
+ * decomNodeBlocks. Additional nodes wait in pendingNodes.
+ */
+ private final Queue pendingNodes;
+
+ private Monitor monitor = null;
DecommissionManager(final Namesystem namesystem,
- final BlockManager blockmanager) {
+ final BlockManager blockManager, final HeartbeatManager hbManager) {
this.namesystem = namesystem;
- this.blockmanager = blockmanager;
+ this.blockManager = blockManager;
+ this.hbManager = hbManager;
+
+ executor = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
+ .setDaemon(true).build());
+ decomNodeBlocks = new TreeMap<>();
+ pendingNodes = new LinkedList<>();
}
- /** Periodically check decommission status. */
- class Monitor implements Runnable {
- /** recheckInterval is how often namenode checks
- * if a node has finished decommission
- */
- private final long recheckInterval;
- /** The number of decommission nodes to check for each interval */
- private final int numNodesPerCheck;
- /** firstkey can be initialized to anything. */
- private String firstkey = "";
+ /**
+ * Start the decommission monitor thread.
+ * @param conf
+ */
+ void activate(Configuration conf) {
+ final int intervalSecs =
+ conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT);
+ checkArgument(intervalSecs >= 0, "Cannot set a negative " +
+ "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
- Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
- this.recheckInterval = recheckIntervalInSecond * 1000L;
+ // By default, the new configuration key overrides the deprecated one.
+ // No # node limit is set.
+ int blocksPerInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
+ int nodesPerInterval = Integer.MAX_VALUE;
+
+ // If the expected key isn't present and the deprecated one is,
+ // use the deprecated one into the new one. This overrides the
+ // default.
+ //
+ // Also print a deprecation warning.
+ final String deprecatedKey =
+ "dfs.namenode.decommission.nodes.per.interval";
+ final String strNodes = conf.get(deprecatedKey);
+ if (strNodes != null) {
+ nodesPerInterval = Integer.parseInt(strNodes);
+ blocksPerInterval = Integer.MAX_VALUE;
+ LOG.warn("Using deprecated configuration key {} value of {}.",
+ deprecatedKey, nodesPerInterval);
+ LOG.warn("Please update your configuration to use {} instead.",
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+ }
+ checkArgument(blocksPerInterval > 0,
+ "Must set a positive value for "
+ + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+
+ final int maxConcurrentTrackedNodes = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
+ checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
+ "value for "
+ + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
+
+ monitor = new Monitor(blocksPerInterval,
+ nodesPerInterval, maxConcurrentTrackedNodes);
+ executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
+ TimeUnit.SECONDS);
+
+ LOG.debug("Activating DecommissionManager with interval {} seconds, " +
+ "{} max blocks per interval, {} max nodes per interval, " +
+ "{} max concurrently tracked nodes.", intervalSecs,
+ blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
+ }
+
+ /**
+ * Stop the decommission monitor thread, waiting briefly for it to terminate.
+ */
+ void close() {
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {}
+ }
+
+ /**
+ * Start decommissioning the specified datanode.
+ * @param node
+ */
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
+ if (!node.isDecommissionInProgress()) {
+ if (!node.isAlive) {
+ LOG.info("Dead node {} is decommissioned immediately.", node);
+ node.setDecommissioned();
+ } else if (!node.isDecommissioned()) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ LOG.info("Starting decommission of {} {} with {} blocks",
+ node, storage, storage.numBlocks());
+ }
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.startDecommission(node);
+ node.decommissioningStatus.setStartTime(now());
+ pendingNodes.add(node);
+ }
+ } else {
+ LOG.trace("startDecommission: Node {} is already decommission in "
+ + "progress, nothing to do.", node);
+ }
+ }
+
+ /**
+ * Stop decommissioning the specified datanode.
+ * @param node
+ */
+ void stopDecommission(DatanodeDescriptor node) {
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ LOG.info("Stopping decommissioning of node {}", node);
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.stopDecommission(node);
+ // Over-replicated blocks will be detected and processed when
+ // the dead node comes back and send in its full block report.
+ if (node.isAlive) {
+ blockManager.processOverReplicatedBlocksOnReCommission(node);
+ }
+ // Remove from tracking in DecommissionManager
+ pendingNodes.remove(node);
+ decomNodeBlocks.remove(node);
+ } else {
+ LOG.trace("stopDecommission: Node {} is not decommission in progress " +
+ "or decommissioned, nothing to do.", node);
+ }
+ }
+
+ private void setDecommissioned(DatanodeDescriptor dn) {
+ dn.setDecommissioned();
+ LOG.info("Decommissioning complete for node {}", dn);
+ }
+
+ /**
+ * Checks whether a block is sufficiently replicated for decommissioning.
+ * Full-strength replication is not always necessary, hence "sufficient".
+ * @return true if sufficient, else false.
+ */
+ private boolean isSufficientlyReplicated(BlockInfoContiguous block,
+ BlockCollection bc,
+ NumberReplicas numberReplicas) {
+ final int numExpected = bc.getBlockReplication();
+ final int numLive = numberReplicas.liveReplicas();
+ if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
+ // Block doesn't need replication. Skip.
+ LOG.trace("Block {} does not need replication.", block);
+ return true;
+ }
+
+ // Block is under-replicated
+ LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
+ numLive);
+ if (numExpected > numLive) {
+ if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
+ // Can decom a UC block as long as there will still be minReplicas
+ if (numLive >= blockManager.minReplication) {
+ LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+ + ">= minR ({})", block, numLive, blockManager.minReplication);
+ return true;
+ } else {
+ LOG.trace("UC block {} insufficiently-replicated since numLive "
+ + "({}) < minR ({})", block, numLive,
+ blockManager.minReplication);
+ }
+ } else {
+ // Can decom a non-UC as long as the default replication is met
+ if (numLive >= blockManager.defaultReplication) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static void logBlockReplicationInfo(Block block, BlockCollection bc,
+ DatanodeDescriptor srcNode, NumberReplicas num,
+ Iterable storages) {
+ int curReplicas = num.liveReplicas();
+ int curExpectedReplicas = bc.getBlockReplication();
+ StringBuilder nodeList = new StringBuilder();
+ for (DatanodeStorageInfo storage : storages) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+ nodeList.append(node);
+ nodeList.append(" ");
+ }
+ LOG.info("Block: " + block + ", Expected Replicas: "
+ + curExpectedReplicas + ", live replicas: " + curReplicas
+ + ", corrupt replicas: " + num.corruptReplicas()
+ + ", decommissioned replicas: " + num.decommissionedReplicas()
+ + ", excess replicas: " + num.excessReplicas()
+ + ", Is Open File: " + bc.isUnderConstruction()
+ + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ + srcNode + ", Is current datanode decommissioning: "
+ + srcNode.isDecommissionInProgress());
+ }
+
+ @VisibleForTesting
+ public int getNumPendingNodes() {
+ return pendingNodes.size();
+ }
+
+ @VisibleForTesting
+ public int getNumTrackedNodes() {
+ return decomNodeBlocks.size();
+ }
+
+ @VisibleForTesting
+ public int getNumNodesChecked() {
+ return monitor.numNodesChecked;
+ }
+
+ /**
+ * Checks to see if DNs have finished decommissioning.
+ *
+ * Since this is done while holding the namesystem lock,
+ * the amount of work per monitor tick is limited.
+ */
+ private class Monitor implements Runnable {
+ /**
+ * The maximum number of blocks to check per tick.
+ */
+ private final int numBlocksPerCheck;
+ /**
+ * The maximum number of nodes to check per tick.
+ */
+ private final int numNodesPerCheck;
+ /**
+ * The maximum number of nodes to track in decomNodeBlocks. A value of 0
+ * means no limit.
+ */
+ private final int maxConcurrentTrackedNodes;
+ /**
+ * The number of blocks that have been checked on this tick.
+ */
+ private int numBlocksChecked = 0;
+ /**
+ * The number of nodes that have been checked on this tick. Used for
+ * testing.
+ */
+ private int numNodesChecked = 0;
+ /**
+ * The last datanode in decomNodeBlocks that we've processed
+ */
+ private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
+ DatanodeID("", "", "", 0, 0, 0, 0));
+
+ Monitor(int numBlocksPerCheck, int numNodesPerCheck, int
+ maxConcurrentTrackedNodes) {
+ this.numBlocksPerCheck = numBlocksPerCheck;
this.numNodesPerCheck = numNodesPerCheck;
+ this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
+ }
+
+ private boolean exceededNumBlocksPerCheck() {
+ LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
+ return numBlocksChecked >= numBlocksPerCheck;
+ }
+
+ @Deprecated
+ private boolean exceededNumNodesPerCheck() {
+ LOG.trace("Processed {} nodes so far this tick", numNodesChecked);
+ return numNodesChecked >= numNodesPerCheck;
+ }
+
+ @Override
+ public void run() {
+ if (!namesystem.isRunning()) {
+ LOG.info("Namesystem is not running, skipping decommissioning checks"
+ + ".");
+ return;
+ }
+ // Reset the checked count at beginning of each iteration
+ numBlocksChecked = 0;
+ numNodesChecked = 0;
+ // Check decom progress
+ namesystem.writeLock();
+ try {
+ processPendingNodes();
+ check();
+ } finally {
+ namesystem.writeUnlock();
+ }
+ if (numBlocksChecked + numNodesChecked > 0) {
+ LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
+ numNodesChecked);
+ }
}
/**
- * Check decommission status of numNodesPerCheck nodes
- * for every recheckInterval milliseconds.
+ * Pop datanodes off the pending list and into decomNodeBlocks,
+ * subject to the maxConcurrentTrackedNodes limit.
*/
- @Override
- public void run() {
- for(; namesystem.isRunning(); ) {
- namesystem.writeLock();
- try {
- check();
- } finally {
- namesystem.writeUnlock();
- }
-
- try {
- Thread.sleep(recheckInterval);
- } catch (InterruptedException ie) {
- LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
- }
+ private void processPendingNodes() {
+ while (!pendingNodes.isEmpty() &&
+ (maxConcurrentTrackedNodes == 0 ||
+ decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+ decomNodeBlocks.put(pendingNodes.poll(), null);
}
}
-
- private void check() {
- final DatanodeManager dm = blockmanager.getDatanodeManager();
- int count = 0;
- for(Map.Entry entry
- : dm.getDatanodeCyclicIteration(firstkey)) {
- final DatanodeDescriptor d = entry.getValue();
- firstkey = entry.getKey();
- if (d.isDecommissionInProgress()) {
- try {
- dm.checkDecommissionState(d);
- } catch(Exception e) {
- LOG.warn("entry=" + entry, e);
+ private void check() {
+ final Iterator>>
+ it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
+ final LinkedList toRemove = new LinkedList<>();
+
+ while (it.hasNext()
+ && !exceededNumBlocksPerCheck()
+ && !exceededNumNodesPerCheck()) {
+ numNodesChecked++;
+ final Map.Entry>
+ entry = it.next();
+ final DatanodeDescriptor dn = entry.getKey();
+ AbstractList blocks = entry.getValue();
+ boolean fullScan = false;
+ if (blocks == null) {
+ // This is a newly added datanode, run through its list to schedule
+ // under-replicated blocks for replication and collect the blocks
+ // that are insufficiently replicated for further tracking
+ LOG.debug("Newly-added node {}, doing full scan to find " +
+ "insufficiently-replicated blocks.", dn);
+ blocks = handleInsufficientlyReplicated(dn);
+ decomNodeBlocks.put(dn, blocks);
+ fullScan = true;
+ } else {
+ // This is a known datanode, check if its # of insufficiently
+ // replicated blocks has dropped to zero and if it can be decommed
+ LOG.debug("Processing decommission-in-progress node {}", dn);
+ pruneSufficientlyReplicated(dn, blocks);
+ }
+ if (blocks.size() == 0) {
+ if (!fullScan) {
+ // If we didn't just do a full scan, need to re-check with the
+ // full block map.
+ //
+ // We've replicated all the known insufficiently replicated
+ // blocks. Re-check with the full block map before finally
+ // marking the datanode as decommissioned
+ LOG.debug("Node {} has finished replicating current set of "
+ + "blocks, checking with the full block map.", dn);
+ blocks = handleInsufficientlyReplicated(dn);
+ decomNodeBlocks.put(dn, blocks);
}
- if (++count == numNodesPerCheck) {
- return;
+ // If the full scan is clean AND the node liveness is okay,
+ // we can finally mark as decommissioned.
+ final boolean isHealthy =
+ blockManager.isNodeHealthyForDecommission(dn);
+ if (blocks.size() == 0 && isHealthy) {
+ setDecommissioned(dn);
+ toRemove.add(dn);
+ LOG.debug("Node {} is sufficiently replicated and healthy, "
+ + "marked as decommissioned.", dn);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ StringBuilder b = new StringBuilder("Node {} ");
+ if (isHealthy) {
+ b.append("is ");
+ } else {
+ b.append("isn't ");
+ }
+ b.append("healthy and still needs to replicate {} more blocks," +
+ " decommissioning is still in progress.");
+ LOG.debug(b.toString(), dn, blocks.size());
+ }
+ }
+ } else {
+ LOG.debug("Node {} still has {} blocks to replicate "
+ + "before it is a candidate to finish decommissioning.",
+ dn, blocks.size());
+ }
+ iterkey = dn;
+ }
+ // Remove the datanodes that are decommissioned
+ for (DatanodeDescriptor dn : toRemove) {
+ Preconditions.checkState(dn.isDecommissioned(),
+ "Removing a node that is not yet decommissioned!");
+ decomNodeBlocks.remove(dn);
+ }
+ }
+
+ /**
+ * Removes sufficiently replicated blocks from the block list of a
+ * datanode.
+ */
+ private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
+ AbstractList blocks) {
+ processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+ }
+
+ /**
+ * Returns a list of blocks on a datanode that are insufficiently
+ * replicated, i.e. are under-replicated enough to prevent decommission.
+ *
+ * As part of this, it also schedules replication work for
+ * any under-replicated blocks.
+ *
+ * @param datanode
+ * @return List of insufficiently replicated blocks
+ */
+ private AbstractList handleInsufficientlyReplicated(
+ final DatanodeDescriptor datanode) {
+ AbstractList insufficient = new ChunkedArrayList<>();
+ processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+ insufficient, false);
+ return insufficient;
+ }
+
+ /**
+ * Used while checking if decommission-in-progress datanodes can be marked
+ * as decommissioned. Combines shared logic of
+ * pruneSufficientlyReplicated and handleInsufficientlyReplicated.
+ *
+ * @param datanode Datanode
+ * @param it Iterator over the blocks on the
+ * datanode
+ * @param insufficientlyReplicated Return parameter. If it's not null,
+ * will contain the insufficiently
+ * replicated-blocks from the list.
+ * @param pruneSufficientlyReplicated whether to remove sufficiently
+ * replicated blocks from the iterator
+ * @return true if there are under-replicated blocks in the provided block
+ * iterator, else false.
+ */
+ private void processBlocksForDecomInternal(
+ final DatanodeDescriptor datanode,
+ final Iterator it,
+ final List insufficientlyReplicated,
+ boolean pruneSufficientlyReplicated) {
+ boolean firstReplicationLog = true;
+ int underReplicatedBlocks = 0;
+ int decommissionOnlyReplicas = 0;
+ int underReplicatedInOpenFiles = 0;
+ while (it.hasNext()) {
+ numBlocksChecked++;
+ final BlockInfoContiguous block = it.next();
+ // Remove the block from the list if it's no longer in the block map,
+ // e.g. the containing file has been deleted
+ if (blockManager.blocksMap.getStoredBlock(block) == null) {
+ LOG.trace("Removing unknown block {}", block);
+ it.remove();
+ continue;
+ }
+ BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+ if (bc == null) {
+ // Orphan block, will be invalidated eventually. Skip.
+ continue;
+ }
+
+ final NumberReplicas num = blockManager.countNodes(block);
+ final int liveReplicas = num.liveReplicas();
+ final int curReplicas = liveReplicas;
+
+ // Schedule under-replicated blocks for replication if not already
+ // pending
+ if (blockManager.isNeededReplication(block, bc.getBlockReplication(),
+ liveReplicas)) {
+ if (!blockManager.neededReplications.contains(block) &&
+ blockManager.pendingReplications.getNumReplicas(block) == 0 &&
+ namesystem.isPopulatingReplQueues()) {
+ // Process these blocks only when active NN is out of safe mode.
+ blockManager.neededReplications.add(block,
+ curReplicas,
+ num.decommissionedReplicas(),
+ bc.getBlockReplication());
}
}
+
+ // Even if the block is under-replicated,
+ // it doesn't block decommission if it's sufficiently replicated
+ if (isSufficientlyReplicated(block, bc, num)) {
+ if (pruneSufficientlyReplicated) {
+ it.remove();
+ }
+ continue;
+ }
+
+ // We've found an insufficiently replicated block.
+ if (insufficientlyReplicated != null) {
+ insufficientlyReplicated.add(block);
+ }
+ // Log if this is our first time through
+ if (firstReplicationLog) {
+ logBlockReplicationInfo(block, bc, datanode, num,
+ blockManager.blocksMap.getStorages(block));
+ firstReplicationLog = false;
+ }
+ // Update various counts
+ underReplicatedBlocks++;
+ if (bc.isUnderConstruction()) {
+ underReplicatedInOpenFiles++;
+ }
+ if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+ decommissionOnlyReplicas++;
+ }
}
+
+ datanode.decommissioningStatus.set(underReplicatedBlocks,
+ decommissionOnlyReplicas,
+ underReplicatedInOpenFiles);
}
}
+
+ @VisibleForTesting
+ void runMonitor() throws ExecutionException, InterruptedException {
+ Future f = executor.submit(monitor);
+ f.get();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index be09b7cbf5c..a8c2400410f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -736,10 +736,25 @@
- dfs.namenode.decommission.nodes.per.interval
- 5
- The number of nodes namenode checks if decommission is complete
- in each dfs.namenode.decommission.interval.
+ dfs.namenode.decommission.blocks.per.interval
+ 500000
+ The approximate number of blocks to process per
+ decommission interval, as defined in dfs.namenode.decommission.interval.
+
+
+
+
+ dfs.namenode.decommission.max.concurrent.tracked.nodes
+ 100
+
+ The maximum number of decommission-in-progress datanodes nodes that will be
+ tracked at one time by the namenode. Tracking a decommission-in-progress
+ datanode consumes additional NN memory proportional to the number of blocks
+ on the datnode. Having a conservative limit reduces the potential impact
+ of decomissioning a large number of nodes at once.
+
+ A value of 0 means no limit will be enforced.
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 0940590b6e1..1d475a5380c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -26,39 +27,56 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class tests the decommissioning of nodes.
*/
public class TestDecommission {
- public static final Log LOG = LogFactory.getLog(TestDecommission.class);
+ public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
+ .class);
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
static final int fileSize = 16384;
@@ -89,6 +107,7 @@ public class TestDecommission {
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
@@ -105,7 +124,7 @@ public class TestDecommission {
}
}
- private void writeConfigFile(Path name, ArrayList nodes)
+ private void writeConfigFile(Path name, List nodes)
throws IOException {
// delete if it already exists
if (localFileSys.exists(name)) {
@@ -149,7 +168,7 @@ public class TestDecommission {
* @param downnode - if null, there is no decommissioned node for this file.
* @return - null if no failure found, else an error message string.
*/
- private String checkFile(FileSystem fileSys, Path name, int repl,
+ private static String checkFile(FileSystem fileSys, Path name, int repl,
String downnode, int numDatanodes) throws IOException {
boolean isNodeDown = (downnode != null);
// need a raw stream
@@ -261,7 +280,7 @@ public class TestDecommission {
/* Ask a specific NN to stop decommission of the datanode and wait for each
* to reach the NORMAL state.
*/
- private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
+ private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
LOG.info("Recommissioning node: " + decommissionedNode);
writeConfigFile(excludeFile, null);
refreshNodes(cluster.getNamesystem(nnIndex), conf);
@@ -279,7 +298,7 @@ public class TestDecommission {
LOG.info("Waiting for node " + node + " to change state to "
+ state + " current state: " + node.getAdminState());
try {
- Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ Thread.sleep(HEARTBEAT_INTERVAL * 500);
} catch (InterruptedException e) {
// nothing
}
@@ -321,28 +340,27 @@ public class TestDecommission {
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
- DatanodeInfo node, boolean decommissioning)
+ DatanodeInfo info, DataNode node, boolean decommissioning)
throws InterruptedException, IOException {
- // Do the stats check over 10 iterations
+ // Do the stats check over 10 heartbeats
for (int i = 0; i < 10; i++) {
long[] newStats = namenode.getRpcServer().getStats();
// For decommissioning nodes, ensure capacity of the DN is no longer
// counted. Only used space of the DN is counted in cluster capacity
- assertEquals(newStats[0], decommissioning ? node.getDfsUsed() :
- node.getCapacity());
+ assertEquals(newStats[0],
+ decommissioning ? info.getDfsUsed() : info.getCapacity());
// Ensure cluster used capacity is counted for both normal and
// decommissioning nodes
- assertEquals(newStats[1], node.getDfsUsed());
+ assertEquals(newStats[1], info.getDfsUsed());
// For decommissioning nodes, remaining space from the DN is not counted
- assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
+ assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
// Ensure transceiver count is same as that DN
- assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
-
- Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
+ assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
+ DataNodeTestUtils.triggerHeartbeat(node);
}
}
@@ -406,14 +424,6 @@ public class TestDecommission {
cluster.shutdown();
}
- /**
- * Tests recommission for non federated cluster
- */
- @Test(timeout=360000)
- public void testRecommission() throws IOException {
- testRecommission(1, 6);
- }
-
/**
* Test decommission for federeated cluster
*/
@@ -500,12 +510,12 @@ public class TestDecommission {
// 1. the last DN would have been chosen as excess replica, given its
// heartbeat is considered old.
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
- // 2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
+ // 2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
// and one excess replica ( 3 )
// After the fix,
- // After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
+ // After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
Thread.sleep(slowHeartbeatDNwaitTime);
- recomissionNode(1, decomNodeFromSBN);
+ recommissionNode(1, decomNodeFromSBN);
// Step 3.b, ask ANN to recommission the first DN.
// To verify the fix, the test makes sure the excess replica picked by ANN
@@ -524,7 +534,7 @@ public class TestDecommission {
cluster.restartDataNode(nextToLastDNprop);
cluster.waitActive();
Thread.sleep(slowHeartbeatDNwaitTime);
- recomissionNode(0, decommissionedNodeFromANN);
+ recommissionNode(0, decommissionedNodeFromANN);
// Step 3.c, make sure the DN has deleted the block and report to NNs
cluster.triggerHeartbeats();
@@ -606,69 +616,88 @@ public class TestDecommission {
cluster.shutdown();
}
+ /**
+ * Test that over-replicated blocks are deleted on recommission.
+ */
+ @Test(timeout=120000)
+ public void testRecommission() throws Exception {
+ final int numDatanodes = 6;
+ try {
+ LOG.info("Starting test testRecommission");
- private void testRecommission(int numNamenodes, int numDatanodes)
- throws IOException {
- LOG.info("Starting test testRecommission");
+ startCluster(1, numDatanodes, conf);
- startCluster(numNamenodes, numDatanodes, conf);
-
- ArrayList> namenodeDecomList =
- new ArrayList>(numNamenodes);
- for(int i = 0; i < numNamenodes; i++) {
- namenodeDecomList.add(i, new ArrayList(numDatanodes));
- }
- Path file1 = new Path("testDecommission.dat");
- int replicas = numDatanodes - 1;
-
- for (int i = 0; i < numNamenodes; i++) {
- ArrayList decommissionedNodes = namenodeDecomList.get(i);
- FileSystem fileSys = cluster.getFileSystem(i);
+ final Path file1 = new Path("testDecommission.dat");
+ final int replicas = numDatanodes - 1;
+
+ ArrayList decommissionedNodes = Lists.newArrayList();
+ final FileSystem fileSys = cluster.getFileSystem();
+
+ // Write a file to n-1 datanodes
writeFile(fileSys, file1, replicas);
-
- // Decommission one node. Verify that node is decommissioned.
- DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
- AdminStates.DECOMMISSIONED);
- decommissionedNodes.add(decomNode);
-
- // Ensure decommissioned datanode is not automatically shutdown
- DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
- assertEquals("All datanodes must be alive", numDatanodes,
- client.datanodeReport(DatanodeReportType.LIVE).length);
- int tries =0;
- // wait for the block to be replicated
- while (tries++ < 20) {
- try {
- Thread.sleep(1000);
- if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
- numDatanodes) == null) {
- break;
- }
- } catch (InterruptedException ie) {
- }
- }
- assertTrue("Checked if block was replicated after decommission, tried "
- + tries + " times.", tries < 20);
- // stop decommission and check if the new replicas are removed
- recomissionNode(0, decomNode);
- // wait for the block to be deleted
- tries = 0;
- while (tries++ < 20) {
- try {
- Thread.sleep(1000);
- if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
- break;
- }
- } catch (InterruptedException ie) {
+ // Decommission one of the datanodes with a replica
+ BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
+ assertEquals("Unexpected number of replicas from getFileBlockLocations",
+ replicas, loc.getHosts().length);
+ final String toDecomHost = loc.getNames()[0];
+ String toDecomUuid = null;
+ for (DataNode d : cluster.getDataNodes()) {
+ if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
+ toDecomUuid = d.getDatanodeId().getDatanodeUuid();
+ break;
}
}
+ assertNotNull("Could not find a dn with the block!", toDecomUuid);
+ final DatanodeInfo decomNode =
+ decommissionNode(0, toDecomUuid, decommissionedNodes,
+ AdminStates.DECOMMISSIONED);
+ decommissionedNodes.add(decomNode);
+ final BlockManager blockManager =
+ cluster.getNamesystem().getBlockManager();
+ final DatanodeManager datanodeManager =
+ blockManager.getDatanodeManager();
+ BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(), conf);
+ assertEquals("All datanodes must be alive", numDatanodes,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+
+ // wait for the block to be replicated
+ final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
+ final String uuid = toDecomUuid;
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ BlockInfoContiguous info =
+ blockManager.getStoredBlock(b.getLocalBlock());
+ int count = 0;
+ StringBuilder sb = new StringBuilder("Replica locations: ");
+ for (int i = 0; i < info.numNodes(); i++) {
+ DatanodeDescriptor dn = info.getDatanode(i);
+ sb.append(dn + ", ");
+ if (!dn.getDatanodeUuid().equals(uuid)) {
+ count++;
+ }
+ }
+ LOG.info(sb.toString());
+ LOG.info("Count: " + count);
+ return count == replicas;
+ }
+ }, 500, 30000);
+
+ // redecommission and wait for over-replication to be fixed
+ recommissionNode(0, decomNode);
+ BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+ DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0);
+
cleanupFile(fileSys, file1);
- assertTrue("Checked if node was recommissioned " + tries + " times.",
- tries < 20);
- LOG.info("tried: " + tries + " times before recommissioned");
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
- cluster.shutdown();
}
/**
@@ -702,20 +731,35 @@ public class TestDecommission {
FSNamesystem fsn = cluster.getNamesystem(i);
NameNode namenode = cluster.getNameNode(i);
- DatanodeInfo downnode = decommissionNode(i, null, null,
+
+ DatanodeInfo decomInfo = decommissionNode(i, null, null,
AdminStates.DECOMMISSION_INPROGRESS);
+ DataNode decomNode = getDataNode(decomInfo);
// Check namenode stats for multiple datanode heartbeats
- verifyStats(namenode, fsn, downnode, true);
+ verifyStats(namenode, fsn, decomInfo, decomNode, true);
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
refreshNodes(fsn, conf);
- DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
- waitNodeState(ret, AdminStates.NORMAL);
- verifyStats(namenode, fsn, ret, false);
+ DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
+ DataNode retNode = getDataNode(decomInfo);
+ waitNodeState(retInfo, AdminStates.NORMAL);
+ verifyStats(namenode, fsn, retInfo, retNode, false);
}
}
-
+
+ private DataNode getDataNode(DatanodeInfo decomInfo) {
+ DataNode decomNode = null;
+ for (DataNode dn: cluster.getDataNodes()) {
+ if (decomInfo.equals(dn.getDatanodeId())) {
+ decomNode = dn;
+ break;
+ }
+ }
+ assertNotNull("Could not find decomNode in cluster!", decomNode);
+ return decomNode;
+ }
+
/**
* Test host/include file functionality. Only datanodes
* in the include file are allowed to connect to the namenode in a non
@@ -901,9 +945,9 @@ public class TestDecommission {
* It is not recommended to use a registration name which is not also a
* valid DNS hostname for the DataNode. See HDFS-5237 for background.
*/
+ @Ignore
@Test(timeout=360000)
- public void testIncludeByRegistrationName() throws IOException,
- InterruptedException {
+ public void testIncludeByRegistrationName() throws Exception {
Configuration hdfsConf = new Configuration(conf);
// Any IPv4 address starting with 127 functions as a "loopback" address
// which is connected to the current host. So by choosing 127.0.0.100
@@ -926,15 +970,22 @@ public class TestDecommission {
refreshNodes(cluster.getNamesystem(0), hdfsConf);
// Wait for the DN to be marked dead.
- DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
- while (true) {
- DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
- if (info.length == 1) {
- break;
+ LOG.info("Waiting for DN to be marked as dead.");
+ final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ BlockManagerTestUtil
+ .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+ try {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+ return info.length == 1;
+ } catch (IOException e) {
+ LOG.warn("Failed to check dead DNs", e);
+ return false;
+ }
}
- LOG.info("Waiting for datanode to be marked dead");
- Thread.sleep(HEARTBEAT_INTERVAL * 1000);
- }
+ }, 500, 5000);
// Use a non-empty include file with our registration name.
// It should work.
@@ -944,18 +995,169 @@ public class TestDecommission {
writeConfigFile(hostsFile, nodes);
refreshNodes(cluster.getNamesystem(0), hdfsConf);
cluster.restartDataNode(0);
+ cluster.triggerHeartbeats();
// Wait for the DN to come back.
- while (true) {
- DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
- if (info.length == 1) {
- Assert.assertFalse(info[0].isDecommissioned());
- Assert.assertFalse(info[0].isDecommissionInProgress());
- assertEquals(registrationName, info[0].getHostName());
- break;
+ LOG.info("Waiting for DN to come back.");
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ BlockManagerTestUtil
+ .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+ try {
+ DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+ if (info.length == 1) {
+ Assert.assertFalse(info[0].isDecommissioned());
+ Assert.assertFalse(info[0].isDecommissionInProgress());
+ assertEquals(registrationName, info[0].getHostName());
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to check dead DNs", e);
+ }
+ return false;
}
- LOG.info("Waiting for datanode to come back");
- Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+ }, 500, 5000);
+ }
+
+ @Test(timeout=120000)
+ public void testBlocksPerInterval() throws Exception {
+ Configuration newConf = new Configuration(conf);
+ org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+ .setLevel(Level.TRACE);
+ // Turn the blocks per interval way down
+ newConf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+ 3);
+ // Disable the normal monitor runs
+ newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+ Integer.MAX_VALUE);
+ startCluster(1, 3, newConf);
+ final FileSystem fs = cluster.getFileSystem();
+ final DatanodeManager datanodeManager =
+ cluster.getNamesystem().getBlockManager().getDatanodeManager();
+ final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+ // Write a 3 block file, so each node has one block. Should scan 3 nodes.
+ DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+ doDecomCheck(datanodeManager, decomManager, 3);
+ // Write another file, should only scan two
+ DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
+ doDecomCheck(datanodeManager, decomManager, 2);
+ // One more file, should only scan 1
+ DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
+ doDecomCheck(datanodeManager, decomManager, 1);
+ // blocks on each DN now exceeds limit, still scan at least one node
+ DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
+ doDecomCheck(datanodeManager, decomManager, 1);
+ }
+
+ @Deprecated
+ @Test(timeout=120000)
+ public void testNodesPerInterval() throws Exception {
+ Configuration newConf = new Configuration(conf);
+ org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+ .setLevel(Level.TRACE);
+ // Set the deprecated configuration key which limits the # of nodes per
+ // interval
+ newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1);
+ // Disable the normal monitor runs
+ newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+ Integer.MAX_VALUE);
+ startCluster(1, 3, newConf);
+ final FileSystem fs = cluster.getFileSystem();
+ final DatanodeManager datanodeManager =
+ cluster.getNamesystem().getBlockManager().getDatanodeManager();
+ final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+ // Write a 3 block file, so each node has one block. Should scan 1 node
+ // each time.
+ DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+ for (int i=0; i<3; i++) {
+ doDecomCheck(datanodeManager, decomManager, 1);
}
}
+
+ private void doDecomCheck(DatanodeManager datanodeManager,
+ DecommissionManager decomManager, int expectedNumCheckedNodes)
+ throws IOException, ExecutionException, InterruptedException {
+ // Decom all nodes
+ ArrayList decommissionedNodes = Lists.newArrayList();
+ for (DataNode d: cluster.getDataNodes()) {
+ DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+ decommissionedNodes,
+ AdminStates.DECOMMISSION_INPROGRESS);
+ decommissionedNodes.add(dn);
+ }
+ // Run decom scan and check
+ BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+ assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes,
+ decomManager.getNumNodesChecked());
+ // Recommission all nodes
+ for (DatanodeInfo dn : decommissionedNodes) {
+ recommissionNode(0, dn);
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testPendingNodes() throws Exception {
+ Configuration newConf = new Configuration(conf);
+ org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+ .setLevel(Level.TRACE);
+ // Only allow one node to be decom'd at a time
+ newConf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+ 1);
+ // Disable the normal monitor runs
+ newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+ Integer.MAX_VALUE);
+ startCluster(1, 3, newConf);
+ final FileSystem fs = cluster.getFileSystem();
+ final DatanodeManager datanodeManager =
+ cluster.getNamesystem().getBlockManager().getDatanodeManager();
+ final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+ // Keep a file open to prevent decom from progressing
+ HdfsDataOutputStream open1 =
+ (HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
+ // Flush and trigger block reports so the block definitely shows up on NN
+ open1.write(123);
+ open1.hflush();
+ for (DataNode d: cluster.getDataNodes()) {
+ DataNodeTestUtils.triggerBlockReport(d);
+ }
+ // Decom two nodes, so one is still alive
+ ArrayList decommissionedNodes = Lists.newArrayList();
+ for (int i=0; i<2; i++) {
+ final DataNode d = cluster.getDataNodes().get(i);
+ DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+ decommissionedNodes,
+ AdminStates.DECOMMISSION_INPROGRESS);
+ decommissionedNodes.add(dn);
+ }
+
+ for (int i=2; i>=0; i--) {
+ assertTrackedAndPending(decomManager, 0, i);
+ BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+ }
+
+ // Close file, try to decom the last node, should get stuck in tracked
+ open1.close();
+ final DataNode d = cluster.getDataNodes().get(2);
+ DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+ decommissionedNodes,
+ AdminStates.DECOMMISSION_INPROGRESS);
+ decommissionedNodes.add(dn);
+ BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+
+ assertTrackedAndPending(decomManager, 1, 0);
+ }
+
+ private void assertTrackedAndPending(DecommissionManager decomManager,
+ int tracked, int pending) {
+ assertEquals("Unexpected number of tracked nodes", tracked,
+ decomManager.getNumTrackedNodes());
+ assertEquals("Unexpected number of pending nodes", pending,
+ decomManager.getNumPendingNodes());
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index fccd308e70a..f61176e16eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -300,9 +301,8 @@ public class BlockManagerTestUtil {
* Have DatanodeManager check decommission state.
* @param dm the DatanodeManager to manipulate
*/
- public static void checkDecommissionState(DatanodeManager dm,
- DatanodeDescriptor node) {
- dm.checkDecommissionState(node);
+ public static void recheckDecommissionState(DatanodeManager dm)
+ throws ExecutionException, InterruptedException {
+ dm.getDecomManager().runMonitor();
}
-
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index 1f1241f0a37..9f06694e67d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -137,7 +137,7 @@ public class TestReplicationPolicyConsiderLoad {
// returns false
for (int i = 0; i < 3; i++) {
DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
- dnManager.startDecommission(d);
+ dnManager.getDecomManager().startDecommission(d);
d.setDecommissioned();
}
assertEquals((double)load/3, dnManager.getFSClusterStats()
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index b7ba38c286c..1c5d3693b9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -29,7 +28,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
@@ -53,7 +51,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -90,7 +93,8 @@ public class TestDecommissioningStatus {
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
Path includeFile = new Path(dir, "include");
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
4);
@@ -104,6 +108,9 @@ public class TestDecommissioningStatus {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
+ cluster.getNamesystem().getBlockManager().getDatanodeManager()
+ .setHeartbeatExpireInterval(3000);
+ Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
}
@AfterClass
@@ -199,13 +206,16 @@ public class TestDecommissioningStatus {
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
int expectedUnderRep, int expectedDecommissionOnly,
int expectedUnderRepInOpenFiles) {
- assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(),
- expectedUnderRep);
+ assertEquals("Unexpected num under-replicated blocks",
+ expectedUnderRep,
+ decommNode.decommissioningStatus.getUnderReplicatedBlocks());
+ assertEquals("Unexpected number of decom-only replicas",
+ expectedDecommissionOnly,
+ decommNode.decommissioningStatus.getDecommissionOnlyReplicas());
assertEquals(
- decommNode.decommissioningStatus.getDecommissionOnlyReplicas(),
- expectedDecommissionOnly);
- assertEquals(decommNode.decommissioningStatus
- .getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles);
+ "Unexpected number of replicas in under-replicated open files",
+ expectedUnderRepInOpenFiles,
+ decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles());
}
private void checkDFSAdminDecommissionStatus(
@@ -257,7 +267,7 @@ public class TestDecommissioningStatus {
* Tests Decommissioning Status in DFS.
*/
@Test
- public void testDecommissionStatus() throws IOException, InterruptedException {
+ public void testDecommissionStatus() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", cluster
.getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
@@ -266,7 +276,7 @@ public class TestDecommissioningStatus {
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSAdmin admin = new DFSAdmin(cluster.getConfiguration(0));
- short replicas = 2;
+ short replicas = numDatanodes;
//
// Decommission one node. Verify the decommission status
//
@@ -275,7 +285,9 @@ public class TestDecommissioningStatus {
Path file2 = new Path("decommission1.dat");
FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
- Thread.sleep(5000);
+ for (DataNode d: cluster.getDataNodes()) {
+ DataNodeTestUtils.triggerBlockReport(d);
+ }
FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
@@ -283,19 +295,22 @@ public class TestDecommissioningStatus {
String downnode = decommissionNode(fsn, client, localFileSys, iteration);
dm.refreshNodes(conf);
decommissionedNodes.add(downnode);
- Thread.sleep(5000);
+ BlockManagerTestUtil.recheckDecommissionState(dm);
final List decommissioningNodes = dm.getDecommissioningNodes();
if (iteration == 0) {
assertEquals(decommissioningNodes.size(), 1);
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
- checkDecommissionStatus(decommNode, 4, 0, 2);
+ checkDecommissionStatus(decommNode, 3, 0, 1);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
fileSys, admin);
} else {
assertEquals(decommissioningNodes.size(), 2);
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
- checkDecommissionStatus(decommNode1, 4, 4, 2);
+ // This one is still 3,3,1 since it passed over the UC block
+ // earlier, before node 2 was decommed
+ checkDecommissionStatus(decommNode1, 3, 3, 1);
+ // This one is 4,4,2 since it has the full state
checkDecommissionStatus(decommNode2, 4, 4, 2);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2),
fileSys, admin);
@@ -317,8 +332,7 @@ public class TestDecommissioningStatus {
* the replication process after it rejoins the cluster.
*/
@Test(timeout=120000)
- public void testDecommissionStatusAfterDNRestart()
- throws IOException, InterruptedException {
+ public void testDecommissionStatusAfterDNRestart() throws Exception {
DistributedFileSystem fileSys =
(DistributedFileSystem)cluster.getFileSystem();
@@ -357,7 +371,7 @@ public class TestDecommissioningStatus {
BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
// Force DatanodeManager to check decommission state.
- BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+ BlockManagerTestUtil.recheckDecommissionState(dm);
// Verify that the DN remains in DECOMMISSION_INPROGRESS state.
assertTrue("the node should be DECOMMISSION_IN_PROGRESSS",
@@ -371,7 +385,7 @@ public class TestDecommissioningStatus {
// Delete the under-replicated file, which should let the
// DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
cleanupFile(fileSys, f);
- BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+ BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue("the node should be decommissioned",
dead.get(0).isDecommissioned());
@@ -392,8 +406,9 @@ public class TestDecommissioningStatus {
* DECOMMISSIONED
*/
@Test(timeout=120000)
- public void testDecommissionDeadDN()
- throws IOException, InterruptedException, TimeoutException {
+ public void testDecommissionDeadDN() throws Exception {
+ Logger log = Logger.getLogger(DecommissionManager.class);
+ log.setLevel(Level.DEBUG);
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
String dnName = dnID.getXferAddr();
DataNodeProperties stoppedDN = cluster.stopDataNode(0);
@@ -404,7 +419,7 @@ public class TestDecommissioningStatus {
DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
decommissionNode(fsn, localFileSys, dnName);
dm.refreshNodes(conf);
- BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
+ BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue(dnDescriptor.isDecommissioned());
// Add the node back
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index b8024a75730..819225afd07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -1305,7 +1305,7 @@ public class TestFsck {
.getBlockManager().getBlockCollection(eb.getLocalBlock())
.getBlocks()[0].getDatanode(0);
cluster.getNameNode().getNamesystem().getBlockManager()
- .getDatanodeManager().startDecommission(dn);
+ .getDatanodeManager().getDecomManager().startDecommission(dn);
String dnName = dn.getXferAddr();
//wait for decommission start
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 426563b355e..35a611b37d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -30,8 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
@@ -240,7 +238,7 @@ public class TestNamenodeCapacityReport {
DatanodeDescriptor dnd =
dnm.getDatanode(datanodes.get(i).getDatanodeId());
expectedInServiceLoad -= dnd.getXceiverCount();
- dnm.startDecommission(dnd);
+ dnm.getDecomManager().startDecommission(dnd);
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);