HDFS-9390. Block management for maintenance states.

This commit is contained in:
Ming Ma 2016-10-17 17:46:29 -07:00
parent a5a56c3564
commit d55a7f8935
21 changed files with 1234 additions and 372 deletions

View File

@ -213,6 +213,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY;
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
"dfs.namenode.maintenance.replication.min";
public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
= 1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY; HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;

View File

@ -124,41 +124,12 @@ public class DFSUtil {
return array; return array;
} }
/**
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
* Decommissioned nodes are moved to the end of the array on sorting with
* this compartor.
*/
public static final Comparator<DatanodeInfo> DECOM_COMPARATOR =
new Comparator<DatanodeInfo>() {
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
return a.isDecommissioned() == b.isDecommissioned() ? 0 :
a.isDecommissioned() ? 1 : -1;
}
};
/** /**
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states. * Comparator for sorting DataNodeInfo[] based on
* Decommissioned/stale nodes are moved to the end of the array on sorting * decommissioned and entering_maintenance states.
* with this comparator.
*/ */
@InterfaceAudience.Private public static class ServiceComparator implements Comparator<DatanodeInfo> {
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
private final long staleInterval;
/**
* Constructor of DecomStaleComparator
*
* @param interval
* The time interval for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
*/
public DecomStaleComparator(long interval) {
this.staleInterval = interval;
}
@Override @Override
public int compare(DatanodeInfo a, DatanodeInfo b) { public int compare(DatanodeInfo a, DatanodeInfo b) {
// Decommissioned nodes will still be moved to the end of the list // Decommissioned nodes will still be moved to the end of the list
@ -167,6 +138,45 @@ public class DFSUtil {
} else if (b.isDecommissioned()) { } else if (b.isDecommissioned()) {
return -1; return -1;
} }
// ENTERING_MAINTENANCE nodes should be after live nodes.
if (a.isEnteringMaintenance()) {
return b.isEnteringMaintenance() ? 0 : 1;
} else if (b.isEnteringMaintenance()) {
return -1;
} else {
return 0;
}
}
}
/**
* Comparator for sorting DataNodeInfo[] based on
* stale, decommissioned and entering_maintenance states.
* Order: live -> stale -> entering_maintenance -> decommissioned
*/
@InterfaceAudience.Private
public static class ServiceAndStaleComparator extends ServiceComparator {
private final long staleInterval;
/**
* Constructor of ServiceAndStaleComparator
*
* @param interval
* The time interval for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
*/
public ServiceAndStaleComparator(long interval) {
this.staleInterval = interval;
}
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
int ret = super.compare(a, b);
if (ret != 0) {
return ret;
}
// Stale nodes will be moved behind the normal nodes // Stale nodes will be moved behind the normal nodes
boolean aStale = a.isStale(staleInterval); boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval); boolean bStale = b.isStale(staleInterval);

View File

@ -929,20 +929,17 @@ public class Dispatcher {
} }
private boolean shouldIgnore(DatanodeInfo dn) { private boolean shouldIgnore(DatanodeInfo dn) {
// ignore decommissioned nodes // ignore out-of-service nodes
final boolean decommissioned = dn.isDecommissioned(); final boolean outOfService = !dn.isInService();
// ignore decommissioning nodes
final boolean decommissioning = dn.isDecommissionInProgress();
// ignore nodes in exclude list // ignore nodes in exclude list
final boolean excluded = Util.isExcluded(excludedNodes, dn); final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty) // ignore nodes not in the include list (if include list is not empty)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn); final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
if (decommissioned || decommissioning || excluded || notIncluded) { if (outOfService || excluded || notIncluded) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn LOG.trace("Excluding datanode " + dn
+ ": decommissioned=" + decommissioned + ": outOfService=" + outOfService
+ ", decommissioning=" + decommissioning
+ ", excluded=" + excluded + ", excluded=" + excluded
+ ", notIncluded=" + notIncluded); + ", notIncluded=" + notIncluded);
} }

View File

@ -111,6 +111,29 @@ import org.slf4j.LoggerFactory;
/** /**
* Keeps information related to the blocks stored in the Hadoop cluster. * Keeps information related to the blocks stored in the Hadoop cluster.
* For block state management, it tries to maintain the safety
* property of "# of live replicas == # of expected redundancy" under
* any events such as decommission, namenode failover, datanode failure.
*
* The motivation of maintenance mode is to allow admins quickly repair nodes
* without paying the cost of decommission. Thus with maintenance mode,
* # of live replicas doesn't have to be equal to # of expected redundancy.
* If any of the replica is in maintenance mode, the safety property
* is extended as follows. These property still apply for the case of zero
* maintenance replicas, thus we can use these safe property for all scenarios.
* a. # of live replicas >= # of min replication for maintenance.
* b. # of live replicas <= # of expected redundancy.
* c. # of live replicas and maintenance replicas >= # of expected redundancy.
*
* For regular replication, # of min live replicas for maintenance is determined
* by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <=
* DFS_NAMENODE_REPLICATION_MIN_KEY.
* For erasure encoding, # of min live replicas for maintenance is
* BlockInfoStriped#getRealDataBlockNum.
*
* Another safety property is to satisfy the block placement policy. While the
* policy is configurable, the replicas the policy is applied to are the live
* replicas + maintenance replicas.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockManager implements BlockStatsMXBean { public class BlockManager implements BlockStatsMXBean {
@ -310,6 +333,11 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager; private final BlockIdManager blockIdManager;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
private final short minReplicationToBeInMaintenance;
public BlockManager(final Namesystem namesystem, boolean haEnabled, public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) final Configuration conf)
throws IOException { throws IOException {
@ -393,6 +421,25 @@ public class BlockManager implements BlockStatsMXBean {
this.getBlocksMinBlockSize = conf.getLongBytes( this.getBlocksMinBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
final int minMaintenanceR = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
if (minMaintenanceR < 0) {
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+ " = " + minMaintenanceR + " < 0");
}
if (minMaintenanceR > minR) {
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+ " = " + minMaintenanceR + " > "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR);
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
this.blockReportLeaseManager = new BlockReportLeaseManager(conf); this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@ -693,6 +740,10 @@ public class BlockManager implements BlockStatsMXBean {
return minReplication; return minReplication;
} }
public short getMinReplicationToBeInMaintenance() {
return minReplicationToBeInMaintenance;
}
/** /**
* Commit a block of a file * Commit a block of a file
* *
@ -867,7 +918,7 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas replicas = countNodes(lastBlock); NumberReplicas replicas = countNodes(lastBlock);
neededReplications.remove(lastBlock, replicas.liveReplicas(), neededReplications.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(), replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); replicas.outOfServiceReplicas(), getExpectedReplicaNum(lastBlock));
pendingReplications.remove(lastBlock); pendingReplications.remove(lastBlock);
// remove this block from the list of pending blocks to be deleted. // remove this block from the list of pending blocks to be deleted.
@ -972,7 +1023,8 @@ public class BlockManager implements BlockStatsMXBean {
} }
// get block locations // get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas(); NumberReplicas numberReplicas = countNodes(blk);
final int numCorruptNodes = numberReplicas.corruptReplicas();
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk); final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
if (numCorruptNodes != numCorruptReplicas) { if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for " LOG.warn("Inconsistent number of corrupt replicas for "
@ -982,17 +1034,23 @@ public class BlockManager implements BlockStatsMXBean {
final int numNodes = blocksMap.numNodes(blk); final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptReplicas == numNodes; final boolean isCorrupt = numCorruptReplicas == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
numMachines -= numberReplicas.maintenanceNotForReadReplicas();
DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0; int j = 0;
if (numMachines > 0) { if (numMachines > 0) {
final boolean noCorrupt = (numCorruptReplicas == 0); final boolean noCorrupt = (numCorruptReplicas == 0);
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
if (storage.getState() != State.FAILED) { if (storage.getState() != State.FAILED) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
// Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
if (d.isInMaintenance()
|| (d.isEnteringMaintenance() && !d.isAlive())) {
continue;
}
if (noCorrupt) { if (noCorrupt) {
machines[j++] = storage; machines[j++] = storage;
} else { } else {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = isReplicaCorrupt(blk, d); final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
if (isCorrupt || !replicaCorrupt) { if (isCorrupt || !replicaCorrupt) {
machines[j++] = storage; machines[j++] = storage;
@ -1542,8 +1600,11 @@ public class BlockManager implements BlockStatsMXBean {
return scheduledWork; return scheduledWork;
} }
// Check if the number of live + pending replicas satisfies
// the expected redundancy.
boolean hasEnoughEffectiveReplicas(BlockInfo block, boolean hasEnoughEffectiveReplicas(BlockInfo block,
NumberReplicas numReplicas, int pendingReplicaNum, int required) { NumberReplicas numReplicas, int pendingReplicaNum) {
int required = getExpectedLiveRedundancyNum(block, numReplicas);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
return (numEffectiveReplicas >= required) && return (numEffectiveReplicas >= required) &&
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
@ -1557,14 +1618,14 @@ public class BlockManager implements BlockStatsMXBean {
return null; return null;
} }
short requiredReplication = getExpectedReplicaNum(block);
// get a source data-node // get a source data-node
List<DatanodeDescriptor> containingNodes = new ArrayList<>(); List<DatanodeDescriptor> containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas(); NumberReplicas numReplicas = new NumberReplicas();
DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes, DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
liveReplicaNodes, numReplicas, priority); liveReplicaNodes, numReplicas, priority);
short requiredReplication = getExpectedLiveRedundancyNum(block,
numReplicas);
if (srcNode == null) { // block can not be replicated from any node if (srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node"); LOG.debug("Block " + block + " cannot be repl from any node");
return null; return null;
@ -1575,8 +1636,7 @@ public class BlockManager implements BlockStatsMXBean {
assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
int pendingNum = pendingReplications.getNumReplicas(block); int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
requiredReplication)) {
neededReplications.remove(block, priority); neededReplications.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReplications as" + blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block); " it has enough replicas", block);
@ -1607,11 +1667,11 @@ public class BlockManager implements BlockStatsMXBean {
} }
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
final short requiredReplication = getExpectedReplicaNum(block);
NumberReplicas numReplicas = countNodes(block); NumberReplicas numReplicas = countNodes(block);
final short requiredReplication =
getExpectedLiveRedundancyNum(block, numReplicas);
final int pendingNum = pendingReplications.getNumReplicas(block); final int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
requiredReplication)) {
neededReplications.remove(block, priority); neededReplications.remove(block, priority);
rw.resetTargets(); rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" + blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@ -1677,7 +1737,7 @@ public class BlockManager implements BlockStatsMXBean {
* @throws IOException * @throws IOException
* if the number of targets < minimum replication. * if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node, * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* Set, long, List, BlockStoragePolicy) * Set, long, List, BlockStoragePolicy, EnumSet)
*/ */
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client, final int numOfReplicas, final Node client,
@ -1766,6 +1826,8 @@ public class BlockManager implements BlockStatsMXBean {
int decommissioning = 0; int decommissioning = 0;
int corrupt = 0; int corrupt = 0;
int excess = 0; int excess = 0;
int maintenanceNotForRead = 0;
int maintenanceForRead = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
@ -1779,6 +1841,12 @@ public class BlockManager implements BlockStatsMXBean {
decommissioning += countableReplica; decommissioning += countableReplica;
} else if (node.isDecommissioned()) { } else if (node.isDecommissioned()) {
decommissioned += countableReplica; decommissioned += countableReplica;
} else if (node.isMaintenance()) {
if (node.isInMaintenance() || !node.isAlive()) {
maintenanceNotForRead++;
} else {
maintenanceForRead++;
}
} else if (excessBlocks != null && excessBlocks.contains(block)) { } else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica; excess += countableReplica;
} else { } else {
@ -1794,9 +1862,8 @@ public class BlockManager implements BlockStatsMXBean {
if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue; continue;
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& !node.isDecommissionInProgress() && !node.isDecommissionInProgress() && !node.isEnteringMaintenance()
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
{
continue; // already reached replication limit continue; // already reached replication limit
} }
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
@ -1809,6 +1876,11 @@ public class BlockManager implements BlockStatsMXBean {
// never use already decommissioned nodes // never use already decommissioned nodes
if(node.isDecommissioned()) if(node.isDecommissioned())
continue; continue;
// Don't use dead ENTERING_MAINTENANCE or IN_MAINTENANCE nodes.
if((!node.isAlive() && node.isEnteringMaintenance()) ||
node.isInMaintenance()) {
continue;
}
// We got this far, current node is a reasonable choice // We got this far, current node is a reasonable choice
if (srcNode == null) { if (srcNode == null) {
@ -1823,7 +1895,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
if(numReplicas != null) if(numReplicas != null)
numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
excess, 0); excess, 0, maintenanceNotForRead, maintenanceForRead);
return srcNode; return srcNode;
} }
@ -1846,9 +1918,10 @@ public class BlockManager implements BlockStatsMXBean {
continue; continue;
} }
NumberReplicas num = countNodes(timedOutItems[i]); NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, num.liveReplicas())) { if (isNeededReplication(bi, num)) {
neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(), neededReplications.add(bi, num.liveReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi)); num.readOnlyReplicas(), num.outOfServiceReplicas(),
getExpectedReplicaNum(bi));
} }
} }
} finally { } finally {
@ -2756,8 +2829,8 @@ public class BlockManager implements BlockStatsMXBean {
// Now check for completion of blocks and safe block count // Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock); NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas(); int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas int pendingNum = pendingReplications.getNumReplicas(storedBlock);
+ pendingReplications.getNumReplicas(storedBlock); int numCurrentReplica = numLiveReplicas + pendingNum;
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication) { numLiveReplicas >= minReplication) {
@ -2784,10 +2857,9 @@ public class BlockManager implements BlockStatsMXBean {
// handle underReplication/overReplication // handle underReplication/overReplication
short fileReplication = getExpectedReplicaNum(storedBlock); short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, numCurrentReplica)) { if (!isNeededReplication(storedBlock, num, pendingNum)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(), num.readOnlyReplicas(), num.outOfServiceReplicas(), fileReplication);
num.decommissionedAndDecommissioning(), fileReplication);
} else { } else {
updateNeededReplications(storedBlock, curReplicaDelta, 0); updateNeededReplications(storedBlock, curReplicaDelta, 0);
} }
@ -3003,9 +3075,10 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) { if (isNeededReplication(block, num)) {
if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), if (neededReplications.add(block, numCurrentReplica,
num.decommissionedAndDecommissioning(), expectedReplication)) { num.readOnlyReplicas(), num.outOfServiceReplicas(),
expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED; return MisReplicationResult.UNDER_REPLICATED;
} }
} }
@ -3037,9 +3110,10 @@ public class BlockManager implements BlockStatsMXBean {
// update needReplication priority queues // update needReplication priority queues
b.setReplication(newRepl); b.setReplication(newRepl);
NumberReplicas num = countNodes(b);
updateNeededReplications(b, 0, newRepl - oldRepl); updateNeededReplications(b, 0, newRepl - oldRepl);
if (oldRepl > newRepl) { if (num.liveReplicas() > newRepl) {
processOverReplicatedBlock(b, newRepl, null, null); processOverReplicatedBlock(b, newRepl, null, null);
} }
} }
@ -3074,7 +3148,7 @@ public class BlockManager implements BlockStatsMXBean {
LightWeightHashSet<Block> excessBlocks = excessReplicateMap.get( LightWeightHashSet<Block> excessBlocks = excessReplicateMap.get(
cur.getDatanodeUuid()); cur.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) { if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if (cur.isInService()) {
// exclude corrupt replicas // exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) { if (corruptNodes == null || !corruptNodes.contains(cur)) {
nonExcess.add(storage); nonExcess.add(storage);
@ -3393,6 +3467,8 @@ public class BlockManager implements BlockStatsMXBean {
int corrupt = 0; int corrupt = 0;
int excess = 0; int excess = 0;
int stale = 0; int stale = 0;
int maintenanceNotForRead = 0;
int maintenanceForRead = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
if (storage.getState() == State.FAILED) { if (storage.getState() == State.FAILED) {
@ -3408,6 +3484,12 @@ public class BlockManager implements BlockStatsMXBean {
decommissioning++; decommissioning++;
} else if (node.isDecommissioned()) { } else if (node.isDecommissioned()) {
decommissioned++; decommissioned++;
} else if (node.isMaintenance()) {
if (node.isInMaintenance() || !node.isAlive()) {
maintenanceNotForRead++;
} else {
maintenanceForRead++;
}
} else { } else {
LightWeightHashSet<Block> blocksExcess = excessReplicateMap.get( LightWeightHashSet<Block> blocksExcess = excessReplicateMap.get(
node.getDatanodeUuid()); node.getDatanodeUuid());
@ -3422,7 +3504,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
} }
return new NumberReplicas(live, readonly, decommissioned, decommissioning, return new NumberReplicas(live, readonly, decommissioned, decommissioning,
corrupt, excess, stale); corrupt, excess, stale, maintenanceNotForRead, maintenanceForRead);
} }
/** /**
@ -3454,11 +3536,11 @@ public class BlockManager implements BlockStatsMXBean {
} }
/** /**
* On stopping decommission, check if the node has excess replicas. * On putting the node in service, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock(). * If there are any excess replicas, call processOverReplicatedBlock().
* Process over replicated blocks only when active NN is out of safe mode. * Process over replicated blocks only when active NN is out of safe mode.
*/ */
void processOverReplicatedBlocksOnReCommission( void processExtraRedundancyBlocksOnInService(
final DatanodeDescriptor srcNode) { final DatanodeDescriptor srcNode) {
if (!isPopulatingReplQueues()) { if (!isPopulatingReplQueues()) {
return; return;
@ -3467,7 +3549,7 @@ public class BlockManager implements BlockStatsMXBean {
int numOverReplicated = 0; int numOverReplicated = 0;
while(it.hasNext()) { while(it.hasNext()) {
final BlockInfo block = it.next(); final BlockInfo block = it.next();
short expectedReplication = block.getReplication(); short expectedReplication = getExpectedReplicaNum(block);
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) { if (numCurrentReplica > expectedReplication) {
@ -3481,10 +3563,11 @@ public class BlockManager implements BlockStatsMXBean {
} }
/** /**
* Returns whether a node can be safely decommissioned based on its * Returns whether a node can be safely decommissioned or in maintenance
* liveness. Dead nodes cannot always be safely decommissioned. * based on its liveness. Dead nodes cannot always be safely decommissioned
* or in maintenance.
*/ */
boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
if (!node.checkBlockReportReceived()) { if (!node.checkBlockReportReceived()) {
LOG.info("Node {} hasn't sent its first block report.", node); LOG.info("Node {} hasn't sent its first block report.", node);
return false; return false;
@ -3498,17 +3581,18 @@ public class BlockManager implements BlockStatsMXBean {
if (pendingReplicationBlocksCount == 0 && if (pendingReplicationBlocksCount == 0 &&
underReplicatedBlocksCount == 0) { underReplicatedBlocksCount == 0) {
LOG.info("Node {} is dead and there are no under-replicated" + LOG.info("Node {} is dead and there are no under-replicated" +
" blocks or blocks pending replication. Safe to decommission.", " blocks or blocks pending replication. Safe to decommission or" +
node); " put in maintenance.", node);
return true; return true;
} }
LOG.warn("Node {} is dead " + LOG.warn("Node {} is dead " +
"while decommission is in progress. Cannot be safely " + "while in {}. Cannot be safely " +
"decommissioned since there is risk of reduced " + "decommissioned or be in maintenance since there is risk of reduced " +
"data durability or data loss. Either restart the failed node or " + "data durability or data loss. Either restart the failed node or " +
" force decommissioning by removing, calling refreshNodes, " + "force decommissioning or maintenance by removing, calling " +
"then re-adding to the excludes files.", node); "refreshNodes, then re-adding to the excludes or host config files.",
node, node.getAdminState());
return false; return false;
} }
@ -3559,17 +3643,16 @@ public class BlockManager implements BlockStatsMXBean {
} }
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);
int pendingNum = pendingReplications.getNumReplicas(block); int pendingNum = pendingReplications.getNumReplicas(block);
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getExpectedReplicaNum(block);
if (!hasEnoughEffectiveReplicas(block, repl, pendingNum, if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) {
curExpectedReplicas)) {
neededReplications.update(block, repl.liveReplicas() + pendingNum, neededReplications.update(block, repl.liveReplicas() + pendingNum,
repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), repl.readOnlyReplicas(), repl.outOfServiceReplicas(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
} else { } else {
int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta; int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas); repl.outOfServiceReplicas(), oldExpectedReplicas);
} }
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -3584,27 +3667,18 @@ public class BlockManager implements BlockStatsMXBean {
*/ */
public void checkReplication(BlockCollection bc) { public void checkReplication(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
final short expected = block.getReplication(); final short expected = getExpectedReplicaNum(block);
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
final int pending = pendingReplications.getNumReplicas(block); final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { if (!hasEnoughEffectiveReplicas(block, n, pending)) {
neededReplications.add(block, n.liveReplicas() + pending, neededReplications.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(), n.readOnlyReplicas(), n.outOfServiceReplicas(), expected);
n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) { } else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null); processOverReplicatedBlock(block, expected, null, null);
} }
} }
} }
/**
* @return 0 if the block is not found;
* otherwise, return the replication factor of the block.
*/
private int getReplication(BlockInfo block) {
return getExpectedReplicaNum(block);
}
/** /**
* Get blocks to invalidate for <i>nodeId</i> * Get blocks to invalidate for <i>nodeId</i>
* in {@link #invalidateBlocks}. * in {@link #invalidateBlocks}.
@ -3651,6 +3725,8 @@ public class BlockManager implements BlockStatsMXBean {
.getNodes(storedBlock); .getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
// Nodes under maintenance should be counted as valid replicas from
// rack policy point of view.
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
&& ((corruptNodes == null) || !corruptNodes.contains(cur))) { && ((corruptNodes == null) || !corruptNodes.contains(cur))) {
liveNodes.add(cur); liveNodes.add(cur);
@ -3661,14 +3737,36 @@ public class BlockManager implements BlockStatsMXBean {
storedBlock.getReplication()).isPlacementPolicySatisfied(); storedBlock.getReplication()).isPlacementPolicySatisfied();
} }
boolean isNeededReplicationForMaintenance(BlockInfo storedBlock,
NumberReplicas numberReplicas) {
return storedBlock.isComplete() && (numberReplicas.liveReplicas() <
getMinReplicationToBeInMaintenance() ||
!isPlacementPolicySatisfied(storedBlock));
}
boolean isNeededReplication(BlockInfo storedBlock,
NumberReplicas numberReplicas) {
return isNeededReplication(storedBlock, numberReplicas, 0);
}
/** /**
* A block needs replication if the number of replicas is less than expected * A block needs reconstruction if the number of redundancies is less than
* or if it does not have enough racks. * expected or if it does not have enough racks.
*/ */
boolean isNeededReplication(BlockInfo storedBlock, int current) { boolean isNeededReplication(BlockInfo storedBlock,
int expected = storedBlock.getReplication(); NumberReplicas numberReplicas, int pending) {
return storedBlock.isComplete() return storedBlock.isComplete() &&
&& (current < expected || !isPlacementPolicySatisfied(storedBlock)); !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
}
// Exclude maintenance, but make sure it has minimal live replicas
// to satisfy the maintenance requirement.
public short getExpectedLiveRedundancyNum(BlockInfo block,
NumberReplicas numberReplicas) {
final short expectedRedundancy = getExpectedReplicaNum(block);
return (short)Math.max(expectedRedundancy -
numberReplicas.maintenanceReplicas(),
getMinReplicationToBeInMaintenance());
} }
public short getExpectedReplicaNum(BlockInfo block) { public short getExpectedReplicaNum(BlockInfo block) {

View File

@ -832,8 +832,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes) { boolean avoidStaleNodes) {
// check if the node is (being) decommissioned // check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) { if (!node.isInService()) {
logNodeIsNotChosen(node, "the node is (being) decommissioned "); logNodeIsNotChosen(node, "the node isn't in service.");
return false; return false;
} }

View File

@ -682,7 +682,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
if (datanode == null) { if (datanode == null) {
continue; continue;
} }
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { if (!datanode.isInService()) {
continue; continue;
} }
if (corrupt != null && corrupt.contains(datanode)) { if (corrupt != null && corrupt.contains(datanode)) {

View File

@ -143,8 +143,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning. // Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything. // If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus = private final LeavingServiceStatus leavingServiceStatus =
new DecommissioningStatus(); new LeavingServiceStatus();
private final Map<String, DatanodeStorageInfo> storageMap = private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>(); new HashMap<>();
@ -270,6 +270,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.needKeyUpdate = needKeyUpdate; this.needKeyUpdate = needKeyUpdate;
} }
public LeavingServiceStatus getLeavingServiceStatus() {
return leavingServiceStatus;
}
@VisibleForTesting @VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) { public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) { synchronized (storageMap) {
@ -688,51 +692,54 @@ public class DatanodeDescriptor extends DatanodeInfo {
return (this == obj) || super.equals(obj); return (this == obj) || super.equals(obj);
} }
/** Decommissioning status */ /** Leaving service status. */
public class DecommissioningStatus { public class LeavingServiceStatus {
private int underReplicatedBlocks; private int underReplicatedBlocks;
private int decommissionOnlyReplicas; private int outOfServiceOnlyReplicas;
private int underReplicatedInOpenFiles; private int underReplicatedInOpenFiles;
private long startTime; private long startTime;
synchronized void set(int underRep, synchronized void set(int underRep,
int onlyRep, int underConstruction) { int onlyRep, int underConstruction) {
if (!isDecommissionInProgress()) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return; return;
} }
underReplicatedBlocks = underRep; underReplicatedBlocks = underRep;
decommissionOnlyReplicas = onlyRep; outOfServiceOnlyReplicas = onlyRep;
underReplicatedInOpenFiles = underConstruction; underReplicatedInOpenFiles = underConstruction;
} }
/** @return the number of under-replicated blocks */ /** @return the number of under-replicated blocks */
public synchronized int getUnderReplicatedBlocks() { public synchronized int getUnderReplicatedBlocks() {
if (!isDecommissionInProgress()) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0; return 0;
} }
return underReplicatedBlocks; return underReplicatedBlocks;
} }
/** @return the number of decommission-only replicas */ /** @return the number of blocks with out-of-service-only replicas */
public synchronized int getDecommissionOnlyReplicas() { public synchronized int getOutOfServiceOnlyReplicas() {
if (!isDecommissionInProgress()) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0; return 0;
} }
return decommissionOnlyReplicas; return outOfServiceOnlyReplicas;
} }
/** @return the number of under-replicated blocks in open files */ /** @return the number of under-replicated blocks in open files */
public synchronized int getUnderReplicatedInOpenFiles() { public synchronized int getUnderReplicatedInOpenFiles() {
if (!isDecommissionInProgress()) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0; return 0;
} }
return underReplicatedInOpenFiles; return underReplicatedInOpenFiles;
} }
/** Set start time */ /** Set start time */
public synchronized void setStartTime(long time) { public synchronized void setStartTime(long time) {
if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return;
}
startTime = time; startTime = time;
} }
/** @return start time */ /** @return start time */
public synchronized long getStartTime() { public synchronized long getStartTime() {
if (!isDecommissionInProgress()) { if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0; return 0;
} }
return startTime; return startTime;

View File

@ -389,8 +389,8 @@ public class DatanodeManager {
} }
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ? Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
new DFSUtil.DecomStaleComparator(staleInterval) : new DFSUtil.ServiceAndStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR; new DFSUtil.ServiceComparator();
for (LocatedBlock b : locatedblocks) { for (LocatedBlock b : locatedblocks) {
DatanodeInfo[] di = b.getLocations(); DatanodeInfo[] di = b.getLocations();
@ -558,9 +558,20 @@ public class DatanodeManager {
* @param nodeInfo datanode descriptor. * @param nodeInfo datanode descriptor.
*/ */
private void removeDatanode(DatanodeDescriptor nodeInfo) { private void removeDatanode(DatanodeDescriptor nodeInfo) {
removeDatanode(nodeInfo, true);
}
/**
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo,
boolean removeBlocksFromBlocksMap) {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo); heartbeatManager.removeDatanode(nodeInfo);
if (removeBlocksFromBlocksMap) {
blockManager.removeBlocksAssociatedTo(nodeInfo); blockManager.removeBlocksAssociatedTo(nodeInfo);
}
networktopology.remove(nodeInfo); networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion()); decrementVersionCount(nodeInfo.getSoftwareVersion());
blockManager.getBlockReportLeaseManager().unregister(nodeInfo); blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
@ -581,7 +592,7 @@ public class DatanodeManager {
try { try {
final DatanodeDescriptor descriptor = getDatanode(node); final DatanodeDescriptor descriptor = getDatanode(node);
if (descriptor != null) { if (descriptor != null) {
removeDatanode(descriptor); removeDatanode(descriptor, true);
} else { } else {
NameNode.stateChangeLog.warn("BLOCK* removeDatanode: " NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+ node + " does not exist"); + node + " does not exist");
@ -592,7 +603,8 @@ public class DatanodeManager {
} }
/** Remove a dead datanode. */ /** Remove a dead datanode. */
void removeDeadDatanode(final DatanodeID nodeID) { void removeDeadDatanode(final DatanodeID nodeID,
boolean removeBlocksFromBlockMap) {
DatanodeDescriptor d; DatanodeDescriptor d;
try { try {
d = getDatanode(nodeID); d = getDatanode(nodeID);
@ -601,8 +613,9 @@ public class DatanodeManager {
} }
if (d != null && isDatanodeDead(d)) { if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info( NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d); "BLOCK* removeDeadDatanode: lost heartbeat from " + d
removeDatanode(d); + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap);
removeDatanode(d, removeBlocksFromBlockMap);
} }
} }
@ -1038,10 +1051,16 @@ public class DatanodeManager {
} }
/** /**
* 1. Added to hosts --> no further work needed here. * Reload datanode membership and the desired admin operations from
* 2. Removed from hosts --> mark AdminState as decommissioned. * host files. If a node isn't allowed, hostConfigManager.isIncluded returns
* 3. Added to exclude --> start decommission. * false and the node can't be used.
* 4. Removed from exclude --> stop decommission. * If a node is allowed and the desired admin operation is defined,
* it will transition to the desired admin state.
* If a node is allowed and upgrade domain is defined,
* the upgrade domain will be set on the node.
* To use maintenance mode or upgrade domain, set
* DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to
* CombinedHostFileManager.class.
*/ */
private void refreshDatanodes() { private void refreshDatanodes() {
final Map<String, DatanodeDescriptor> copy; final Map<String, DatanodeDescriptor> copy;
@ -1051,17 +1070,17 @@ public class DatanodeManager {
for (DatanodeDescriptor node : copy.values()) { for (DatanodeDescriptor node : copy.values()) {
// Check if not include. // Check if not include.
if (!hostConfigManager.isIncluded(node)) { if (!hostConfigManager.isIncluded(node)) {
node.setDisallowed(true); // case 2. node.setDisallowed(true);
} else { } else {
long maintenanceExpireTimeInMS = long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(node); hostConfigManager.getMaintenanceExpirationTimeInMS(node);
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) { if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
decomManager.startMaintenance(node, maintenanceExpireTimeInMS); decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
} else if (hostConfigManager.isExcluded(node)) { } else if (hostConfigManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3. decomManager.startDecommission(node);
} else { } else {
decomManager.stopMaintenance(node); decomManager.stopMaintenance(node);
decomManager.stopDecommission(node); // case 4. decomManager.stopDecommission(node);
} }
} }
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node)); node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));

View File

@ -210,7 +210,7 @@ public class DecommissionManager {
LOG.info("Starting decommission of {} {} with {} blocks", LOG.info("Starting decommission of {} {} with {} blocks",
node, storage, storage.numBlocks()); node, storage, storage.numBlocks());
} }
node.decommissioningStatus.setStartTime(monotonicNow()); node.getLeavingServiceStatus().setStartTime(monotonicNow());
pendingNodes.add(node); pendingNodes.add(node);
} }
} else { } else {
@ -231,7 +231,7 @@ public class DecommissionManager {
// Over-replicated blocks will be detected and processed when // Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report. // the dead node comes back and send in its full block report.
if (node.isAlive()) { if (node.isAlive()) {
blockManager.processOverReplicatedBlocksOnReCommission(node); blockManager.processExtraRedundancyBlocksOnInService(node);
} }
// Remove from tracking in DecommissionManager // Remove from tracking in DecommissionManager
pendingNodes.remove(node); pendingNodes.remove(node);
@ -255,6 +255,16 @@ public class DecommissionManager {
if (!node.isMaintenance()) { if (!node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager // Update DN stats maintained by HeartbeatManager
hbManager.startMaintenance(node); hbManager.startMaintenance(node);
// hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
if (node.isEnteringMaintenance()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Starting maintenance of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
node.getLeavingServiceStatus().setStartTime(monotonicNow());
}
// Track the node regardless whether it is ENTERING_MAINTENANCE or
// IN_MAINTENANCE to support maintenance expiration.
pendingNodes.add(node); pendingNodes.add(node);
} else { } else {
LOG.trace("startMaintenance: Node {} in {}, nothing to do." + LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
@ -273,8 +283,34 @@ public class DecommissionManager {
// Update DN stats maintained by HeartbeatManager // Update DN stats maintained by HeartbeatManager
hbManager.stopMaintenance(node); hbManager.stopMaintenance(node);
// TODO HDFS-9390 remove replicas from block maps // extra redundancy blocks will be detected and processed when
// or handle over replicated blocks. // the dead node comes back and send in its full block report.
if (!node.isAlive()) {
// The node became dead when it was in maintenance, at which point
// the replicas weren't removed from block maps.
// When the node leaves maintenance, the replicas should be removed
// from the block maps to trigger the necessary replication to
// maintain the safety property of "# of live replicas + maintenance
// replicas" >= the expected redundancy.
blockManager.removeBlocksAssociatedTo(node);
} else {
// Even though putting nodes in maintenance node doesn't cause live
// replicas to match expected replication factor, it is still possible
// to have over replicated when the node leaves maintenance node.
// First scenario:
// a. Node became dead when it is at AdminStates.NORMAL, thus
// block is replicated so that 3 replicas exist on other nodes.
// b. Admins put the dead node into maintenance mode and then
// have the node rejoin the cluster.
// c. Take the node out of maintenance mode.
// Second scenario:
// a. With replication factor 3, set one replica to maintenance node,
// thus block has 1 maintenance replica and 2 live replicas.
// b. Change the replication factor to 2. The block will still have
// 1 maintenance replica and 2 live replicas.
// c. Take the node out of maintenance mode.
blockManager.processExtraRedundancyBlocksOnInService(node);
}
// Remove from tracking in DecommissionManager // Remove from tracking in DecommissionManager
pendingNodes.remove(node); pendingNodes.remove(node);
@ -290,27 +326,32 @@ public class DecommissionManager {
LOG.info("Decommissioning complete for node {}", dn); LOG.info("Decommissioning complete for node {}", dn);
} }
private void setInMaintenance(DatanodeDescriptor dn) {
dn.setInMaintenance();
LOG.info("Node {} has entered maintenance mode.", dn);
}
/** /**
* Checks whether a block is sufficiently replicated for decommissioning. * Checks whether a block is sufficiently replicated for decommissioning.
* Full-strength replication is not always necessary, hence "sufficient". * Full-strength replication is not always necessary, hence "sufficient".
* @return true if sufficient, else false. * @return true if sufficient, else false.
*/ */
private boolean isSufficientlyReplicated(BlockInfo block, private boolean isSufficientlyReplicated(BlockInfo block, BlockCollection bc,
BlockCollection bc, NumberReplicas numberReplicas, boolean isDecommission) {
NumberReplicas numberReplicas) { if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
final int numExpected = block.getReplication();
final int numLive = numberReplicas.liveReplicas();
if (numLive >= numExpected
&& blockManager.isPlacementPolicySatisfied(block)) {
// Block has enough replica, skip // Block has enough replica, skip
LOG.trace("Block {} does not need replication.", block); LOG.trace("Block {} does not need replication.", block);
return true; return true;
} }
final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
numberReplicas);
final int numLive = numberReplicas.liveReplicas();
// Block is under-replicated // Block is under-replicated
LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
numLive); numLive);
if (numExpected > numLive) { if (isDecommission && numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) { if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas // Can decom a UC block as long as there will still be minReplicas
if (numLive >= blockManager.minReplication) { if (numLive >= blockManager.minReplication) {
@ -354,11 +395,16 @@ public class DecommissionManager {
+ ", corrupt replicas: " + num.corruptReplicas() + ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissioned() + ", decommissioned replicas: " + num.decommissioned()
+ ", decommissioning replicas: " + num.decommissioning() + ", decommissioning replicas: " + num.decommissioning()
+ ", maintenance replicas: " + num.maintenanceReplicas()
+ ", live entering maintenance replicas: "
+ num.liveEnteringMaintenanceReplicas()
+ ", excess replicas: " + num.excessReplicas() + ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction() + ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: " + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: " + srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress()); + srcNode.isDecommissionInProgress() +
", Is current datanode entering maintenance: "
+ srcNode.isEnteringMaintenance());
} }
@VisibleForTesting @VisibleForTesting
@ -444,7 +490,7 @@ public class DecommissionManager {
numBlocksChecked = 0; numBlocksChecked = 0;
numBlocksCheckedPerLock = 0; numBlocksCheckedPerLock = 0;
numNodesChecked = 0; numNodesChecked = 0;
// Check decom progress // Check decommission or maintenance progress.
namesystem.writeLock(); namesystem.writeLock();
try { try {
processPendingNodes(); processPendingNodes();
@ -486,15 +532,14 @@ public class DecommissionManager {
final DatanodeDescriptor dn = entry.getKey(); final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfo> blocks = entry.getValue(); AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false; boolean fullScan = false;
if (dn.isMaintenance()) { if (dn.isMaintenance() && dn.maintenanceExpired()) {
// TODO HDFS-9390 make sure blocks are minimally replicated
// before transitioning the node to IN_MAINTENANCE state.
// If maintenance expires, stop tracking it. // If maintenance expires, stop tracking it.
if (dn.maintenanceExpired()) {
stopMaintenance(dn); stopMaintenance(dn);
toRemove.add(dn); toRemove.add(dn);
continue;
} }
if (dn.isInMaintenance()) {
// The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
continue; continue;
} }
if (blocks == null) { if (blocks == null) {
@ -509,7 +554,7 @@ public class DecommissionManager {
} else { } else {
// This is a known datanode, check if its # of insufficiently // This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can be decommed // replicated blocks has dropped to zero and if it can be decommed
LOG.debug("Processing decommission-in-progress node {}", dn); LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneSufficientlyReplicated(dn, blocks); pruneSufficientlyReplicated(dn, blocks);
} }
if (blocks.size() == 0) { if (blocks.size() == 0) {
@ -528,22 +573,31 @@ public class DecommissionManager {
// If the full scan is clean AND the node liveness is okay, // If the full scan is clean AND the node liveness is okay,
// we can finally mark as decommissioned. // we can finally mark as decommissioned.
final boolean isHealthy = final boolean isHealthy =
blockManager.isNodeHealthyForDecommission(dn); blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) { if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) {
setDecommissioned(dn); setDecommissioned(dn);
toRemove.add(dn); toRemove.add(dn);
} else if (dn.isEnteringMaintenance()) {
// IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
// to track maintenance expiration.
setInMaintenance(dn);
} else {
Preconditions.checkState(false,
"A node is in an invalid state!");
}
LOG.debug("Node {} is sufficiently replicated and healthy, " LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as decommissioned.", dn); + "marked as {}.", dn.getAdminState());
} else { } else {
LOG.debug("Node {} {} healthy." LOG.debug("Node {} {} healthy."
+ " It needs to replicate {} more blocks." + " It needs to replicate {} more blocks."
+ " Decommissioning is still in progress.", + " {} is still in progress.", dn,
dn, isHealthy? "is": "isn't", blocks.size()); isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
} }
} else { } else {
LOG.debug("Node {} still has {} blocks to replicate " LOG.debug("Node {} still has {} blocks to replicate "
+ "before it is a candidate to finish decommissioning.", + "before it is a candidate to finish {}.",
dn, blocks.size()); dn, blocks.size(), dn.getAdminState());
} }
iterkey = dn; iterkey = dn;
} }
@ -562,7 +616,7 @@ public class DecommissionManager {
*/ */
private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode, private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
AbstractList<BlockInfo> blocks) { AbstractList<BlockInfo> blocks) {
processBlocksForDecomInternal(datanode, blocks.iterator(), null, true); processBlocksInternal(datanode, blocks.iterator(), null, true);
} }
/** /**
@ -578,7 +632,7 @@ public class DecommissionManager {
private AbstractList<BlockInfo> handleInsufficientlyReplicated( private AbstractList<BlockInfo> handleInsufficientlyReplicated(
final DatanodeDescriptor datanode) { final DatanodeDescriptor datanode) {
AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>(); AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), processBlocksInternal(datanode, datanode.getBlockIterator(),
insufficient, false); insufficient, false);
return insufficient; return insufficient;
} }
@ -599,14 +653,14 @@ public class DecommissionManager {
* @return true if there are under-replicated blocks in the provided block * @return true if there are under-replicated blocks in the provided block
* iterator, else false. * iterator, else false.
*/ */
private void processBlocksForDecomInternal( private void processBlocksInternal(
final DatanodeDescriptor datanode, final DatanodeDescriptor datanode,
final Iterator<BlockInfo> it, final Iterator<BlockInfo> it,
final List<BlockInfo> insufficientlyReplicated, final List<BlockInfo> insufficientlyReplicated,
boolean pruneSufficientlyReplicated) { boolean pruneSufficientlyReplicated) {
boolean firstReplicationLog = true; boolean firstReplicationLog = true;
int underReplicatedBlocks = 0; int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0; int outOfServiceOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0; int underReplicatedInOpenFiles = 0;
while (it.hasNext()) { while (it.hasNext()) {
if (insufficientlyReplicated == null if (insufficientlyReplicated == null
@ -653,21 +707,25 @@ public class DecommissionManager {
// Schedule under-replicated blocks for replication if not already // Schedule under-replicated blocks for replication if not already
// pending // pending
if (blockManager.isNeededReplication(block, liveReplicas)) { boolean isDecommission = datanode.isDecommissionInProgress();
boolean neededReplication = isDecommission ?
blockManager.isNeededReplication(block, num) :
blockManager.isNeededReplicationForMaintenance(block, num);
if (neededReplication) {
if (!blockManager.neededReplications.contains(block) && if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 && blockManager.pendingReplications.getNumReplicas(block) == 0 &&
blockManager.isPopulatingReplQueues()) { blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode. // Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block, blockManager.neededReplications.add(block,
liveReplicas, num.readOnlyReplicas(), liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), num.outOfServiceReplicas(),
block.getReplication()); block.getReplication());
} }
} }
// Even if the block is under-replicated, // Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated // it doesn't block decommission if it's sufficiently replicated
if (isSufficientlyReplicated(block, bc, num)) { if (isSufficientlyReplicated(block, bc, num, isDecommission)) {
if (pruneSufficientlyReplicated) { if (pruneSufficientlyReplicated) {
it.remove(); it.remove();
} }
@ -689,14 +747,13 @@ public class DecommissionManager {
if (bc.isUnderConstruction()) { if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++; underReplicatedInOpenFiles++;
} }
if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { if ((curReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
decommissionOnlyReplicas++; outOfServiceOnlyReplicas++;
} }
} }
datanode.decommissioningStatus.set(underReplicatedBlocks, datanode.getLeavingServiceStatus().set(underReplicatedBlocks,
decommissionOnlyReplicas, outOfServiceOnlyReplicas, underReplicatedInOpenFiles);
underReplicatedInOpenFiles);
} }
} }

View File

@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -269,13 +266,19 @@ class HeartbeatManager implements DatanodeStatistics {
if (!node.isAlive()) { if (!node.isAlive()) {
LOG.info("Dead node {} is put in maintenance state immediately.", node); LOG.info("Dead node {} is put in maintenance state immediately.", node);
node.setInMaintenance(); node.setInMaintenance();
} else if (node.isDecommissioned()) { } else {
stats.subtract(node);
if (node.isDecommissioned()) {
LOG.info("Decommissioned node " + node + " is put in maintenance state" LOG.info("Decommissioned node " + node + " is put in maintenance state"
+ " immediately."); + " immediately.");
node.setInMaintenance(); node.setInMaintenance();
} else if (blockManager.getMinReplicationToBeInMaintenance() == 0) {
LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node +
" is put in maintenance state" + " immediately.");
node.setInMaintenance();
} else { } else {
stats.subtract(node);
node.startMaintenance(); node.startMaintenance();
}
stats.add(node); stats.add(node);
} }
} }
@ -352,7 +355,7 @@ class HeartbeatManager implements DatanodeStatistics {
boolean allAlive = false; boolean allAlive = false;
while (!allAlive) { while (!allAlive) {
// locate the first dead node. // locate the first dead node.
DatanodeID dead = null; DatanodeDescriptor dead = null;
// locate the first failed storage that isn't on a dead node. // locate the first failed storage that isn't on a dead node.
DatanodeStorageInfo failedStorage = null; DatanodeStorageInfo failedStorage = null;
@ -401,7 +404,7 @@ class HeartbeatManager implements DatanodeStatistics {
// acquire the fsnamesystem lock, and then remove the dead node. // acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock(); namesystem.writeLock();
try { try {
dm.removeDeadDatanode(dead); dm.removeDeadDatanode(dead, !dead.isMaintenance());
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }

View File

@ -32,18 +32,29 @@ public class NumberReplicas {
private int corruptReplicas; private int corruptReplicas;
private int excessReplicas; private int excessReplicas;
private int replicasOnStaleNodes; private int replicasOnStaleNodes;
// We need live ENTERING_MAINTENANCE nodes to continue
// to serve read request while it is being transitioned to live
// IN_MAINTENANCE if these are the only replicas left.
// maintenanceNotForRead == maintenanceReplicas -
// Live ENTERING_MAINTENANCE.
private int maintenanceNotForRead;
// Live ENTERING_MAINTENANCE nodes to serve read requests.
private int maintenanceForRead;
NumberReplicas() { NumberReplicas() {
this(0, 0, 0, 0, 0, 0, 0); this(0, 0, 0, 0, 0, 0, 0, 0, 0);
} }
NumberReplicas(int live, int readonly, int decommissioned, NumberReplicas(int live, int readonly, int decommissioned,
int decommissioning, int corrupt, int excess, int stale) { int decommissioning, int corrupt, int excess, int stale,
set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale); int maintenanceNotForRead, int maintenanceForRead) {
set(live, readonly, decommissioned, decommissioning, corrupt,
excess, stale, maintenanceNotForRead, maintenanceForRead);
} }
void set(int live, int readonly, int decommissioned, int decommissioning, void set(int live, int readonly, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) { int corrupt, int excess, int stale, int maintenanceNotForRead,
int maintenanceForRead) {
liveReplicas = live; liveReplicas = live;
readOnlyReplicas = readonly; readOnlyReplicas = readonly;
this.decommissioning = decommissioning; this.decommissioning = decommissioning;
@ -51,6 +62,8 @@ public class NumberReplicas {
corruptReplicas = corrupt; corruptReplicas = corrupt;
excessReplicas = excess; excessReplicas = excess;
replicasOnStaleNodes = stale; replicasOnStaleNodes = stale;
this.maintenanceNotForRead = maintenanceNotForRead;
this.maintenanceForRead = maintenanceForRead;
} }
public int liveReplicas() { public int liveReplicas() {
@ -112,4 +125,20 @@ public class NumberReplicas {
public int replicasOnStaleNodes() { public int replicasOnStaleNodes() {
return replicasOnStaleNodes; return replicasOnStaleNodes;
} }
public int maintenanceNotForReadReplicas() {
return maintenanceNotForRead;
}
public int maintenanceReplicas() {
return maintenanceNotForRead + maintenanceForRead;
}
public int outOfServiceReplicas() {
return maintenanceReplicas() + decommissionedAndDecommissioning();
}
public int liveEnteringMaintenanceReplicas() {
return maintenanceForRead;
}
} }

View File

@ -81,7 +81,7 @@ public class StorageTypeStats {
final DatanodeDescriptor node) { final DatanodeDescriptor node) {
capacityUsed += info.getDfsUsed(); capacityUsed += info.getDfsUsed();
blockPoolUsed += info.getBlockPoolUsed(); blockPoolUsed += info.getBlockPoolUsed();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (node.isInService()) {
capacityTotal += info.getCapacity(); capacityTotal += info.getCapacity();
capacityRemaining += info.getRemaining(); capacityRemaining += info.getRemaining();
} else { } else {
@ -90,7 +90,7 @@ public class StorageTypeStats {
} }
void addNode(final DatanodeDescriptor node) { void addNode(final DatanodeDescriptor node) {
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (node.isInService()) {
nodesInService++; nodesInService++;
} }
} }
@ -99,7 +99,7 @@ public class StorageTypeStats {
final DatanodeDescriptor node) { final DatanodeDescriptor node) {
capacityUsed -= info.getDfsUsed(); capacityUsed -= info.getDfsUsed();
blockPoolUsed -= info.getBlockPoolUsed(); blockPoolUsed -= info.getBlockPoolUsed();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (node.isInService()) {
capacityTotal -= info.getCapacity(); capacityTotal -= info.getCapacity();
capacityRemaining -= info.getRemaining(); capacityRemaining -= info.getRemaining();
} else { } else {
@ -108,7 +108,7 @@ public class StorageTypeStats {
} }
void subtractNode(final DatanodeDescriptor node) { void subtractNode(final DatanodeDescriptor node) {
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { if (node.isInService()) {
nodesInService--; nodesInService--;
} }
} }

View File

@ -5436,11 +5436,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
.<String, Object> builder() .<String, Object> builder()
.put("xferaddr", node.getXferAddr()) .put("xferaddr", node.getXferAddr())
.put("underReplicatedBlocks", .put("underReplicatedBlocks",
node.decommissioningStatus.getUnderReplicatedBlocks()) node.getLeavingServiceStatus().getUnderReplicatedBlocks())
// TODO use another property name for outOfServiceOnlyReplicas.
.put("decommissionOnlyReplicas", .put("decommissionOnlyReplicas",
node.decommissioningStatus.getDecommissionOnlyReplicas()) node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas())
.put("underReplicateInOpenFiles", .put("underReplicateInOpenFiles",
node.decommissioningStatus.getUnderReplicatedInOpenFiles()) node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles())
.build(); .build();
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
} }
@ -5502,7 +5503,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getDatanodeManager().fetchDatanodes(live, null, true); blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) { for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next(); DatanodeDescriptor node = it.next();
if (node.isDecommissionInProgress() || node.isDecommissioned()) { if (!node.isInService()) {
it.remove(); it.remove();
} }
} }

View File

@ -535,6 +535,13 @@
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.maintenance.replication.min</name>
<value>1</value>
<description>Minimal live block replication in existence of maintenance mode.
</description>
</property>
<property> <property>
<name>dfs.namenode.safemode.replication.min</name> <name>dfs.namenode.safemode.replication.min</name>
<value></value> <value></value>

View File

@ -102,6 +102,7 @@ public class AdminStatesBaseTest {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
hostsFileWriter.initialize(conf, "temp/admin"); hostsFileWriter.initialize(conf, "temp/admin");
} }
@After @After
@ -110,17 +111,22 @@ public class AdminStatesBaseTest {
shutdownCluster(); shutdownCluster();
} }
protected void writeFile(FileSystem fileSys, Path name, int repl) static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys,
Path name, short repl, short numOfBlocks) throws IOException {
return writeFile(fileSys, name, repl, numOfBlocks, false);
}
static protected void writeFile(FileSystem fileSys, Path name, int repl)
throws IOException { throws IOException {
writeFile(fileSys, name, repl, 2); writeFile(fileSys, name, repl, 2);
} }
protected void writeFile(FileSystem fileSys, Path name, int repl, static protected void writeFile(FileSystem fileSys, Path name, int repl,
int numOfBlocks) throws IOException { int numOfBlocks) throws IOException {
writeFile(fileSys, name, repl, numOfBlocks, true); writeFile(fileSys, name, repl, numOfBlocks, true);
} }
protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
int repl, int numOfBlocks, boolean completeFile) int repl, int numOfBlocks, boolean completeFile)
throws IOException { throws IOException {
// create and write a file that contains two blocks of data // create and write a file that contains two blocks of data
@ -136,6 +142,7 @@ public class AdminStatesBaseTest {
stm.close(); stm.close();
return null; return null;
} else { } else {
stm.flush();
// Do not close stream, return it // Do not close stream, return it
// so that it is not garbage collected // so that it is not garbage collected
return stm; return stm;
@ -353,7 +360,7 @@ public class AdminStatesBaseTest {
protected void shutdownCluster() { protected void shutdownCluster() {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown(true);
} }
} }
@ -362,12 +369,13 @@ public class AdminStatesBaseTest {
refreshNodes(conf); refreshNodes(conf);
} }
protected DatanodeDescriptor getDatanodeDesriptor( static private DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) { final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
} }
protected void cleanupFile(FileSystem fileSys, Path name) throws IOException { static public void cleanupFile(FileSystem fileSys, Path name)
throws IOException {
assertTrue(fileSys.exists(name)); assertTrue(fileSys.exists(name));
fileSys.delete(name, true); fileSys.delete(name, true);
assertTrue(!fileSys.exists(name)); assertTrue(!fileSys.exists(name));

View File

@ -18,13 +18,19 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.slf4j.Logger;
import org.apache.commons.logging.LogFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@ -32,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
@ -40,13 +48,23 @@ import org.junit.Test;
* This class tests node maintenance. * This class tests node maintenance.
*/ */
public class TestMaintenanceState extends AdminStatesBaseTest { public class TestMaintenanceState extends AdminStatesBaseTest {
public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class); public static final Logger LOG =
static private final long EXPIRATION_IN_MS = 500; LoggerFactory.getLogger(TestMaintenanceState.class);
static private final long EXPIRATION_IN_MS = 50;
private int minMaintenanceR =
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT;
public TestMaintenanceState() { public TestMaintenanceState() {
setUseCombinedHostFileManager(); setUseCombinedHostFileManager();
} }
void setMinMaintenanceR(int minMaintenanceR) {
this.minMaintenanceR = minMaintenanceR;
getConf().setInt(
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
minMaintenanceR);
}
/** /**
* Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to * Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to
* AdminStates.NORMAL. * AdminStates.NORMAL.
@ -55,21 +73,25 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
public void testTakeNodeOutOfEnteringMaintenance() throws Exception { public void testTakeNodeOutOfEnteringMaintenance() throws Exception {
LOG.info("Starting testTakeNodeOutOfEnteringMaintenance"); LOG.info("Starting testTakeNodeOutOfEnteringMaintenance");
final int replicas = 1; final int replicas = 1;
final int numNamenodes = 1; final Path file = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 1);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1); final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE);
// When node is in ENTERING_MAINTENANCE state, it can still serve read
// requests
assertNull(checkWithRetry(ns, fileSys, file, replicas, null,
nodeOutofService));
putNodeInService(0, nodeOutofService.getDatanodeUuid()); putNodeInService(0, nodeOutofService.getDatanodeUuid());
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -80,23 +102,21 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
public void testEnteringMaintenanceExpiration() throws Exception { public void testEnteringMaintenanceExpiration() throws Exception {
LOG.info("Starting testEnteringMaintenanceExpiration"); LOG.info("Starting testEnteringMaintenanceExpiration");
final int replicas = 1; final int replicas = 1;
final int numNamenodes = 1; final Path file = new Path("/testEnteringMaintenanceExpiration.dat");
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 1);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, replicas, 1);
// expires in 500 milliseconds final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE);
Time.monotonicNow() + EXPIRATION_IN_MS, null,
AdminStates.ENTERING_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL); // Adjust the expiration.
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -106,20 +126,18 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
public void testInvalidExpiration() throws Exception { public void testInvalidExpiration() throws Exception {
LOG.info("Starting testInvalidExpiration"); LOG.info("Starting testInvalidExpiration");
final int replicas = 1; final int replicas = 1;
final int numNamenodes = 1; final Path file = new Path("/testInvalidExpiration.dat");
final int numDatanodes = 1;
final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 1);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, replicas, 1);
// expiration has to be greater than Time.monotonicNow(). // expiration has to be greater than Time.monotonicNow().
takeNodeOutofService(0, null, Time.monotonicNow(), null, takeNodeOutofService(0, null, Time.monotonicNow(), null,
AdminStates.NORMAL); AdminStates.NORMAL);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -129,18 +147,17 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
@Test(timeout = 360000) @Test(timeout = 360000)
public void testPutDeadNodeToMaintenance() throws Exception { public void testPutDeadNodeToMaintenance() throws Exception {
LOG.info("Starting testPutDeadNodeToMaintenance"); LOG.info("Starting testPutDeadNodeToMaintenance");
final int numNamenodes = 1;
final int numDatanodes = 1;
final int replicas = 1; final int replicas = 1;
final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); final Path file = new Path("/testPutDeadNodeToMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 1);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0); final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, replicas, 1);
MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); final MiniDFSCluster.DataNodeProperties dnProp =
getCluster().stopDataNode(0);
DFSTestUtil.waitForDatanodeState( DFSTestUtil.waitForDatanodeState(
getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000);
@ -153,7 +170,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes()); assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes());
assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -164,16 +181,14 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
@Test(timeout = 360000) @Test(timeout = 360000)
public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception {
LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration"); LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration");
final int numNamenodes = 1; final Path file =
final int numDatanodes = 1; new Path("/testPutDeadNodeToMaintenanceWithExpiration.dat");
final int replicas = 1;
final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 1);
FileSystem fileSys = getCluster().getFileSystem(0); FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0); FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, 1, 1);
MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0);
DFSTestUtil.waitForDatanodeState( DFSTestUtil.waitForDatanodeState(
@ -184,16 +199,17 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
DatanodeInfo nodeOutofService = takeNodeOutofService(0, DatanodeInfo nodeOutofService = takeNodeOutofService(0,
dnProp.datanode.getDatanodeUuid(), dnProp.datanode.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
AdminStates.IN_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL); // Adjust the expiration.
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
// no change // no change
assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes()); assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes());
assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -202,15 +218,12 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
@Test(timeout = 360000) @Test(timeout = 360000)
public void testTransitionFromDecommissioned() throws IOException { public void testTransitionFromDecommissioned() throws IOException {
LOG.info("Starting testTransitionFromDecommissioned"); LOG.info("Starting testTransitionFromDecommissioned");
final int numNamenodes = 1; final Path file = new Path("/testTransitionFromDecommissioned.dat");
final int numDatanodes = 4;
final int replicas = 3;
final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 4);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, 3, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null,
AdminStates.DECOMMISSIONED); AdminStates.DECOMMISSIONED);
@ -218,7 +231,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE, takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE,
null, AdminStates.IN_MAINTENANCE); null, AdminStates.IN_MAINTENANCE);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
@ -228,34 +241,33 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
@Test(timeout = 360000) @Test(timeout = 360000)
public void testTransitionFromDecommissionedAndExpired() throws IOException { public void testTransitionFromDecommissionedAndExpired() throws IOException {
LOG.info("Starting testTransitionFromDecommissionedAndExpired"); LOG.info("Starting testTransitionFromDecommissionedAndExpired");
final int numNamenodes = 1; final Path file =
final int numDatanodes = 4; new Path("/testTransitionFromDecommissionedAndExpired.dat");
final int replicas = 3;
final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
startCluster(numNamenodes, numDatanodes); startCluster(1, 4);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, 3, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0,
AdminStates.DECOMMISSIONED); null, AdminStates.DECOMMISSIONED);
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
AdminStates.IN_MAINTENANCE);
waitNodeState(nodeOutofService, AdminStates.NORMAL); // Adjust the expiration.
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
/** /**
* When a node is put to maintenance, it first transitions to * When a node is put to maintenance, it first transitions to
* AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal
* replication before it can be transitioned to AdminStates.IN_MAINTENANCE. * replication before it can be transitioned to AdminStates.IN_MAINTENANCE.
* If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, it
* state should stay in AdminStates.ENTERING_MAINTENANCE state. * should stay in AdminStates.ENTERING_MAINTENANCE state.
*/ */
@Test(timeout = 360000) @Test(timeout = 360000)
public void testNodeDeadWhenInEnteringMaintenance() throws Exception { public void testNodeDeadWhenInEnteringMaintenance() throws Exception {
@ -263,16 +275,16 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
final int numNamenodes = 1; final int numNamenodes = 1;
final int numDatanodes = 1; final int numDatanodes = 1;
final int replicas = 1; final int replicas = 1;
final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat"); final Path file = new Path("/testNodeDeadWhenInEnteringMaintenance.dat");
startCluster(numNamenodes, numDatanodes); startCluster(numNamenodes, numDatanodes);
FileSystem fileSys = getCluster().getFileSystem(0); final FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0); final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file1, replicas, 1); writeFile(fileSys, file, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null, getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
AdminStates.ENTERING_MAINTENANCE); AdminStates.ENTERING_MAINTENANCE);
assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
@ -281,30 +293,627 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
DFSTestUtil.waitForDatanodeState( DFSTestUtil.waitForDatanodeState(
getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
DFSClient client = getDfsClient(0); DFSClient client = getDfsClient(0);
assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, assertEquals("maintenance node shouldn't be live", numDatanodes - 1,
client.datanodeReport(DatanodeReportType.LIVE).length); client.datanodeReport(DatanodeReportType.LIVE).length);
assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
getCluster().restartDataNode(dnProp, true); getCluster().restartDataNode(dnProp, true);
getCluster().waitActive(); getCluster().waitActive();
waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE); waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE);
assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
assertEquals("maintenance node should be live", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file);
} }
static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys, /**
* When a node is put to maintenance, it first transitions to
* AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have
* been properly replicated before it can be transitioned to
* AdminStates.IN_MAINTENANCE. The expected replication count takes
* DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY and
* its file's replication factor into account.
*/
@Test(timeout = 360000)
public void testExpectedReplications() throws IOException {
LOG.info("Starting testExpectedReplications");
testExpectedReplication(1);
testExpectedReplication(2);
testExpectedReplication(3);
testExpectedReplication(4);
}
private void testExpectedReplication(int replicationFactor)
throws IOException {
testExpectedReplication(replicationFactor,
Math.max(replicationFactor - 1, this.minMaintenanceR));
}
private void testExpectedReplication(int replicationFactor,
int expectedReplicasInRead) throws IOException {
startCluster(1, 5);
final Path file = new Path("/testExpectedReplication.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicationFactor, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE,
null, AdminStates.IN_MAINTENANCE);
// The block should be replicated to another datanode to meet
// expected replication count.
assertNull(checkWithRetry(ns, fileSys, file, expectedReplicasInRead,
nodeOutofService));
cleanupFile(fileSys, file);
}
/**
* Verify a node can transition directly to AdminStates.IN_MAINTENANCE when
* DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero.
*/
@Test(timeout = 360000)
public void testZeroMinMaintenanceReplication() throws Exception {
LOG.info("Starting testZeroMinMaintenanceReplication");
setMinMaintenanceR(0);
startCluster(1, 1);
final Path file = new Path("/testZeroMinMaintenanceReplication.dat");
final int replicas = 1;
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file, replicas, 1);
takeNodeOutofService(0, null, Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
cleanupFile(fileSys, file);
}
/**
* Verify a node can transition directly to AdminStates.IN_MAINTENANCE when
* DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero. Then later
* transition to NORMAL after maintenance expiration.
*/
@Test(timeout = 360000)
public void testZeroMinMaintenanceReplicationWithExpiration()
throws Exception {
LOG.info("Starting testZeroMinMaintenanceReplicationWithExpiration");
setMinMaintenanceR(0);
startCluster(1, 1);
final Path file =
new Path("/testZeroMinMaintenanceReplicationWithExpiration.dat");
FileSystem fileSys = getCluster().getFileSystem(0);
writeFile(fileSys, file, 1, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
// Adjust the expiration.
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
cleanupFile(fileSys, file);
}
/**
* Transition from IN_MAINTENANCE to DECOMMISSIONED.
*/
@Test(timeout = 360000)
public void testTransitionToDecommission() throws IOException {
LOG.info("Starting testTransitionToDecommission");
final int numNamenodes = 1;
final int numDatanodes = 4;
startCluster(numNamenodes, numDatanodes);
final Path file = new Path("testTransitionToDecommission.dat");
final int replicas = 3;
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
DFSClient client = getDfsClient(0);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
// test 1, verify the replica in IN_MAINTENANCE state isn't in LocatedBlock
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), 0, null,
AdminStates.DECOMMISSIONED);
// test 2 after decommission has completed, the replication count is
// replicas + 1 which includes the decommissioned node.
assertNull(checkWithRetry(ns, fileSys, file, replicas + 1, null));
// test 3, put the node in service, replication count should restore.
putNodeInService(0, nodeOutofService.getDatanodeUuid());
assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
cleanupFile(fileSys, file);
}
/**
* Transition from decommissioning state to maintenance state.
*/
@Test(timeout = 360000)
public void testTransitionFromDecommissioning() throws IOException {
LOG.info("Starting testTransitionFromDecommissioning");
startCluster(1, 3);
final Path file = new Path("/testTransitionFromDecommissioning.dat");
final int replicas = 3;
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas);
final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0,
null, AdminStates.DECOMMISSION_INPROGRESS);
takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE,
null, AdminStates.IN_MAINTENANCE);
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
cleanupFile(fileSys, file);
}
/**
* First put a node in maintenance, then put a different node
* in decommission. Make sure decommission process take
* maintenance replica into account.
*/
@Test(timeout = 360000)
public void testDecommissionDifferentNodeAfterMaintenances()
throws Exception {
testDecommissionDifferentNodeAfterMaintenance(2);
testDecommissionDifferentNodeAfterMaintenance(3);
testDecommissionDifferentNodeAfterMaintenance(4);
}
private void testDecommissionDifferentNodeAfterMaintenance(int repl)
throws Exception {
startCluster(1, 5);
final Path file =
new Path("/testDecommissionDifferentNodeAfterMaintenance.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, repl, 1);
final DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys,
file);
String maintenanceDNUuid = nodes[0].getDatanodeUuid();
String decommissionDNUuid = nodes[1].getDatanodeUuid();
DatanodeInfo maintenanceDN = takeNodeOutofService(0, maintenanceDNUuid,
Long.MAX_VALUE, null, null, AdminStates.IN_MAINTENANCE);
Map<DatanodeInfo, Long> maintenanceNodes = new HashMap<>();
maintenanceNodes.put(nodes[0], Long.MAX_VALUE);
takeNodeOutofService(0, decommissionDNUuid, 0, null, maintenanceNodes,
AdminStates.DECOMMISSIONED);
// Out of the replicas returned, one is the decommissioned node.
assertNull(checkWithRetry(ns, fileSys, file, repl, maintenanceDN));
putNodeInService(0, maintenanceDN);
assertNull(checkWithRetry(ns, fileSys, file, repl + 1, null));
cleanupFile(fileSys, file);
}
@Test(timeout = 360000)
public void testChangeReplicationFactors() throws IOException {
// Prior to any change, there is 1 maintenance node and 2 live nodes.
// Replication factor is adjusted from 3 to 4.
// After the change, given 1 maintenance + 2 live is less than the
// newFactor, one live nodes will be added.
testChangeReplicationFactor(3, 4, 3);
// Replication factor is adjusted from 3 to 2.
// After the change, given 2 live nodes is the same as the newFactor,
// no live nodes will be invalidated.
testChangeReplicationFactor(3, 2, 2);
// Replication factor is adjusted from 3 to 1.
// After the change, given 2 live nodes is greater than the newFactor,
// one live nodes will be invalidated.
testChangeReplicationFactor(3, 1, 1);
}
/**
* After the change of replication factor, # of live replicas <=
* the new replication factor.
*/
private void testChangeReplicationFactor(int oldFactor, int newFactor,
int expectedLiveReplicas) throws IOException {
LOG.info("Starting testChangeReplicationFactor {} {} {}",
oldFactor, newFactor, expectedLiveReplicas);
startCluster(1, 5);
final Path file = new Path("/testChangeReplicationFactor.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, oldFactor, 1);
final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
// Verify that the nodeOutofService remains in blocksMap and
// # of live replicas For read operation is expected.
assertNull(checkWithRetry(ns, fileSys, file, oldFactor - 1,
nodeOutofService));
final DFSClient client = getDfsClient(0);
client.setReplication(file.toString(), (short)newFactor);
// Verify that the nodeOutofService remains in blocksMap and
// # of live replicas for read operation.
assertNull(checkWithRetry(ns, fileSys, file, expectedLiveReplicas,
nodeOutofService));
putNodeInService(0, nodeOutofService.getDatanodeUuid());
assertNull(checkWithRetry(ns, fileSys, file, newFactor, null));
cleanupFile(fileSys, file);
}
/**
* Verify the following scenario.
* a. Put a live node to maintenance => 1 maintenance, 2 live.
* b. The maintenance node becomes dead => block map still has 1 maintenance,
* 2 live.
* c. Take the node out of maintenance => NN should schedule the replication
* and end up with 3 live.
*/
@Test(timeout = 360000)
public void testTakeDeadNodeOutOfMaintenance() throws Exception {
LOG.info("Starting testTakeDeadNodeOutOfMaintenance");
final int numNamenodes = 1;
final int numDatanodes = 4;
startCluster(numNamenodes, numDatanodes);
final Path file = new Path("/testTakeDeadNodeOutOfMaintenance.dat");
final int replicas = 3;
final FileSystem fileSys = getCluster().getFileSystem(0);
final FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas, 1);
final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
final DFSClient client = getDfsClient(0);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
getCluster().stopDataNode(nodeOutofService.getXferAddr());
DFSTestUtil.waitForDatanodeState(
getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
client.datanodeReport(DatanodeReportType.LIVE).length);
// Dead maintenance node's blocks should remain in block map.
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
// When dead maintenance mode is transitioned to out of maintenance mode,
// its blocks should be removed from block map.
// This will then trigger replication to restore the live replicas back
// to replication factor.
putNodeInService(0, nodeOutofService.getDatanodeUuid());
assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService,
null));
cleanupFile(fileSys, file);
}
/**
* Verify the following scenario.
* a. Put a live node to maintenance => 1 maintenance, 2 live.
* b. The maintenance node becomes dead => block map still has 1 maintenance,
* 2 live.
* c. Restart nn => block map only has 2 live => restore the 3 live.
* d. Restart the maintenance dn => 1 maintenance, 3 live.
* e. Take the node out of maintenance => over replication => 3 live.
*/
@Test(timeout = 360000)
public void testWithNNAndDNRestart() throws Exception {
LOG.info("Starting testWithNNAndDNRestart");
final int numNamenodes = 1;
final int numDatanodes = 4;
startCluster(numNamenodes, numDatanodes);
final Path file = new Path("/testWithNNAndDNRestart.dat");
final int replicas = 3;
final FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas, 1);
DatanodeInfo nodeOutofService = takeNodeOutofService(0,
getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
DFSClient client = getDfsClient(0);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
MiniDFSCluster.DataNodeProperties dnProp =
getCluster().stopDataNode(nodeOutofService.getXferAddr());
DFSTestUtil.waitForDatanodeState(
getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
client.datanodeReport(DatanodeReportType.LIVE).length);
// Dead maintenance node's blocks should remain in block map.
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService));
// restart nn, nn will restore 3 live replicas given it doesn't
// know the maintenance node has the replica.
getCluster().restartNameNode(0);
ns = getCluster().getNamesystem(0);
assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
// restart dn, nn has 1 maintenance replica and 3 live replicas.
getCluster().restartDataNode(dnProp, true);
getCluster().waitActive();
assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService));
// Put the node in service, a redundant replica should be removed.
putNodeInService(0, nodeOutofService.getDatanodeUuid());
assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
cleanupFile(fileSys, file);
}
/**
* Machine under maintenance state won't be chosen for new block allocation.
*/
@Test(timeout = 3600000)
public void testWriteAfterMaintenance() throws IOException {
LOG.info("Starting testWriteAfterMaintenance");
startCluster(1, 3);
final Path file = new Path("/testWriteAfterMaintenance.dat");
int replicas = 3;
final FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
writeFile(fileSys, file, replicas, 2);
// Verify nodeOutofService wasn't chosen for write operation.
assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
nodeOutofService, null));
// Put the node back to service, live replicas should be restored.
putNodeInService(0, nodeOutofService.getDatanodeUuid());
assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
cleanupFile(fileSys, file);
}
/**
* A node has blocks under construction when it is put to maintenance.
* Given there are minReplication replicas somewhere else,
* it can be transitioned to AdminStates.IN_MAINTENANCE.
*/
@Test(timeout = 360000)
public void testEnterMaintenanceWhenFileOpen() throws Exception {
LOG.info("Starting testEnterMaintenanceWhenFileOpen");
startCluster(1, 3);
final Path file = new Path("/testEnterMaintenanceWhenFileOpen.dat");
final FileSystem fileSys = getCluster().getFileSystem(0);
writeIncompleteFile(fileSys, file, (short)3, (short)2);
takeNodeOutofService(0, null, Long.MAX_VALUE, null,
AdminStates.IN_MAINTENANCE);
cleanupFile(fileSys, file);
}
/**
* Machine under maintenance state won't be chosen for invalidation.
*/
@Test(timeout = 360000)
public void testInvalidation() throws IOException {
LOG.info("Starting testInvalidation");
int numNamenodes = 1;
int numDatanodes = 3;
startCluster(numNamenodes, numDatanodes);
Path file = new Path("/testInvalidation.dat");
int replicas = 3;
FileSystem fileSys = getCluster().getFileSystem(0);
FSNamesystem ns = getCluster().getNamesystem(0);
writeFile(fileSys, file, replicas);
DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
DFSClient client = getDfsClient(0);
client.setReplication(file.toString(), (short) 1);
// Verify the nodeOutofService remains in blocksMap.
assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService));
// Restart NN and verify the nodeOutofService remains in blocksMap.
getCluster().restartNameNode(0);
ns = getCluster().getNamesystem(0);
assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService));
cleanupFile(fileSys, file);
}
static String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
Path name) throws IOException { Path name) throws IOException {
DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name);
if (nodes != null && nodes.length != 0) {
return nodes[0].getDatanodeUuid();
} else {
return null;
}
}
/*
* Verify that the number of replicas are as expected for each block in
* the given file.
*
* @return - null if no failure found, else an error message string.
*/
static String checkFile(FSNamesystem ns, FileSystem fileSys,
Path name, int repl, DatanodeInfo expectedExcludedNode,
DatanodeInfo expectedMaintenanceNode) throws IOException {
// need a raw stream
assertTrue("Not HDFS:"+fileSys.getUri(),
fileSys instanceof DistributedFileSystem);
HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
BlockManager bm = ns.getBlockManager();
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
String output;
for (LocatedBlock blk : dinfo) { // for each block
DatanodeInfo[] nodes = blk.getLocations();
for (int j = 0; j < nodes.length; j++) { // for each replica
if (expectedExcludedNode != null &&
nodes[j].equals(expectedExcludedNode)) {
//excluded node must not be in LocatedBlock.
output = "For block " + blk.getBlock() + " replica on " +
nodes[j] + " found in LocatedBlock.";
LOG.info(output);
return output;
} else {
if (nodes[j].isInMaintenance()) {
//IN_MAINTENANCE node must not be in LocatedBlock.
output = "For block " + blk.getBlock() + " replica on " +
nodes[j] + " which is in maintenance state.";
LOG.info(output);
return output;
}
}
}
if (repl != nodes.length) {
output = "Wrong number of replicas for block " + blk.getBlock() +
": expected " + repl + ", got " + nodes.length + " ,";
for (int j = 0; j < nodes.length; j++) { // for each replica
output += nodes[j] + ",";
}
output += "pending block # " + ns.getPendingReplicationBlocks() + " ,";
output += "under replicated # " + ns.getUnderReplicatedBlocks() + " ,";
if (expectedExcludedNode != null) {
output += "excluded node " + expectedExcludedNode;
}
LOG.info(output);
return output;
}
// Verify it has the expected maintenance node
Iterator<DatanodeStorageInfo> storageInfoIter =
bm.getStorages(blk.getBlock().getLocalBlock()).iterator();
List<DatanodeInfo> maintenanceNodes = new ArrayList<>();
while (storageInfoIter.hasNext()) {
DatanodeInfo node = storageInfoIter.next().getDatanodeDescriptor();
if (node.isMaintenance()) {
maintenanceNodes.add(node);
}
}
if (expectedMaintenanceNode != null) {
if (!maintenanceNodes.contains(expectedMaintenanceNode)) {
output = "No maintenance replica on " + expectedMaintenanceNode;
LOG.info(output);
return output;
}
} else {
if (maintenanceNodes.size() != 0) {
output = "Has maintenance replica(s)";
LOG.info(output);
return output;
}
}
}
return null;
}
static String checkWithRetry(FSNamesystem ns, FileSystem fileSys,
Path name, int repl, DatanodeInfo inMaintenanceNode)
throws IOException {
return checkWithRetry(ns, fileSys, name, repl, inMaintenanceNode,
inMaintenanceNode);
}
static String checkWithRetry(FSNamesystem ns, FileSystem fileSys,
Path name, int repl, DatanodeInfo excludedNode,
DatanodeInfo underMaintenanceNode) throws IOException {
int tries = 0;
String output = null;
while (tries++ < 200) {
try {
Thread.sleep(100);
output = checkFile(ns, fileSys, name, repl, excludedNode,
underMaintenanceNode);
if (output == null) {
break;
}
} catch (InterruptedException ie) {
}
}
return output;
}
static private DatanodeInfo[] getFirstBlockReplicasDatanodeInfos(
FileSystem fileSys, Path name) throws IOException {
// need a raw stream // need a raw stream
assertTrue("Not HDFS:"+fileSys.getUri(), assertTrue("Not HDFS:"+fileSys.getUri(),
fileSys instanceof DistributedFileSystem); fileSys instanceof DistributedFileSystem);
HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name); HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
Collection<LocatedBlock> dinfo = dis.getAllBlocks(); Collection<LocatedBlock> dinfo = dis.getAllBlocks();
for (LocatedBlock blk : dinfo) { // for each block if (dinfo.iterator().hasNext()) { // for the first block
DatanodeInfo[] nodes = blk.getLocations(); return dinfo.iterator().next().getLocations();
if (nodes.length > 0) { } else {
return nodes[0].getDatanodeUuid();
}
}
return null; return null;
} }
} }
}

View File

@ -414,7 +414,7 @@ public class TestBlockManager {
throws Exception { throws Exception {
assertEquals(0, bm.numOfUnderReplicatedBlocks()); assertEquals(0, bm.numOfUnderReplicatedBlocks());
BlockInfo block = addBlockOnNodes(testIndex, origNodes); BlockInfo block = addBlockOnNodes(testIndex, origNodes);
assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block))); assertFalse(bm.isNeededReplication(block, bm.countNodes(block)));
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -458,7 +458,7 @@ public class TestBlockManager {
namenode.updatePipeline(clientName, oldBlock, newBlock, namenode.updatePipeline(clientName, oldBlock, newBlock,
oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs()); oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs());
BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock()); BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi))); assertFalse(bm.isNeededReplication(bi, bm.countNodes(bi)));
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.AdminStatesBaseTest;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
@ -124,24 +125,6 @@ public class TestDecommissioningStatus {
stm.close(); stm.close();
} }
private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name,
short repl) throws IOException {
// create and write a file that contains three blocks of data
FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), repl,
blockSize);
byte[] buffer = new byte[fileSize];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
// need to make sure that we actually write out both file blocks
// (see FSOutputSummer#flush)
stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;
}
static private void cleanupFile(FileSystem fileSys, Path name) static private void cleanupFile(FileSystem fileSys, Path name)
throws IOException { throws IOException {
assertTrue(fileSys.exists(name)); assertTrue(fileSys.exists(name));
@ -152,19 +135,19 @@ public class TestDecommissioningStatus {
/* /*
* Decommissions the node at the given index * Decommissions the node at the given index
*/ */
private String decommissionNode(FSNamesystem namesystem, DFSClient client, private String decommissionNode(DFSClient client,
int nodeIndex) throws IOException { int nodeIndex) throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
String nodename = info[nodeIndex].getXferAddr(); String nodename = info[nodeIndex].getXferAddr();
decommissionNode(namesystem, nodename); decommissionNode(nodename);
return nodename; return nodename;
} }
/* /*
* Decommissions the node by name * Decommissions the node by name
*/ */
private void decommissionNode(FSNamesystem namesystem, String dnName) private void decommissionNode(String dnName)
throws IOException { throws IOException {
System.out.println("Decommissioning node: " + dnName); System.out.println("Decommissioning node: " + dnName);
@ -179,14 +162,14 @@ public class TestDecommissioningStatus {
int expectedUnderRepInOpenFiles) { int expectedUnderRepInOpenFiles) {
assertEquals("Unexpected num under-replicated blocks", assertEquals("Unexpected num under-replicated blocks",
expectedUnderRep, expectedUnderRep,
decommNode.decommissioningStatus.getUnderReplicatedBlocks()); decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
assertEquals("Unexpected number of decom-only replicas", assertEquals("Unexpected number of decom-only replicas",
expectedDecommissionOnly, expectedDecommissionOnly,
decommNode.decommissioningStatus.getDecommissionOnlyReplicas()); decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
assertEquals( assertEquals(
"Unexpected number of replicas in under-replicated open files", "Unexpected number of replicas in under-replicated open files",
expectedUnderRepInOpenFiles, expectedUnderRepInOpenFiles,
decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles()); decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
} }
private void checkDFSAdminDecommissionStatus( private void checkDFSAdminDecommissionStatus(
@ -255,7 +238,8 @@ public class TestDecommissioningStatus {
writeFile(fileSys, file1, replicas); writeFile(fileSys, file1, replicas);
Path file2 = new Path("decommission1.dat"); Path file2 = new Path("decommission1.dat");
FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas); FSDataOutputStream st1 = AdminStatesBaseTest.writeIncompleteFile(fileSys,
file2, replicas, (short)(fileSize / blockSize));
for (DataNode d: cluster.getDataNodes()) { for (DataNode d: cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(d); DataNodeTestUtils.triggerBlockReport(d);
} }
@ -263,7 +247,7 @@ public class TestDecommissioningStatus {
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
for (int iteration = 0; iteration < numDatanodes; iteration++) { for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(fsn, client, iteration); String downnode = decommissionNode(client, iteration);
dm.refreshNodes(conf); dm.refreshNodes(conf);
decommissionedNodes.add(downnode); decommissionedNodes.add(downnode);
BlockManagerTestUtil.recheckDecommissionState(dm); BlockManagerTestUtil.recheckDecommissionState(dm);
@ -293,8 +277,8 @@ public class TestDecommissioningStatus {
hostsFileWriter.initExcludeHost(""); hostsFileWriter.initExcludeHost("");
dm.refreshNodes(conf); dm.refreshNodes(conf);
st1.close(); st1.close();
cleanupFile(fileSys, file1); AdminStatesBaseTest.cleanupFile(fileSys, file1);
cleanupFile(fileSys, file2); AdminStatesBaseTest.cleanupFile(fileSys, file2);
} }
/** /**
@ -320,7 +304,7 @@ public class TestDecommissioningStatus {
// Decommission the DN. // Decommission the DN.
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
decommissionNode(fsn, dnName); decommissionNode(dnName);
dm.refreshNodes(conf); dm.refreshNodes(conf);
// Stop the DN when decommission is in progress. // Stop the DN when decommission is in progress.
@ -355,7 +339,7 @@ public class TestDecommissioningStatus {
// Delete the under-replicated file, which should let the // Delete the under-replicated file, which should let the
// DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
cleanupFile(fileSys, f); AdminStatesBaseTest.cleanupFile(fileSys, f);
BlockManagerTestUtil.recheckDecommissionState(dm); BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue("the node should be decommissioned", assertTrue("the node should be decommissioned",
dead.get(0).isDecommissioned()); dead.get(0).isDecommissioned());
@ -388,7 +372,7 @@ public class TestDecommissioningStatus {
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID); DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
decommissionNode(fsn, dnName); decommissionNode(dnName);
dm.refreshNodes(conf); dm.refreshNodes(conf);
BlockManagerTestUtil.recheckDecommissionState(dm); BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue(dnDescriptor.isDecommissioned()); assertTrue(dnDescriptor.isDecommissioned());

View File

@ -195,9 +195,17 @@ public class TestNamenodeCapacityReport {
private static final float EPSILON = 0.0001f; private static final float EPSILON = 0.0001f;
@Test @Test
public void testXceiverCount() throws Exception { public void testXceiverCount() throws Exception {
testXceiverCountInternal(0);
testXceiverCountInternal(1);
}
public void testXceiverCountInternal(int minMaintenanceR) throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
// retry one time, if close fails // retry one time, if close fails
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1); conf.setInt(
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
minMaintenanceR);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
final int nodes = 8; final int nodes = 8;
@ -220,23 +228,23 @@ public class TestNamenodeCapacityReport {
int expectedTotalLoad = nodes; // xceiver server adds 1 to load int expectedTotalLoad = nodes; // xceiver server adds 1 to load
int expectedInServiceNodes = nodes; int expectedInServiceNodes = nodes;
int expectedInServiceLoad = nodes; int expectedInServiceLoad = nodes;
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
// shutdown half the nodes and force a heartbeat check to ensure // Shutdown half the nodes followed by admin operations on those nodes.
// counts are accurate // Ensure counts are accurate.
for (int i=0; i < nodes/2; i++) { for (int i=0; i < nodes/2; i++) {
DataNode dn = datanodes.get(i); DataNode dn = datanodes.get(i);
DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId()); DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
dn.shutdown(); dn.shutdown();
DFSTestUtil.setDatanodeDead(dnd); DFSTestUtil.setDatanodeDead(dnd);
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
//Verify decommission of dead node won't impact nodesInService metrics. //Admin operations on dead nodes won't impact nodesInService metrics.
dnm.getDecomManager().startDecommission(dnd); startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
expectedInServiceNodes--; expectedInServiceNodes--;
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes()); assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
//Verify recommission of dead node won't impact nodesInService metrics. stopDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
dnm.getDecomManager().stopDecommission(dnd);
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
} }
@ -247,7 +255,8 @@ public class TestNamenodeCapacityReport {
datanodes = cluster.getDataNodes(); datanodes = cluster.getDataNodes();
expectedInServiceNodes = nodes; expectedInServiceNodes = nodes;
assertEquals(nodes, datanodes.size()); assertEquals(nodes, datanodes.size());
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
// create streams and hsync to force datastreamers to start // create streams and hsync to force datastreamers to start
DFSOutputStream[] streams = new DFSOutputStream[fileCount]; DFSOutputStream[] streams = new DFSOutputStream[fileCount];
@ -263,30 +272,32 @@ public class TestNamenodeCapacityReport {
} }
// force nodes to send load update // force nodes to send load update
triggerHeartbeats(datanodes); triggerHeartbeats(datanodes);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
// decomm a few nodes, substract their load from the expected load, // admin operations on a few nodes, substract their load from the
// trigger heartbeat to force load update // expected load, trigger heartbeat to force load update.
for (int i=0; i < fileRepl; i++) { for (int i=0; i < fileRepl; i++) {
expectedInServiceNodes--; expectedInServiceNodes--;
DatanodeDescriptor dnd = DatanodeDescriptor dnd =
dnm.getDatanode(datanodes.get(i).getDatanodeId()); dnm.getDatanode(datanodes.get(i).getDatanodeId());
expectedInServiceLoad -= dnd.getXceiverCount(); expectedInServiceLoad -= dnd.getXceiverCount();
dnm.getDecomManager().startDecommission(dnd); startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i)); DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100); Thread.sleep(100);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
} }
// check expected load while closing each stream. recalc expected // check expected load while closing each stream. recalc expected
// load based on whether the nodes in the pipeline are decomm // load based on whether the nodes in the pipeline are decomm
for (int i=0; i < fileCount; i++) { for (int i=0; i < fileCount; i++) {
int decomm = 0; int adminOps = 0;
for (DatanodeInfo dni : streams[i].getPipeline()) { for (DatanodeInfo dni : streams[i].getPipeline()) {
DatanodeDescriptor dnd = dnm.getDatanode(dni); DatanodeDescriptor dnd = dnm.getDatanode(dni);
expectedTotalLoad -= 2; expectedTotalLoad -= 2;
if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) { if (!dnd.isInService()) {
decomm++; adminOps++;
} else { } else {
expectedInServiceLoad -= 2; expectedInServiceLoad -= 2;
} }
@ -297,16 +308,17 @@ public class TestNamenodeCapacityReport {
// nodes will go decommissioned even if there's a UC block whose // nodes will go decommissioned even if there's a UC block whose
// other locations are decommissioned too. we'll ignore that // other locations are decommissioned too. we'll ignore that
// bug for now // bug for now
if (decomm < fileRepl) { if (adminOps < fileRepl) {
throw ioe; throw ioe;
} }
} }
triggerHeartbeats(datanodes); triggerHeartbeats(datanodes);
// verify node count and loads // verify node count and loads
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); checkClusterHealth(nodes, namesystem, expectedTotalLoad,
expectedInServiceNodes, expectedInServiceLoad);
} }
// shutdown each node, verify node counts based on decomm state // shutdown each node, verify node counts based on admin state
for (int i=0; i < nodes; i++) { for (int i=0; i < nodes; i++) {
DataNode dn = datanodes.get(i); DataNode dn = datanodes.get(i);
dn.shutdown(); dn.shutdown();
@ -320,13 +332,11 @@ public class TestNamenodeCapacityReport {
expectedInServiceNodes--; expectedInServiceNodes--;
} }
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
// live nodes always report load of 1. no nodes is load 0 // live nodes always report load of 1. no nodes is load 0
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0; double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
assertEquals((double)expectedXceiverAvg, assertEquals((double)expectedXceiverAvg,
getInServiceXceiverAverage(namesystem), EPSILON); getInServiceXceiverAverage(namesystem), EPSILON);
} }
// final sanity check // final sanity check
checkClusterHealth(0, namesystem, 0.0, 0, 0.0); checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
} finally { } finally {
@ -336,6 +346,24 @@ public class TestNamenodeCapacityReport {
} }
} }
private void startDecommissionOrMaintenance(DatanodeManager dnm,
DatanodeDescriptor dnd, boolean decomm) {
if (decomm) {
dnm.getDecomManager().startDecommission(dnd);
} else {
dnm.getDecomManager().startMaintenance(dnd, Long.MAX_VALUE);
}
}
private void stopDecommissionOrMaintenance(DatanodeManager dnm,
DatanodeDescriptor dnd, boolean decomm) {
if (decomm) {
dnm.getDecomManager().stopDecommission(dnd);
} else {
dnm.getDecomManager().stopMaintenance(dnd);
}
}
private static void checkClusterHealth( private static void checkClusterHealth(
int numOfLiveNodes, int numOfLiveNodes,
FSNamesystem namesystem, double expectedTotalLoad, FSNamesystem namesystem, double expectedTotalLoad,

View File

@ -54,6 +54,7 @@ public class HostsFileWriter {
localFileSys = FileSystem.getLocal(conf); localFileSys = FileSystem.getLocal(conf);
Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
this.fullDir = new Path(workingDir, dir); this.fullDir = new Path(workingDir, dir);
cleanup(); // In case there is some left over from previous run.
assertTrue(localFileSys.mkdirs(this.fullDir)); assertTrue(localFileSys.mkdirs(this.fullDir));
if (conf.getClass( if (conf.getClass(