HDFS-9129. Move the safemode block count into BlockManager. Contributed by Mingliang Liu.

This commit is contained in:
Jing Zhao 2015-12-07 13:46:12 -08:00
parent 4ac1564418
commit c993327483
21 changed files with 1199 additions and 839 deletions

View File

@ -20,6 +20,9 @@ Release 2.9.0 - UNRELEASED
HDFS-8947. NameNode, DataNode and NFS gateway to support JvmPauseMonitor as HDFS-8947. NameNode, DataNode and NFS gateway to support JvmPauseMonitor as
a service. (Sunil G via Stevel) a service. (Sunil G via Stevel)
HDFS-9129. Move the safemode block count into BlockManager. (Mingliang Liu
via jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -116,6 +117,8 @@ public class BlockManager implements BlockStatsMXBean {
private final Namesystem namesystem; private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
private final DatanodeManager datanodeManager; private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager; private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager; private final BlockTokenSecretManager blockTokenSecretManager;
@ -364,6 +367,8 @@ public class BlockManager implements BlockStatsMXBean {
this.numberOfBytesInFutureBlocks = new AtomicLong(); this.numberOfBytesInFutureBlocks = new AtomicLong();
this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf)); this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf));
bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
LOG.info("defaultReplication = " + defaultReplication); LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication); LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication); LOG.info("minReplication = " + minReplication);
@ -465,15 +470,17 @@ public class BlockManager implements BlockStatsMXBean {
: false; : false;
} }
public void activate(Configuration conf) { public void activate(Configuration conf, long blockTotal) {
pendingReplications.start(); pendingReplications.start();
datanodeManager.activate(conf); datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor"); this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start(); this.replicationThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this); mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
} }
public void close() { public void close() {
bmSafeMode.close();
try { try {
replicationThread.interrupt(); replicationThread.interrupt();
replicationThread.join(3000); replicationThread.join(3000);
@ -709,9 +716,8 @@ public class BlockManager implements BlockStatsMXBean {
// count. (We may not have the minimum replica count yet if this is // count. (We may not have the minimum replica count yet if this is
// a "forced" completion when a file is getting closed by an // a "forced" completion when a file is getting closed by an
// OP_CLOSE edit on the standby). // OP_CLOSE edit on the standby).
namesystem.adjustSafeModeBlockTotals(0, 1); bmSafeMode.adjustBlockTotals(0, 1);
namesystem.incrementSafeBlockCount( bmSafeMode.incrementSafeBlockCount(Math.min(numNodes, minReplication));
Math.min(numNodes, minReplication));
} }
/** /**
@ -768,7 +774,7 @@ public class BlockManager implements BlockStatsMXBean {
// Adjust safe-mode totals, since under-construction blocks don't // Adjust safe-mode totals, since under-construction blocks don't
// count in safe-mode. // count in safe-mode.
namesystem.adjustSafeModeBlockTotals( bmSafeMode.adjustBlockTotals(
// decrement safe if we had enough // decrement safe if we had enough
targets.length >= minReplication ? -1 : 0, targets.length >= minReplication ? -1 : 0,
// always decrement total blocks // always decrement total blocks
@ -1111,7 +1117,7 @@ public class BlockManager implements BlockStatsMXBean {
removeStoredBlock(block, node); removeStoredBlock(block, node);
invalidateBlocks.remove(node, block); invalidateBlocks.remove(node, block);
} }
namesystem.checkSafeMode(); checkSafeMode();
} }
/** /**
@ -1777,6 +1783,76 @@ public class BlockManager implements BlockStatsMXBean {
return leaseId; return leaseId;
} }
public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
assert namesystem.hasWriteLock();
datanodeManager.registerDatanode(nodeReg);
bmSafeMode.checkSafeMode();
}
/**
* Set the total number of blocks in the system.
* If safe mode is not currently on, this is a no-op.
*/
public void setBlockTotal(long total) {
if (bmSafeMode.isInSafeMode()) {
bmSafeMode.setBlockTotal(total);
bmSafeMode.checkSafeMode();
}
}
public boolean isInSafeMode() {
return bmSafeMode.isInSafeMode();
}
public String getSafeModeTip() {
return bmSafeMode.getSafeModeTip();
}
public void leaveSafeMode(boolean force) {
bmSafeMode.leaveSafeMode(force);
}
void checkSafeMode() {
bmSafeMode.checkSafeMode();
}
/**
* Removes the blocks from blocksmap and updates the safemode blocks total
*
* @param blocks
* An instance of {@link BlocksMapUpdateInfo} which contains a list
* of blocks that need to be removed from blocksMap
*/
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
assert namesystem.hasWriteLock();
// In the case that we are a Standby tailing edits from the
// active while in safe-mode, we need to track the total number
// of blocks and safe blocks in the system.
boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks();
int numRemovedComplete = 0, numRemovedSafe = 0;
for (BlockInfo b : blocks.getToDeleteList()) {
if (trackBlockCounts) {
if (b.isComplete()) {
numRemovedComplete++;
if (checkMinReplication(b)) {
numRemovedSafe++;
}
}
}
removeBlock(b);
}
if (trackBlockCounts) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting safe-mode totals for deletion."
+ "decreasing safeBlocks by " + numRemovedSafe
+ ", totalBlocks by " + numRemovedComplete);
}
bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete);
}
}
/** /**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of * StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks. * updates to the information about under-construction blocks.
@ -2162,7 +2238,7 @@ public class BlockManager implements BlockStatsMXBean {
if (namesystem.isInSnapshot(storedBlock)) { if (namesystem.isInSnapshot(storedBlock)) {
int numOfReplicas = storedBlock.getUnderConstructionFeature() int numOfReplicas = storedBlock.getUnderConstructionFeature()
.getNumExpectedLocations(); .getNumExpectedLocations();
namesystem.incrementSafeBlockCount(numOfReplicas); bmSafeMode.incrementSafeBlockCount(numOfReplicas);
} }
//and fall through to next clause //and fall through to next clause
} }
@ -2543,7 +2619,7 @@ public class BlockManager implements BlockStatsMXBean {
// only complete blocks are counted towards that. // only complete blocks are counted towards that.
// In the case that the block just became complete above, completeBlock() // In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance. // handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica); bmSafeMode.incrementSafeBlockCount(numCurrentReplica);
} }
} }
@ -2618,7 +2694,7 @@ public class BlockManager implements BlockStatsMXBean {
// Is no-op if not in safe mode. // Is no-op if not in safe mode.
// In the case that the block just became complete above, completeBlock() // In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance. // handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica); bmSafeMode.incrementSafeBlockCount(numCurrentReplica);
} }
// if file is under construction, then done for now // if file is under construction, then done for now
@ -3048,7 +3124,7 @@ public class BlockManager implements BlockStatsMXBean {
// //
BlockCollection bc = getBlockCollection(storedBlock); BlockCollection bc = getBlockCollection(storedBlock);
if (bc != null) { if (bc != null) {
namesystem.decrementSafeBlockCount(storedBlock); bmSafeMode.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0); updateNeededReplications(storedBlock, -1, 0);
} }

View File

@ -0,0 +1,568 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
* Block manager safe mode info.
*
* During name node startup, counts the number of <em>safe blocks</em>, those
* that have at least the minimal number of replicas, and calculates the ratio
* of safe blocks to the total number of blocks in the system, which is the size
* of blocks. When the ratio reaches the {@link #threshold} and enough live data
* nodes have registered, it needs to wait for the safe mode {@link #extension}
* interval. After the extension period has passed, it will not leave safe mode
* until the safe blocks ratio reaches the {@link #threshold} and enough live
* data node registered.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class BlockManagerSafeMode {
enum BMSafeModeStatus {
PENDING_THRESHOLD, /** Pending on more safe blocks or live datanode. */
EXTENSION, /** In extension period. */
OFF /** Safe mode is off. */
}
static final Logger LOG = LoggerFactory.getLogger(BlockManagerSafeMode.class);
static final Step STEP_AWAITING_REPORTED_BLOCKS =
new Step(StepType.AWAITING_REPORTED_BLOCKS);
private final BlockManager blockManager;
private final Namesystem namesystem;
private final boolean haEnabled;
private volatile BMSafeModeStatus status = BMSafeModeStatus.OFF;
/** Safe mode threshold condition %.*/
private final double threshold;
/** Number of blocks needed to satisfy safe mode threshold condition. */
private long blockThreshold;
/** Total number of blocks. */
private long blockTotal;
/** Number of safe blocks. */
private long blockSafe;
/** Safe mode minimum number of datanodes alive. */
private final int datanodeThreshold;
/** Min replication required by safe mode. */
private final int safeReplication;
/** Threshold for populating needed replication queues. */
private final double replQueueThreshold;
/** Number of blocks needed before populating replication queues. */
private long blockReplQueueThreshold;
/** How long (in ms) is the extension period. */
private final int extension;
/** Timestamp of the first time when thresholds are met. */
private final AtomicLong reachedTime = new AtomicLong();
/** Timestamp of the safe mode initialized. */
private long startTime;
/** the safe mode monitor thread. */
private final Daemon smmthread = new Daemon(new SafeModeMonitor());
/** time of the last status printout */
private long lastStatusReport;
/** Counter for tracking startup progress of reported blocks. */
private Counter awaitingReportedBlocksCounter;
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
Configuration conf) {
this.blockManager = blockManager;
this.namesystem = namesystem;
this.haEnabled = namesystem.isHaEnabled();
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
if (this.threshold > 1.0) {
LOG.warn("The threshold value should't be greater than 1, threshold: {}",
threshold);
}
this.datanodeThreshold = conf.getInt(
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
int minReplication =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
// DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
// setting this lower than the min replication is not recommended
// and/or dangerous for production setups.
// When it's unset, safeReplication will use dfs.namenode.replication.min
this.safeReplication =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
minReplication);
// default to safe mode threshold (i.e., don't populate queues before
// leaving safe mode)
this.replQueueThreshold =
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
(float) threshold);
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
datanodeThreshold);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension);
}
/**
* Initialize the safe mode information.
* @param total initial total blocks
*/
void activate(long total) {
assert namesystem.hasWriteLock();
assert status == BMSafeModeStatus.OFF;
startTime = monotonicNow();
setBlockTotal(total);
if (areThresholdsMet()) {
leaveSafeMode(true);
} else {
// enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", true);
lastStatusReport = monotonicNow();
}
}
/**
* @return true if it stays in start up safe mode else false.
*/
boolean isInSafeMode() {
if (status != BMSafeModeStatus.OFF) {
doConsistencyCheck();
return true;
} else {
return false;
}
}
/**
* The transition of the safe mode state machine.
* If safe mode is not currently on, this is a no-op.
*/
void checkSafeMode() {
assert namesystem.hasWriteLock();
if (namesystem.inTransitionToActive()) {
return;
}
switch (status) {
case PENDING_THRESHOLD:
if (areThresholdsMet()) {
if (extension > 0) {
// PENDING_THRESHOLD -> EXTENSION
status = BMSafeModeStatus.EXTENSION;
reachedTime.set(monotonicNow());
smmthread.start();
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode extension entered.", true);
} else {
// PENDING_THRESHOLD -> OFF
leaveSafeMode(false);
}
} else {
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", false);
}
break;
case EXTENSION:
reportStatus("STATE* Safe mode ON.", false);
break;
case OFF:
break;
default:
assert false : "Non-recognized block manager safe mode status: " + status;
}
}
/**
* Adjust the total number of blocks safe and expected during safe mode.
* If safe mode is not currently on, this is a no-op.
* @param deltaSafe the change in number of safe blocks
* @param deltaTotal the change in number of total blocks expected
*/
void adjustBlockTotals(int deltaSafe, int deltaTotal) {
assert namesystem.hasWriteLock();
if (!isSafeModeTrackingBlocks()) {
return;
}
long newBlockTotal;
synchronized (this) {
LOG.debug("Adjusting block totals from {}/{} to {}/{}", blockSafe,
blockTotal, blockSafe + deltaSafe, blockTotal + deltaTotal);
assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
blockSafe + " by " + deltaSafe + ": would be negative";
assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
blockTotal + " by " + deltaTotal + ": would be negative";
blockSafe += deltaSafe;
newBlockTotal = blockTotal + deltaTotal;
}
setBlockTotal(newBlockTotal);
checkSafeMode();
}
/**
* Should we track blocks in safe mode.
* <p/>
* Never track blocks incrementally in non-HA code.
* <p/>
* In the HA case, the StandbyNode can be in safemode while the namespace
* is modified by the edit log tailer. In this case, the number of total
* blocks changes as edits are processed (eg blocks are added and deleted).
* However, we don't want to do the incremental tracking during the
* startup-time loading process -- only once the initial total has been
* set after the image has been loaded.
*/
boolean isSafeModeTrackingBlocks() {
assert namesystem.hasWriteLock();
return haEnabled && status != BMSafeModeStatus.OFF;
}
/**
* Set total number of blocks.
*/
void setBlockTotal(long total) {
assert namesystem.hasWriteLock();
synchronized (this) {
this.blockTotal = total;
this.blockThreshold = (long) (total * threshold);
}
this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}
String getSafeModeTip() {
String msg = "";
synchronized (this) {
if (blockSafe < blockThreshold) {
msg += String.format(
"The reported blocks %d needs additional %d"
+ " blocks to reach the threshold %.4f of total blocks %d.%n",
blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
} else {
msg += String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
}
}
int numLive = blockManager.getDatanodeManager().getNumLiveDataNodes();
if (numLive < datanodeThreshold) {
msg += String.format(
"The number of live datanodes %d needs an additional %d live "
+ "datanodes to reach the minimum number %d.%n",
numLive, (datanodeThreshold - numLive), datanodeThreshold);
} else {
msg += String.format("The number of live datanodes %d has reached "
+ "the minimum number %d. ",
numLive, datanodeThreshold);
}
if (blockManager.getBytesInFuture() > 0) {
msg += "Name node detected blocks with generation stamps " +
"in future. This means that Name node metadata is inconsistent." +
"This can happen if Name node metadata files have been manually " +
"replaced. Exiting safe mode will cause loss of " + blockManager
.getBytesInFuture() + " byte(s). Please restart name node with " +
"right metadata or use \"hdfs dfsadmin -safemode forceExit" +
"if you are certain that the NameNode was started with the" +
"correct FsImage and edit logs. If you encountered this during" +
"a rollback, it is safe to exit with -safemode forceExit.";
return msg;
}
final String turnOffTip = "Safe mode will be turned off automatically ";
switch(status) {
case PENDING_THRESHOLD:
msg += turnOffTip + "once the thresholds have been reached.";
break;
case EXTENSION:
msg += "In safe mode extension. "+ turnOffTip + "in " +
timeToLeaveExtension() / 1000 + " seconds.";
break;
case OFF:
msg += turnOffTip + "soon.";
break;
default:
assert false : "Non-recognized block manager safe mode status: " + status;
}
return msg;
}
/**
* Leave start up safe mode.
* @param force - true to force exit
*/
void leaveSafeMode(boolean force) {
assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!";
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!blockManager.isPopulatingReplQueues() &&
blockManager.shouldPopulateReplQueues()) {
blockManager.initializeReplQueues();
}
if (!force && blockManager.getBytesInFuture() > 0) {
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of {} byte(s). Please use " +
"-forceExit flag to exit safe mode forcefully if data loss is " +
"acceptable.", blockManager.getBytesInFuture());
return;
}
if (status != BMSafeModeStatus.OFF) {
NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
}
status = BMSafeModeStatus.OFF;
final long timeInSafemode = monotonicNow() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after {} secs",
timeInSafemode / 1000);
NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode);
final NetworkTopology nt = blockManager.getDatanodeManager()
.getNetworkTopology();
NameNode.stateChangeLog.info("STATE* Network topology has {} racks and {}" +
" datanodes", nt.getNumOfRacks(), nt.getNumOfLeaves());
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has {} blocks",
blockManager.numOfUnderReplicatedBlocks());
namesystem.startSecretManagerIfNecessary();
// If startup has not yet completed, end safemode phase.
StartupProgress prog = NameNode.getStartupProgress();
if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
prog.endStep(Phase.SAFEMODE,
BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS);
prog.endPhase(Phase.SAFEMODE);
}
}
/**
* Increment number of safe blocks if current block has reached minimal
* replication.
* If safe mode is not currently on, this is a no-op.
* @param storageNum current number of replicas or number of internal blocks
* of a striped block group
*/
synchronized void incrementSafeBlockCount(int storageNum) {
assert namesystem.hasWriteLock();
if (status == BMSafeModeStatus.OFF) {
return;
}
if (storageNum == safeReplication) {
this.blockSafe++;
// Report startup progress only if we haven't completed startup yet.
StartupProgress prog = NameNode.getStartupProgress();
if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
if (this.awaitingReportedBlocksCounter == null) {
this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
STEP_AWAITING_REPORTED_BLOCKS);
}
this.awaitingReportedBlocksCounter.increment();
}
checkSafeMode();
}
}
/**
* Decrement number of safe blocks if current block has fallen below minimal
* replication.
* If safe mode is not currently on, this is a no-op.
*/
synchronized void decrementSafeBlockCount(BlockInfo b) {
assert namesystem.hasWriteLock();
if (status == BMSafeModeStatus.OFF) {
return;
}
BlockInfo storedBlock = blockManager.getStoredBlock(b);
if (storedBlock.isComplete() &&
blockManager.countNodes(b).liveReplicas() == safeReplication - 1) {
this.blockSafe--;
assert blockSafe >= 0;
checkSafeMode();
}
}
void close() {
assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!";
try {
smmthread.interrupt();
smmthread.join(3000);
} catch (InterruptedException ignored) {
}
}
/**
* Get time (counting in milliseconds) left to leave extension period.
*
* Negative value indicates the extension period has passed.
*/
private long timeToLeaveExtension() {
return reachedTime.get() + extension - monotonicNow();
}
/** Check if we are ready to initialize replication queues. */
private void initializeReplQueuesIfNecessary() {
assert namesystem.hasWriteLock();
// Whether it has reached the threshold for initializing replication queues.
boolean canInitializeReplQueues = blockManager.shouldPopulateReplQueues() &&
blockSafe >= blockReplQueueThreshold;
if (canInitializeReplQueues &&
!blockManager.isPopulatingReplQueues() &&
!haEnabled) {
blockManager.initializeReplQueues();
}
}
/**
* @return true if both block and datanode threshold are met else false.
*/
private boolean areThresholdsMet() {
assert namesystem.hasWriteLock();
int datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes();
synchronized (this) {
return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;
}
}
/**
* Checks consistency of the class state.
* This is costly so only runs if asserts are enabled.
*/
private void doConsistencyCheck() {
boolean assertsOn = false;
assert assertsOn = true; // set to true if asserts are on
if (!assertsOn) {
return;
}
int activeBlocks = blockManager.getActiveBlockCount();
synchronized (this) {
if (blockTotal != activeBlocks &&
!(blockSafe >= 0 && blockSafe <= blockTotal)) {
LOG.warn("SafeMode is in inconsistent filesystem state. " +
"BlockManagerSafeMode data: blockTotal={}, blockSafe={}; " +
"BlockManager data: activeBlocks={}",
blockTotal, blockSafe, activeBlocks);
}
}
}
/**
* Print status every 20 seconds.
*/
private void reportStatus(String msg, boolean rightNow) {
assert namesystem.hasWriteLock();
long curTime = monotonicNow();
if(!rightNow && (curTime - lastStatusReport < 20 * 1000)) {
return;
}
NameNode.stateChangeLog.info(msg + " \n" + getSafeModeTip());
lastStatusReport = curTime;
}
/**
* Periodically check whether it is time to leave safe mode.
* This thread starts when the threshold level is reached.
*/
private class SafeModeMonitor implements Runnable {
/** Interval in msec for checking safe mode. */
private static final long RECHECK_INTERVAL = 1000;
@Override
public void run() {
while (namesystem.isRunning()) {
try {
namesystem.writeLock();
if (status == BMSafeModeStatus.OFF) { // Not in safe mode.
break;
}
if (canLeave()) {
// EXTENSION -> OFF
leaveSafeMode(false);
break;
}
} finally {
namesystem.writeUnlock();
}
try {
Thread.sleep(RECHECK_INTERVAL);
} catch (InterruptedException ignored) {
}
}
if (!namesystem.isRunning()) {
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
}
}
/**
* Check whether the safe mode can be turned off by this monitor.
*
* Safe mode can be turned off iff
* the threshold is reached, and
* the extension time has passed.
*/
private boolean canLeave() {
if (timeToLeaveExtension() > 0) {
reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
return false;
} else if (!areThresholdsMet()) {
reportStatus("STATE* Safe mode ON, thresholds not met.", false);
return false;
} else {
return true;
}
}
}
}

View File

@ -87,7 +87,7 @@ public class DatanodeManager {
private final Map<String, DatanodeDescriptor> datanodeMap private final Map<String, DatanodeDescriptor> datanodeMap
= new HashMap<>(); = new HashMap<>();
/** Cluster network topology */ /** Cluster network topology. */
private final NetworkTopology networktopology; private final NetworkTopology networktopology;
/** Host names to datanode descriptors mapping. */ /** Host names to datanode descriptors mapping. */
@ -104,7 +104,7 @@ public class DatanodeManager {
private final int defaultIpcPort; private final int defaultIpcPort;
/** Read include/exclude files*/ /** Read include/exclude files. */
private final HostFileManager hostFileManager = new HostFileManager(); private final HostFileManager hostFileManager = new HostFileManager();
/** The period to wait for datanode heartbeat.*/ /** The period to wait for datanode heartbeat.*/
@ -555,7 +555,7 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo); LOG.debug("remove datanode " + nodeInfo);
} }
namesystem.checkSafeMode(); blockManager.checkSafeMode();
} }
/** /**

View File

@ -256,7 +256,8 @@ class Checkpointer extends Daemon {
if(backupNode.namesystem.getBlocksTotal() > 0) { if(backupNode.namesystem.getBlocksTotal() > 0) {
long completeBlocksTotal = long completeBlocksTotal =
backupNode.namesystem.getCompleteBlocksTotal(); backupNode.namesystem.getCompleteBlocksTotal();
backupNode.namesystem.setBlockTotal(completeBlocksTotal); backupNode.namesystem.getBlockManager().setBlockTotal(
completeBlocksTotal);
} }
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid); bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
if (!backupNode.namesystem.isRollingUpgrade()) { if (!backupNode.namesystem.isRollingUpgrade()) {

View File

@ -150,7 +150,7 @@ class FSDirDeleteOp {
if (filesRemoved) { if (filesRemoved) {
fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false); fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks); fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
} }
} }

View File

@ -312,7 +312,8 @@ class FSDirRenameOp {
unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp, unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
collectedBlocks, options); collectedBlocks, options);
if (!collectedBlocks.getToDeleteList().isEmpty()) { if (!collectedBlocks.getToDeleteList().isEmpty()) {
fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks); fsd.getFSNamesystem().getBlockManager()
.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
} }
} }

View File

@ -184,7 +184,7 @@ final class FSDirTruncateOp {
} }
assert onBlockBoundary == (truncateBlock == null) : assert onBlockBoundary == (truncateBlock == null) :
"truncateBlock is null iff on block boundary: " + truncateBlock; "truncateBlock is null iff on block boundary: " + truncateBlock;
fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks); fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
} }
/** /**

View File

@ -734,7 +734,8 @@ public class FSEditLogLoader {
deleteSnapshotOp.snapshotName, deleteSnapshotOp.snapshotName,
new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(), new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
collectedBlocks, removedINodes, null)); collectedBlocks, removedINodes, null));
fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks); fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
collectedBlocks);
collectedBlocks.clear(); collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes); fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear(); removedINodes.clear();

View File

@ -383,7 +383,7 @@ public class NameNode implements NameNodeStatusMXBean {
return rpcServer; return rpcServer;
} }
static void initMetrics(Configuration conf, NamenodeRole role) { public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role); metrics = NameNodeMetrics.create(conf, role);
} }
@ -1690,11 +1690,9 @@ public class NameNode implements NameNodeStatusMXBean {
HAServiceState retState = state.getServiceState(); HAServiceState retState = state.getServiceState();
HAServiceStatus ret = new HAServiceStatus(retState); HAServiceStatus ret = new HAServiceStatus(retState);
if (retState == HAServiceState.STANDBY) { if (retState == HAServiceState.STANDBY) {
String safemodeTip = namesystem.getSafeModeTip(); if (namesystem.isInSafeMode()) {
if (!safemodeTip.isEmpty()) { ret.setNotReadyToBecomeActive("The NameNode is in safemode. " +
ret.setNotReadyToBecomeActive( namesystem.getSafeModeTip());
"The NameNode is in safemode. " +
safemodeTip);
} else { } else {
ret.setReadyToBecomeActive(); ret.setReadyToBecomeActive();
} }

View File

@ -45,12 +45,23 @@ public interface Namesystem extends RwLock, SafeMode {
BlockCollection getBlockCollection(long id); BlockCollection getBlockCollection(long id);
void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
void checkOperation(OperationCategory read) throws StandbyException; void checkOperation(OperationCategory read) throws StandbyException;
void startSecretManagerIfNecessary();
boolean isInSnapshot(BlockInfo blockUC); boolean isInSnapshot(BlockInfo blockUC);
CacheManager getCacheManager(); CacheManager getCacheManager();
HAContext getHAContext(); HAContext getHAContext();
/**
* @return true if the HA is enabled else false
*/
boolean isHaEnabled();
/**
* @return Whether the namenode is transitioning to active state and is in the
* middle of the starting active services.
*/
boolean inTransitionToActive();
} }

View File

@ -18,18 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
/** SafeMode related operations. */ /** SafeMode related operations. */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface SafeMode { public interface SafeMode {
/**
* Check safe mode conditions.
* If the corresponding conditions are satisfied,
* trigger the system to enter/leave safe mode.
*/
public void checkSafeMode();
/** Is the system in safe mode? */ /** Is the system in safe mode? */
public boolean isInSafeMode(); public boolean isInSafeMode();
@ -38,13 +30,4 @@ public interface SafeMode {
* safe mode turned on automatically? * safe mode turned on automatically?
*/ */
public boolean isInStartupSafeMode(); public boolean isInStartupSafeMode();
/**
* Increment number of blocks that reached minimal replication.
* @param replication current replication
*/
public void incrementSafeBlockCount(int replication);
/** Decrement number of blocks that reached minimal replication. */
public void decrementSafeBlockCount(BlockInfo b);
} }

View File

@ -219,7 +219,7 @@ public class TestSafeMode {
} }
}, 10, 10000); }, 10, 10000);
final int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn); final long safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
assertTrue("Expected first block report to make some blocks safe.", safe > 0); assertTrue("Expected first block report to make some blocks safe.", safe > 0);
assertTrue("Did not expect first block report to make all blocks safe.", safe < 15); assertTrue("Did not expect first block report to make all blocks safe.", safe < 15);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -35,6 +36,7 @@ import org.apache.hadoop.util.Daemon;
import org.junit.Assert; import org.junit.Assert;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.mockito.internal.util.reflection.Whitebox;
public class BlockManagerTestUtil { public class BlockManagerTestUtil {
public static void setNodeReplicationLimit(final BlockManager blockManager, public static void setNodeReplicationLimit(final BlockManager blockManager,
@ -306,4 +308,11 @@ public class BlockManagerTestUtil {
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor(); dm.getDecomManager().runMonitor();
} }
public static void setStartupSafeModeForTest(BlockManager bm) {
BlockManagerSafeMode bmSafeMode = (BlockManagerSafeMode)Whitebox
.getInternalState(bm, "bmSafeMode");
Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
Whitebox.setInternalState(bmSafeMode, "status", BMSafeModeStatus.EXTENSION);
}
} }

View File

@ -0,0 +1,414 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* This test is for testing {@link BlockManagerSafeMode} package local APIs.
*
* They use heavily mocked objects, treating the {@link BlockManagerSafeMode}
* as white-box. Tests are light-weight thus no multi-thread scenario or real
* mini-cluster is tested.
*
* @see org.apache.hadoop.hdfs.TestSafeMode
* @see org.apache.hadoop.hdfs.server.namenode.ha.TestHASafeMode
*/
public class TestBlockManagerSafeMode {
private static final int DATANODE_NUM = 3;
private static final long BLOCK_TOTAL = 10;
private static final double THRESHOLD = 0.99;
private static final long BLOCK_THRESHOLD = (long)(BLOCK_TOTAL * THRESHOLD);
private static final int EXTENSION = 1000; // 1 second
private BlockManager bm;
private DatanodeManager dn;
private BlockManagerSafeMode bmSafeMode;
/**
* Set up the mock context.
*
* - extension is always needed (default period is {@link #EXTENSION} ms
* - datanode threshold is always reached via mock
* - safe block is 0 and it needs {@link #BLOCK_THRESHOLD} to reach threshold
* - write/read lock is always held by current thread
*
* @throws IOException
*/
@Before
public void setupMockCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setDouble(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
THRESHOLD);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY,
EXTENSION);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
DATANODE_NUM);
FSNamesystem fsn = mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
Mockito.doReturn(true).when(fsn).hasReadLock();
Mockito.doReturn(true).when(fsn).isRunning();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
bm = spy(new BlockManager(fsn, conf));
dn = spy(bm.getDatanodeManager());
Whitebox.setInternalState(bm, "datanodeManager", dn);
// the datanode threshold is always met
when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf);
}
/**
* Test set block total.
*
* The block total is set which will call checkSafeMode for the first time
* and bmSafeMode transfers from INITIALIZED to PENDING_THRESHOLD status
*/
@Test(timeout = 30000)
public void testInitialize() {
assertFalse("Block manager should not be in safe mode at beginning.",
bmSafeMode.isInSafeMode());
bmSafeMode.activate(BLOCK_TOTAL);
assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
assertTrue(bmSafeMode.isInSafeMode());
}
/**
* Test the state machine transition.
*/
@Test(timeout = 30000)
public void testCheckSafeMode() {
bmSafeMode.activate(BLOCK_TOTAL);
// stays in PENDING_THRESHOLD: pending block threshold
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
for (long i = 0; i < BLOCK_THRESHOLD; i++) {
setBlockSafe(i);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
}
// PENDING_THRESHOLD -> EXTENSION
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
Whitebox.setInternalState(bmSafeMode, "smmthread", null);
// PENDING_THRESHOLD -> OFF
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
setBlockSafe(BLOCK_THRESHOLD);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.OFF, getSafeModeStatus());
// stays in EXTENSION
setSafeModeStatus(BMSafeModeStatus.EXTENSION);
setBlockSafe(0);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
// stays in EXTENSION: pending extension period
setSafeModeStatus(BMSafeModeStatus.EXTENSION);
setBlockSafe(BLOCK_THRESHOLD);
Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
}
/**
* Test that the block safe increases up to block threshold.
*
* Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
*/
@Test(timeout = 30000)
public void testIncrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
for (long i = 1; i <= BLOCK_TOTAL; i++) {
bmSafeMode.incrementSafeBlockCount(1);
if (i < BLOCK_THRESHOLD) {
assertEquals(i, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
} else {
// block manager leaves safe mode if block threshold is met
assertFalse(bmSafeMode.isInSafeMode());
// the increment will be a no-op if safe mode is OFF
assertEquals(BLOCK_THRESHOLD, getblockSafe());
}
}
}
/**
* Test that the block safe increases up to block threshold.
*
* Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> EXTENSION-> OFF
*/
@Test(timeout = 30000)
public void testIncrementSafeBlockCountWithExtension() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
for (long i = 1; i <= BLOCK_TOTAL; i++) {
bmSafeMode.incrementSafeBlockCount(1);
if (i < BLOCK_THRESHOLD) {
assertTrue(bmSafeMode.isInSafeMode());
}
}
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test that the block safe decreases the block safe.
*
* The block manager stays in safe mode.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD
*/
@Test(timeout = 30000)
public void testDecrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
mockBlockManagerForBlockSafeDecrement();
setBlockSafe(BLOCK_THRESHOLD);
for (long i = BLOCK_THRESHOLD; i > 0; i--) {
BlockInfo blockInfo = mock(BlockInfo.class);
bmSafeMode.decrementSafeBlockCount(blockInfo);
assertEquals(i - 1, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
}
}
/**
* Test when the block safe increment and decrement interleave.
*
* Both the increment and decrement will be a no-op if the safe mode is OFF.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
*/
@Test(timeout = 30000)
public void testIncrementAndDecrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
mockBlockManagerForBlockSafeDecrement();
for (long i = 1; i <= BLOCK_TOTAL; i++) {
BlockInfo blockInfo = mock(BlockInfo.class);
bmSafeMode.incrementSafeBlockCount(1);
bmSafeMode.decrementSafeBlockCount(blockInfo);
bmSafeMode.incrementSafeBlockCount(1);
if (i < BLOCK_THRESHOLD) {
assertEquals(i, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
} else {
// block manager leaves safe mode if block threshold is met
assertEquals(BLOCK_THRESHOLD, getblockSafe());
assertFalse(bmSafeMode.isInSafeMode());
}
}
}
/**
* Test the safe mode monitor.
*
* The monitor will make block manager leave the safe mode after extension
* period.
*/
@Test(timeout = 30000)
public void testSafeModeMonitor() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
setBlockSafe(BLOCK_THRESHOLD);
// PENDING_THRESHOLD -> EXTENSION
bmSafeMode.checkSafeMode();
assertTrue(bmSafeMode.isInSafeMode());
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test block manager won't leave safe mode if datanode threshold is not met.
*/
@Test(timeout = 30000)
public void testDatanodeThreshodShouldBeMet() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
// All datanode have not registered yet.
when(dn.getNumLiveDataNodes()).thenReturn(1);
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
assertTrue(bmSafeMode.isInSafeMode());
// The datanode number reaches threshold after all data nodes register
when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
bmSafeMode.checkSafeMode();
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test block manager won't leave safe mode if there are orphan blocks.
*/
@Test(timeout = 30000)
public void testStayInSafeModeWhenBytesInFuture() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
when(bm.getBytesInFuture()).thenReturn(1L);
// safe blocks are enough
setBlockSafe(BLOCK_THRESHOLD);
// PENDING_THRESHOLD -> EXTENSION
bmSafeMode.checkSafeMode();
try {
waitForExtensionPeriod();
fail("Safe mode should not leave extension period with orphan blocks!");
} catch (TimeoutException e) {
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
}
}
/**
* Test get safe mode tip.
*/
@Test(timeout = 30000)
public void testGetSafeModeTip() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
String tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format(
"The reported blocks %d needs additional %d blocks to reach the " +
"threshold %.4f of total blocks %d.%n",
0, BLOCK_THRESHOLD, THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("Safe mode will be turned off automatically once " +
"the thresholds have been reached."));
// safe blocks are enough
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("In safe mode extension. Safe mode will be turned" +
" off automatically in"));
waitForExtensionPeriod();
tip = bmSafeMode.getSafeModeTip();
System.out.println(tip);
assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("Safe mode will be turned off automatically soon"));
}
/**
* Mock block manager internal state for decrement safe block
*/
private void mockBlockManagerForBlockSafeDecrement() {
BlockInfo storedBlock = mock(BlockInfo.class);
when(storedBlock.isComplete()).thenReturn(true);
doReturn(storedBlock).when(bm).getStoredBlock(any(Block.class));
NumberReplicas numberReplicas = mock(NumberReplicas.class);
when(numberReplicas.liveReplicas()).thenReturn(0);
doReturn(numberReplicas).when(bm).countNodes(any(Block.class));
}
/**
* Wait the bmSafeMode monitor for the extension period.
* @throws InterruptedIOException
* @throws TimeoutException
*/
private void waitForExtensionPeriod() throws Exception{
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return getSafeModeStatus() != BMSafeModeStatus.EXTENSION;
}
}, EXTENSION / 10, EXTENSION * 2);
}
private void setSafeModeStatus(BMSafeModeStatus status) {
Whitebox.setInternalState(bmSafeMode, "status", status);
}
private BMSafeModeStatus getSafeModeStatus() {
return (BMSafeModeStatus)Whitebox.getInternalState(bmSafeMode, "status");
}
private void setBlockSafe(long blockSafe) {
Whitebox.setInternalState(bmSafeMode, "blockSafe", blockSafe);
}
private long getblockSafe() {
return (long)Whitebox.getInternalState(bmSafeMode, "blockSafe");
}
}

View File

@ -1359,7 +1359,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication() testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException { throws IOException {
Namesystem mockNS = mock(Namesystem.class); Namesystem mockNS = mock(Namesystem.class);
when(mockNS.hasReadLock()).thenReturn(true); when(mockNS.hasWriteLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -231,12 +230,13 @@ public class NameNodeAdapter {
* @return the number of blocks marked safe by safemode, or -1 * @return the number of blocks marked safe by safemode, or -1
* if safemode is not running. * if safemode is not running.
*/ */
public static int getSafeModeSafeBlocks(NameNode nn) { public static long getSafeModeSafeBlocks(NameNode nn) {
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests(); if (!nn.getNamesystem().isInSafeMode()) {
if (smi == null) {
return -1; return -1;
} }
return smi.blockSafe; Object bmSafeMode = Whitebox.getInternalState(
nn.getNamesystem().getBlockManager(), "bmSafeMode");
return (long)Whitebox.getInternalState(bmSafeMode, "blockSafe");
} }
/** /**

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState; import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@ -332,7 +331,11 @@ public class TestFSNamesystem {
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
FSNamesystem fsn = new FSNamesystem(conf, fsImage); FSNamesystem fsn = new FSNamesystem(conf, fsImage);
SafeModeInfo safemodeInfo = fsn.getSafeModeInfoForTests();
assertTrue(safemodeInfo.toString().contains("Minimal replication = 2")); Object bmSafeMode = Whitebox.getInternalState(fsn.getBlockManager(),
"bmSafeMode");
int safeReplication = (int)Whitebox.getInternalState(bmSafeMode,
"safeReplication");
assertEquals(2, safeReplication);
} }
} }

View File

@ -103,8 +103,6 @@ public class TestNameNodeMetadataConsistency {
// we also need to tell block manager that we are in the startup path // we also need to tell block manager that we are in the startup path
FSNamesystem spyNameSystem = spy(cluster.getNameNode().getNamesystem()); FSNamesystem spyNameSystem = spy(cluster.getNameNode().getNamesystem());
spyNameSystem.enableSafeModeForTesting(conf);
Whitebox.setInternalState(cluster.getNameNode() Whitebox.setInternalState(cluster.getNameNode()
.getNamesystem().getBlockManager(), .getNamesystem().getBlockManager(),
"namesystem", spyNameSystem); "namesystem", spyNameSystem);

View File

@ -32,7 +32,6 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -56,7 +55,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -126,10 +124,13 @@ public class TestHASafeMode {
.synchronizedMap(new HashMap<Path, Boolean>()); .synchronizedMap(new HashMap<Path, Boolean>());
final Path test = new Path("/test"); final Path test = new Path("/test");
// let nn0 enter safemode // let nn0 enter safemode
cluster.getConfiguration(0).setInt(
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 3);
NameNodeAdapter.enterSafeMode(nn0, false); NameNodeAdapter.enterSafeMode(nn0, false);
SafeModeInfo safeMode = (SafeModeInfo) Whitebox.getInternalState( Whitebox.setInternalState(nn0.getNamesystem(), "manualSafeMode", false);
nn0.getNamesystem(), "safeMode"); BlockManagerTestUtil.setStartupSafeModeForTest(nn0.getNamesystem()
Whitebox.setInternalState(safeMode, "extension", Integer.valueOf(30000)); .getBlockManager());
assertTrue(nn0.getNamesystem().isInStartupSafeMode());
LOG.info("enter safemode"); LOG.info("enter safemode");
new Thread() { new Thread() {
@Override @Override