From d55a7f893584acee0c3bfd89e89f8002310dcc3f Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Mon, 17 Oct 2016 17:46:29 -0700 Subject: [PATCH] HDFS-9390. Block management for maintenance states. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 76 +- .../hdfs/server/balancer/Dispatcher.java | 11 +- .../server/blockmanagement/BlockManager.java | 260 ++++-- .../BlockPlacementPolicyDefault.java | 4 +- .../CacheReplicationMonitor.java | 2 +- .../blockmanagement/DatanodeDescriptor.java | 35 +- .../blockmanagement/DatanodeManager.java | 47 +- .../blockmanagement/DecommissionManager.java | 145 +++- .../blockmanagement/HeartbeatManager.java | 23 +- .../blockmanagement/NumberReplicas.java | 39 +- .../blockmanagement/StorageTypeStats.java | 8 +- .../hdfs/server/namenode/FSNamesystem.java | 9 +- .../src/main/resources/hdfs-default.xml | 7 + .../hadoop/hdfs/AdminStatesBaseTest.java | 20 +- .../apache/hadoop/hdfs/TestDecommission.java | 2 +- .../hadoop/hdfs/TestMaintenanceState.java | 783 ++++++++++++++++-- .../blockmanagement/TestBlockManager.java | 4 +- .../namenode/TestDecommissioningStatus.java | 48 +- .../namenode/TestNamenodeCapacityReport.java | 78 +- .../hadoop/hdfs/util/HostsFileWriter.java | 1 + 21 files changed, 1234 insertions(+), 372 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index ca2fb3e8630..6b6a4e09914 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -213,6 +213,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY; public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; + public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY = + "dfs.namenode.maintenance.replication.min"; + public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT + = 1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index a2d3d5d08e3..e0a4e18f22e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -124,41 +124,12 @@ public static T[] shuffle(final T[] array) { return array; } + /** - * Compartor for sorting DataNodeInfo[] based on decommissioned states. - * Decommissioned nodes are moved to the end of the array on sorting with - * this compartor. + * Comparator for sorting DataNodeInfo[] based on + * decommissioned and entering_maintenance states. */ - public static final Comparator DECOM_COMPARATOR = - new Comparator() { - @Override - public int compare(DatanodeInfo a, DatanodeInfo b) { - return a.isDecommissioned() == b.isDecommissioned() ? 0 : - a.isDecommissioned() ? 1 : -1; - } - }; - - - /** - * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states. - * Decommissioned/stale nodes are moved to the end of the array on sorting - * with this comparator. - */ - @InterfaceAudience.Private - public static class DecomStaleComparator implements Comparator { - private final long staleInterval; - - /** - * Constructor of DecomStaleComparator - * - * @param interval - * The time interval for marking datanodes as stale is passed from - * outside, since the interval may be changed dynamically - */ - public DecomStaleComparator(long interval) { - this.staleInterval = interval; - } - + public static class ServiceComparator implements Comparator { @Override public int compare(DatanodeInfo a, DatanodeInfo b) { // Decommissioned nodes will still be moved to the end of the list @@ -167,6 +138,45 @@ public int compare(DatanodeInfo a, DatanodeInfo b) { } else if (b.isDecommissioned()) { return -1; } + + // ENTERING_MAINTENANCE nodes should be after live nodes. + if (a.isEnteringMaintenance()) { + return b.isEnteringMaintenance() ? 0 : 1; + } else if (b.isEnteringMaintenance()) { + return -1; + } else { + return 0; + } + } + } + + /** + * Comparator for sorting DataNodeInfo[] based on + * stale, decommissioned and entering_maintenance states. + * Order: live -> stale -> entering_maintenance -> decommissioned + */ + @InterfaceAudience.Private + public static class ServiceAndStaleComparator extends ServiceComparator { + private final long staleInterval; + + /** + * Constructor of ServiceAndStaleComparator + * + * @param interval + * The time interval for marking datanodes as stale is passed from + * outside, since the interval may be changed dynamically + */ + public ServiceAndStaleComparator(long interval) { + this.staleInterval = interval; + } + + @Override + public int compare(DatanodeInfo a, DatanodeInfo b) { + int ret = super.compare(a, b); + if (ret != 0) { + return ret; + } + // Stale nodes will be moved behind the normal nodes boolean aStale = a.isStale(staleInterval); boolean bStale = b.isStale(staleInterval); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 574e1855e3a..1dbe85bba26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -929,20 +929,17 @@ void add(Source source, StorageGroup target) { } private boolean shouldIgnore(DatanodeInfo dn) { - // ignore decommissioned nodes - final boolean decommissioned = dn.isDecommissioned(); - // ignore decommissioning nodes - final boolean decommissioning = dn.isDecommissionInProgress(); + // ignore out-of-service nodes + final boolean outOfService = !dn.isInService(); // ignore nodes in exclude list final boolean excluded = Util.isExcluded(excludedNodes, dn); // ignore nodes not in the include list (if include list is not empty) final boolean notIncluded = !Util.isIncluded(includedNodes, dn); - if (decommissioned || decommissioning || excluded || notIncluded) { + if (outOfService || excluded || notIncluded) { if (LOG.isTraceEnabled()) { LOG.trace("Excluding datanode " + dn - + ": decommissioned=" + decommissioned - + ", decommissioning=" + decommissioning + + ": outOfService=" + outOfService + ", excluded=" + excluded + ", notIncluded=" + notIncluded); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c2825055dc9..0b0021e49f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -111,6 +111,29 @@ /** * Keeps information related to the blocks stored in the Hadoop cluster. + * For block state management, it tries to maintain the safety + * property of "# of live replicas == # of expected redundancy" under + * any events such as decommission, namenode failover, datanode failure. + * + * The motivation of maintenance mode is to allow admins quickly repair nodes + * without paying the cost of decommission. Thus with maintenance mode, + * # of live replicas doesn't have to be equal to # of expected redundancy. + * If any of the replica is in maintenance mode, the safety property + * is extended as follows. These property still apply for the case of zero + * maintenance replicas, thus we can use these safe property for all scenarios. + * a. # of live replicas >= # of min replication for maintenance. + * b. # of live replicas <= # of expected redundancy. + * c. # of live replicas and maintenance replicas >= # of expected redundancy. + * + * For regular replication, # of min live replicas for maintenance is determined + * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <= + * DFS_NAMENODE_REPLICATION_MIN_KEY. + * For erasure encoding, # of min live replicas for maintenance is + * BlockInfoStriped#getRealDataBlockNum. + * + * Another safety property is to satisfy the block placement policy. While the + * policy is configurable, the replicas the policy is applied to are the live + * replicas + maintenance replicas. */ @InterfaceAudience.Private public class BlockManager implements BlockStatsMXBean { @@ -310,6 +333,11 @@ public long getNumTimedOutPendingReplications() { private final BlockIdManager blockIdManager; + /** Minimum live replicas needed for the datanode to be transitioned + * from ENTERING_MAINTENANCE to IN_MAINTENANCE. + */ + private final short minReplicationToBeInMaintenance; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { @@ -342,13 +370,13 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY, DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED); - this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, - DFSConfigKeys.DFS_REPLICATION_DEFAULT); + this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); - final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, - DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); + final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, + DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT); final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); if (minR <= 0) throw new IOException("Unexpected configuration parameters: " + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY @@ -378,12 +406,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, this.replicationRecheckInterval = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; - + DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; + this.encryptDataTransfer = conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); - + this.maxNumBlocksToLog = conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -393,6 +421,25 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, this.getBlocksMinBlockSize = conf.getLongBytes( DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + + final int minMaintenanceR = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY, + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT); + + if (minMaintenanceR < 0) { + throw new IOException("Unexpected configuration parameters: " + + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY + + " = " + minMaintenanceR + " < 0"); + } + if (minMaintenanceR > minR) { + throw new IOException("Unexpected configuration parameters: " + + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY + + " = " + minMaintenanceR + " > " + + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + + " = " + minR); + } + this.minReplicationToBeInMaintenance = (short)minMaintenanceR; + this.blockReportLeaseManager = new BlockReportLeaseManager(conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); @@ -620,7 +667,7 @@ public void metaSave(PrintWriter out) { // Dump all datanodes getDatanodeManager().datanodeDump(out); } - + /** * Dump the metadata for the given block in a human-readable * form. @@ -649,12 +696,12 @@ private void dumpBlockMeta(Block block, PrintWriter out) { out.print(fileName + ": "); } // l: == live:, d: == decommissioned c: == corrupt e: == excess - out.print(block + ((usableReplicas > 0)? "" : " MISSING") + + out.print(block + ((usableReplicas > 0)? "" : " MISSING") + " (replicas:" + " l: " + numReplicas.liveReplicas() + " d: " + numReplicas.decommissionedAndDecommissioning() + " c: " + numReplicas.corruptReplicas() + - " e: " + numReplicas.excessReplicas() + ") "); + " e: " + numReplicas.excessReplicas() + ") "); Collection corruptNodes = corruptReplicas.getNodes(block); @@ -693,6 +740,10 @@ public short getMinReplication() { return minReplication; } + public short getMinReplicationToBeInMaintenance() { + return minReplicationToBeInMaintenance; + } + /** * Commit a block of a file * @@ -867,7 +918,7 @@ public LocatedBlock convertLastBlockToUnderConstruction( NumberReplicas replicas = countNodes(lastBlock); neededReplications.remove(lastBlock, replicas.liveReplicas(), replicas.readOnlyReplicas(), - replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); + replicas.outOfServiceReplicas(), getExpectedReplicaNum(lastBlock)); pendingReplications.remove(lastBlock); // remove this block from the list of pending blocks to be deleted. @@ -972,7 +1023,8 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos } // get block locations - final int numCorruptNodes = countNodes(blk).corruptReplicas(); + NumberReplicas numberReplicas = countNodes(blk); + final int numCorruptNodes = numberReplicas.corruptReplicas(); final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk); if (numCorruptNodes != numCorruptReplicas) { LOG.warn("Inconsistent number of corrupt replicas for " @@ -982,17 +1034,23 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos final int numNodes = blocksMap.numNodes(blk); final boolean isCorrupt = numCorruptReplicas == numNodes; - final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; + int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas; + numMachines -= numberReplicas.maintenanceNotForReadReplicas(); DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; int j = 0; if (numMachines > 0) { final boolean noCorrupt = (numCorruptReplicas == 0); for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { if (storage.getState() != State.FAILED) { + final DatanodeDescriptor d = storage.getDatanodeDescriptor(); + // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states. + if (d.isInMaintenance() + || (d.isEnteringMaintenance() && !d.isAlive())) { + continue; + } if (noCorrupt) { machines[j++] = storage; } else { - final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = isReplicaCorrupt(blk, d); if (isCorrupt || !replicaCorrupt) { machines[j++] = storage; @@ -1007,7 +1065,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos } assert j == machines.length : - "isCorrupt: " + isCorrupt + + "isCorrupt: " + isCorrupt + " numMachines: " + numMachines + " numNodes: " + numNodes + " numCorrupt: " + numCorruptNodes + @@ -1542,8 +1600,11 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { return scheduledWork; } + // Check if the number of live + pending replicas satisfies + // the expected redundancy. boolean hasEnoughEffectiveReplicas(BlockInfo block, - NumberReplicas numReplicas, int pendingReplicaNum, int required) { + NumberReplicas numReplicas, int pendingReplicaNum) { + int required = getExpectedLiveRedundancyNum(block, numReplicas); int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; return (numEffectiveReplicas >= required) && (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); @@ -1557,14 +1618,14 @@ private ReplicationWork scheduleReplication(BlockInfo block, int priority) { return null; } - short requiredReplication = getExpectedReplicaNum(block); - // get a source data-node List containingNodes = new ArrayList<>(); List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes, liveReplicaNodes, numReplicas, priority); + short requiredReplication = getExpectedLiveRedundancyNum(block, + numReplicas); if (srcNode == null) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be repl from any node"); return null; @@ -1575,8 +1636,7 @@ private ReplicationWork scheduleReplication(BlockInfo block, int priority) { assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); int pendingNum = pendingReplications.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredReplication)) { + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { neededReplications.remove(block, priority); blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1607,11 +1667,11 @@ private boolean validateReplicationWork(ReplicationWork rw) { } // do not schedule more if enough replicas is already pending - final short requiredReplication = getExpectedReplicaNum(block); NumberReplicas numReplicas = countNodes(block); + final short requiredReplication = + getExpectedLiveRedundancyNum(block, numReplicas); final int pendingNum = pendingReplications.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, - requiredReplication)) { + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { neededReplications.remove(block, priority); rw.resetTargets(); blockLog.debug("BLOCK* Removing {} from neededReplications as" + @@ -1677,7 +1737,7 @@ public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src, * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * Set, long, List, BlockStoragePolicy) + * Set, long, List, BlockStoragePolicy, EnumSet) */ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final Node client, @@ -1766,7 +1826,9 @@ DatanodeDescriptor chooseSourceDatanode(Block block, int decommissioning = 0; int corrupt = 0; int excess = 0; - + int maintenanceNotForRead = 0; + int maintenanceForRead = 0; + Collection nodesCorrupt = corruptReplicas.getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); @@ -1779,6 +1841,12 @@ else if (node.isDecommissionInProgress()) { decommissioning += countableReplica; } else if (node.isDecommissioned()) { decommissioned += countableReplica; + } else if (node.isMaintenance()) { + if (node.isInMaintenance() || !node.isAlive()) { + maintenanceNotForRead++; + } else { + maintenanceForRead++; + } } else if (excessBlocks != null && excessBlocks.contains(block)) { excess += countableReplica; } else { @@ -1793,10 +1861,9 @@ else if (node.isDecommissionInProgress()) { // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; - if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY - && !node.isDecommissionInProgress() - && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) - { + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + && !node.isDecommissionInProgress() && !node.isEnteringMaintenance() + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { continue; // already reached replication limit } if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) @@ -1809,6 +1876,11 @@ else if (node.isDecommissionInProgress()) { // never use already decommissioned nodes if(node.isDecommissioned()) continue; + // Don't use dead ENTERING_MAINTENANCE or IN_MAINTENANCE nodes. + if((!node.isAlive() && node.isEnteringMaintenance()) || + node.isInMaintenance()) { + continue; + } // We got this far, current node is a reasonable choice if (srcNode == null) { @@ -1823,7 +1895,7 @@ else if (node.isDecommissionInProgress()) { } if(numReplicas != null) numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, - excess, 0); + excess, 0, maintenanceNotForRead, maintenanceForRead); return srcNode; } @@ -1846,9 +1918,10 @@ private void processPendingReplications() { continue; } NumberReplicas num = countNodes(timedOutItems[i]); - if (isNeededReplication(bi, num.liveReplicas())) { - neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), getReplication(bi)); + if (isNeededReplication(bi, num)) { + neededReplications.add(bi, num.liveReplicas(), + num.readOnlyReplicas(), num.outOfServiceReplicas(), + getExpectedReplicaNum(bi)); } } } finally { @@ -2756,8 +2829,8 @@ private Block addStoredBlock(final BlockInfo block, // Now check for completion of blocks and safe block count NumberReplicas num = countNodes(storedBlock); int numLiveReplicas = num.liveReplicas(); - int numCurrentReplica = numLiveReplicas - + pendingReplications.getNumReplicas(storedBlock); + int pendingNum = pendingReplications.getNumReplicas(storedBlock); + int numCurrentReplica = numLiveReplicas + pendingNum; if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && numLiveReplicas >= minReplication) { @@ -2784,10 +2857,9 @@ private Block addStoredBlock(final BlockInfo block, // handle underReplication/overReplication short fileReplication = getExpectedReplicaNum(storedBlock); - if (!isNeededReplication(storedBlock, numCurrentReplica)) { + if (!isNeededReplication(storedBlock, num, pendingNum)) { neededReplications.remove(storedBlock, numCurrentReplica, - num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), fileReplication); + num.readOnlyReplicas(), num.outOfServiceReplicas(), fileReplication); } else { updateNeededReplications(storedBlock, curReplicaDelta, 0); } @@ -3003,9 +3075,10 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) { NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be - if (isNeededReplication(block, numCurrentReplica)) { - if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), expectedReplication)) { + if (isNeededReplication(block, num)) { + if (neededReplications.add(block, numCurrentReplica, + num.readOnlyReplicas(), num.outOfServiceReplicas(), + expectedReplication)) { return MisReplicationResult.UNDER_REPLICATED; } } @@ -3037,9 +3110,10 @@ public void setReplication( // update needReplication priority queues b.setReplication(newRepl); + NumberReplicas num = countNodes(b); updateNeededReplications(b, 0, newRepl - oldRepl); - if (oldRepl > newRepl) { + if (num.liveReplicas() > newRepl) { processOverReplicatedBlock(b, newRepl, null, null); } } @@ -3074,7 +3148,7 @@ private void processOverReplicatedBlock(final BlockInfo block, LightWeightHashSet excessBlocks = excessReplicateMap.get( cur.getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { - if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { + if (cur.isInService()) { // exclude corrupt replicas if (corruptNodes == null || !corruptNodes.contains(cur)) { nonExcess.add(storage); @@ -3393,6 +3467,8 @@ public NumberReplicas countNodes(Block b) { int corrupt = 0; int excess = 0; int stale = 0; + int maintenanceNotForRead = 0; + int maintenanceForRead = 0; Collection nodesCorrupt = corruptReplicas.getNodes(b); for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { if (storage.getState() == State.FAILED) { @@ -3408,6 +3484,12 @@ public NumberReplicas countNodes(Block b) { decommissioning++; } else if (node.isDecommissioned()) { decommissioned++; + } else if (node.isMaintenance()) { + if (node.isInMaintenance() || !node.isAlive()) { + maintenanceNotForRead++; + } else { + maintenanceForRead++; + } } else { LightWeightHashSet blocksExcess = excessReplicateMap.get( node.getDatanodeUuid()); @@ -3422,7 +3504,7 @@ public NumberReplicas countNodes(Block b) { } } return new NumberReplicas(live, readonly, decommissioned, decommissioning, - corrupt, excess, stale); + corrupt, excess, stale, maintenanceNotForRead, maintenanceForRead); } /** @@ -3454,11 +3536,11 @@ int countLiveNodes(BlockInfo b) { } /** - * On stopping decommission, check if the node has excess replicas. + * On putting the node in service, check if the node has excess replicas. * If there are any excess replicas, call processOverReplicatedBlock(). * Process over replicated blocks only when active NN is out of safe mode. */ - void processOverReplicatedBlocksOnReCommission( + void processExtraRedundancyBlocksOnInService( final DatanodeDescriptor srcNode) { if (!isPopulatingReplQueues()) { return; @@ -3467,7 +3549,7 @@ void processOverReplicatedBlocksOnReCommission( int numOverReplicated = 0; while(it.hasNext()) { final BlockInfo block = it.next(); - short expectedReplication = block.getReplication(); + short expectedReplication = getExpectedReplicaNum(block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); if (numCurrentReplica > expectedReplication) { @@ -3481,10 +3563,11 @@ void processOverReplicatedBlocksOnReCommission( } /** - * Returns whether a node can be safely decommissioned based on its - * liveness. Dead nodes cannot always be safely decommissioned. + * Returns whether a node can be safely decommissioned or in maintenance + * based on its liveness. Dead nodes cannot always be safely decommissioned + * or in maintenance. */ - boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { + boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) { if (!node.checkBlockReportReceived()) { LOG.info("Node {} hasn't sent its first block report.", node); return false; @@ -3498,17 +3581,18 @@ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) { if (pendingReplicationBlocksCount == 0 && underReplicatedBlocksCount == 0) { LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", - node); + " blocks or blocks pending replication. Safe to decommission or" + + " put in maintenance.", node); return true; } LOG.warn("Node {} is dead " + - "while decommission is in progress. Cannot be safely " + - "decommissioned since there is risk of reduced " + - "data durability or data loss. Either restart the failed node or" + - " force decommissioning by removing, calling refreshNodes, " + - "then re-adding to the excludes files.", node); + "while in {}. Cannot be safely " + + "decommissioned or be in maintenance since there is risk of reduced " + + "data durability or data loss. Either restart the failed node or " + + "force decommissioning or maintenance by removing, calling " + + "refreshNodes, then re-adding to the excludes or host config files.", + node, node.getAdminState()); return false; } @@ -3559,17 +3643,16 @@ private void updateNeededReplications(final BlockInfo block, } NumberReplicas repl = countNodes(block); int pendingNum = pendingReplications.getNumReplicas(block); - int curExpectedReplicas = getReplication(block); - if (!hasEnoughEffectiveReplicas(block, repl, pendingNum, - curExpectedReplicas)) { + int curExpectedReplicas = getExpectedReplicaNum(block); + if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) { neededReplications.update(block, repl.liveReplicas() + pendingNum, - repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), + repl.readOnlyReplicas(), repl.outOfServiceReplicas(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), - repl.decommissionedAndDecommissioning(), oldExpectedReplicas); + repl.outOfServiceReplicas(), oldExpectedReplicas); } } finally { namesystem.writeUnlock(); @@ -3584,27 +3667,18 @@ private void updateNeededReplications(final BlockInfo block, */ public void checkReplication(BlockCollection bc) { for (BlockInfo block : bc.getBlocks()) { - final short expected = block.getReplication(); + final short expected = getExpectedReplicaNum(block); final NumberReplicas n = countNodes(block); final int pending = pendingReplications.getNumReplicas(block); - if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { + if (!hasEnoughEffectiveReplicas(block, n, pending)) { neededReplications.add(block, n.liveReplicas() + pending, - n.readOnlyReplicas(), - n.decommissionedAndDecommissioning(), expected); + n.readOnlyReplicas(), n.outOfServiceReplicas(), expected); } else if (n.liveReplicas() > expected) { processOverReplicatedBlock(block, expected, null, null); } } } - /** - * @return 0 if the block is not found; - * otherwise, return the replication factor of the block. - */ - private int getReplication(BlockInfo block) { - return getExpectedReplicaNum(block); - } - /** * Get blocks to invalidate for nodeId * in {@link #invalidateBlocks}. @@ -3651,6 +3725,8 @@ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) { .getNodes(storedBlock); for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); + // Nodes under maintenance should be counted as valid replicas from + // rack policy point of view. if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() && ((corruptNodes == null) || !corruptNodes.contains(cur))) { liveNodes.add(cur); @@ -3661,14 +3737,36 @@ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) { storedBlock.getReplication()).isPlacementPolicySatisfied(); } + boolean isNeededReplicationForMaintenance(BlockInfo storedBlock, + NumberReplicas numberReplicas) { + return storedBlock.isComplete() && (numberReplicas.liveReplicas() < + getMinReplicationToBeInMaintenance() || + !isPlacementPolicySatisfied(storedBlock)); + } + + boolean isNeededReplication(BlockInfo storedBlock, + NumberReplicas numberReplicas) { + return isNeededReplication(storedBlock, numberReplicas, 0); + } + /** - * A block needs replication if the number of replicas is less than expected - * or if it does not have enough racks. + * A block needs reconstruction if the number of redundancies is less than + * expected or if it does not have enough racks. */ - boolean isNeededReplication(BlockInfo storedBlock, int current) { - int expected = storedBlock.getReplication(); - return storedBlock.isComplete() - && (current < expected || !isPlacementPolicySatisfied(storedBlock)); + boolean isNeededReplication(BlockInfo storedBlock, + NumberReplicas numberReplicas, int pending) { + return storedBlock.isComplete() && + !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending); + } + + // Exclude maintenance, but make sure it has minimal live replicas + // to satisfy the maintenance requirement. + public short getExpectedLiveRedundancyNum(BlockInfo block, + NumberReplicas numberReplicas) { + final short expectedRedundancy = getExpectedReplicaNum(block); + return (short)Math.max(expectedRedundancy - + numberReplicas.maintenanceReplicas(), + getMinReplicationToBeInMaintenance()); } public short getExpectedReplicaNum(BlockInfo block) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 4bc479a6538..ba45b0573da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -832,8 +832,8 @@ boolean isGoodDatanode(DatanodeDescriptor node, List results, boolean avoidStaleNodes) { // check if the node is (being) decommissioned - if (node.isDecommissionInProgress() || node.isDecommissioned()) { - logNodeIsNotChosen(node, "the node is (being) decommissioned "); + if (!node.isInService()) { + logNodeIsNotChosen(node, "the node isn't in service."); return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index ca8d72ac65e..8563cf31a1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -682,7 +682,7 @@ private void addNewPendingCached(final int neededCached, if (datanode == null) { continue; } - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { + if (!datanode.isInService()) { continue; } if (corrupt != null && corrupt.contains(datanode)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 2a9c2241020..0abba1d1106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -143,8 +143,8 @@ public Type getType() { // Stores status of decommissioning. // If node is not decommissioning, do not use this object for anything. - public final DecommissioningStatus decommissioningStatus = - new DecommissioningStatus(); + private final LeavingServiceStatus leavingServiceStatus = + new LeavingServiceStatus(); private final Map storageMap = new HashMap<>(); @@ -270,6 +270,10 @@ public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) { this.needKeyUpdate = needKeyUpdate; } + public LeavingServiceStatus getLeavingServiceStatus() { + return leavingServiceStatus; + } + @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -688,51 +692,54 @@ public boolean equals(Object obj) { return (this == obj) || super.equals(obj); } - /** Decommissioning status */ - public class DecommissioningStatus { + /** Leaving service status. */ + public class LeavingServiceStatus { private int underReplicatedBlocks; - private int decommissionOnlyReplicas; + private int outOfServiceOnlyReplicas; private int underReplicatedInOpenFiles; private long startTime; synchronized void set(int underRep, int onlyRep, int underConstruction) { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return; } underReplicatedBlocks = underRep; - decommissionOnlyReplicas = onlyRep; + outOfServiceOnlyReplicas = onlyRep; underReplicatedInOpenFiles = underConstruction; } /** @return the number of under-replicated blocks */ public synchronized int getUnderReplicatedBlocks() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return underReplicatedBlocks; } - /** @return the number of decommission-only replicas */ - public synchronized int getDecommissionOnlyReplicas() { - if (!isDecommissionInProgress()) { + /** @return the number of blocks with out-of-service-only replicas */ + public synchronized int getOutOfServiceOnlyReplicas() { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } - return decommissionOnlyReplicas; + return outOfServiceOnlyReplicas; } /** @return the number of under-replicated blocks in open files */ public synchronized int getUnderReplicatedInOpenFiles() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return underReplicatedInOpenFiles; } /** Set start time */ public synchronized void setStartTime(long time) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { + return; + } startTime = time; } /** @return start time */ public synchronized long getStartTime() { - if (!isDecommissionInProgress()) { + if (!isDecommissionInProgress() && !isEnteringMaintenance()) { return 0; } return startTime; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 67a8e485f5f..2675a04c744 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -389,8 +389,8 @@ public void sortLocatedBlocks(final String targethost, } Comparator comparator = avoidStaleDataNodesForRead ? - new DFSUtil.DecomStaleComparator(staleInterval) : - DFSUtil.DECOM_COMPARATOR; + new DFSUtil.ServiceAndStaleComparator(staleInterval) : + new DFSUtil.ServiceComparator(); for (LocatedBlock b : locatedblocks) { DatanodeInfo[] di = b.getLocations(); @@ -558,9 +558,20 @@ void datanodeDump(final PrintWriter out) { * @param nodeInfo datanode descriptor. */ private void removeDatanode(DatanodeDescriptor nodeInfo) { + removeDatanode(nodeInfo, true); + } + + /** + * Remove a datanode descriptor. + * @param nodeInfo datanode descriptor. + */ + private void removeDatanode(DatanodeDescriptor nodeInfo, + boolean removeBlocksFromBlocksMap) { assert namesystem.hasWriteLock(); heartbeatManager.removeDatanode(nodeInfo); - blockManager.removeBlocksAssociatedTo(nodeInfo); + if (removeBlocksFromBlocksMap) { + blockManager.removeBlocksAssociatedTo(nodeInfo); + } networktopology.remove(nodeInfo); decrementVersionCount(nodeInfo.getSoftwareVersion()); blockManager.getBlockReportLeaseManager().unregister(nodeInfo); @@ -581,7 +592,7 @@ public void removeDatanode(final DatanodeID node) try { final DatanodeDescriptor descriptor = getDatanode(node); if (descriptor != null) { - removeDatanode(descriptor); + removeDatanode(descriptor, true); } else { NameNode.stateChangeLog.warn("BLOCK* removeDatanode: " + node + " does not exist"); @@ -592,7 +603,8 @@ public void removeDatanode(final DatanodeID node) } /** Remove a dead datanode. */ - void removeDeadDatanode(final DatanodeID nodeID) { + void removeDeadDatanode(final DatanodeID nodeID, + boolean removeBlocksFromBlockMap) { DatanodeDescriptor d; try { d = getDatanode(nodeID); @@ -601,8 +613,9 @@ void removeDeadDatanode(final DatanodeID nodeID) { } if (d != null && isDatanodeDead(d)) { NameNode.stateChangeLog.info( - "BLOCK* removeDeadDatanode: lost heartbeat from " + d); - removeDatanode(d); + "BLOCK* removeDeadDatanode: lost heartbeat from " + d + + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap); + removeDatanode(d, removeBlocksFromBlockMap); } } @@ -1038,10 +1051,16 @@ private void refreshHostsReader(Configuration conf) throws IOException { } /** - * 1. Added to hosts --> no further work needed here. - * 2. Removed from hosts --> mark AdminState as decommissioned. - * 3. Added to exclude --> start decommission. - * 4. Removed from exclude --> stop decommission. + * Reload datanode membership and the desired admin operations from + * host files. If a node isn't allowed, hostConfigManager.isIncluded returns + * false and the node can't be used. + * If a node is allowed and the desired admin operation is defined, + * it will transition to the desired admin state. + * If a node is allowed and upgrade domain is defined, + * the upgrade domain will be set on the node. + * To use maintenance mode or upgrade domain, set + * DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to + * CombinedHostFileManager.class. */ private void refreshDatanodes() { final Map copy; @@ -1051,17 +1070,17 @@ private void refreshDatanodes() { for (DatanodeDescriptor node : copy.values()) { // Check if not include. if (!hostConfigManager.isIncluded(node)) { - node.setDisallowed(true); // case 2. + node.setDisallowed(true); } else { long maintenanceExpireTimeInMS = hostConfigManager.getMaintenanceExpirationTimeInMS(node); if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) { decomManager.startMaintenance(node, maintenanceExpireTimeInMS); } else if (hostConfigManager.isExcluded(node)) { - decomManager.startDecommission(node); // case 3. + decomManager.startDecommission(node); } else { decomManager.stopMaintenance(node); - decomManager.stopDecommission(node); // case 4. + decomManager.stopDecommission(node); } } node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 10e4c961076..7e404c4c3e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -210,7 +210,7 @@ public void startDecommission(DatanodeDescriptor node) { LOG.info("Starting decommission of {} {} with {} blocks", node, storage, storage.numBlocks()); } - node.decommissioningStatus.setStartTime(monotonicNow()); + node.getLeavingServiceStatus().setStartTime(monotonicNow()); pendingNodes.add(node); } } else { @@ -231,7 +231,7 @@ public void stopDecommission(DatanodeDescriptor node) { // Over-replicated blocks will be detected and processed when // the dead node comes back and send in its full block report. if (node.isAlive()) { - blockManager.processOverReplicatedBlocksOnReCommission(node); + blockManager.processExtraRedundancyBlocksOnInService(node); } // Remove from tracking in DecommissionManager pendingNodes.remove(node); @@ -255,6 +255,16 @@ public void startMaintenance(DatanodeDescriptor node, if (!node.isMaintenance()) { // Update DN stats maintained by HeartbeatManager hbManager.startMaintenance(node); + // hbManager.startMaintenance will set dead node to IN_MAINTENANCE. + if (node.isEnteringMaintenance()) { + for (DatanodeStorageInfo storage : node.getStorageInfos()) { + LOG.info("Starting maintenance of {} {} with {} blocks", + node, storage, storage.numBlocks()); + } + node.getLeavingServiceStatus().setStartTime(monotonicNow()); + } + // Track the node regardless whether it is ENTERING_MAINTENANCE or + // IN_MAINTENANCE to support maintenance expiration. pendingNodes.add(node); } else { LOG.trace("startMaintenance: Node {} in {}, nothing to do." + @@ -273,8 +283,34 @@ public void stopMaintenance(DatanodeDescriptor node) { // Update DN stats maintained by HeartbeatManager hbManager.stopMaintenance(node); - // TODO HDFS-9390 remove replicas from block maps - // or handle over replicated blocks. + // extra redundancy blocks will be detected and processed when + // the dead node comes back and send in its full block report. + if (!node.isAlive()) { + // The node became dead when it was in maintenance, at which point + // the replicas weren't removed from block maps. + // When the node leaves maintenance, the replicas should be removed + // from the block maps to trigger the necessary replication to + // maintain the safety property of "# of live replicas + maintenance + // replicas" >= the expected redundancy. + blockManager.removeBlocksAssociatedTo(node); + } else { + // Even though putting nodes in maintenance node doesn't cause live + // replicas to match expected replication factor, it is still possible + // to have over replicated when the node leaves maintenance node. + // First scenario: + // a. Node became dead when it is at AdminStates.NORMAL, thus + // block is replicated so that 3 replicas exist on other nodes. + // b. Admins put the dead node into maintenance mode and then + // have the node rejoin the cluster. + // c. Take the node out of maintenance mode. + // Second scenario: + // a. With replication factor 3, set one replica to maintenance node, + // thus block has 1 maintenance replica and 2 live replicas. + // b. Change the replication factor to 2. The block will still have + // 1 maintenance replica and 2 live replicas. + // c. Take the node out of maintenance mode. + blockManager.processExtraRedundancyBlocksOnInService(node); + } // Remove from tracking in DecommissionManager pendingNodes.remove(node); @@ -290,27 +326,32 @@ private void setDecommissioned(DatanodeDescriptor dn) { LOG.info("Decommissioning complete for node {}", dn); } + private void setInMaintenance(DatanodeDescriptor dn) { + dn.setInMaintenance(); + LOG.info("Node {} has entered maintenance mode.", dn); + } + /** * Checks whether a block is sufficiently replicated for decommissioning. * Full-strength replication is not always necessary, hence "sufficient". * @return true if sufficient, else false. */ - private boolean isSufficientlyReplicated(BlockInfo block, - BlockCollection bc, - NumberReplicas numberReplicas) { - final int numExpected = block.getReplication(); - final int numLive = numberReplicas.liveReplicas(); - if (numLive >= numExpected - && blockManager.isPlacementPolicySatisfied(block)) { + private boolean isSufficientlyReplicated(BlockInfo block, BlockCollection bc, + NumberReplicas numberReplicas, boolean isDecommission) { + if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) { // Block has enough replica, skip LOG.trace("Block {} does not need replication.", block); return true; } + final int numExpected = blockManager.getExpectedLiveRedundancyNum(block, + numberReplicas); + final int numLive = numberReplicas.liveReplicas(); + // Block is under-replicated - LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, + LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, numLive); - if (numExpected > numLive) { + if (isDecommission && numExpected > numLive) { if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) { // Can decom a UC block as long as there will still be minReplicas if (numLive >= blockManager.minReplication) { @@ -354,11 +395,16 @@ private static void logBlockReplicationInfo(BlockInfo block, + ", corrupt replicas: " + num.corruptReplicas() + ", decommissioned replicas: " + num.decommissioned() + ", decommissioning replicas: " + num.decommissioning() + + ", maintenance replicas: " + num.maintenanceReplicas() + + ", live entering maintenance replicas: " + + num.liveEnteringMaintenanceReplicas() + ", excess replicas: " + num.excessReplicas() + ", Is Open File: " + bc.isUnderConstruction() + ", Datanodes having this block: " + nodeList + ", Current Datanode: " + srcNode + ", Is current datanode decommissioning: " - + srcNode.isDecommissionInProgress()); + + srcNode.isDecommissionInProgress() + + ", Is current datanode entering maintenance: " + + srcNode.isEnteringMaintenance()); } @VisibleForTesting @@ -444,7 +490,7 @@ public void run() { numBlocksChecked = 0; numBlocksCheckedPerLock = 0; numNodesChecked = 0; - // Check decom progress + // Check decommission or maintenance progress. namesystem.writeLock(); try { processPendingNodes(); @@ -486,15 +532,14 @@ private void check() { final DatanodeDescriptor dn = entry.getKey(); AbstractList blocks = entry.getValue(); boolean fullScan = false; - if (dn.isMaintenance()) { - // TODO HDFS-9390 make sure blocks are minimally replicated - // before transitioning the node to IN_MAINTENANCE state. - + if (dn.isMaintenance() && dn.maintenanceExpired()) { // If maintenance expires, stop tracking it. - if (dn.maintenanceExpired()) { - stopMaintenance(dn); - toRemove.add(dn); - } + stopMaintenance(dn); + toRemove.add(dn); + continue; + } + if (dn.isInMaintenance()) { + // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet. continue; } if (blocks == null) { @@ -509,7 +554,7 @@ private void check() { } else { // This is a known datanode, check if its # of insufficiently // replicated blocks has dropped to zero and if it can be decommed - LOG.debug("Processing decommission-in-progress node {}", dn); + LOG.debug("Processing {} node {}", dn.getAdminState(), dn); pruneSufficientlyReplicated(dn, blocks); } if (blocks.size() == 0) { @@ -528,22 +573,31 @@ private void check() { // If the full scan is clean AND the node liveness is okay, // we can finally mark as decommissioned. final boolean isHealthy = - blockManager.isNodeHealthyForDecommission(dn); + blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); if (blocks.size() == 0 && isHealthy) { - setDecommissioned(dn); - toRemove.add(dn); + if (dn.isDecommissionInProgress()) { + setDecommissioned(dn); + toRemove.add(dn); + } else if (dn.isEnteringMaintenance()) { + // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to + // to track maintenance expiration. + setInMaintenance(dn); + } else { + Preconditions.checkState(false, + "A node is in an invalid state!"); + } LOG.debug("Node {} is sufficiently replicated and healthy, " - + "marked as decommissioned.", dn); + + "marked as {}.", dn.getAdminState()); } else { LOG.debug("Node {} {} healthy." + " It needs to replicate {} more blocks." - + " Decommissioning is still in progress.", - dn, isHealthy? "is": "isn't", blocks.size()); + + " {} is still in progress.", dn, + isHealthy? "is": "isn't", blocks.size(), dn.getAdminState()); } } else { LOG.debug("Node {} still has {} blocks to replicate " - + "before it is a candidate to finish decommissioning.", - dn, blocks.size()); + + "before it is a candidate to finish {}.", + dn, blocks.size(), dn.getAdminState()); } iterkey = dn; } @@ -562,7 +616,7 @@ private void check() { */ private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode, AbstractList blocks) { - processBlocksForDecomInternal(datanode, blocks.iterator(), null, true); + processBlocksInternal(datanode, blocks.iterator(), null, true); } /** @@ -578,7 +632,7 @@ private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode, private AbstractList handleInsufficientlyReplicated( final DatanodeDescriptor datanode) { AbstractList insufficient = new ChunkedArrayList<>(); - processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), + processBlocksInternal(datanode, datanode.getBlockIterator(), insufficient, false); return insufficient; } @@ -599,14 +653,14 @@ private AbstractList handleInsufficientlyReplicated( * @return true if there are under-replicated blocks in the provided block * iterator, else false. */ - private void processBlocksForDecomInternal( + private void processBlocksInternal( final DatanodeDescriptor datanode, final Iterator it, final List insufficientlyReplicated, boolean pruneSufficientlyReplicated) { boolean firstReplicationLog = true; int underReplicatedBlocks = 0; - int decommissionOnlyReplicas = 0; + int outOfServiceOnlyReplicas = 0; int underReplicatedInOpenFiles = 0; while (it.hasNext()) { if (insufficientlyReplicated == null @@ -653,21 +707,25 @@ private void processBlocksForDecomInternal( // Schedule under-replicated blocks for replication if not already // pending - if (blockManager.isNeededReplication(block, liveReplicas)) { + boolean isDecommission = datanode.isDecommissionInProgress(); + boolean neededReplication = isDecommission ? + blockManager.isNeededReplication(block, num) : + blockManager.isNeededReplicationForMaintenance(block, num); + if (neededReplication) { if (!blockManager.neededReplications.contains(block) && blockManager.pendingReplications.getNumReplicas(block) == 0 && blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, liveReplicas, num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), + num.outOfServiceReplicas(), block.getReplication()); } } // Even if the block is under-replicated, // it doesn't block decommission if it's sufficiently replicated - if (isSufficientlyReplicated(block, bc, num)) { + if (isSufficientlyReplicated(block, bc, num, isDecommission)) { if (pruneSufficientlyReplicated) { it.remove(); } @@ -689,14 +747,13 @@ private void processBlocksForDecomInternal( if (bc.isUnderConstruction()) { underReplicatedInOpenFiles++; } - if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { - decommissionOnlyReplicas++; + if ((curReplicas == 0) && (num.outOfServiceReplicas() > 0)) { + outOfServiceOnlyReplicas++; } } - datanode.decommissioningStatus.set(underReplicatedBlocks, - decommissionOnlyReplicas, - underReplicatedInOpenFiles); + datanode.getLeavingServiceStatus().set(underReplicatedBlocks, + outOfServiceOnlyReplicas, underReplicatedInOpenFiles); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index d728ee2c9b6..a72ad64497a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -25,10 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; @@ -269,13 +266,19 @@ synchronized void startMaintenance(final DatanodeDescriptor node) { if (!node.isAlive()) { LOG.info("Dead node {} is put in maintenance state immediately.", node); node.setInMaintenance(); - } else if (node.isDecommissioned()) { - LOG.info("Decommissioned node " + node + " is put in maintenance state" - + " immediately."); - node.setInMaintenance(); } else { stats.subtract(node); - node.startMaintenance(); + if (node.isDecommissioned()) { + LOG.info("Decommissioned node " + node + " is put in maintenance state" + + " immediately."); + node.setInMaintenance(); + } else if (blockManager.getMinReplicationToBeInMaintenance() == 0) { + LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node + + " is put in maintenance state" + " immediately."); + node.setInMaintenance(); + } else { + node.startMaintenance(); + } stats.add(node); } } @@ -352,7 +355,7 @@ void heartbeatCheck() { boolean allAlive = false; while (!allAlive) { // locate the first dead node. - DatanodeID dead = null; + DatanodeDescriptor dead = null; // locate the first failed storage that isn't on a dead node. DatanodeStorageInfo failedStorage = null; @@ -401,7 +404,7 @@ void heartbeatCheck() { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); try { - dm.removeDeadDatanode(dead); + dm.removeDeadDatanode(dead, !dead.isMaintenance()); } finally { namesystem.writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java index 44ae6f6a462..9d79259e931 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java @@ -32,18 +32,29 @@ public class NumberReplicas { private int corruptReplicas; private int excessReplicas; private int replicasOnStaleNodes; + // We need live ENTERING_MAINTENANCE nodes to continue + // to serve read request while it is being transitioned to live + // IN_MAINTENANCE if these are the only replicas left. + // maintenanceNotForRead == maintenanceReplicas - + // Live ENTERING_MAINTENANCE. + private int maintenanceNotForRead; + // Live ENTERING_MAINTENANCE nodes to serve read requests. + private int maintenanceForRead; NumberReplicas() { - this(0, 0, 0, 0, 0, 0, 0); + this(0, 0, 0, 0, 0, 0, 0, 0, 0); } NumberReplicas(int live, int readonly, int decommissioned, - int decommissioning, int corrupt, int excess, int stale) { - set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale); + int decommissioning, int corrupt, int excess, int stale, + int maintenanceNotForRead, int maintenanceForRead) { + set(live, readonly, decommissioned, decommissioning, corrupt, + excess, stale, maintenanceNotForRead, maintenanceForRead); } void set(int live, int readonly, int decommissioned, int decommissioning, - int corrupt, int excess, int stale) { + int corrupt, int excess, int stale, int maintenanceNotForRead, + int maintenanceForRead) { liveReplicas = live; readOnlyReplicas = readonly; this.decommissioning = decommissioning; @@ -51,6 +62,8 @@ void set(int live, int readonly, int decommissioned, int decommissioning, corruptReplicas = corrupt; excessReplicas = excess; replicasOnStaleNodes = stale; + this.maintenanceNotForRead = maintenanceNotForRead; + this.maintenanceForRead = maintenanceForRead; } public int liveReplicas() { @@ -112,4 +125,20 @@ public int excessReplicas() { public int replicasOnStaleNodes() { return replicasOnStaleNodes; } -} + + public int maintenanceNotForReadReplicas() { + return maintenanceNotForRead; + } + + public int maintenanceReplicas() { + return maintenanceNotForRead + maintenanceForRead; + } + + public int outOfServiceReplicas() { + return maintenanceReplicas() + decommissionedAndDecommissioning(); + } + + public int liveEnteringMaintenanceReplicas() { + return maintenanceForRead; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java index 45dcc8d672a..005e6d5b273 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java @@ -81,7 +81,7 @@ void addStorage(final DatanodeStorageInfo info, final DatanodeDescriptor node) { capacityUsed += info.getDfsUsed(); blockPoolUsed += info.getBlockPoolUsed(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityTotal += info.getCapacity(); capacityRemaining += info.getRemaining(); } else { @@ -90,7 +90,7 @@ void addStorage(final DatanodeStorageInfo info, } void addNode(final DatanodeDescriptor node) { - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { nodesInService++; } } @@ -99,7 +99,7 @@ void subtractStorage(final DatanodeStorageInfo info, final DatanodeDescriptor node) { capacityUsed -= info.getDfsUsed(); blockPoolUsed -= info.getBlockPoolUsed(); - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { capacityTotal -= info.getCapacity(); capacityRemaining -= info.getRemaining(); } else { @@ -108,7 +108,7 @@ void subtractStorage(final DatanodeStorageInfo info, } void subtractNode(final DatanodeDescriptor node) { - if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + if (node.isInService()) { nodesInService--; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9e357a27cf3..ea5cb3a1900 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5436,11 +5436,12 @@ public String getDecomNodes() { . builder() .put("xferaddr", node.getXferAddr()) .put("underReplicatedBlocks", - node.decommissioningStatus.getUnderReplicatedBlocks()) + node.getLeavingServiceStatus().getUnderReplicatedBlocks()) + // TODO use another property name for outOfServiceOnlyReplicas. .put("decommissionOnlyReplicas", - node.decommissioningStatus.getDecommissionOnlyReplicas()) + node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas()) .put("underReplicateInOpenFiles", - node.decommissioningStatus.getUnderReplicatedInOpenFiles()) + node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles()) .build(); info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo); } @@ -5502,7 +5503,7 @@ public String getNodeUsage() { blockManager.getDatanodeManager().fetchDatanodes(live, null, true); for (Iterator it = live.iterator(); it.hasNext();) { DatanodeDescriptor node = it.next(); - if (node.isDecommissionInProgress() || node.isDecommissioned()) { + if (!node.isInService()) { it.remove(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d119792a51c..7fca68bf997 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -535,6 +535,13 @@ + + dfs.namenode.maintenance.replication.min + 1 + Minimal live block replication in existence of maintenance mode. + + + dfs.namenode.safemode.replication.min diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index 0698628bbe3..534c5e0c8f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -102,6 +102,7 @@ public void setup() throws IOException { conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); hostsFileWriter.initialize(conf, "temp/admin"); + } @After @@ -110,17 +111,22 @@ public void teardown() throws IOException { shutdownCluster(); } - protected void writeFile(FileSystem fileSys, Path name, int repl) + static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys, + Path name, short repl, short numOfBlocks) throws IOException { + return writeFile(fileSys, name, repl, numOfBlocks, false); + } + + static protected void writeFile(FileSystem fileSys, Path name, int repl) throws IOException { writeFile(fileSys, name, repl, 2); } - protected void writeFile(FileSystem fileSys, Path name, int repl, + static protected void writeFile(FileSystem fileSys, Path name, int repl, int numOfBlocks) throws IOException { writeFile(fileSys, name, repl, numOfBlocks, true); } - protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, + static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, int repl, int numOfBlocks, boolean completeFile) throws IOException { // create and write a file that contains two blocks of data @@ -136,6 +142,7 @@ protected FSDataOutputStream writeFile(FileSystem fileSys, Path name, stm.close(); return null; } else { + stm.flush(); // Do not close stream, return it // so that it is not garbage collected return stm; @@ -353,7 +360,7 @@ protected void startSimpleHACluster(int numDatanodes) throws IOException { protected void shutdownCluster() { if (cluster != null) { - cluster.shutdown(); + cluster.shutdown(true); } } @@ -362,12 +369,13 @@ protected void refreshNodes(final int nnIndex) throws IOException { refreshNodes(conf); } - protected DatanodeDescriptor getDatanodeDesriptor( + static private DatanodeDescriptor getDatanodeDesriptor( final FSNamesystem ns, final String datanodeUuid) { return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); } - protected void cleanupFile(FileSystem fileSys, Path name) throws IOException { + static public void cleanupFile(FileSystem fileSys, Path name) + throws IOException { assertTrue(fileSys.exists(name)); fileSys.delete(name, true); assertTrue(!fileSys.exists(name)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index f8e6da4ba4c..cb815b198f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -485,7 +485,7 @@ public Boolean get() { shutdownCluster(); } } - + /** * Tests cluster storage statistics during decommissioning for non * federated cluster diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java index 63617ad5ea6..c125f4533c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -18,13 +18,19 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; @@ -32,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.util.Time; import org.junit.Test; @@ -40,13 +48,23 @@ * This class tests node maintenance. */ public class TestMaintenanceState extends AdminStatesBaseTest { - public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class); - static private final long EXPIRATION_IN_MS = 500; + public static final Logger LOG = + LoggerFactory.getLogger(TestMaintenanceState.class); + static private final long EXPIRATION_IN_MS = 50; + private int minMaintenanceR = + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT; public TestMaintenanceState() { setUseCombinedHostFileManager(); } + void setMinMaintenanceR(int minMaintenanceR) { + this.minMaintenanceR = minMaintenanceR; + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY, + minMaintenanceR); + } + /** * Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to * AdminStates.NORMAL. @@ -55,21 +73,25 @@ public TestMaintenanceState() { public void testTakeNodeOutOfEnteringMaintenance() throws Exception { LOG.info("Starting testTakeNodeOutOfEnteringMaintenance"); final int replicas = 1; - final int numNamenodes = 1; - final int numDatanodes = 1; - final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + final Path file = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 1); - FileSystem fileSys = getCluster().getFileSystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file, replicas, 1); - DatanodeInfo nodeOutofService = takeNodeOutofService(0, + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); + // When node is in ENTERING_MAINTENANCE state, it can still serve read + // requests + assertNull(checkWithRetry(ns, fileSys, file, replicas, null, + nodeOutofService)); + putNodeInService(0, nodeOutofService.getDatanodeUuid()); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -80,23 +102,21 @@ public void testTakeNodeOutOfEnteringMaintenance() throws Exception { public void testEnteringMaintenanceExpiration() throws Exception { LOG.info("Starting testEnteringMaintenanceExpiration"); final int replicas = 1; - final int numNamenodes = 1; - final int numDatanodes = 1; - final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + final Path file = new Path("/testEnteringMaintenanceExpiration.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 1); - FileSystem fileSys = getCluster().getFileSystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, replicas, 1); - // expires in 500 milliseconds - DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, - Time.monotonicNow() + EXPIRATION_IN_MS, null, - AdminStates.ENTERING_MAINTENANCE); + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); - waitNodeState(nodeOutofService, AdminStates.NORMAL); + // Adjust the expiration. + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -106,20 +126,18 @@ public void testEnteringMaintenanceExpiration() throws Exception { public void testInvalidExpiration() throws Exception { LOG.info("Starting testInvalidExpiration"); final int replicas = 1; - final int numNamenodes = 1; - final int numDatanodes = 1; - final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + final Path file = new Path("/testInvalidExpiration.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 1); - FileSystem fileSys = getCluster().getFileSystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, replicas, 1); // expiration has to be greater than Time.monotonicNow(). takeNodeOutofService(0, null, Time.monotonicNow(), null, AdminStates.NORMAL); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -129,18 +147,17 @@ public void testInvalidExpiration() throws Exception { @Test(timeout = 360000) public void testPutDeadNodeToMaintenance() throws Exception { LOG.info("Starting testPutDeadNodeToMaintenance"); - final int numNamenodes = 1; - final int numDatanodes = 1; final int replicas = 1; - final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + final Path file = new Path("/testPutDeadNodeToMaintenance.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 1); - FileSystem fileSys = getCluster().getFileSystem(0); - FSNamesystem ns = getCluster().getNamesystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file, replicas, 1); - MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); + final MiniDFSCluster.DataNodeProperties dnProp = + getCluster().stopDataNode(0); DFSTestUtil.waitForDatanodeState( getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); @@ -153,7 +170,7 @@ public void testPutDeadNodeToMaintenance() throws Exception { assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes()); assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -164,16 +181,14 @@ public void testPutDeadNodeToMaintenance() throws Exception { @Test(timeout = 360000) public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration"); - final int numNamenodes = 1; - final int numDatanodes = 1; - final int replicas = 1; - final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + final Path file = + new Path("/testPutDeadNodeToMaintenanceWithExpiration.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 1); FileSystem fileSys = getCluster().getFileSystem(0); FSNamesystem ns = getCluster().getNamesystem(0); - writeFile(fileSys, file1, replicas, 1); + writeFile(fileSys, file, 1, 1); MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); DFSTestUtil.waitForDatanodeState( @@ -184,16 +199,17 @@ public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { DatanodeInfo nodeOutofService = takeNodeOutofService(0, dnProp.datanode.getDatanodeUuid(), - Time.monotonicNow() + EXPIRATION_IN_MS, null, - AdminStates.IN_MAINTENANCE); + Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); - waitNodeState(nodeOutofService, AdminStates.NORMAL); + // Adjust the expiration. + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL); // no change assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes()); assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -202,15 +218,12 @@ public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { @Test(timeout = 360000) public void testTransitionFromDecommissioned() throws IOException { LOG.info("Starting testTransitionFromDecommissioned"); - final int numNamenodes = 1; - final int numDatanodes = 4; - final int replicas = 3; - final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + final Path file = new Path("/testTransitionFromDecommissioned.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 4); - FileSystem fileSys = getCluster().getFileSystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, 3, 1); DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, AdminStates.DECOMMISSIONED); @@ -218,7 +231,7 @@ public void testTransitionFromDecommissioned() throws IOException { takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** @@ -228,34 +241,33 @@ public void testTransitionFromDecommissioned() throws IOException { @Test(timeout = 360000) public void testTransitionFromDecommissionedAndExpired() throws IOException { LOG.info("Starting testTransitionFromDecommissionedAndExpired"); - final int numNamenodes = 1; - final int numDatanodes = 4; - final int replicas = 3; - final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + final Path file = + new Path("/testTransitionFromDecommissionedAndExpired.dat"); - startCluster(numNamenodes, numDatanodes); + startCluster(1, 4); - FileSystem fileSys = getCluster().getFileSystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, 3, 1); - DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, - AdminStates.DECOMMISSIONED); + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, + null, AdminStates.DECOMMISSIONED); takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), - Time.monotonicNow() + EXPIRATION_IN_MS, null, - AdminStates.IN_MAINTENANCE); + Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); - waitNodeState(nodeOutofService, AdminStates.NORMAL); + // Adjust the expiration. + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } /** * When a node is put to maintenance, it first transitions to * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal * replication before it can be transitioned to AdminStates.IN_MAINTENANCE. - * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin - * state should stay in AdminStates.ENTERING_MAINTENANCE state. + * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, it + * should stay in AdminStates.ENTERING_MAINTENANCE state. */ @Test(timeout = 360000) public void testNodeDeadWhenInEnteringMaintenance() throws Exception { @@ -263,16 +275,16 @@ public void testNodeDeadWhenInEnteringMaintenance() throws Exception { final int numNamenodes = 1; final int numDatanodes = 1; final int replicas = 1; - final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat"); + final Path file = new Path("/testNodeDeadWhenInEnteringMaintenance.dat"); startCluster(numNamenodes, numDatanodes); - FileSystem fileSys = getCluster().getFileSystem(0); - FSNamesystem ns = getCluster().getNamesystem(0); - writeFile(fileSys, file1, replicas, 1); + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file, replicas, 1); DatanodeInfo nodeOutofService = takeNodeOutofService(0, - getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); @@ -281,30 +293,627 @@ public void testNodeDeadWhenInEnteringMaintenance() throws Exception { DFSTestUtil.waitForDatanodeState( getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); DFSClient client = getDfsClient(0); - assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, + assertEquals("maintenance node shouldn't be live", numDatanodes - 1, client.datanodeReport(DatanodeReportType.LIVE).length); + assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); getCluster().restartDataNode(dnProp, true); getCluster().waitActive(); waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE); assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); + assertEquals("maintenance node should be live", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); - cleanupFile(fileSys, file1); + cleanupFile(fileSys, file); } - static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys, + /** + * When a node is put to maintenance, it first transitions to + * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have + * been properly replicated before it can be transitioned to + * AdminStates.IN_MAINTENANCE. The expected replication count takes + * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY and + * its file's replication factor into account. + */ + @Test(timeout = 360000) + public void testExpectedReplications() throws IOException { + LOG.info("Starting testExpectedReplications"); + testExpectedReplication(1); + testExpectedReplication(2); + testExpectedReplication(3); + testExpectedReplication(4); + } + + private void testExpectedReplication(int replicationFactor) + throws IOException { + testExpectedReplication(replicationFactor, + Math.max(replicationFactor - 1, this.minMaintenanceR)); + } + + private void testExpectedReplication(int replicationFactor, + int expectedReplicasInRead) throws IOException { + startCluster(1, 5); + + final Path file = new Path("/testExpectedReplication.dat"); + + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, replicationFactor, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + // The block should be replicated to another datanode to meet + // expected replication count. + assertNull(checkWithRetry(ns, fileSys, file, expectedReplicasInRead, + nodeOutofService)); + + cleanupFile(fileSys, file); + } + + /** + * Verify a node can transition directly to AdminStates.IN_MAINTENANCE when + * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero. + */ + @Test(timeout = 360000) + public void testZeroMinMaintenanceReplication() throws Exception { + LOG.info("Starting testZeroMinMaintenanceReplication"); + setMinMaintenanceR(0); + startCluster(1, 1); + + final Path file = new Path("/testZeroMinMaintenanceReplication.dat"); + final int replicas = 1; + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, replicas, 1); + + takeNodeOutofService(0, null, Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + cleanupFile(fileSys, file); + } + + /** + * Verify a node can transition directly to AdminStates.IN_MAINTENANCE when + * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero. Then later + * transition to NORMAL after maintenance expiration. + */ + @Test(timeout = 360000) + public void testZeroMinMaintenanceReplicationWithExpiration() + throws Exception { + LOG.info("Starting testZeroMinMaintenanceReplicationWithExpiration"); + setMinMaintenanceR(0); + startCluster(1, 1); + + final Path file = + new Path("/testZeroMinMaintenanceReplicationWithExpiration.dat"); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file, 1, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); + + // Adjust the expiration. + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL); + + cleanupFile(fileSys, file); + } + + /** + * Transition from IN_MAINTENANCE to DECOMMISSIONED. + */ + @Test(timeout = 360000) + public void testTransitionToDecommission() throws IOException { + LOG.info("Starting testTransitionToDecommission"); + final int numNamenodes = 1; + final int numDatanodes = 4; + startCluster(numNamenodes, numDatanodes); + + final Path file = new Path("testTransitionToDecommission.dat"); + final int replicas = 3; + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + DFSClient client = getDfsClient(0); + assertEquals("All datanodes must be alive", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); + + // test 1, verify the replica in IN_MAINTENANCE state isn't in LocatedBlock + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), 0, null, + AdminStates.DECOMMISSIONED); + + // test 2 after decommission has completed, the replication count is + // replicas + 1 which includes the decommissioned node. + assertNull(checkWithRetry(ns, fileSys, file, replicas + 1, null)); + + // test 3, put the node in service, replication count should restore. + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + assertNull(checkWithRetry(ns, fileSys, file, replicas, null)); + + cleanupFile(fileSys, file); + } + + /** + * Transition from decommissioning state to maintenance state. + */ + @Test(timeout = 360000) + public void testTransitionFromDecommissioning() throws IOException { + LOG.info("Starting testTransitionFromDecommissioning"); + startCluster(1, 3); + + final Path file = new Path("/testTransitionFromDecommissioning.dat"); + final int replicas = 3; + + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, replicas); + + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, + null, AdminStates.DECOMMISSION_INPROGRESS); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + cleanupFile(fileSys, file); + } + + + /** + * First put a node in maintenance, then put a different node + * in decommission. Make sure decommission process take + * maintenance replica into account. + */ + @Test(timeout = 360000) + public void testDecommissionDifferentNodeAfterMaintenances() + throws Exception { + testDecommissionDifferentNodeAfterMaintenance(2); + testDecommissionDifferentNodeAfterMaintenance(3); + testDecommissionDifferentNodeAfterMaintenance(4); + } + + private void testDecommissionDifferentNodeAfterMaintenance(int repl) + throws Exception { + startCluster(1, 5); + + final Path file = + new Path("/testDecommissionDifferentNodeAfterMaintenance.dat"); + + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, repl, 1); + final DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, + file); + String maintenanceDNUuid = nodes[0].getDatanodeUuid(); + String decommissionDNUuid = nodes[1].getDatanodeUuid(); + DatanodeInfo maintenanceDN = takeNodeOutofService(0, maintenanceDNUuid, + Long.MAX_VALUE, null, null, AdminStates.IN_MAINTENANCE); + + Map maintenanceNodes = new HashMap<>(); + maintenanceNodes.put(nodes[0], Long.MAX_VALUE); + takeNodeOutofService(0, decommissionDNUuid, 0, null, maintenanceNodes, + AdminStates.DECOMMISSIONED); + // Out of the replicas returned, one is the decommissioned node. + assertNull(checkWithRetry(ns, fileSys, file, repl, maintenanceDN)); + + putNodeInService(0, maintenanceDN); + assertNull(checkWithRetry(ns, fileSys, file, repl + 1, null)); + + cleanupFile(fileSys, file); + } + + + @Test(timeout = 360000) + public void testChangeReplicationFactors() throws IOException { + // Prior to any change, there is 1 maintenance node and 2 live nodes. + + // Replication factor is adjusted from 3 to 4. + // After the change, given 1 maintenance + 2 live is less than the + // newFactor, one live nodes will be added. + testChangeReplicationFactor(3, 4, 3); + + // Replication factor is adjusted from 3 to 2. + // After the change, given 2 live nodes is the same as the newFactor, + // no live nodes will be invalidated. + testChangeReplicationFactor(3, 2, 2); + + // Replication factor is adjusted from 3 to 1. + // After the change, given 2 live nodes is greater than the newFactor, + // one live nodes will be invalidated. + testChangeReplicationFactor(3, 1, 1); + } + + /** + * After the change of replication factor, # of live replicas <= + * the new replication factor. + */ + private void testChangeReplicationFactor(int oldFactor, int newFactor, + int expectedLiveReplicas) throws IOException { + LOG.info("Starting testChangeReplicationFactor {} {} {}", + oldFactor, newFactor, expectedLiveReplicas); + startCluster(1, 5); + + final Path file = new Path("/testChangeReplicationFactor.dat"); + + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, oldFactor, 1); + + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + // Verify that the nodeOutofService remains in blocksMap and + // # of live replicas For read operation is expected. + assertNull(checkWithRetry(ns, fileSys, file, oldFactor - 1, + nodeOutofService)); + + final DFSClient client = getDfsClient(0); + client.setReplication(file.toString(), (short)newFactor); + + // Verify that the nodeOutofService remains in blocksMap and + // # of live replicas for read operation. + assertNull(checkWithRetry(ns, fileSys, file, expectedLiveReplicas, + nodeOutofService)); + + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + assertNull(checkWithRetry(ns, fileSys, file, newFactor, null)); + + cleanupFile(fileSys, file); + } + + + /** + * Verify the following scenario. + * a. Put a live node to maintenance => 1 maintenance, 2 live. + * b. The maintenance node becomes dead => block map still has 1 maintenance, + * 2 live. + * c. Take the node out of maintenance => NN should schedule the replication + * and end up with 3 live. + */ + @Test(timeout = 360000) + public void testTakeDeadNodeOutOfMaintenance() throws Exception { + LOG.info("Starting testTakeDeadNodeOutOfMaintenance"); + final int numNamenodes = 1; + final int numDatanodes = 4; + startCluster(numNamenodes, numDatanodes); + + final Path file = new Path("/testTakeDeadNodeOutOfMaintenance.dat"); + final int replicas = 3; + + final FileSystem fileSys = getCluster().getFileSystem(0); + final FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file, replicas, 1); + + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + final DFSClient client = getDfsClient(0); + assertEquals("All datanodes must be alive", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); + + getCluster().stopDataNode(nodeOutofService.getXferAddr()); + DFSTestUtil.waitForDatanodeState( + getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); + assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, + client.datanodeReport(DatanodeReportType.LIVE).length); + + // Dead maintenance node's blocks should remain in block map. + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + // When dead maintenance mode is transitioned to out of maintenance mode, + // its blocks should be removed from block map. + // This will then trigger replication to restore the live replicas back + // to replication factor. + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService, + null)); + + cleanupFile(fileSys, file); + } + + + /** + * Verify the following scenario. + * a. Put a live node to maintenance => 1 maintenance, 2 live. + * b. The maintenance node becomes dead => block map still has 1 maintenance, + * 2 live. + * c. Restart nn => block map only has 2 live => restore the 3 live. + * d. Restart the maintenance dn => 1 maintenance, 3 live. + * e. Take the node out of maintenance => over replication => 3 live. + */ + @Test(timeout = 360000) + public void testWithNNAndDNRestart() throws Exception { + LOG.info("Starting testWithNNAndDNRestart"); + final int numNamenodes = 1; + final int numDatanodes = 4; + startCluster(numNamenodes, numDatanodes); + + final Path file = new Path("/testWithNNAndDNRestart.dat"); + final int replicas = 3; + + final FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + DFSClient client = getDfsClient(0); + assertEquals("All datanodes must be alive", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); + + MiniDFSCluster.DataNodeProperties dnProp = + getCluster().stopDataNode(nodeOutofService.getXferAddr()); + DFSTestUtil.waitForDatanodeState( + getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); + assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, + client.datanodeReport(DatanodeReportType.LIVE).length); + + // Dead maintenance node's blocks should remain in block map. + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService)); + + // restart nn, nn will restore 3 live replicas given it doesn't + // know the maintenance node has the replica. + getCluster().restartNameNode(0); + ns = getCluster().getNamesystem(0); + assertNull(checkWithRetry(ns, fileSys, file, replicas, null)); + + // restart dn, nn has 1 maintenance replica and 3 live replicas. + getCluster().restartDataNode(dnProp, true); + getCluster().waitActive(); + assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService)); + + // Put the node in service, a redundant replica should be removed. + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + assertNull(checkWithRetry(ns, fileSys, file, replicas, null)); + + cleanupFile(fileSys, file); + } + + + /** + * Machine under maintenance state won't be chosen for new block allocation. + */ + @Test(timeout = 3600000) + public void testWriteAfterMaintenance() throws IOException { + LOG.info("Starting testWriteAfterMaintenance"); + startCluster(1, 3); + + final Path file = new Path("/testWriteAfterMaintenance.dat"); + int replicas = 3; + + final FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + + final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); + + writeFile(fileSys, file, replicas, 2); + + // Verify nodeOutofService wasn't chosen for write operation. + assertNull(checkWithRetry(ns, fileSys, file, replicas - 1, + nodeOutofService, null)); + + // Put the node back to service, live replicas should be restored. + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + assertNull(checkWithRetry(ns, fileSys, file, replicas, null)); + + cleanupFile(fileSys, file); + } + + /** + * A node has blocks under construction when it is put to maintenance. + * Given there are minReplication replicas somewhere else, + * it can be transitioned to AdminStates.IN_MAINTENANCE. + */ + @Test(timeout = 360000) + public void testEnterMaintenanceWhenFileOpen() throws Exception { + LOG.info("Starting testEnterMaintenanceWhenFileOpen"); + startCluster(1, 3); + + final Path file = new Path("/testEnterMaintenanceWhenFileOpen.dat"); + + final FileSystem fileSys = getCluster().getFileSystem(0); + writeIncompleteFile(fileSys, file, (short)3, (short)2); + + takeNodeOutofService(0, null, Long.MAX_VALUE, null, + AdminStates.IN_MAINTENANCE); + + cleanupFile(fileSys, file); + } + + /** + * Machine under maintenance state won't be chosen for invalidation. + */ + @Test(timeout = 360000) + public void testInvalidation() throws IOException { + LOG.info("Starting testInvalidation"); + int numNamenodes = 1; + int numDatanodes = 3; + startCluster(numNamenodes, numDatanodes); + + Path file = new Path("/testInvalidation.dat"); + int replicas = 3; + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + + writeFile(fileSys, file, replicas); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE); + + DFSClient client = getDfsClient(0); + client.setReplication(file.toString(), (short) 1); + + // Verify the nodeOutofService remains in blocksMap. + assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService)); + + // Restart NN and verify the nodeOutofService remains in blocksMap. + getCluster().restartNameNode(0); + ns = getCluster().getNamesystem(0); + assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService)); + + cleanupFile(fileSys, file); + } + + static String getFirstBlockFirstReplicaUuid(FileSystem fileSys, Path name) throws IOException { + DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name); + if (nodes != null && nodes.length != 0) { + return nodes[0].getDatanodeUuid(); + } else { + return null; + } + } + + /* + * Verify that the number of replicas are as expected for each block in + * the given file. + * + * @return - null if no failure found, else an error message string. + */ + static String checkFile(FSNamesystem ns, FileSystem fileSys, + Path name, int repl, DatanodeInfo expectedExcludedNode, + DatanodeInfo expectedMaintenanceNode) throws IOException { + // need a raw stream + assertTrue("Not HDFS:"+fileSys.getUri(), + fileSys instanceof DistributedFileSystem); + HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name); + BlockManager bm = ns.getBlockManager(); + Collection dinfo = dis.getAllBlocks(); + String output; + for (LocatedBlock blk : dinfo) { // for each block + DatanodeInfo[] nodes = blk.getLocations(); + for (int j = 0; j < nodes.length; j++) { // for each replica + if (expectedExcludedNode != null && + nodes[j].equals(expectedExcludedNode)) { + //excluded node must not be in LocatedBlock. + output = "For block " + blk.getBlock() + " replica on " + + nodes[j] + " found in LocatedBlock."; + LOG.info(output); + return output; + } else { + if (nodes[j].isInMaintenance()) { + //IN_MAINTENANCE node must not be in LocatedBlock. + output = "For block " + blk.getBlock() + " replica on " + + nodes[j] + " which is in maintenance state."; + LOG.info(output); + return output; + } + } + } + if (repl != nodes.length) { + output = "Wrong number of replicas for block " + blk.getBlock() + + ": expected " + repl + ", got " + nodes.length + " ,"; + for (int j = 0; j < nodes.length; j++) { // for each replica + output += nodes[j] + ","; + } + output += "pending block # " + ns.getPendingReplicationBlocks() + " ,"; + output += "under replicated # " + ns.getUnderReplicatedBlocks() + " ,"; + if (expectedExcludedNode != null) { + output += "excluded node " + expectedExcludedNode; + } + + LOG.info(output); + return output; + } + + // Verify it has the expected maintenance node + Iterator storageInfoIter = + bm.getStorages(blk.getBlock().getLocalBlock()).iterator(); + List maintenanceNodes = new ArrayList<>(); + while (storageInfoIter.hasNext()) { + DatanodeInfo node = storageInfoIter.next().getDatanodeDescriptor(); + if (node.isMaintenance()) { + maintenanceNodes.add(node); + } + } + + if (expectedMaintenanceNode != null) { + if (!maintenanceNodes.contains(expectedMaintenanceNode)) { + output = "No maintenance replica on " + expectedMaintenanceNode; + LOG.info(output); + return output; + } + } else { + if (maintenanceNodes.size() != 0) { + output = "Has maintenance replica(s)"; + LOG.info(output); + return output; + } + } + } + return null; + } + + static String checkWithRetry(FSNamesystem ns, FileSystem fileSys, + Path name, int repl, DatanodeInfo inMaintenanceNode) + throws IOException { + return checkWithRetry(ns, fileSys, name, repl, inMaintenanceNode, + inMaintenanceNode); + } + + static String checkWithRetry(FSNamesystem ns, FileSystem fileSys, + Path name, int repl, DatanodeInfo excludedNode, + DatanodeInfo underMaintenanceNode) throws IOException { + int tries = 0; + String output = null; + while (tries++ < 200) { + try { + Thread.sleep(100); + output = checkFile(ns, fileSys, name, repl, excludedNode, + underMaintenanceNode); + if (output == null) { + break; + } + } catch (InterruptedException ie) { + } + } + return output; + } + + static private DatanodeInfo[] getFirstBlockReplicasDatanodeInfos( + FileSystem fileSys, Path name) throws IOException { // need a raw stream assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem); HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name); Collection dinfo = dis.getAllBlocks(); - for (LocatedBlock blk : dinfo) { // for each block - DatanodeInfo[] nodes = blk.getLocations(); - if (nodes.length > 0) { - return nodes[0].getDatanodeUuid(); - } + if (dinfo.iterator().hasNext()) { // for the first block + return dinfo.iterator().next().getLocations(); + } else { + return null; } - return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 81fdc63af1e..96b5035a65a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -414,7 +414,7 @@ private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex, throws Exception { assertEquals(0, bm.numOfUnderReplicatedBlocks()); BlockInfo block = addBlockOnNodes(testIndex, origNodes); - assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block))); + assertFalse(bm.isNeededReplication(block, bm.countNodes(block))); } @Test(timeout = 60000) @@ -458,7 +458,7 @@ public void testNeededReplicationWhileAppending() throws IOException { namenode.updatePipeline(clientName, oldBlock, newBlock, oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs()); BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock()); - assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi))); + assertFalse(bm.isNeededReplication(bi, bm.countNodes(bi))); } finally { IOUtils.closeStream(out); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 0d17b71b8e9..0ca7412f6ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.AdminStatesBaseTest; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -123,25 +124,7 @@ private void writeFile(FileSystem fileSys, Path name, short repl) stm.write(buffer); stm.close(); } - - private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name, - short repl) throws IOException { - // create and write a file that contains three blocks of data - FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() - .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), repl, - blockSize); - byte[] buffer = new byte[fileSize]; - Random rand = new Random(seed); - rand.nextBytes(buffer); - stm.write(buffer); - // need to make sure that we actually write out both file blocks - // (see FSOutputSummer#flush) - stm.flush(); - // Do not close stream, return it - // so that it is not garbage collected - return stm; - } - + static private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.exists(name)); @@ -152,19 +135,19 @@ static private void cleanupFile(FileSystem fileSys, Path name) /* * Decommissions the node at the given index */ - private String decommissionNode(FSNamesystem namesystem, DFSClient client, + private String decommissionNode(DFSClient client, int nodeIndex) throws IOException { DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); String nodename = info[nodeIndex].getXferAddr(); - decommissionNode(namesystem, nodename); + decommissionNode(nodename); return nodename; } /* * Decommissions the node by name */ - private void decommissionNode(FSNamesystem namesystem, String dnName) + private void decommissionNode(String dnName) throws IOException { System.out.println("Decommissioning node: " + dnName); @@ -179,14 +162,14 @@ private void checkDecommissionStatus(DatanodeDescriptor decommNode, int expectedUnderRepInOpenFiles) { assertEquals("Unexpected num under-replicated blocks", expectedUnderRep, - decommNode.decommissioningStatus.getUnderReplicatedBlocks()); + decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks()); assertEquals("Unexpected number of decom-only replicas", expectedDecommissionOnly, - decommNode.decommissioningStatus.getDecommissionOnlyReplicas()); + decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas()); assertEquals( "Unexpected number of replicas in under-replicated open files", expectedUnderRepInOpenFiles, - decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles()); + decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles()); } private void checkDFSAdminDecommissionStatus( @@ -255,7 +238,8 @@ public void testDecommissionStatus() throws Exception { writeFile(fileSys, file1, replicas); Path file2 = new Path("decommission1.dat"); - FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas); + FSDataOutputStream st1 = AdminStatesBaseTest.writeIncompleteFile(fileSys, + file2, replicas, (short)(fileSize / blockSize)); for (DataNode d: cluster.getDataNodes()) { DataNodeTestUtils.triggerBlockReport(d); } @@ -263,7 +247,7 @@ public void testDecommissionStatus() throws Exception { FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); for (int iteration = 0; iteration < numDatanodes; iteration++) { - String downnode = decommissionNode(fsn, client, iteration); + String downnode = decommissionNode(client, iteration); dm.refreshNodes(conf); decommissionedNodes.add(downnode); BlockManagerTestUtil.recheckDecommissionState(dm); @@ -293,8 +277,8 @@ public void testDecommissionStatus() throws Exception { hostsFileWriter.initExcludeHost(""); dm.refreshNodes(conf); st1.close(); - cleanupFile(fileSys, file1); - cleanupFile(fileSys, file2); + AdminStatesBaseTest.cleanupFile(fileSys, file1); + AdminStatesBaseTest.cleanupFile(fileSys, file2); } /** @@ -320,7 +304,7 @@ public void testDecommissionStatusAfterDNRestart() throws Exception { // Decommission the DN. FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); - decommissionNode(fsn, dnName); + decommissionNode(dnName); dm.refreshNodes(conf); // Stop the DN when decommission is in progress. @@ -355,7 +339,7 @@ public void testDecommissionStatusAfterDNRestart() throws Exception { // Delete the under-replicated file, which should let the // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED - cleanupFile(fileSys, f); + AdminStatesBaseTest.cleanupFile(fileSys, f); BlockManagerTestUtil.recheckDecommissionState(dm); assertTrue("the node should be decommissioned", dead.get(0).isDecommissioned()); @@ -388,7 +372,7 @@ public void testDecommissionDeadDN() throws Exception { FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID); - decommissionNode(fsn, dnName); + decommissionNode(dnName); dm.refreshNodes(conf); BlockManagerTestUtil.recheckDecommissionState(dm); assertTrue(dnDescriptor.isDecommissioned()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index 38ec9f803ae..c9fe2c3d570 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -195,9 +195,17 @@ public void testVolumeSize() throws Exception { private static final float EPSILON = 0.0001f; @Test public void testXceiverCount() throws Exception { + testXceiverCountInternal(0); + testXceiverCountInternal(1); + } + + public void testXceiverCountInternal(int minMaintenanceR) throws Exception { Configuration conf = new HdfsConfiguration(); // retry one time, if close fails - conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1); + conf.setInt( + HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY, + minMaintenanceR); MiniDFSCluster cluster = null; final int nodes = 8; @@ -220,23 +228,23 @@ public void testXceiverCount() throws Exception { int expectedTotalLoad = nodes; // xceiver server adds 1 to load int expectedInServiceNodes = nodes; int expectedInServiceLoad = nodes; - checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); - - // shutdown half the nodes and force a heartbeat check to ensure - // counts are accurate + checkClusterHealth(nodes, namesystem, expectedTotalLoad, + expectedInServiceNodes, expectedInServiceLoad); + + // Shutdown half the nodes followed by admin operations on those nodes. + // Ensure counts are accurate. for (int i=0; i < nodes/2; i++) { DataNode dn = datanodes.get(i); DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId()); dn.shutdown(); DFSTestUtil.setDatanodeDead(dnd); BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); - //Verify decommission of dead node won't impact nodesInService metrics. - dnm.getDecomManager().startDecommission(dnd); + //Admin operations on dead nodes won't impact nodesInService metrics. + startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0)); expectedInServiceNodes--; assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes()); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); - //Verify recommission of dead node won't impact nodesInService metrics. - dnm.getDecomManager().stopDecommission(dnd); + stopDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0)); assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); } @@ -247,8 +255,9 @@ public void testXceiverCount() throws Exception { datanodes = cluster.getDataNodes(); expectedInServiceNodes = nodes; assertEquals(nodes, datanodes.size()); - checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); - + checkClusterHealth(nodes, namesystem, expectedTotalLoad, + expectedInServiceNodes, expectedInServiceLoad); + // create streams and hsync to force datastreamers to start DFSOutputStream[] streams = new DFSOutputStream[fileCount]; for (int i=0; i < fileCount; i++) { @@ -263,30 +272,32 @@ public void testXceiverCount() throws Exception { } // force nodes to send load update triggerHeartbeats(datanodes); - checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); + checkClusterHealth(nodes, namesystem, expectedTotalLoad, + expectedInServiceNodes, expectedInServiceLoad); - // decomm a few nodes, substract their load from the expected load, - // trigger heartbeat to force load update + // admin operations on a few nodes, substract their load from the + // expected load, trigger heartbeat to force load update. for (int i=0; i < fileRepl; i++) { expectedInServiceNodes--; DatanodeDescriptor dnd = dnm.getDatanode(datanodes.get(i).getDatanodeId()); expectedInServiceLoad -= dnd.getXceiverCount(); - dnm.getDecomManager().startDecommission(dnd); + startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0)); DataNodeTestUtils.triggerHeartbeat(datanodes.get(i)); Thread.sleep(100); - checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); + checkClusterHealth(nodes, namesystem, expectedTotalLoad, + expectedInServiceNodes, expectedInServiceLoad); } - + // check expected load while closing each stream. recalc expected // load based on whether the nodes in the pipeline are decomm for (int i=0; i < fileCount; i++) { - int decomm = 0; + int adminOps = 0; for (DatanodeInfo dni : streams[i].getPipeline()) { DatanodeDescriptor dnd = dnm.getDatanode(dni); expectedTotalLoad -= 2; - if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) { - decomm++; + if (!dnd.isInService()) { + adminOps++; } else { expectedInServiceLoad -= 2; } @@ -297,16 +308,17 @@ public void testXceiverCount() throws Exception { // nodes will go decommissioned even if there's a UC block whose // other locations are decommissioned too. we'll ignore that // bug for now - if (decomm < fileRepl) { + if (adminOps < fileRepl) { throw ioe; } } triggerHeartbeats(datanodes); // verify node count and loads - checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad); + checkClusterHealth(nodes, namesystem, expectedTotalLoad, + expectedInServiceNodes, expectedInServiceLoad); } - // shutdown each node, verify node counts based on decomm state + // shutdown each node, verify node counts based on admin state for (int i=0; i < nodes; i++) { DataNode dn = datanodes.get(i); dn.shutdown(); @@ -320,13 +332,11 @@ public void testXceiverCount() throws Exception { expectedInServiceNodes--; } assertEquals(expectedInServiceNodes, getNumDNInService(namesystem)); - // live nodes always report load of 1. no nodes is load 0 double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0; assertEquals((double)expectedXceiverAvg, getInServiceXceiverAverage(namesystem), EPSILON); } - // final sanity check checkClusterHealth(0, namesystem, 0.0, 0, 0.0); } finally { @@ -336,6 +346,24 @@ public void testXceiverCount() throws Exception { } } + private void startDecommissionOrMaintenance(DatanodeManager dnm, + DatanodeDescriptor dnd, boolean decomm) { + if (decomm) { + dnm.getDecomManager().startDecommission(dnd); + } else { + dnm.getDecomManager().startMaintenance(dnd, Long.MAX_VALUE); + } + } + + private void stopDecommissionOrMaintenance(DatanodeManager dnm, + DatanodeDescriptor dnd, boolean decomm) { + if (decomm) { + dnm.getDecomManager().stopDecommission(dnd); + } else { + dnm.getDecomManager().stopMaintenance(dnd); + } + } + private static void checkClusterHealth( int numOfLiveNodes, FSNamesystem namesystem, double expectedTotalLoad, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java index 4c8fcef5a8a..e171e2bbdc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java @@ -54,6 +54,7 @@ public void initialize(Configuration conf, String dir) throws IOException { localFileSys = FileSystem.getLocal(conf); Path workingDir = new Path(MiniDFSCluster.getBaseDirectory()); this.fullDir = new Path(workingDir, dir); + cleanup(); // In case there is some left over from previous run. assertTrue(localFileSys.mkdirs(this.fullDir)); if (conf.getClass(