HDFS-9388. Decommission related code to support Maintenance State for datanodes.
This commit is contained in:
parent
12e44e7bda
commit
79df1e750e
|
@ -49,37 +49,47 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* Manages datanode decommissioning. A background monitor thread
|
||||
* periodically checks the status of datanodes that are in-progress of
|
||||
* decommissioning.
|
||||
* Manages decommissioning and maintenance state for DataNodes. A background
|
||||
* monitor thread periodically checks the status of DataNodes that are
|
||||
* decommissioning or entering maintenance state.
|
||||
* <p/>
|
||||
* A datanode can be decommissioned in a few situations:
|
||||
* A DataNode can be decommissioned in a few situations:
|
||||
* <ul>
|
||||
* <li>If a DN is dead, it is decommissioned immediately.</li>
|
||||
* <li>If a DN is alive, it is decommissioned after all of its blocks
|
||||
* are sufficiently replicated. Merely under-replicated blocks do not
|
||||
* block decommissioning as long as they are above a replication
|
||||
* <li>If a DN is alive, it is decommissioned after all of its blocks
|
||||
* are sufficiently replicated. Merely under-replicated blocks do not
|
||||
* block decommissioning as long as they are above a replication
|
||||
* threshold.</li>
|
||||
* </ul>
|
||||
* In the second case, the datanode transitions to a
|
||||
* decommission-in-progress state and is tracked by the monitor thread. The
|
||||
* monitor periodically scans through the list of insufficiently replicated
|
||||
* blocks on these datanodes to
|
||||
* determine if they can be decommissioned. The monitor also prunes this list
|
||||
* as blocks become replicated, so monitor scans will become more efficient
|
||||
* In the second case, the DataNode transitions to a DECOMMISSION_INPROGRESS
|
||||
* state and is tracked by the monitor thread. The monitor periodically scans
|
||||
* through the list of insufficiently replicated blocks on these DataNodes to
|
||||
* determine if they can be DECOMMISSIONED. The monitor also prunes this list
|
||||
* as blocks become replicated, so monitor scans will become more efficient
|
||||
* over time.
|
||||
* <p/>
|
||||
* Decommission-in-progress nodes that become dead do not progress to
|
||||
* decommissioned until they become live again. This prevents potential
|
||||
* DECOMMISSION_INPROGRESS nodes that become dead do not progress to
|
||||
* DECOMMISSIONED until they become live again. This prevents potential
|
||||
* durability loss for singly-replicated blocks (see HDFS-6791).
|
||||
* <p/>
|
||||
* DataNodes can also be put under maintenance state for any short duration
|
||||
* maintenance operations. Unlike decommissioning, blocks are not always
|
||||
* re-replicated for the DataNodes to enter maintenance state. When the
|
||||
* blocks are replicated at least dfs.namenode.maintenance.replication.min,
|
||||
* DataNodes transition to IN_MAINTENANCE state. Otherwise, just like
|
||||
* decommissioning, DataNodes transition to ENTERING_MAINTENANCE state and
|
||||
* wait for the blocks to be sufficiently replicated and then transition to
|
||||
* IN_MAINTENANCE state. The block replication factor is relaxed for a maximum
|
||||
* of maintenance expiry time. When DataNodes don't transition or join the
|
||||
* cluster back by expiry time, blocks are re-replicated just as in
|
||||
* decommissioning case as to avoid read or write performance degradation.
|
||||
* <p/>
|
||||
* This class depends on the FSNamesystem lock for synchronization.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DecommissionManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
|
||||
.class);
|
||||
|
||||
public class DatanodeAdminManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DatanodeAdminManager.class);
|
||||
private final Namesystem namesystem;
|
||||
private final BlockManager blockManager;
|
||||
private final HeartbeatManager hbManager;
|
||||
|
@ -97,7 +107,7 @@ public class DecommissionManager {
|
|||
* the node from being marked as decommissioned. During a monitor tick, this
|
||||
* list is pruned as blocks becomes replicated.
|
||||
* <p/>
|
||||
* Note also that the reference to the list of under-replicated blocks
|
||||
* Note also that the reference to the list of under-replicated blocks
|
||||
* will be null on initial add
|
||||
* <p/>
|
||||
* However, this map can become out-of-date since it is not updated by block
|
||||
|
@ -113,24 +123,23 @@ public class DecommissionManager {
|
|||
* outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
|
||||
*/
|
||||
private final Queue<DatanodeDescriptor> pendingNodes;
|
||||
|
||||
private Monitor monitor = null;
|
||||
|
||||
DecommissionManager(final Namesystem namesystem,
|
||||
DatanodeAdminManager(final Namesystem namesystem,
|
||||
final BlockManager blockManager, final HeartbeatManager hbManager) {
|
||||
this.namesystem = namesystem;
|
||||
this.blockManager = blockManager;
|
||||
this.hbManager = hbManager;
|
||||
|
||||
executor = Executors.newScheduledThreadPool(1,
|
||||
new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
|
||||
new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
|
||||
.setDaemon(true).build());
|
||||
outOfServiceNodeBlocks = new TreeMap<>();
|
||||
pendingNodes = new LinkedList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the decommission monitor thread.
|
||||
* Start the DataNode admin monitor thread.
|
||||
* @param conf
|
||||
*/
|
||||
void activate(Configuration conf) {
|
||||
|
@ -151,7 +160,7 @@ public class DecommissionManager {
|
|||
if (strNodes != null) {
|
||||
LOG.warn("Deprecated configuration key {} will be ignored.",
|
||||
deprecatedKey);
|
||||
LOG.warn("Please update your configuration to use {} instead.",
|
||||
LOG.warn("Please update your configuration to use {} instead.",
|
||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
|
||||
}
|
||||
|
||||
|
@ -161,7 +170,8 @@ public class DecommissionManager {
|
|||
|
||||
final int maxConcurrentTrackedNodes = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
|
||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
|
||||
DFSConfigKeys
|
||||
.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
|
||||
checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
|
||||
"value for "
|
||||
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
|
||||
|
@ -170,14 +180,14 @@ public class DecommissionManager {
|
|||
executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
|
||||
TimeUnit.SECONDS);
|
||||
|
||||
LOG.debug("Activating DecommissionManager with interval {} seconds, " +
|
||||
LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
|
||||
"{} max blocks per interval, " +
|
||||
"{} max concurrently tracked nodes.", intervalSecs,
|
||||
blocksPerInterval, maxConcurrentTrackedNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the decommission monitor thread, waiting briefly for it to terminate.
|
||||
* Stop the admin monitor thread, waiting briefly for it to terminate.
|
||||
*/
|
||||
void close() {
|
||||
executor.shutdownNow();
|
||||
|
@ -187,7 +197,7 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Start decommissioning the specified datanode.
|
||||
* Start decommissioning the specified datanode.
|
||||
* @param node
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
@ -211,7 +221,7 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Stop decommissioning the specified datanode.
|
||||
* Stop decommissioning the specified datanode.
|
||||
* @param node
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
@ -224,7 +234,7 @@ public class DecommissionManager {
|
|||
if (node.isAlive()) {
|
||||
blockManager.processExtraRedundancyBlocksOnInService(node);
|
||||
}
|
||||
// Remove from tracking in DecommissionManager
|
||||
// Remove from tracking in DatanodeAdminManager
|
||||
pendingNodes.remove(node);
|
||||
outOfServiceNodeBlocks.remove(node);
|
||||
} else {
|
||||
|
@ -303,7 +313,7 @@ public class DecommissionManager {
|
|||
blockManager.processExtraRedundancyBlocksOnInService(node);
|
||||
}
|
||||
|
||||
// Remove from tracking in DecommissionManager
|
||||
// Remove from tracking in DatanodeAdminManager
|
||||
pendingNodes.remove(node);
|
||||
outOfServiceNodeBlocks.remove(node);
|
||||
} else {
|
||||
|
@ -324,8 +334,9 @@ public class DecommissionManager {
|
|||
|
||||
/**
|
||||
* Checks whether a block is sufficiently replicated/stored for
|
||||
* decommissioning. For replicated blocks or striped blocks, full-strength
|
||||
* replication or storage is not always necessary, hence "sufficient".
|
||||
* DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE datanodes. For replicated
|
||||
* blocks or striped blocks, full-strength replication or storage is not
|
||||
* always necessary, hence "sufficient".
|
||||
* @return true if sufficient, else false.
|
||||
*/
|
||||
private boolean isSufficient(BlockInfo block, BlockCollection bc,
|
||||
|
@ -416,9 +427,10 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Checks to see if DNs have finished decommissioning.
|
||||
* Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
|
||||
* ENTERING_MAINTENANCE state.
|
||||
* <p/>
|
||||
* Since this is done while holding the namesystem lock,
|
||||
* Since this is done while holding the namesystem lock,
|
||||
* the amount of work per monitor tick is limited.
|
||||
*/
|
||||
private class Monitor implements Runnable {
|
||||
|
@ -440,15 +452,15 @@ public class DecommissionManager {
|
|||
*/
|
||||
private int numBlocksCheckedPerLock = 0;
|
||||
/**
|
||||
* The number of nodes that have been checked on this tick. Used for
|
||||
* The number of nodes that have been checked on this tick. Used for
|
||||
* statistics.
|
||||
*/
|
||||
private int numNodesChecked = 0;
|
||||
/**
|
||||
* The last datanode in outOfServiceNodeBlocks that we've processed
|
||||
* The last datanode in outOfServiceNodeBlocks that we've processed.
|
||||
*/
|
||||
private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
|
||||
DatanodeID("", "", "", 0, 0, 0, 0));
|
||||
private DatanodeDescriptor iterkey = new DatanodeDescriptor(
|
||||
new DatanodeID("", "", "", 0, 0, 0, 0));
|
||||
|
||||
Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
|
||||
this.numBlocksPerCheck = numBlocksPerCheck;
|
||||
|
@ -463,8 +475,8 @@ public class DecommissionManager {
|
|||
@Override
|
||||
public void run() {
|
||||
if (!namesystem.isRunning()) {
|
||||
LOG.info("Namesystem is not running, skipping decommissioning checks"
|
||||
+ ".");
|
||||
LOG.info("Namesystem is not running, skipping " +
|
||||
"decommissioning/maintenance checks.");
|
||||
return;
|
||||
}
|
||||
// Reset the checked count at beginning of each iteration
|
||||
|
@ -486,7 +498,7 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Pop datanodes off the pending list and into decomNodeBlocks,
|
||||
* Pop datanodes off the pending list and into decomNodeBlocks,
|
||||
* subject to the maxConcurrentTrackedNodes limit.
|
||||
*/
|
||||
private void processPendingNodes() {
|
||||
|
@ -522,8 +534,8 @@ public class DecommissionManager {
|
|||
continue;
|
||||
}
|
||||
if (blocks == null) {
|
||||
// This is a newly added datanode, run through its list to schedule
|
||||
// under-replicated blocks for replication and collect the blocks
|
||||
// This is a newly added datanode, run through its list to schedule
|
||||
// under-replicated blocks for replication and collect the blocks
|
||||
// that are insufficiently replicated for further tracking
|
||||
LOG.debug("Newly-added node {}, doing full scan to find " +
|
||||
"insufficiently-replicated blocks.", dn);
|
||||
|
@ -531,26 +543,27 @@ public class DecommissionManager {
|
|||
outOfServiceNodeBlocks.put(dn, blocks);
|
||||
fullScan = true;
|
||||
} else {
|
||||
// This is a known datanode, check if its # of insufficiently
|
||||
// replicated blocks has dropped to zero and if it can be decommed
|
||||
// This is a known datanode, check if its # of insufficiently
|
||||
// replicated blocks has dropped to zero and if it can move
|
||||
// to the next state.
|
||||
LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
|
||||
pruneReliableBlocks(dn, blocks);
|
||||
}
|
||||
if (blocks.size() == 0) {
|
||||
if (!fullScan) {
|
||||
// If we didn't just do a full scan, need to re-check with the
|
||||
// If we didn't just do a full scan, need to re-check with the
|
||||
// full block map.
|
||||
//
|
||||
// We've replicated all the known insufficiently replicated
|
||||
// blocks. Re-check with the full block map before finally
|
||||
// marking the datanode as decommissioned
|
||||
// We've replicated all the known insufficiently replicated
|
||||
// blocks. Re-check with the full block map before finally
|
||||
// marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
|
||||
LOG.debug("Node {} has finished replicating current set of "
|
||||
+ "blocks, checking with the full block map.", dn);
|
||||
blocks = handleInsufficientlyStored(dn);
|
||||
outOfServiceNodeBlocks.put(dn, blocks);
|
||||
}
|
||||
// If the full scan is clean AND the node liveness is okay,
|
||||
// we can finally mark as decommissioned.
|
||||
// If the full scan is clean AND the node liveness is okay,
|
||||
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
|
||||
final boolean isHealthy =
|
||||
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
|
||||
if (blocks.size() == 0 && isHealthy) {
|
||||
|
@ -580,7 +593,7 @@ public class DecommissionManager {
|
|||
}
|
||||
iterkey = dn;
|
||||
}
|
||||
// Remove the datanodes that are decommissioned or in service after
|
||||
// Remove the datanodes that are DECOMMISSIONED or in service after
|
||||
// maintenance expiration.
|
||||
for (DatanodeDescriptor dn : toRemove) {
|
||||
Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
|
||||
|
@ -598,9 +611,9 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns a list of blocks on a datanode that are insufficiently replicated
|
||||
* or require recovery, i.e. requiring recovery and should prevent
|
||||
* decommission.
|
||||
* Returns a list of blocks on a datanode that are insufficiently
|
||||
* replicated or require recovery, i.e. requiring recovery and
|
||||
* should prevent decommission or maintenance.
|
||||
* <p/>
|
||||
* As part of this, it also schedules replication/recovery work.
|
||||
*
|
||||
|
@ -615,9 +628,10 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Used while checking if decommission-in-progress datanodes can be marked
|
||||
* as decommissioned. Combines shared logic of
|
||||
* pruneReliableBlocks and handleInsufficientlyStored.
|
||||
* Used while checking if DECOMMISSION_INPROGRESS datanodes can be
|
||||
* marked as DECOMMISSIONED or ENTERING_MAINTENANCE datanodes can be
|
||||
* marked as IN_MAINTENANCE. Combines shared logic of pruneReliableBlocks
|
||||
* and handleInsufficientlyStored.
|
||||
*
|
||||
* @param datanode Datanode
|
||||
* @param it Iterator over the blocks on the
|
||||
|
@ -652,7 +666,7 @@ public class DecommissionManager {
|
|||
// configured per-iteration-limit.
|
||||
namesystem.writeUnlock();
|
||||
try {
|
||||
LOG.debug("Yielded lock during decommission check");
|
||||
LOG.debug("Yielded lock during decommission/maintenance check");
|
||||
Thread.sleep(0, 500);
|
||||
} catch (InterruptedException ignored) {
|
||||
return;
|
||||
|
@ -682,8 +696,8 @@ public class DecommissionManager {
|
|||
final NumberReplicas num = blockManager.countNodes(block);
|
||||
final int liveReplicas = num.liveReplicas();
|
||||
|
||||
// Schedule low redundancy blocks for reconstruction if not already
|
||||
// pending
|
||||
// Schedule low redundancy blocks for reconstruction
|
||||
// if not already pending.
|
||||
boolean isDecommission = datanode.isDecommissionInProgress();
|
||||
boolean neededReconstruction = isDecommission ?
|
||||
blockManager.isNeededReconstruction(block, num) :
|
||||
|
@ -701,7 +715,8 @@ public class DecommissionManager {
|
|||
}
|
||||
|
||||
// Even if the block is without sufficient redundancy,
|
||||
// it doesn't block decommission if has sufficient redundancy
|
||||
// it might not block decommission/maintenance if it
|
||||
// has sufficient redundancy.
|
||||
if (isSufficient(block, bc, num, isDecommission)) {
|
||||
if (pruneReliableBlocks) {
|
||||
it.remove();
|
|
@ -75,7 +75,7 @@ public class DatanodeManager {
|
|||
|
||||
private final Namesystem namesystem;
|
||||
private final BlockManager blockManager;
|
||||
private final DecommissionManager decomManager;
|
||||
private final DatanodeAdminManager datanodeAdminManager;
|
||||
private final HeartbeatManager heartbeatManager;
|
||||
private final FSClusterStats fsClusterStats;
|
||||
|
||||
|
@ -223,9 +223,10 @@ public class DatanodeManager {
|
|||
networktopology = NetworkTopology.getInstance(conf);
|
||||
}
|
||||
|
||||
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
|
||||
this.decomManager = new DecommissionManager(namesystem, blockManager,
|
||||
heartbeatManager);
|
||||
this.heartbeatManager = new HeartbeatManager(namesystem,
|
||||
blockManager, conf);
|
||||
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
|
||||
blockManager, heartbeatManager);
|
||||
this.fsClusterStats = newFSClusterStats();
|
||||
this.dataNodePeerStatsEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||
|
@ -372,12 +373,12 @@ public class DatanodeManager {
|
|||
}
|
||||
|
||||
void activate(final Configuration conf) {
|
||||
decomManager.activate(conf);
|
||||
datanodeAdminManager.activate(conf);
|
||||
heartbeatManager.activate();
|
||||
}
|
||||
|
||||
void close() {
|
||||
decomManager.close();
|
||||
datanodeAdminManager.close();
|
||||
heartbeatManager.close();
|
||||
}
|
||||
|
||||
|
@ -392,8 +393,8 @@ public class DatanodeManager {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DecommissionManager getDecomManager() {
|
||||
return decomManager;
|
||||
public DatanodeAdminManager getDatanodeAdminManager() {
|
||||
return datanodeAdminManager;
|
||||
}
|
||||
|
||||
public HostConfigManager getHostConfigManager() {
|
||||
|
@ -991,9 +992,9 @@ public class DatanodeManager {
|
|||
hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg);
|
||||
// If the registered node is in exclude list, then decommission it
|
||||
if (getHostConfigManager().isExcluded(nodeReg)) {
|
||||
decomManager.startDecommission(nodeReg);
|
||||
datanodeAdminManager.startDecommission(nodeReg);
|
||||
} else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
|
||||
decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
|
||||
datanodeAdminManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1219,12 +1220,13 @@ public class DatanodeManager {
|
|||
long maintenanceExpireTimeInMS =
|
||||
hostConfigManager.getMaintenanceExpirationTimeInMS(node);
|
||||
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
|
||||
decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
|
||||
datanodeAdminManager.startMaintenance(
|
||||
node, maintenanceExpireTimeInMS);
|
||||
} else if (hostConfigManager.isExcluded(node)) {
|
||||
decomManager.startDecommission(node);
|
||||
datanodeAdminManager.startDecommission(node);
|
||||
} else {
|
||||
decomManager.stopMaintenance(node);
|
||||
decomManager.stopDecommission(node);
|
||||
datanodeAdminManager.stopMaintenance(node);
|
||||
datanodeAdminManager.stopDecommission(node);
|
||||
}
|
||||
}
|
||||
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
|
||||
|
|
|
@ -471,7 +471,7 @@ public class BackupNode extends NameNode {
|
|||
* {@link LeaseManager.Monitor} protected by SafeMode.
|
||||
* {@link BlockManager.RedundancyMonitor} protected by SafeMode.
|
||||
* {@link HeartbeatManager.Monitor} protected by SafeMode.
|
||||
* {@link DecommissionManager.Monitor} need to prohibit refreshNodes().
|
||||
* {@link DatanodeAdminManager.Monitor} need to prohibit refreshNodes().
|
||||
* {@link PendingReconstructionBlocks.PendingReconstructionMonitor}
|
||||
* harmless, because RedundancyMonitor is muted.
|
||||
*/
|
||||
|
|
|
@ -960,17 +960,17 @@
|
|||
<property>
|
||||
<name>dfs.namenode.decommission.interval</name>
|
||||
<value>30s</value>
|
||||
<description>Namenode periodicity in seconds to check if decommission is
|
||||
complete. Support multiple time unit suffix(case insensitive), as described
|
||||
in dfs.heartbeat.interval.
|
||||
<description>Namenode periodicity in seconds to check if
|
||||
decommission or maintenance is complete. Support multiple time unit
|
||||
suffix(case insensitive), as described in dfs.heartbeat.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.decommission.blocks.per.interval</name>
|
||||
<value>500000</value>
|
||||
<description>The approximate number of blocks to process per
|
||||
decommission interval, as defined in dfs.namenode.decommission.interval.
|
||||
<description>The approximate number of blocks to process per decommission
|
||||
or maintenance interval, as defined in dfs.namenode.decommission.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -978,11 +978,12 @@
|
|||
<name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
|
||||
<value>100</value>
|
||||
<description>
|
||||
The maximum number of decommission-in-progress datanodes nodes that will be
|
||||
tracked at one time by the namenode. Tracking a decommission-in-progress
|
||||
datanode consumes additional NN memory proportional to the number of blocks
|
||||
on the datnode. Having a conservative limit reduces the potential impact
|
||||
of decomissioning a large number of nodes at once.
|
||||
The maximum number of decommission-in-progress or
|
||||
entering-maintenance datanodes nodes that will be tracked at one time by
|
||||
the namenode. Tracking these datanode consumes additional NN memory
|
||||
proportional to the number of blocks on the datnode. Having a conservative
|
||||
limit reduces the potential impact of decommissioning or maintenance of
|
||||
a large number of nodes at once.
|
||||
|
||||
A value of 0 means no limit will be enforced.
|
||||
</description>
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
|
@ -256,9 +256,10 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
|
||||
startSimpleHACluster(3);
|
||||
|
||||
// Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs.
|
||||
// The last DN is empty. Also configure the last DN to have slow heartbeat
|
||||
// so that it will be chosen as excess replica candidate during recommission.
|
||||
// Step 1, create a cluster with 4 DNs. Blocks are stored on the
|
||||
// first 3 DNs. The last DN is empty. Also configure the last DN to have
|
||||
// slow heartbeat so that it will be chosen as excess replica candidate
|
||||
// during recommission.
|
||||
|
||||
// Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
|
||||
// same as # of DNs, each DN will have a replica for any block.
|
||||
|
@ -290,9 +291,9 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
|
||||
// Step 3, recommission the first DN on SBN and ANN to create excess replica
|
||||
// It recommissions the node on SBN first to create potential
|
||||
// inconsistent state. In production cluster, such insistent state can happen
|
||||
// even if recommission command was issued on ANN first given the async nature
|
||||
// of the system.
|
||||
// inconsistent state. In production cluster, such insistent state can
|
||||
// happen even if recommission command was issued on ANN first given the
|
||||
// async nature of the system.
|
||||
|
||||
// Step 3.a, ask SBN to recomm the first DN.
|
||||
// SBN has been fixed so that it no longer invalidates excess replica during
|
||||
|
@ -301,10 +302,10 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
// 1. the last DN would have been chosen as excess replica, given its
|
||||
// heartbeat is considered old.
|
||||
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
|
||||
// 2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
|
||||
// 2. After recommissionNode finishes, SBN has 3 live replicas (0, 1, 2)
|
||||
// and one excess replica ( 3 )
|
||||
// After the fix,
|
||||
// After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
|
||||
// After recommissionNode finishes, SBN has 4 live replicas (0, 1, 2, 3)
|
||||
Thread.sleep(slowHeartbeatDNwaitTime);
|
||||
putNodeInService(1, decomNodeFromSBN);
|
||||
|
||||
|
@ -561,7 +562,8 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
* federated cluster.
|
||||
*/
|
||||
@Test(timeout=360000)
|
||||
public void testHostsFileFederation() throws IOException, InterruptedException {
|
||||
public void testHostsFileFederation()
|
||||
throws IOException, InterruptedException {
|
||||
// Test for 3 namenode federated cluster
|
||||
testHostsFile(3);
|
||||
}
|
||||
|
@ -598,7 +600,8 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
}
|
||||
|
||||
@Test(timeout=120000)
|
||||
public void testDecommissionWithOpenfile() throws IOException, InterruptedException {
|
||||
public void testDecommissionWithOpenfile()
|
||||
throws IOException, InterruptedException {
|
||||
LOG.info("Starting test testDecommissionWithOpenfile");
|
||||
|
||||
//At most 4 nodes will be decommissioned
|
||||
|
@ -742,14 +745,15 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
|
||||
// make sure the two datanodes remain in decomm in progress state
|
||||
BlockManagerTestUtil.recheckDecommissionState(dm);
|
||||
assertTrackedAndPending(dm.getDecomManager(), 2, 0);
|
||||
assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests restart of namenode while datanode hosts are added to exclude file
|
||||
**/
|
||||
@Test(timeout=360000)
|
||||
public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException {
|
||||
public void testDecommissionWithNamenodeRestart()
|
||||
throws IOException, InterruptedException {
|
||||
LOG.info("Starting test testDecommissionWithNamenodeRestart");
|
||||
int numNamenodes = 1;
|
||||
int numDatanodes = 1;
|
||||
|
@ -914,7 +918,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
|
||||
@Test(timeout=120000)
|
||||
public void testBlocksPerInterval() throws Exception {
|
||||
org.apache.log4j.Logger.getLogger(DecommissionManager.class)
|
||||
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
|
||||
.setLevel(Level.TRACE);
|
||||
// Turn the blocks per interval way down
|
||||
getConf().setInt(
|
||||
|
@ -927,7 +931,8 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
final FileSystem fs = getCluster().getFileSystem();
|
||||
final DatanodeManager datanodeManager =
|
||||
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
|
||||
final DecommissionManager decomManager = datanodeManager.getDecomManager();
|
||||
final DatanodeAdminManager decomManager =
|
||||
datanodeManager.getDatanodeAdminManager();
|
||||
|
||||
// Write a 3 block file, so each node has one block. Should scan 3 nodes.
|
||||
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
|
||||
|
@ -944,7 +949,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
}
|
||||
|
||||
private void doDecomCheck(DatanodeManager datanodeManager,
|
||||
DecommissionManager decomManager, int expectedNumCheckedNodes)
|
||||
DatanodeAdminManager decomManager, int expectedNumCheckedNodes)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
// Decom all nodes
|
||||
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
|
||||
|
@ -965,7 +970,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
|
||||
@Test(timeout=120000)
|
||||
public void testPendingNodes() throws Exception {
|
||||
org.apache.log4j.Logger.getLogger(DecommissionManager.class)
|
||||
org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
|
||||
.setLevel(Level.TRACE);
|
||||
// Only allow one node to be decom'd at a time
|
||||
getConf().setInt(
|
||||
|
@ -978,7 +983,8 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
final FileSystem fs = getCluster().getFileSystem();
|
||||
final DatanodeManager datanodeManager =
|
||||
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
|
||||
final DecommissionManager decomManager = datanodeManager.getDecomManager();
|
||||
final DatanodeAdminManager decomManager =
|
||||
datanodeManager.getDatanodeAdminManager();
|
||||
|
||||
// Keep a file open to prevent decom from progressing
|
||||
HdfsDataOutputStream open1 =
|
||||
|
@ -1014,7 +1020,7 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|||
assertTrackedAndPending(decomManager, 1, 0);
|
||||
}
|
||||
|
||||
private void assertTrackedAndPending(DecommissionManager decomManager,
|
||||
private void assertTrackedAndPending(DatanodeAdminManager decomManager,
|
||||
int tracked, int pending) {
|
||||
assertEquals("Unexpected number of tracked nodes", tracked,
|
||||
decomManager.getNumTrackedNodes());
|
||||
|
|
|
@ -328,7 +328,7 @@ public class BlockManagerTestUtil {
|
|||
*/
|
||||
public static void recheckDecommissionState(DatanodeManager dm)
|
||||
throws ExecutionException, InterruptedException {
|
||||
dm.getDecomManager().runMonitorForTest();
|
||||
dm.getDatanodeAdminManager().runMonitorForTest();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -330,8 +330,9 @@ public class TestReconstructStripedBlocksWithRackAwareness {
|
|||
// start decommissioning h9
|
||||
boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
|
||||
Assert.assertFalse(satisfied);
|
||||
final DecommissionManager decomManager =
|
||||
(DecommissionManager) Whitebox.getInternalState(dm, "decomManager");
|
||||
final DatanodeAdminManager decomManager =
|
||||
(DatanodeAdminManager) Whitebox.getInternalState(
|
||||
dm, "datanodeAdminManager");
|
||||
cluster.getNamesystem().writeLock();
|
||||
try {
|
||||
dn9.stopDecommission();
|
||||
|
|
|
@ -100,7 +100,7 @@ public class TestReplicationPolicyConsiderLoad
|
|||
// returns false
|
||||
for (int i = 0; i < 3; i++) {
|
||||
DatanodeDescriptor d = dataNodes[i];
|
||||
dnManager.getDecomManager().startDecommission(d);
|
||||
dnManager.getDatanodeAdminManager().startDecommission(d);
|
||||
d.setDecommissioned();
|
||||
}
|
||||
assertEquals((double)load/3, dnManager.getFSClusterStats()
|
||||
|
|
|
@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
|
@ -100,7 +100,7 @@ public class TestDecommissioningStatus {
|
|||
fileSys = cluster.getFileSystem();
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setHeartbeatExpireInterval(3000);
|
||||
Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
|
||||
Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
|
||||
LOG = Logger.getLogger(TestDecommissioningStatus.class);
|
||||
}
|
||||
|
||||
|
@ -344,7 +344,7 @@ public class TestDecommissioningStatus {
|
|||
*/
|
||||
@Test(timeout=120000)
|
||||
public void testDecommissionDeadDN() throws Exception {
|
||||
Logger log = Logger.getLogger(DecommissionManager.class);
|
||||
Logger log = Logger.getLogger(DatanodeAdminManager.class);
|
||||
log.setLevel(Level.DEBUG);
|
||||
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
|
||||
String dnName = dnID.getXferAddr();
|
||||
|
|
|
@ -156,11 +156,11 @@ public class TestDefaultBlockPlacementPolicy {
|
|||
DatanodeDescriptor dnd3 = dnm.getDatanode(
|
||||
cluster.getDataNodes().get(3).getDatanodeId());
|
||||
assertEquals(dnd3.getNetworkLocation(), clientRack);
|
||||
dnm.getDecomManager().startDecommission(dnd3);
|
||||
dnm.getDatanodeAdminManager().startDecommission(dnd3);
|
||||
try {
|
||||
testPlacement(clientMachine, clientRack, false);
|
||||
} finally {
|
||||
dnm.getDecomManager().stopDecommission(dnd3);
|
||||
dnm.getDatanodeAdminManager().stopDecommission(dnd3);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -947,7 +947,8 @@ public class TestFsck {
|
|||
DatanodeDescriptor dnDesc0 = dnm.getDatanode(
|
||||
cluster.getDataNodes().get(0).getDatanodeId());
|
||||
|
||||
bm.getDatanodeManager().getDecomManager().startDecommission(dnDesc0);
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(
|
||||
dnDesc0);
|
||||
final String dn0Name = dnDesc0.getXferAddr();
|
||||
|
||||
// check the replica status while decommissioning
|
||||
|
@ -1000,7 +1001,7 @@ public class TestFsck {
|
|||
cluster.getDataNodes().get(1).getDatanodeId());
|
||||
final String dn1Name = dnDesc1.getXferAddr();
|
||||
|
||||
bm.getDatanodeManager().getDecomManager().startMaintenance(dnDesc1,
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dnDesc1,
|
||||
Long.MAX_VALUE);
|
||||
|
||||
// check the replica status while entering maintenance
|
||||
|
@ -1539,7 +1540,7 @@ public class TestFsck {
|
|||
fsn.writeUnlock();
|
||||
}
|
||||
DatanodeDescriptor dn = bc.getBlocks()[0].getDatanode(0);
|
||||
bm.getDatanodeManager().getDecomManager().startDecommission(dn);
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(dn);
|
||||
String dnName = dn.getXferAddr();
|
||||
|
||||
//wait for decommission start
|
||||
|
@ -1619,7 +1620,7 @@ public class TestFsck {
|
|||
DatanodeManager dnm = bm.getDatanodeManager();
|
||||
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
|
||||
.getDatanodeId());
|
||||
bm.getDatanodeManager().getDecomManager().startMaintenance(dn,
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dn,
|
||||
Long.MAX_VALUE);
|
||||
final String dnName = dn.getXferAddr();
|
||||
|
||||
|
@ -1854,7 +1855,7 @@ public class TestFsck {
|
|||
}
|
||||
DatanodeDescriptor dn = bc.getBlocks()[0]
|
||||
.getDatanode(0);
|
||||
bm.getDatanodeManager().getDecomManager().startDecommission(dn);
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(dn);
|
||||
String dnName = dn.getXferAddr();
|
||||
|
||||
// wait for decommission start
|
||||
|
@ -1933,7 +1934,7 @@ public class TestFsck {
|
|||
DatanodeManager dnm = bm.getDatanodeManager();
|
||||
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
|
||||
.getDatanodeId());
|
||||
bm.getDatanodeManager().getDecomManager().startMaintenance(dn,
|
||||
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dn,
|
||||
Long.MAX_VALUE);
|
||||
final String dnName = dn.getXferAddr();
|
||||
|
||||
|
|
|
@ -370,7 +370,7 @@ public class TestNameNodeMXBean {
|
|||
cluster.getDataNodes().get(0).getDisplayName());
|
||||
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
||||
|
||||
// Wait for the DecommissionManager to complete refresh nodes
|
||||
// Wait for the DatanodeAdminManager to complete refresh nodes
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
|
@ -399,7 +399,7 @@ public class TestNameNodeMXBean {
|
|||
assertEquals(0, fsn.getNumDecomLiveDataNodes());
|
||||
assertEquals(0, fsn.getNumDecomDeadDataNodes());
|
||||
|
||||
// Wait for the DecommissionManager to complete check
|
||||
// Wait for the DatanodeAdminManager to complete check
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
|
@ -501,7 +501,7 @@ public class TestNameNodeMXBean {
|
|||
assertEquals(0, fsn.getNumInMaintenanceDeadDataNodes());
|
||||
}
|
||||
|
||||
// Wait for the DecommissionManager to complete check
|
||||
// Wait for the DatanodeAdminManager to complete check
|
||||
// and perform state transition
|
||||
while (fsn.getNumInMaintenanceLiveDataNodes() != 1) {
|
||||
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
|
||||
|
|
|
@ -349,18 +349,18 @@ public class TestNamenodeCapacityReport {
|
|||
private void startDecommissionOrMaintenance(DatanodeManager dnm,
|
||||
DatanodeDescriptor dnd, boolean decomm) {
|
||||
if (decomm) {
|
||||
dnm.getDecomManager().startDecommission(dnd);
|
||||
dnm.getDatanodeAdminManager().startDecommission(dnd);
|
||||
} else {
|
||||
dnm.getDecomManager().startMaintenance(dnd, Long.MAX_VALUE);
|
||||
dnm.getDatanodeAdminManager().startMaintenance(dnd, Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopDecommissionOrMaintenance(DatanodeManager dnm,
|
||||
DatanodeDescriptor dnd, boolean decomm) {
|
||||
if (decomm) {
|
||||
dnm.getDecomManager().stopDecommission(dnd);
|
||||
dnm.getDatanodeAdminManager().stopDecommission(dnd);
|
||||
} else {
|
||||
dnm.getDecomManager().stopMaintenance(dnd);
|
||||
dnm.getDatanodeAdminManager().stopMaintenance(dnd);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue