diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 98cf3068e3e..9c1ea6570cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -20,6 +20,9 @@ Release 2.9.0 - UNRELEASED HDFS-8947. NameNode, DataNode and NFS gateway to support JvmPauseMonitor as a service. (Sunil G via Stevel) + HDFS-9129. Move the safemode block count into BlockManager. (Mingliang Liu + via jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 99e0567f91d..a6e769261c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -116,6 +117,8 @@ public class BlockManager implements BlockStatsMXBean { private final Namesystem namesystem; + private final BlockManagerSafeMode bmSafeMode; + private final DatanodeManager datanodeManager; private final HeartbeatManager heartbeatManager; private final BlockTokenSecretManager blockTokenSecretManager; @@ -364,6 +367,8 @@ public BlockManager(final Namesystem namesystem, final Configuration conf) this.numberOfBytesInFutureBlocks = new AtomicLong(); this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf)); + bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf); + LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); @@ -465,15 +470,17 @@ boolean shouldUpdateBlockKey(final long updateTime) throws IOException { : false; } - public void activate(Configuration conf) { + public void activate(Configuration conf, long blockTotal) { pendingReplications.start(); datanodeManager.activate(conf); this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); + bmSafeMode.activate(blockTotal); } public void close() { + bmSafeMode.close(); try { replicationThread.interrupt(); replicationThread.join(3000); @@ -709,9 +716,8 @@ private void completeBlock(BlockInfo curBlock, boolean force) // count. (We may not have the minimum replica count yet if this is // a "forced" completion when a file is getting closed by an // OP_CLOSE edit on the standby). - namesystem.adjustSafeModeBlockTotals(0, 1); - namesystem.incrementSafeBlockCount( - Math.min(numNodes, minReplication)); + bmSafeMode.adjustBlockTotals(0, 1); + bmSafeMode.incrementSafeBlockCount(Math.min(numNodes, minReplication)); } /** @@ -768,7 +774,7 @@ public LocatedBlock convertLastBlockToUnderConstruction( // Adjust safe-mode totals, since under-construction blocks don't // count in safe-mode. - namesystem.adjustSafeModeBlockTotals( + bmSafeMode.adjustBlockTotals( // decrement safe if we had enough targets.length >= minReplication ? -1 : 0, // always decrement total blocks @@ -1111,7 +1117,7 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { removeStoredBlock(block, node); invalidateBlocks.remove(node, block); } - namesystem.checkSafeMode(); + checkSafeMode(); } /** @@ -1777,6 +1783,76 @@ public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) { return leaseId; } + public void registerDatanode(DatanodeRegistration nodeReg) + throws IOException { + assert namesystem.hasWriteLock(); + datanodeManager.registerDatanode(nodeReg); + bmSafeMode.checkSafeMode(); + } + + /** + * Set the total number of blocks in the system. + * If safe mode is not currently on, this is a no-op. + */ + public void setBlockTotal(long total) { + if (bmSafeMode.isInSafeMode()) { + bmSafeMode.setBlockTotal(total); + bmSafeMode.checkSafeMode(); + } + } + + public boolean isInSafeMode() { + return bmSafeMode.isInSafeMode(); + } + + public String getSafeModeTip() { + return bmSafeMode.getSafeModeTip(); + } + + public void leaveSafeMode(boolean force) { + bmSafeMode.leaveSafeMode(force); + } + + void checkSafeMode() { + bmSafeMode.checkSafeMode(); + } + + /** + * Removes the blocks from blocksmap and updates the safemode blocks total + * + * @param blocks + * An instance of {@link BlocksMapUpdateInfo} which contains a list + * of blocks that need to be removed from blocksMap + */ + public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) { + assert namesystem.hasWriteLock(); + // In the case that we are a Standby tailing edits from the + // active while in safe-mode, we need to track the total number + // of blocks and safe blocks in the system. + boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks(); + int numRemovedComplete = 0, numRemovedSafe = 0; + + for (BlockInfo b : blocks.getToDeleteList()) { + if (trackBlockCounts) { + if (b.isComplete()) { + numRemovedComplete++; + if (checkMinReplication(b)) { + numRemovedSafe++; + } + } + } + removeBlock(b); + } + if (trackBlockCounts) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adjusting safe-mode totals for deletion." + + "decreasing safeBlocks by " + numRemovedSafe + + ", totalBlocks by " + numRemovedComplete); + } + bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete); + } + } + /** * StatefulBlockInfo is used to build the "toUC" list, which is a list of * updates to the information about under-construction blocks. @@ -2162,7 +2238,7 @@ private void processFirstBlockReport( if (namesystem.isInSnapshot(storedBlock)) { int numOfReplicas = storedBlock.getUnderConstructionFeature() .getNumExpectedLocations(); - namesystem.incrementSafeBlockCount(numOfReplicas); + bmSafeMode.incrementSafeBlockCount(numOfReplicas); } //and fall through to next clause } @@ -2543,7 +2619,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, // only complete blocks are counted towards that. // In the case that the block just became complete above, completeBlock() // handles the safe block count maintenance. - namesystem.incrementSafeBlockCount(numCurrentReplica); + bmSafeMode.incrementSafeBlockCount(numCurrentReplica); } } @@ -2618,7 +2694,7 @@ private Block addStoredBlock(final BlockInfo block, // Is no-op if not in safe mode. // In the case that the block just became complete above, completeBlock() // handles the safe block count maintenance. - namesystem.incrementSafeBlockCount(numCurrentReplica); + bmSafeMode.incrementSafeBlockCount(numCurrentReplica); } // if file is under construction, then done for now @@ -3048,7 +3124,7 @@ public void removeStoredBlock(Block block, DatanodeDescriptor node) { // BlockCollection bc = getBlockCollection(storedBlock); if (bc != null) { - namesystem.decrementSafeBlockCount(storedBlock); + bmSafeMode.decrementSafeBlockCount(storedBlock); updateNeededReplications(storedBlock, -1, 0); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java new file mode 100644 index 00000000000..f2ad00beaba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerSafeMode.java @@ -0,0 +1,568 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.Daemon; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY; +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * Block manager safe mode info. + * + * During name node startup, counts the number of safe blocks, those + * that have at least the minimal number of replicas, and calculates the ratio + * of safe blocks to the total number of blocks in the system, which is the size + * of blocks. When the ratio reaches the {@link #threshold} and enough live data + * nodes have registered, it needs to wait for the safe mode {@link #extension} + * interval. After the extension period has passed, it will not leave safe mode + * until the safe blocks ratio reaches the {@link #threshold} and enough live + * data node registered. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class BlockManagerSafeMode { + enum BMSafeModeStatus { + PENDING_THRESHOLD, /** Pending on more safe blocks or live datanode. */ + EXTENSION, /** In extension period. */ + OFF /** Safe mode is off. */ + } + + static final Logger LOG = LoggerFactory.getLogger(BlockManagerSafeMode.class); + static final Step STEP_AWAITING_REPORTED_BLOCKS = + new Step(StepType.AWAITING_REPORTED_BLOCKS); + + private final BlockManager blockManager; + private final Namesystem namesystem; + private final boolean haEnabled; + private volatile BMSafeModeStatus status = BMSafeModeStatus.OFF; + + /** Safe mode threshold condition %.*/ + private final double threshold; + /** Number of blocks needed to satisfy safe mode threshold condition. */ + private long blockThreshold; + /** Total number of blocks. */ + private long blockTotal; + /** Number of safe blocks. */ + private long blockSafe; + /** Safe mode minimum number of datanodes alive. */ + private final int datanodeThreshold; + /** Min replication required by safe mode. */ + private final int safeReplication; + /** Threshold for populating needed replication queues. */ + private final double replQueueThreshold; + /** Number of blocks needed before populating replication queues. */ + private long blockReplQueueThreshold; + + /** How long (in ms) is the extension period. */ + private final int extension; + /** Timestamp of the first time when thresholds are met. */ + private final AtomicLong reachedTime = new AtomicLong(); + /** Timestamp of the safe mode initialized. */ + private long startTime; + /** the safe mode monitor thread. */ + private final Daemon smmthread = new Daemon(new SafeModeMonitor()); + + /** time of the last status printout */ + private long lastStatusReport; + /** Counter for tracking startup progress of reported blocks. */ + private Counter awaitingReportedBlocksCounter; + + BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem, + Configuration conf) { + this.blockManager = blockManager; + this.namesystem = namesystem; + this.haEnabled = namesystem.isHaEnabled(); + this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, + DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT); + if (this.threshold > 1.0) { + LOG.warn("The threshold value should't be greater than 1, threshold: {}", + threshold); + } + this.datanodeThreshold = conf.getInt( + DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, + DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT); + int minReplication = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); + // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting, + // setting this lower than the min replication is not recommended + // and/or dangerous for production setups. + // When it's unset, safeReplication will use dfs.namenode.replication.min + this.safeReplication = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY, + minReplication); + // default to safe mode threshold (i.e., don't populate queues before + // leaving safe mode) + this.replQueueThreshold = + conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, + (float) threshold); + + this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); + + LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold); + LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, + datanodeThreshold); + LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension); + } + + /** + * Initialize the safe mode information. + * @param total initial total blocks + */ + void activate(long total) { + assert namesystem.hasWriteLock(); + assert status == BMSafeModeStatus.OFF; + + startTime = monotonicNow(); + setBlockTotal(total); + if (areThresholdsMet()) { + leaveSafeMode(true); + } else { + // enter safe mode + status = BMSafeModeStatus.PENDING_THRESHOLD; + initializeReplQueuesIfNecessary(); + reportStatus("STATE* Safe mode ON.", true); + lastStatusReport = monotonicNow(); + } + } + + /** + * @return true if it stays in start up safe mode else false. + */ + boolean isInSafeMode() { + if (status != BMSafeModeStatus.OFF) { + doConsistencyCheck(); + return true; + } else { + return false; + } + } + + /** + * The transition of the safe mode state machine. + * If safe mode is not currently on, this is a no-op. + */ + void checkSafeMode() { + assert namesystem.hasWriteLock(); + if (namesystem.inTransitionToActive()) { + return; + } + + switch (status) { + case PENDING_THRESHOLD: + if (areThresholdsMet()) { + if (extension > 0) { + // PENDING_THRESHOLD -> EXTENSION + status = BMSafeModeStatus.EXTENSION; + reachedTime.set(monotonicNow()); + smmthread.start(); + initializeReplQueuesIfNecessary(); + reportStatus("STATE* Safe mode extension entered.", true); + } else { + // PENDING_THRESHOLD -> OFF + leaveSafeMode(false); + } + } else { + initializeReplQueuesIfNecessary(); + reportStatus("STATE* Safe mode ON.", false); + } + break; + case EXTENSION: + reportStatus("STATE* Safe mode ON.", false); + break; + case OFF: + break; + default: + assert false : "Non-recognized block manager safe mode status: " + status; + } + } + + /** + * Adjust the total number of blocks safe and expected during safe mode. + * If safe mode is not currently on, this is a no-op. + * @param deltaSafe the change in number of safe blocks + * @param deltaTotal the change in number of total blocks expected + */ + void adjustBlockTotals(int deltaSafe, int deltaTotal) { + assert namesystem.hasWriteLock(); + if (!isSafeModeTrackingBlocks()) { + return; + } + + long newBlockTotal; + synchronized (this) { + LOG.debug("Adjusting block totals from {}/{} to {}/{}", blockSafe, + blockTotal, blockSafe + deltaSafe, blockTotal + deltaTotal); + assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " + + blockSafe + " by " + deltaSafe + ": would be negative"; + assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " + + blockTotal + " by " + deltaTotal + ": would be negative"; + + blockSafe += deltaSafe; + newBlockTotal = blockTotal + deltaTotal; + } + setBlockTotal(newBlockTotal); + checkSafeMode(); + } + + /** + * Should we track blocks in safe mode. + *
+ * Never track blocks incrementally in non-HA code. + * + * In the HA case, the StandbyNode can be in safemode while the namespace + * is modified by the edit log tailer. In this case, the number of total + * blocks changes as edits are processed (eg blocks are added and deleted). + * However, we don't want to do the incremental tracking during the + * startup-time loading process -- only once the initial total has been + * set after the image has been loaded. + */ + boolean isSafeModeTrackingBlocks() { + assert namesystem.hasWriteLock(); + return haEnabled && status != BMSafeModeStatus.OFF; + } + + /** + * Set total number of blocks. + */ + void setBlockTotal(long total) { + assert namesystem.hasWriteLock(); + synchronized (this) { + this.blockTotal = total; + this.blockThreshold = (long) (total * threshold); + } + this.blockReplQueueThreshold = (long) (total * replQueueThreshold); + } + + String getSafeModeTip() { + String msg = ""; + + synchronized (this) { + if (blockSafe < blockThreshold) { + msg += String.format( + "The reported blocks %d needs additional %d" + + " blocks to reach the threshold %.4f of total blocks %d.%n", + blockSafe, (blockThreshold - blockSafe), threshold, blockTotal); + } else { + msg += String.format("The reported blocks %d has reached the threshold" + + " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal); + } + } + + int numLive = blockManager.getDatanodeManager().getNumLiveDataNodes(); + if (numLive < datanodeThreshold) { + msg += String.format( + "The number of live datanodes %d needs an additional %d live " + + "datanodes to reach the minimum number %d.%n", + numLive, (datanodeThreshold - numLive), datanodeThreshold); + } else { + msg += String.format("The number of live datanodes %d has reached " + + "the minimum number %d. ", + numLive, datanodeThreshold); + } + + if (blockManager.getBytesInFuture() > 0) { + msg += "Name node detected blocks with generation stamps " + + "in future. This means that Name node metadata is inconsistent." + + "This can happen if Name node metadata files have been manually " + + "replaced. Exiting safe mode will cause loss of " + blockManager + .getBytesInFuture() + " byte(s). Please restart name node with " + + "right metadata or use \"hdfs dfsadmin -safemode forceExit" + + "if you are certain that the NameNode was started with the" + + "correct FsImage and edit logs. If you encountered this during" + + "a rollback, it is safe to exit with -safemode forceExit."; + return msg; + } + + final String turnOffTip = "Safe mode will be turned off automatically "; + switch(status) { + case PENDING_THRESHOLD: + msg += turnOffTip + "once the thresholds have been reached."; + break; + case EXTENSION: + msg += "In safe mode extension. "+ turnOffTip + "in " + + timeToLeaveExtension() / 1000 + " seconds."; + break; + case OFF: + msg += turnOffTip + "soon."; + break; + default: + assert false : "Non-recognized block manager safe mode status: " + status; + } + return msg; + } + + /** + * Leave start up safe mode. + * @param force - true to force exit + */ + void leaveSafeMode(boolean force) { + assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!"; + + // if not done yet, initialize replication queues. + // In the standby, do not populate repl queues + if (!blockManager.isPopulatingReplQueues() && + blockManager.shouldPopulateReplQueues()) { + blockManager.initializeReplQueues(); + } + + if (!force && blockManager.getBytesInFuture() > 0) { + LOG.error("Refusing to leave safe mode without a force flag. " + + "Exiting safe mode will cause a deletion of {} byte(s). Please use " + + "-forceExit flag to exit safe mode forcefully if data loss is " + + "acceptable.", blockManager.getBytesInFuture()); + return; + } + + if (status != BMSafeModeStatus.OFF) { + NameNode.stateChangeLog.info("STATE* Safe mode is OFF"); + } + status = BMSafeModeStatus.OFF; + + final long timeInSafemode = monotonicNow() - startTime; + NameNode.stateChangeLog.info("STATE* Leaving safe mode after {} secs", + timeInSafemode / 1000); + NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode); + + final NetworkTopology nt = blockManager.getDatanodeManager() + .getNetworkTopology(); + NameNode.stateChangeLog.info("STATE* Network topology has {} racks and {}" + + " datanodes", nt.getNumOfRacks(), nt.getNumOfLeaves()); + NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has {} blocks", + blockManager.numOfUnderReplicatedBlocks()); + + namesystem.startSecretManagerIfNecessary(); + + // If startup has not yet completed, end safemode phase. + StartupProgress prog = NameNode.getStartupProgress(); + if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { + prog.endStep(Phase.SAFEMODE, + BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS); + prog.endPhase(Phase.SAFEMODE); + } + } + + /** + * Increment number of safe blocks if current block has reached minimal + * replication. + * If safe mode is not currently on, this is a no-op. + * @param storageNum current number of replicas or number of internal blocks + * of a striped block group + */ + synchronized void incrementSafeBlockCount(int storageNum) { + assert namesystem.hasWriteLock(); + if (status == BMSafeModeStatus.OFF) { + return; + } + + if (storageNum == safeReplication) { + this.blockSafe++; + + // Report startup progress only if we haven't completed startup yet. + StartupProgress prog = NameNode.getStartupProgress(); + if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { + if (this.awaitingReportedBlocksCounter == null) { + this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE, + STEP_AWAITING_REPORTED_BLOCKS); + } + this.awaitingReportedBlocksCounter.increment(); + } + + checkSafeMode(); + } + } + + /** + * Decrement number of safe blocks if current block has fallen below minimal + * replication. + * If safe mode is not currently on, this is a no-op. + */ + synchronized void decrementSafeBlockCount(BlockInfo b) { + assert namesystem.hasWriteLock(); + if (status == BMSafeModeStatus.OFF) { + return; + } + + BlockInfo storedBlock = blockManager.getStoredBlock(b); + if (storedBlock.isComplete() && + blockManager.countNodes(b).liveReplicas() == safeReplication - 1) { + this.blockSafe--; + assert blockSafe >= 0; + checkSafeMode(); + } + } + + void close() { + assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!"; + try { + smmthread.interrupt(); + smmthread.join(3000); + } catch (InterruptedException ignored) { + } + } + + /** + * Get time (counting in milliseconds) left to leave extension period. + * + * Negative value indicates the extension period has passed. + */ + private long timeToLeaveExtension() { + return reachedTime.get() + extension - monotonicNow(); + } + + /** Check if we are ready to initialize replication queues. */ + private void initializeReplQueuesIfNecessary() { + assert namesystem.hasWriteLock(); + // Whether it has reached the threshold for initializing replication queues. + boolean canInitializeReplQueues = blockManager.shouldPopulateReplQueues() && + blockSafe >= blockReplQueueThreshold; + if (canInitializeReplQueues && + !blockManager.isPopulatingReplQueues() && + !haEnabled) { + blockManager.initializeReplQueues(); + } + } + + /** + * @return true if both block and datanode threshold are met else false. + */ + private boolean areThresholdsMet() { + assert namesystem.hasWriteLock(); + int datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes(); + synchronized (this) { + return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold; + } + } + + /** + * Checks consistency of the class state. + * This is costly so only runs if asserts are enabled. + */ + private void doConsistencyCheck() { + boolean assertsOn = false; + assert assertsOn = true; // set to true if asserts are on + if (!assertsOn) { + return; + } + + int activeBlocks = blockManager.getActiveBlockCount(); + synchronized (this) { + if (blockTotal != activeBlocks && + !(blockSafe >= 0 && blockSafe <= blockTotal)) { + LOG.warn("SafeMode is in inconsistent filesystem state. " + + "BlockManagerSafeMode data: blockTotal={}, blockSafe={}; " + + "BlockManager data: activeBlocks={}", + blockTotal, blockSafe, activeBlocks); + } + } + } + + /** + * Print status every 20 seconds. + */ + private void reportStatus(String msg, boolean rightNow) { + assert namesystem.hasWriteLock(); + long curTime = monotonicNow(); + if(!rightNow && (curTime - lastStatusReport < 20 * 1000)) { + return; + } + NameNode.stateChangeLog.info(msg + " \n" + getSafeModeTip()); + lastStatusReport = curTime; + } + + /** + * Periodically check whether it is time to leave safe mode. + * This thread starts when the threshold level is reached. + */ + private class SafeModeMonitor implements Runnable { + /** Interval in msec for checking safe mode. */ + private static final long RECHECK_INTERVAL = 1000; + + @Override + public void run() { + while (namesystem.isRunning()) { + try { + namesystem.writeLock(); + if (status == BMSafeModeStatus.OFF) { // Not in safe mode. + break; + } + if (canLeave()) { + // EXTENSION -> OFF + leaveSafeMode(false); + break; + } + } finally { + namesystem.writeUnlock(); + } + + try { + Thread.sleep(RECHECK_INTERVAL); + } catch (InterruptedException ignored) { + } + } + + if (!namesystem.isRunning()) { + LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread"); + } + } + + /** + * Check whether the safe mode can be turned off by this monitor. + * + * Safe mode can be turned off iff + * the threshold is reached, and + * the extension time has passed. + */ + private boolean canLeave() { + if (timeToLeaveExtension() > 0) { + reportStatus("STATE* Safe mode ON, in safe mode extension.", false); + return false; + } else if (!areThresholdsMet()) { + reportStatus("STATE* Safe mode ON, thresholds not met.", false); + return false; + } else { + return true; + } + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 49f4100e650..f18607e197d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -87,7 +87,7 @@ public class DatanodeManager { private final Map- * An instance of {@link SafeModeInfo} is created when the name node - * enters safe mode. - *
- * During name node startup {@link SafeModeInfo} counts the number of - * safe blocks, those that have at least the minimal number of - * replicas, and calculates the ratio of safe blocks to the total number - * of blocks in the system, which is the size of blocks in - * {@link FSNamesystem#blockManager}. When the ratio reaches the - * {@link #threshold} it starts the SafeModeMonitor daemon in order - * to monitor whether the safe mode {@link #extension} is passed. - * Then it leaves safe mode and destroys itself. - *
- * If safe mode is turned on manually then the number of safe blocks is
- * not tracked because the name node is not intended to leave safe mode
- * automatically in the case.
- *
- * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
- */
- public class SafeModeInfo {
- // configuration fields
- /** Safe mode threshold condition %.*/
- private final double threshold;
- /** Safe mode minimum number of datanodes alive */
- private final int datanodeThreshold;
- /**
- * Safe mode extension after the threshold.
- * Make it volatile so that getSafeModeTip can read the latest value
- * without taking a lock.
- */
- private volatile int extension;
- /** Min replication required by safe mode. */
- private final int safeReplication;
- /** threshold for populating needed replication queues */
- private final double replQueueThreshold;
- // internal fields
- /** Time when threshold was reached.
- *
-1 safe mode is off
- *
0 safe mode is on, and threshold is not reached yet
- *
>0 safe mode is on, but we are in extension period
- */
- private long reached = -1;
- private long reachedTimestamp = -1;
- /** Total number of blocks. */
- int blockTotal;
- /** Number of safe blocks. */
- int blockSafe;
- /** Number of blocks needed to satisfy safe mode threshold condition */
- private int blockThreshold;
- /** Number of blocks needed before populating replication queues */
- private int blockReplQueueThreshold;
- /** time of the last status printout */
- private long lastStatusReport = 0;
- /**
- * Was safemode entered automatically because available resources were low.
- * Make it volatile so that getSafeModeTip can read the latest value
- * without taking a lock.
- */
- private volatile boolean resourcesLow = false;
- /** Should safemode adjust its block totals as blocks come in */
- private boolean shouldIncrementallyTrackBlocks = false;
- /** counter for tracking startup progress of reported blocks */
- private Counter awaitingReportedBlocksCounter;
-
- /**
- * Creates SafeModeInfo when the name node enters
- * automatic safe mode at startup.
- *
- * @param conf configuration
- */
- private SafeModeInfo(Configuration conf) {
- this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
- DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
- if(threshold > 1.0) {
- LOG.warn("The threshold value should't be greater than 1, threshold: " + threshold);
- }
- this.datanodeThreshold = conf.getInt(
- DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
- DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
- this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
- int minReplication =
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
- // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
- // setting this lower than the min replication is not recommended
- // and/or dangerous for production setups.
- // When it's unset, safeReplication will use dfs.namenode.replication.min
- this.safeReplication =
- conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
- minReplication);
-
- LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
- LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
- LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + " = " + extension);
-
- // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
- this.replQueueThreshold =
- conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
- (float) threshold);
- this.blockTotal = 0;
- this.blockSafe = 0;
- }
-
- /**
- * In the HA case, the StandbyNode can be in safemode while the namespace
- * is modified by the edit log tailer. In this case, the number of total
- * blocks changes as edits are processed (eg blocks are added and deleted).
- * However, we don't want to do the incremental tracking during the
- * startup-time loading process -- only once the initial total has been
- * set after the image has been loaded.
- */
- private boolean shouldIncrementallyTrackBlocks() {
- return shouldIncrementallyTrackBlocks;
- }
-
- /**
- * Creates SafeModeInfo when safe mode is entered manually, or because
- * available resources are low.
- *
- * The {@link #threshold} is set to 1.5 so that it could never be reached.
- * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
- *
- * @see SafeModeInfo
- */
- private SafeModeInfo(boolean resourcesLow) {
- this.threshold = 1.5f; // this threshold can never be reached
- this.datanodeThreshold = Integer.MAX_VALUE;
- this.extension = Integer.MAX_VALUE;
- this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
- this.replQueueThreshold = 1.5f; // can never be reached
- this.blockTotal = -1;
- this.blockSafe = -1;
- this.resourcesLow = resourcesLow;
- enter();
- reportStatus("STATE* Safe mode is ON.", true);
- }
-
- /**
- * Check if safe mode is on.
- * @return true if in safe mode
- */
- private synchronized boolean isOn() {
- doConsistencyCheck();
- return this.reached >= 0;
- }
-
- /**
- * Enter safe mode.
- */
- private void enter() {
- this.reached = 0;
- this.reachedTimestamp = 0;
- }
-
- /**
- * Leave safe mode.
- *
- * Check for invalid, under- & over-replicated blocks in the end of startup.
- * @param force - true to force exit
- */
- private synchronized void leave(boolean force) {
- // if not done yet, initialize replication queues.
- // In the standby, do not populate repl queues
- if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
- blockManager.initializeReplQueues();
- }
-
-
- if (!force && (blockManager.getBytesInFuture() > 0)) {
- LOG.error("Refusing to leave safe mode without a force flag. " +
- "Exiting safe mode will cause a deletion of " + blockManager
- .getBytesInFuture() + " byte(s). Please use " +
- "-forceExit flag to exit safe mode forcefully if data loss is " +
- "acceptable.");
- return;
- }
-
- long timeInSafemode = now() - startTime;
- NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
- + timeInSafemode/1000 + " secs");
- NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
-
- //Log the following only once (when transitioning from ON -> OFF)
- if (reached >= 0) {
- NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
- }
- reached = -1;
- reachedTimestamp = -1;
- safeMode = null;
- final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
- NameNode.stateChangeLog.info("STATE* Network topology has "
- + nt.getNumOfRacks() + " racks and "
- + nt.getNumOfLeaves() + " datanodes");
- NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
- + blockManager.numOfUnderReplicatedBlocks() + " blocks");
-
- startSecretManagerIfNecessary();
-
- // If startup has not yet completed, end safemode phase.
- StartupProgress prog = NameNode.getStartupProgress();
- if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
- prog.endStep(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS);
- prog.endPhase(Phase.SAFEMODE);
- }
- }
-
- /**
- * Check whether we have reached the threshold for
- * initializing replication queues.
- */
- private synchronized boolean canInitializeReplQueues() {
- return blockManager.shouldPopulateReplQueues()
- && blockSafe >= blockReplQueueThreshold;
- }
-
- /**
- * Safe mode can be turned off iff
- * the threshold is reached and
- * the extension time have passed.
- * @return true if can leave or false otherwise.
- */
- private synchronized boolean canLeave() {
- if (reached == 0) {
- return false;
- }
-
- if (monotonicNow() - reached < extension) {
- reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
- return false;
- }
-
- if (needEnter()) {
- reportStatus("STATE* Safe mode ON, thresholds not met.", false);
- return false;
- }
-
- return true;
- }
-
- /**
- * There is no need to enter safe mode
- * if DFS is empty or {@link #threshold} == 0
- */
- private boolean needEnter() {
- return (threshold != 0 && blockSafe < blockThreshold) ||
- (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
- (!nameNodeHasResourcesAvailable());
- }
-
- /**
- * Check and trigger safe mode if needed.
- */
- private void checkMode() {
- // Have to have write-lock since leaving safemode initializes
- // repl queues, which requires write lock
- assert hasWriteLock();
- if (inTransitionToActive()) {
- return;
- }
- // if smmthread is already running, the block threshold must have been
- // reached before, there is no need to enter the safe mode again
- if (smmthread == null && needEnter()) {
- enter();
- // check if we are ready to initialize replication queues
- if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
- && !haEnabled) {
- blockManager.initializeReplQueues();
- }
- reportStatus("STATE* Safe mode ON.", false);
- return;
- }
- // the threshold is reached or was reached before
- if (!isOn() || // safe mode is off
- extension <= 0 || threshold <= 0) { // don't need to wait
- this.leave(false); // leave safe mode
- return;
- }
- if (reached > 0) { // threshold has already been reached before
- reportStatus("STATE* Safe mode ON.", false);
- return;
- }
- // start monitor
- reached = monotonicNow();
- reachedTimestamp = now();
- if (smmthread == null) {
- smmthread = new Daemon(new SafeModeMonitor());
- smmthread.start();
- reportStatus("STATE* Safe mode extension entered.", true);
- }
-
- // check if we are ready to initialize replication queues
- if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
- blockManager.initializeReplQueues();
- }
- }
-
- /**
- * Set total number of blocks.
- */
- private synchronized void setBlockTotal(int total) {
- this.blockTotal = total;
- this.blockThreshold = (int) (blockTotal * threshold);
- this.blockReplQueueThreshold =
- (int) (blockTotal * replQueueThreshold);
- if (haEnabled) {
- // After we initialize the block count, any further namespace
- // modifications done while in safe mode need to keep track
- // of the number of total blocks in the system.
- this.shouldIncrementallyTrackBlocks = true;
- }
- if(blockSafe < 0)
- this.blockSafe = 0;
- checkMode();
- }
-
- /**
- * Increment number of safe blocks if current block has
- * reached minimal replication.
- * @param replication current replication
- */
- private synchronized void incrementSafeBlockCount(short replication) {
- if (replication == safeReplication) {
- this.blockSafe++;
-
- // Report startup progress only if we haven't completed startup yet.
- StartupProgress prog = NameNode.getStartupProgress();
- if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
- if (this.awaitingReportedBlocksCounter == null) {
- this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
- STEP_AWAITING_REPORTED_BLOCKS);
- }
- this.awaitingReportedBlocksCounter.increment();
- }
-
- checkMode();
- }
- }
-
- /**
- * Decrement number of safe blocks if current block has
- * fallen below minimal replication.
- * @param replication current replication
- */
- private synchronized void decrementSafeBlockCount(short replication) {
- if (replication == safeReplication-1) {
- this.blockSafe--;
- //blockSafe is set to -1 in manual / low resources safemode
- assert blockSafe >= 0 || isManual() || areResourcesLow();
- checkMode();
- }
- }
-
- /**
- * Check if safe mode was entered manually
- */
- private boolean isManual() {
- return extension == Integer.MAX_VALUE;
- }
-
- /**
- * Set manual safe mode.
- */
- private synchronized void setManual() {
- extension = Integer.MAX_VALUE;
- }
-
- /**
- * Check if safe mode was entered due to resources being low.
- */
- private boolean areResourcesLow() {
- return resourcesLow;
- }
-
- /**
- * Set that resources are low for this instance of safe mode.
- */
- private void setResourcesLow() {
- resourcesLow = true;
- }
-
- /**
- * A tip on how safe mode is to be turned off: manually or automatically.
- */
- String getTurnOffTip() {
- if(!isOn()) {
- return "Safe mode is OFF.";
- }
-
- //Manual OR low-resource safemode. (Admin intervention required)
- String adminMsg = "It was turned on manually. ";
- if (areResourcesLow()) {
- adminMsg = "Resources are low on NN. Please add or free up more "
- + "resources then turn off safe mode manually. NOTE: If you turn off"
- + " safe mode before adding resources, "
- + "the NN will immediately return to safe mode. ";
- }
- if (isManual() || areResourcesLow()) {
- return adminMsg
- + "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off.";
- }
-
- boolean thresholdsMet = true;
- int numLive = getNumLiveDataNodes();
- String msg = "";
- if (blockSafe < blockThreshold) {
- msg += String.format(
- "The reported blocks %d needs additional %d"
- + " blocks to reach the threshold %.4f of total blocks %d.%n",
- blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
- thresholdsMet = false;
- } else {
- msg += String.format("The reported blocks %d has reached the threshold"
- + " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
- }
- if (numLive < datanodeThreshold) {
- msg += String.format(
- "The number of live datanodes %d needs an additional %d live "
- + "datanodes to reach the minimum number %d.%n",
- numLive, (datanodeThreshold - numLive), datanodeThreshold);
- thresholdsMet = false;
- } else {
- msg += String.format("The number of live datanodes %d has reached "
- + "the minimum number %d. ",
- numLive, datanodeThreshold);
- }
-
- if(blockManager.getBytesInFuture() > 0) {
- msg += "Name node detected blocks with generation stamps " +
- "in future. This means that Name node metadata is inconsistent." +
- "This can happen if Name node metadata files have been manually " +
- "replaced. Exiting safe mode will cause loss of " + blockManager
- .getBytesInFuture() + " byte(s). Please restart name node with " +
- "right metadata or use \"hdfs dfsadmin -safemode forceExit" +
- "if you are certain that the NameNode was started with the" +
- "correct FsImage and edit logs. If you encountered this during" +
- "a rollback, it is safe to exit with -safemode forceExit.";
- return msg;
- }
-
-
- msg += (reached > 0) ? "In safe mode extension. " : "";
- msg += "Safe mode will be turned off automatically ";
-
- if (!thresholdsMet) {
- msg += "once the thresholds have been reached.";
- } else if (reached + extension - monotonicNow() > 0) {
- msg += ("in " + (reached + extension - monotonicNow()) / 1000 + " seconds.");
- } else {
- msg += "soon.";
- }
-
- return msg;
- }
-
- /**
- * Print status every 20 seconds.
- */
- private void reportStatus(String msg, boolean rightNow) {
- long curTime = now();
- if(!rightNow && (curTime - lastStatusReport < 20 * 1000))
- return;
- NameNode.stateChangeLog.info(msg + " \n" + getTurnOffTip());
- lastStatusReport = curTime;
- }
-
- @Override
- public String toString() {
- String resText = "Current safe blocks = "
- + blockSafe
- + ". Target blocks = " + blockThreshold + " for threshold = %" + threshold
- + ". Minimal replication = " + safeReplication + ".";
- if (reached > 0)
- resText += " Threshold was reached " + new Date(reachedTimestamp) + ".";
- return resText;
- }
-
- /**
- * Checks consistency of the class state.
- * This is costly so only runs if asserts are enabled.
- */
- private void doConsistencyCheck() {
- boolean assertsOn = false;
- assert assertsOn = true; // set to true if asserts are on
- if (!assertsOn) return;
-
- if (blockTotal == -1 && blockSafe == -1) {
- return; // manual safe mode
- }
- int activeBlocks = blockManager.getActiveBlockCount();
- if ((blockTotal != activeBlocks) &&
- !(blockSafe >= 0 && blockSafe <= blockTotal)) {
- throw new AssertionError(
- " SafeMode: Inconsistent filesystem state: "
- + "SafeMode data: blockTotal=" + blockTotal
- + " blockSafe=" + blockSafe + "; "
- + "BlockManager data: active=" + activeBlocks);
- }
- }
-
- private synchronized void adjustBlockTotals(int deltaSafe, int deltaTotal) {
- if (!shouldIncrementallyTrackBlocks) {
- return;
- }
- assert haEnabled;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adjusting block totals from " +
- blockSafe + "/" + blockTotal + " to " +
- (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
- }
- assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
- blockSafe + " by " + deltaSafe + ": would be negative";
- assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
- blockTotal + " by " + deltaTotal + ": would be negative";
-
- blockSafe += deltaSafe;
- setBlockTotal(blockTotal + deltaTotal);
- }
- }
-
- /**
- * Periodically check whether it is time to leave safe mode.
- * This thread starts when the threshold level is reached.
- *
- */
- class SafeModeMonitor implements Runnable {
- /** interval in msec for checking safe mode: {@value} */
- private static final long recheckInterval = 1000;
-
- /**
- */
- @Override
- public void run() {
- while (fsRunning) {
- writeLock();
- try {
- if (safeMode == null) { // Not in safe mode.
- break;
- }
- if (safeMode.canLeave()) {
- // Leave safe mode.
- safeMode.leave(false);
- smmthread = null;
- break;
- }
- } finally {
- writeUnlock();
- }
-
- try {
- Thread.sleep(recheckInterval);
- } catch (InterruptedException ie) {
- // Ignored
- }
- }
- if (!fsRunning) {
- LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
- }
- }
- }
-
boolean setSafeMode(SafeModeAction action) throws IOException {
if (action != SafeModeAction.SAFEMODE_GET) {
checkSuperuserPrivilege();
@@ -4624,9 +3993,9 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
.getBytesInFuture() + " byte(s). Please use " +
"-forceExit flag to exit safe mode forcefully and data loss is " +
"acceptable.");
- return isInSafeMode();
+ } else {
+ leaveSafeMode();
}
- leaveSafeMode();
break;
case SAFEMODE_ENTER: // enter safe mode
enterSafeMode(false);
@@ -4635,7 +4004,6 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
if (blockManager.getBytesInFuture() > 0) {
LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
+ "loss of " + blockManager.getBytesInFuture() + " byte(s).");
- safeMode.leave(true);
blockManager.clearBytesInFuture();
} else {
LOG.warn("forceExit used when normal exist would suffice. Treating " +
@@ -4650,85 +4018,6 @@ boolean setSafeMode(SafeModeAction action) throws IOException {
return isInSafeMode();
}
- @Override
- public void checkSafeMode() {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode != null) {
- safeMode.checkMode();
- }
- }
-
- @Override
- public boolean isInSafeMode() {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null)
- return false;
- return safeMode.isOn();
- }
-
- @Override
- public boolean isInStartupSafeMode() {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null)
- return false;
- // If the NN is in safemode, and not due to manual / low resources, we
- // assume it must be because of startup. If the NN had low resources during
- // startup, we assume it came out of startup safemode and it is now in low
- // resources safemode
- return !safeMode.isManual() && !safeMode.areResourcesLow()
- && safeMode.isOn();
- }
-
- @Override
- public void incrementSafeBlockCount(int replication) {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null)
- return;
- safeMode.incrementSafeBlockCount((short)replication);
- }
-
- @Override
- public void decrementSafeBlockCount(BlockInfo b) {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null) // mostly true
- return;
- BlockInfo storedBlock = getStoredBlock(b);
- if (storedBlock.isComplete()) {
- safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
- }
- }
-
- /**
- * Adjust the total number of blocks safe and expected during safe mode.
- * If safe mode is not currently on, this is a no-op.
- * @param deltaSafe the change in number of safe blocks
- * @param deltaTotal the change i nnumber of total blocks expected
- */
- @Override
- public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null)
- return;
- safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
- }
-
- /**
- * Set the total number of blocks in the system.
- */
- public void setBlockTotal(long completeBlocksTotal) {
- // safeMode is volatile, and may be set to null at any time
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null)
- return;
- safeMode.setBlockTotal((int) completeBlocksTotal);
- }
-
/**
* Get the total number of blocks in the system.
*/
@@ -4772,6 +4061,17 @@ public long getCompleteBlocksTotal() {
}
}
+
+ @Override
+ public boolean isInSafeMode() {
+ return isInManualOrResourceLowSafeMode() || blockManager.isInSafeMode();
+ }
+
+ @Override
+ public boolean isInStartupSafeMode() {
+ return !isInManualOrResourceLowSafeMode() && blockManager.isInSafeMode();
+ }
+
/**
* Enter safe mode. If resourcesLow is false, then we assume it is manual
* @throws IOException
@@ -4792,20 +4092,13 @@ void enterSafeMode(boolean resourcesLow) throws IOException {
if (isEditlogOpenForWrite) {
getEditLog().logSyncAll();
}
- if (!isInSafeMode()) {
- safeMode = new SafeModeInfo(resourcesLow);
- return;
- }
- if (resourcesLow) {
- safeMode.setResourcesLow();
- } else {
- safeMode.setManual();
- }
+ setManualAndResourceLowSafeMode(!resourcesLow, resourcesLow);
+ NameNode.stateChangeLog.info("STATE* Safe mode is ON.\n" +
+ getSafeModeTip());
if (isEditlogOpenForWrite) {
getEditLog().logSyncAll();
}
- NameNode.stateChangeLog.info("STATE* Safe mode is ON"
- + safeMode.getTurnOffTip());
+ NameNode.stateChangeLog.info("STATE* Safe mode is ON" + getSafeModeTip());
} finally {
writeUnlock();
}
@@ -4821,29 +4114,40 @@ void leaveSafeMode() {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF");
return;
}
- safeMode.leave(false);
+ setManualAndResourceLowSafeMode(false, false);
+ blockManager.leaveSafeMode(true);
} finally {
writeUnlock();
}
}
-
+
String getSafeModeTip() {
- // There is no need to take readLock.
- // Don't use isInSafeMode as this.safeMode might be set to null.
- // after isInSafeMode returns.
- boolean inSafeMode;
- SafeModeInfo safeMode = this.safeMode;
- if (safeMode == null) {
- inSafeMode = false;
- } else {
- inSafeMode = safeMode.isOn();
+ String cmd = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off.";
+ synchronized (this) {
+ if (resourceLowSafeMode) {
+ return "Resources are low on NN. Please add or free up more resources"
+ + "then turn off safe mode manually. NOTE: If you turn off safe "
+ + "mode before adding resources, the NN will immediately return to "
+ + "safe mode. " + cmd;
+ } else if (manualSafeMode) {
+ return "It was turned on manually. " + cmd;
+ }
}
- if (!inSafeMode) {
- return "";
- } else {
- return safeMode.getTurnOffTip();
- }
+ return blockManager.getSafeModeTip();
+ }
+
+ /**
+ * @return true iff it is in manual safe mode or resource low safe mode.
+ */
+ private synchronized boolean isInManualOrResourceLowSafeMode() {
+ return manualSafeMode || resourceLowSafeMode;
+ }
+
+ private synchronized void setManualAndResourceLowSafeMode(boolean manual,
+ boolean resourceLow) {
+ this.manualSafeMode = manual;
+ this.resourceLowSafeMode = resourceLow;
}
CheckpointSignature rollEditLog() throws IOException {
@@ -6326,11 +5630,6 @@ public ReentrantReadWriteLock getFsLockForTests() {
public ReentrantLock getCpLockForTests() {
return cpLock;
}
-
- @VisibleForTesting
- public SafeModeInfo getSafeModeInfoForTests() {
- return safeMode;
- }
@VisibleForTesting
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
@@ -7391,11 +6690,5 @@ public long getBytesInFuture() {
return blockManager.getBytesInFuture();
}
- @VisibleForTesting
- synchronized void enableSafeModeForTesting(Configuration conf) {
- SafeModeInfo newSafemode = new SafeModeInfo(conf);
- newSafemode.enter();
- this.safeMode = newSafemode;
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 6b7196414b2..5b9a9f5d484 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -383,7 +383,7 @@ public NamenodeProtocols getRpcServer() {
return rpcServer;
}
- static void initMetrics(Configuration conf, NamenodeRole role) {
+ public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}
@@ -1690,11 +1690,9 @@ synchronized HAServiceStatus getServiceStatus()
HAServiceState retState = state.getServiceState();
HAServiceStatus ret = new HAServiceStatus(retState);
if (retState == HAServiceState.STANDBY) {
- String safemodeTip = namesystem.getSafeModeTip();
- if (!safemodeTip.isEmpty()) {
- ret.setNotReadyToBecomeActive(
- "The NameNode is in safemode. " +
- safemodeTip);
+ if (namesystem.isInSafeMode()) {
+ ret.setNotReadyToBecomeActive("The NameNode is in safemode. " +
+ namesystem.getSafeModeTip());
} else {
ret.setReadyToBecomeActive();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index 5bc4033e562..84056d04345 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -45,12 +45,23 @@ public interface Namesystem extends RwLock, SafeMode {
BlockCollection getBlockCollection(long id);
- void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
-
void checkOperation(OperationCategory read) throws StandbyException;
+ void startSecretManagerIfNecessary();
+
boolean isInSnapshot(BlockInfo blockUC);
CacheManager getCacheManager();
HAContext getHAContext();
-}
\ No newline at end of file
+
+ /**
+ * @return true if the HA is enabled else false
+ */
+ boolean isHaEnabled();
+
+ /**
+ * @return Whether the namenode is transitioning to active state and is in the
+ * middle of the starting active services.
+ */
+ boolean inTransitionToActive();
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 06a8219f4ab..9eb5796276f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@ -18,18 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
/** SafeMode related operations. */
@InterfaceAudience.Private
public interface SafeMode {
- /**
- * Check safe mode conditions.
- * If the corresponding conditions are satisfied,
- * trigger the system to enter/leave safe mode.
- */
- public void checkSafeMode();
-
/** Is the system in safe mode? */
public boolean isInSafeMode();
@@ -38,13 +30,4 @@ public interface SafeMode {
* safe mode turned on automatically?
*/
public boolean isInStartupSafeMode();
-
- /**
- * Increment number of blocks that reached minimal replication.
- * @param replication current replication
- */
- public void incrementSafeBlockCount(int replication);
-
- /** Decrement number of blocks that reached minimal replication. */
- public void decrementSafeBlockCount(BlockInfo b);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
index 6cea7e89517..2485b32c54d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
@@ -219,7 +219,7 @@ public Boolean get() {
}
}, 10, 10000);
- final int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
+ final long safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
assertTrue("Expected first block report to make some blocks safe.", safe > 0);
assertTrue("Did not expect first block report to make all blocks safe.", safe < 15);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 148135bae97..ed81a4d32f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -35,6 +36,7 @@
import org.junit.Assert;
import com.google.common.base.Preconditions;
+import org.mockito.internal.util.reflection.Whitebox;
public class BlockManagerTestUtil {
public static void setNodeReplicationLimit(final BlockManager blockManager,
@@ -306,4 +308,11 @@ public static void recheckDecommissionState(DatanodeManager dm)
throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor();
}
+
+ public static void setStartupSafeModeForTest(BlockManager bm) {
+ BlockManagerSafeMode bmSafeMode = (BlockManagerSafeMode)Whitebox
+ .getInternalState(bm, "bmSafeMode");
+ Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
+ Whitebox.setInternalState(bmSafeMode, "status", BMSafeModeStatus.EXTENSION);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java
new file mode 100644
index 00000000000..49c96eb52a0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test is for testing {@link BlockManagerSafeMode} package local APIs.
+ *
+ * They use heavily mocked objects, treating the {@link BlockManagerSafeMode}
+ * as white-box. Tests are light-weight thus no multi-thread scenario or real
+ * mini-cluster is tested.
+ *
+ * @see org.apache.hadoop.hdfs.TestSafeMode
+ * @see org.apache.hadoop.hdfs.server.namenode.ha.TestHASafeMode
+ */
+public class TestBlockManagerSafeMode {
+ private static final int DATANODE_NUM = 3;
+ private static final long BLOCK_TOTAL = 10;
+ private static final double THRESHOLD = 0.99;
+ private static final long BLOCK_THRESHOLD = (long)(BLOCK_TOTAL * THRESHOLD);
+ private static final int EXTENSION = 1000; // 1 second
+
+ private BlockManager bm;
+ private DatanodeManager dn;
+ private BlockManagerSafeMode bmSafeMode;
+
+ /**
+ * Set up the mock context.
+ *
+ * - extension is always needed (default period is {@link #EXTENSION} ms
+ * - datanode threshold is always reached via mock
+ * - safe block is 0 and it needs {@link #BLOCK_THRESHOLD} to reach threshold
+ * - write/read lock is always held by current thread
+ *
+ * @throws IOException
+ */
+ @Before
+ public void setupMockCluster() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setDouble(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
+ THRESHOLD);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY,
+ EXTENSION);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
+ DATANODE_NUM);
+
+ FSNamesystem fsn = mock(FSNamesystem.class);
+ Mockito.doReturn(true).when(fsn).hasWriteLock();
+ Mockito.doReturn(true).when(fsn).hasReadLock();
+ Mockito.doReturn(true).when(fsn).isRunning();
+ NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
+
+ bm = spy(new BlockManager(fsn, conf));
+ dn = spy(bm.getDatanodeManager());
+ Whitebox.setInternalState(bm, "datanodeManager", dn);
+ // the datanode threshold is always met
+ when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
+
+ bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf);
+ }
+
+ /**
+ * Test set block total.
+ *
+ * The block total is set which will call checkSafeMode for the first time
+ * and bmSafeMode transfers from INITIALIZED to PENDING_THRESHOLD status
+ */
+ @Test(timeout = 30000)
+ public void testInitialize() {
+ assertFalse("Block manager should not be in safe mode at beginning.",
+ bmSafeMode.isInSafeMode());
+ bmSafeMode.activate(BLOCK_TOTAL);
+ assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
+ assertTrue(bmSafeMode.isInSafeMode());
+ }
+
+ /**
+ * Test the state machine transition.
+ */
+ @Test(timeout = 30000)
+ public void testCheckSafeMode() {
+ bmSafeMode.activate(BLOCK_TOTAL);
+
+ // stays in PENDING_THRESHOLD: pending block threshold
+ setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
+ for (long i = 0; i < BLOCK_THRESHOLD; i++) {
+ setBlockSafe(i);
+ bmSafeMode.checkSafeMode();
+ assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
+ }
+
+ // PENDING_THRESHOLD -> EXTENSION
+ setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
+ setBlockSafe(BLOCK_THRESHOLD);
+ bmSafeMode.checkSafeMode();
+ assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
+ Whitebox.setInternalState(bmSafeMode, "smmthread", null);
+
+ // PENDING_THRESHOLD -> OFF
+ setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
+ setBlockSafe(BLOCK_THRESHOLD);
+ Whitebox.setInternalState(bmSafeMode, "extension", 0);
+ bmSafeMode.checkSafeMode();
+ assertEquals(BMSafeModeStatus.OFF, getSafeModeStatus());
+
+ // stays in EXTENSION
+ setSafeModeStatus(BMSafeModeStatus.EXTENSION);
+ setBlockSafe(0);
+ Whitebox.setInternalState(bmSafeMode, "extension", 0);
+ bmSafeMode.checkSafeMode();
+ assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
+
+ // stays in EXTENSION: pending extension period
+ setSafeModeStatus(BMSafeModeStatus.EXTENSION);
+ setBlockSafe(BLOCK_THRESHOLD);
+ Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
+ bmSafeMode.checkSafeMode();
+ assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
+ }
+
+ /**
+ * Test that the block safe increases up to block threshold.
+ *
+ * Once the block threshold is reached, the block manger leaves safe mode and
+ * increment will be a no-op.
+ * The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
+ */
+ @Test(timeout = 30000)
+ public void testIncrementSafeBlockCount() {
+ bmSafeMode.activate(BLOCK_TOTAL);
+ Whitebox.setInternalState(bmSafeMode, "extension", 0);
+
+ for (long i = 1; i <= BLOCK_TOTAL; i++) {
+ bmSafeMode.incrementSafeBlockCount(1);
+ if (i < BLOCK_THRESHOLD) {
+ assertEquals(i, getblockSafe());
+ assertTrue(bmSafeMode.isInSafeMode());
+ } else {
+ // block manager leaves safe mode if block threshold is met
+ assertFalse(bmSafeMode.isInSafeMode());
+ // the increment will be a no-op if safe mode is OFF
+ assertEquals(BLOCK_THRESHOLD, getblockSafe());
+ }
+ }
+ }
+
+ /**
+ * Test that the block safe increases up to block threshold.
+ *
+ * Once the block threshold is reached, the block manger leaves safe mode and
+ * increment will be a no-op.
+ * The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> EXTENSION-> OFF
+ */
+ @Test(timeout = 30000)
+ public void testIncrementSafeBlockCountWithExtension() throws Exception {
+ bmSafeMode.activate(BLOCK_TOTAL);
+
+ for (long i = 1; i <= BLOCK_TOTAL; i++) {
+ bmSafeMode.incrementSafeBlockCount(1);
+ if (i < BLOCK_THRESHOLD) {
+ assertTrue(bmSafeMode.isInSafeMode());
+ }
+ }
+ waitForExtensionPeriod();
+ assertFalse(bmSafeMode.isInSafeMode());
+ }
+
+ /**
+ * Test that the block safe decreases the block safe.
+ *
+ * The block manager stays in safe mode.
+ * The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD
+ */
+ @Test(timeout = 30000)
+ public void testDecrementSafeBlockCount() {
+ bmSafeMode.activate(BLOCK_TOTAL);
+ Whitebox.setInternalState(bmSafeMode, "extension", 0);
+
+ mockBlockManagerForBlockSafeDecrement();
+ setBlockSafe(BLOCK_THRESHOLD);
+ for (long i = BLOCK_THRESHOLD; i > 0; i--) {
+ BlockInfo blockInfo = mock(BlockInfo.class);
+ bmSafeMode.decrementSafeBlockCount(blockInfo);
+
+ assertEquals(i - 1, getblockSafe());
+ assertTrue(bmSafeMode.isInSafeMode());
+ }
+ }
+
+ /**
+ * Test when the block safe increment and decrement interleave.
+ *
+ * Both the increment and decrement will be a no-op if the safe mode is OFF.
+ * The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
+ */
+ @Test(timeout = 30000)
+ public void testIncrementAndDecrementSafeBlockCount() {
+ bmSafeMode.activate(BLOCK_TOTAL);
+ Whitebox.setInternalState(bmSafeMode, "extension", 0);
+
+ mockBlockManagerForBlockSafeDecrement();
+ for (long i = 1; i <= BLOCK_TOTAL; i++) {
+ BlockInfo blockInfo = mock(BlockInfo.class);
+
+ bmSafeMode.incrementSafeBlockCount(1);
+ bmSafeMode.decrementSafeBlockCount(blockInfo);
+ bmSafeMode.incrementSafeBlockCount(1);
+
+ if (i < BLOCK_THRESHOLD) {
+ assertEquals(i, getblockSafe());
+ assertTrue(bmSafeMode.isInSafeMode());
+ } else {
+ // block manager leaves safe mode if block threshold is met
+ assertEquals(BLOCK_THRESHOLD, getblockSafe());
+ assertFalse(bmSafeMode.isInSafeMode());
+ }
+ }
+ }
+
+ /**
+ * Test the safe mode monitor.
+ *
+ * The monitor will make block manager leave the safe mode after extension
+ * period.
+ */
+ @Test(timeout = 30000)
+ public void testSafeModeMonitor() throws Exception {
+ bmSafeMode.activate(BLOCK_TOTAL);
+
+ setBlockSafe(BLOCK_THRESHOLD);
+ // PENDING_THRESHOLD -> EXTENSION
+ bmSafeMode.checkSafeMode();
+
+ assertTrue(bmSafeMode.isInSafeMode());
+ waitForExtensionPeriod();
+ assertFalse(bmSafeMode.isInSafeMode());
+ }
+
+ /**
+ * Test block manager won't leave safe mode if datanode threshold is not met.
+ */
+ @Test(timeout = 30000)
+ public void testDatanodeThreshodShouldBeMet() throws Exception {
+ bmSafeMode.activate(BLOCK_TOTAL);
+
+ // All datanode have not registered yet.
+ when(dn.getNumLiveDataNodes()).thenReturn(1);
+ setBlockSafe(BLOCK_THRESHOLD);
+ bmSafeMode.checkSafeMode();
+ assertTrue(bmSafeMode.isInSafeMode());
+
+ // The datanode number reaches threshold after all data nodes register
+ when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
+ bmSafeMode.checkSafeMode();
+ waitForExtensionPeriod();
+ assertFalse(bmSafeMode.isInSafeMode());
+ }
+
+ /**
+ * Test block manager won't leave safe mode if there are orphan blocks.
+ */
+ @Test(timeout = 30000)
+ public void testStayInSafeModeWhenBytesInFuture() throws Exception {
+ bmSafeMode.activate(BLOCK_TOTAL);
+
+ when(bm.getBytesInFuture()).thenReturn(1L);
+ // safe blocks are enough
+ setBlockSafe(BLOCK_THRESHOLD);
+
+ // PENDING_THRESHOLD -> EXTENSION
+ bmSafeMode.checkSafeMode();
+ try {
+ waitForExtensionPeriod();
+ fail("Safe mode should not leave extension period with orphan blocks!");
+ } catch (TimeoutException e) {
+ assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
+ }
+ }
+
+ /**
+ * Test get safe mode tip.
+ */
+ @Test(timeout = 30000)
+ public void testGetSafeModeTip() throws Exception {
+ bmSafeMode.activate(BLOCK_TOTAL);
+ String tip = bmSafeMode.getSafeModeTip();
+ assertTrue(tip.contains(
+ String.format(
+ "The reported blocks %d needs additional %d blocks to reach the " +
+ "threshold %.4f of total blocks %d.%n",
+ 0, BLOCK_THRESHOLD, THRESHOLD, BLOCK_TOTAL)));
+ assertTrue(tip.contains(
+ String.format("The number of live datanodes %d has reached the " +
+ "minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
+ assertTrue(tip.contains("Safe mode will be turned off automatically once " +
+ "the thresholds have been reached."));
+
+ // safe blocks are enough
+ setBlockSafe(BLOCK_THRESHOLD);
+ bmSafeMode.checkSafeMode();
+ tip = bmSafeMode.getSafeModeTip();
+ assertTrue(tip.contains(
+ String.format("The reported blocks %d has reached the threshold"
+ + " %.4f of total blocks %d. ",
+ getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
+ assertTrue(tip.contains(
+ String.format("The number of live datanodes %d has reached the " +
+ "minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
+ assertTrue(tip.contains("In safe mode extension. Safe mode will be turned" +
+ " off automatically in"));
+
+ waitForExtensionPeriod();
+ tip = bmSafeMode.getSafeModeTip();
+ System.out.println(tip);
+ assertTrue(tip.contains(
+ String.format("The reported blocks %d has reached the threshold"
+ + " %.4f of total blocks %d. ",
+ getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
+ assertTrue(tip.contains(
+ String.format("The number of live datanodes %d has reached the " +
+ "minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
+ assertTrue(tip.contains("Safe mode will be turned off automatically soon"));
+ }
+
+ /**
+ * Mock block manager internal state for decrement safe block
+ */
+ private void mockBlockManagerForBlockSafeDecrement() {
+ BlockInfo storedBlock = mock(BlockInfo.class);
+ when(storedBlock.isComplete()).thenReturn(true);
+ doReturn(storedBlock).when(bm).getStoredBlock(any(Block.class));
+ NumberReplicas numberReplicas = mock(NumberReplicas.class);
+ when(numberReplicas.liveReplicas()).thenReturn(0);
+ doReturn(numberReplicas).when(bm).countNodes(any(Block.class));
+ }
+
+ /**
+ * Wait the bmSafeMode monitor for the extension period.
+ * @throws InterruptedIOException
+ * @throws TimeoutException
+ */
+ private void waitForExtensionPeriod() throws Exception{
+ assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
+
+ GenericTestUtils.waitFor(new Supplier