HDFS-9129. Move the safemode block count into BlockManager. Contributed by Mingliang Liu.

This commit is contained in:
Jing Zhao 2015-12-01 16:09:19 -08:00
parent 58f6f54eea
commit a49cc74b4c
21 changed files with 1208 additions and 845 deletions

View File

@ -879,6 +879,9 @@ Release 2.9.0 - UNRELEASED
IMPROVEMENTS
HDFS-9129. Move the safemode block count into BlockManager. (Mingliang Liu
via jing9)
OPTIMIZATIONS
BUG FIXES

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -125,6 +126,8 @@ public class BlockManager implements BlockStatsMXBean {
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
@ -380,6 +383,8 @@ public class BlockManager implements BlockStatsMXBean {
this.numberOfBytesInFutureBlocks = new AtomicLong();
this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf));
bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
@ -488,15 +493,17 @@ public class BlockManager implements BlockStatsMXBean {
: false;
}
public void activate(Configuration conf) {
public void activate(Configuration conf, long blockTotal) {
pendingReplications.start();
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
}
public void close() {
bmSafeMode.close();
try {
replicationThread.interrupt();
replicationThread.join(3000);
@ -741,11 +748,11 @@ public class BlockManager implements BlockStatsMXBean {
// count. (We may not have the minimum replica count yet if this is
// a "forced" completion when a file is getting closed by an
// OP_CLOSE edit on the standby).
namesystem.adjustSafeModeBlockTotals(0, 1);
bmSafeMode.adjustBlockTotals(0, 1);
final int minStorage = curBlock.isStriped() ?
((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
namesystem.incrementSafeBlockCount(
Math.min(numNodes, minStorage), curBlock);
bmSafeMode.incrementSafeBlockCount(Math.min(numNodes, minStorage),
curBlock);
}
/**
@ -805,7 +812,7 @@ public class BlockManager implements BlockStatsMXBean {
// Adjust safe-mode totals, since under-construction blocks don't
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
bmSafeMode.adjustBlockTotals(
// decrement safe if we had enough
hasMinStorage(lastBlock, targets.length) ? -1 : 0,
// always decrement total blocks
@ -1188,7 +1195,7 @@ public class BlockManager implements BlockStatsMXBean {
invalidateBlocks.remove(node, b);
}
}
namesystem.checkSafeMode();
checkSafeMode();
}
/**
@ -1933,6 +1940,74 @@ public class BlockManager implements BlockStatsMXBean {
return leaseId;
}
public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
assert namesystem.hasWriteLock();
datanodeManager.registerDatanode(nodeReg);
bmSafeMode.checkSafeMode();
}
/**
* Set the total number of blocks in the system.
* If safe mode is not currently on, this is a no-op.
*/
public void setBlockTotal(long total) {
if (bmSafeMode.isInSafeMode()) {
bmSafeMode.setBlockTotal(total);
bmSafeMode.checkSafeMode();
}
}
public boolean isInSafeMode() {
return bmSafeMode.isInSafeMode();
}
public String getSafeModeTip() {
return bmSafeMode.getSafeModeTip();
}
public void leaveSafeMode(boolean force) {
bmSafeMode.leaveSafeMode(force);
}
void checkSafeMode() {
bmSafeMode.checkSafeMode();
}
/**
* Removes the blocks from blocksmap and updates the safemode blocks total.
* @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
* list of blocks that need to be removed from blocksMap
*/
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
assert namesystem.hasWriteLock();
// In the case that we are a Standby tailing edits from the
// active while in safe-mode, we need to track the total number
// of blocks and safe blocks in the system.
boolean trackBlockCounts = bmSafeMode.isSafeModeTrackingBlocks();
int numRemovedComplete = 0, numRemovedSafe = 0;
for (BlockInfo b : blocks.getToDeleteList()) {
if (trackBlockCounts) {
if (b.isComplete()) {
numRemovedComplete++;
if (hasMinStorage(b, b.numNodes())) {
numRemovedSafe++;
}
}
}
removeBlock(b);
}
if (trackBlockCounts) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting safe-mode totals for deletion."
+ "decreasing safeBlocks by " + numRemovedSafe
+ ", totalBlocks by " + numRemovedComplete);
}
bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete);
}
}
/**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks.
@ -2333,7 +2408,7 @@ public class BlockManager implements BlockStatsMXBean {
if (namesystem.isInSnapshot(storedBlock)) {
int numOfReplicas = storedBlock.getUnderConstructionFeature()
.getNumExpectedLocations();
namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
bmSafeMode.incrementSafeBlockCount(numOfReplicas, storedBlock);
}
//and fall through to next clause
}
@ -2732,7 +2807,7 @@ public class BlockManager implements BlockStatsMXBean {
// only complete blocks are counted towards that.
// In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
bmSafeMode.incrementSafeBlockCount(numCurrentReplica, storedBlock);
}
}
@ -2808,7 +2883,7 @@ public class BlockManager implements BlockStatsMXBean {
// Is no-op if not in safe mode.
// In the case that the block just became complete above, completeBlock()
// handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
bmSafeMode.incrementSafeBlockCount(numCurrentReplica, storedBlock);
}
// if file is under construction, then done for now
@ -3352,7 +3427,7 @@ public class BlockManager implements BlockStatsMXBean {
//
BlockCollection bc = getBlockCollection(storedBlock);
if (bc != null) {
namesystem.decrementSafeBlockCount(storedBlock);
bmSafeMode.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0);
}

View File

@ -0,0 +1,573 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
* Block manager safe mode info.
*
* During name node startup, counts the number of <em>safe blocks</em>, those
* that have at least the minimal number of replicas, and calculates the ratio
* of safe blocks to the total number of blocks in the system, which is the size
* of blocks. When the ratio reaches the {@link #threshold} and enough live data
* nodes have registered, it needs to wait for the safe mode {@link #extension}
* interval. After the extension period has passed, it will not leave safe mode
* until the safe blocks ratio reaches the {@link #threshold} and enough live
* data node registered.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class BlockManagerSafeMode {
enum BMSafeModeStatus {
PENDING_THRESHOLD, /** Pending on more safe blocks or live datanode. */
EXTENSION, /** In extension period. */
OFF /** Safe mode is off. */
}
static final Logger LOG = LoggerFactory.getLogger(BlockManagerSafeMode.class);
static final Step STEP_AWAITING_REPORTED_BLOCKS =
new Step(StepType.AWAITING_REPORTED_BLOCKS);
private final BlockManager blockManager;
private final Namesystem namesystem;
private final boolean haEnabled;
private volatile BMSafeModeStatus status = BMSafeModeStatus.OFF;
/** Safe mode threshold condition %.*/
private final double threshold;
/** Number of blocks needed to satisfy safe mode threshold condition. */
private long blockThreshold;
/** Total number of blocks. */
private long blockTotal;
/** Number of safe blocks. */
private long blockSafe;
/** Safe mode minimum number of datanodes alive. */
private final int datanodeThreshold;
/** Min replication required by safe mode. */
private final int safeReplication;
/** Threshold for populating needed replication queues. */
private final double replQueueThreshold;
/** Number of blocks needed before populating replication queues. */
private long blockReplQueueThreshold;
/** How long (in ms) is the extension period. */
private final int extension;
/** Timestamp of the first time when thresholds are met. */
private final AtomicLong reachedTime = new AtomicLong();
/** Timestamp of the safe mode initialized. */
private long startTime;
/** the safe mode monitor thread. */
private final Daemon smmthread = new Daemon(new SafeModeMonitor());
/** time of the last status printout */
private long lastStatusReport;
/** Counter for tracking startup progress of reported blocks. */
private Counter awaitingReportedBlocksCounter;
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
Configuration conf) {
this.blockManager = blockManager;
this.namesystem = namesystem;
this.haEnabled = namesystem.isHaEnabled();
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
if (this.threshold > 1.0) {
LOG.warn("The threshold value should't be greater than 1, threshold: {}",
threshold);
}
this.datanodeThreshold = conf.getInt(
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
int minReplication =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
// DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
// setting this lower than the min replication is not recommended
// and/or dangerous for production setups.
// When it's unset, safeReplication will use dfs.namenode.replication.min
this.safeReplication =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
minReplication);
// default to safe mode threshold (i.e., don't populate queues before
// leaving safe mode)
this.replQueueThreshold =
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
(float) threshold);
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
datanodeThreshold);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, extension);
}
/**
* Initialize the safe mode information.
* @param total initial total blocks
*/
void activate(long total) {
assert namesystem.hasWriteLock();
assert status == BMSafeModeStatus.OFF;
startTime = monotonicNow();
setBlockTotal(total);
if (areThresholdsMet()) {
leaveSafeMode(true);
} else {
// enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", true);
lastStatusReport = monotonicNow();
}
}
/**
* @return true if it stays in start up safe mode else false.
*/
boolean isInSafeMode() {
if (status != BMSafeModeStatus.OFF) {
doConsistencyCheck();
return true;
} else {
return false;
}
}
/**
* The transition of the safe mode state machine.
* If safe mode is not currently on, this is a no-op.
*/
void checkSafeMode() {
assert namesystem.hasWriteLock();
if (namesystem.inTransitionToActive()) {
return;
}
switch (status) {
case PENDING_THRESHOLD:
if (areThresholdsMet()) {
if (extension > 0) {
// PENDING_THRESHOLD -> EXTENSION
status = BMSafeModeStatus.EXTENSION;
reachedTime.set(monotonicNow());
smmthread.start();
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode extension entered.", true);
} else {
// PENDING_THRESHOLD -> OFF
leaveSafeMode(false);
}
} else {
initializeReplQueuesIfNecessary();
reportStatus("STATE* Safe mode ON.", false);
}
break;
case EXTENSION:
reportStatus("STATE* Safe mode ON.", false);
break;
case OFF:
break;
default:
assert false : "Non-recognized block manager safe mode status: " + status;
}
}
/**
* Adjust the total number of blocks safe and expected during safe mode.
* If safe mode is not currently on, this is a no-op.
* @param deltaSafe the change in number of safe blocks
* @param deltaTotal the change in number of total blocks expected
*/
void adjustBlockTotals(int deltaSafe, int deltaTotal) {
assert namesystem.hasWriteLock();
if (!isSafeModeTrackingBlocks()) {
return;
}
long newBlockTotal;
synchronized (this) {
LOG.debug("Adjusting block totals from {}/{} to {}/{}", blockSafe,
blockTotal, blockSafe + deltaSafe, blockTotal + deltaTotal);
assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
blockSafe + " by " + deltaSafe + ": would be negative";
assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
blockTotal + " by " + deltaTotal + ": would be negative";
blockSafe += deltaSafe;
newBlockTotal = blockTotal + deltaTotal;
}
setBlockTotal(newBlockTotal);
checkSafeMode();
}
/**
* Should we track blocks in safe mode.
* <p/>
* Never track blocks incrementally in non-HA code.
* <p/>
* In the HA case, the StandbyNode can be in safemode while the namespace
* is modified by the edit log tailer. In this case, the number of total
* blocks changes as edits are processed (eg blocks are added and deleted).
* However, we don't want to do the incremental tracking during the
* startup-time loading process -- only once the initial total has been
* set after the image has been loaded.
*/
boolean isSafeModeTrackingBlocks() {
assert namesystem.hasWriteLock();
return haEnabled && status != BMSafeModeStatus.OFF;
}
/**
* Set total number of blocks.
*/
void setBlockTotal(long total) {
assert namesystem.hasWriteLock();
synchronized (this) {
this.blockTotal = total;
this.blockThreshold = (long) (total * threshold);
}
this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}
String getSafeModeTip() {
String msg = "";
synchronized (this) {
if (blockSafe < blockThreshold) {
msg += String.format(
"The reported blocks %d needs additional %d"
+ " blocks to reach the threshold %.4f of total blocks %d.%n",
blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
} else {
msg += String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
}
}
int numLive = blockManager.getDatanodeManager().getNumLiveDataNodes();
if (numLive < datanodeThreshold) {
msg += String.format(
"The number of live datanodes %d needs an additional %d live "
+ "datanodes to reach the minimum number %d.%n",
numLive, (datanodeThreshold - numLive), datanodeThreshold);
} else {
msg += String.format("The number of live datanodes %d has reached "
+ "the minimum number %d. ",
numLive, datanodeThreshold);
}
if (blockManager.getBytesInFuture() > 0) {
msg += "Name node detected blocks with generation stamps " +
"in future. This means that Name node metadata is inconsistent." +
"This can happen if Name node metadata files have been manually " +
"replaced. Exiting safe mode will cause loss of " + blockManager
.getBytesInFuture() + " byte(s). Please restart name node with " +
"right metadata or use \"hdfs dfsadmin -safemode forceExit" +
"if you are certain that the NameNode was started with the" +
"correct FsImage and edit logs. If you encountered this during" +
"a rollback, it is safe to exit with -safemode forceExit.";
return msg;
}
final String turnOffTip = "Safe mode will be turned off automatically ";
switch(status) {
case PENDING_THRESHOLD:
msg += turnOffTip + "once the thresholds have been reached.";
break;
case EXTENSION:
msg += "In safe mode extension. "+ turnOffTip + "in " +
timeToLeaveExtension() / 1000 + " seconds.";
break;
case OFF:
msg += turnOffTip + "soon.";
break;
default:
assert false : "Non-recognized block manager safe mode status: " + status;
}
return msg;
}
/**
* Leave start up safe mode.
* @param force - true to force exit
*/
void leaveSafeMode(boolean force) {
assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!";
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!blockManager.isPopulatingReplQueues() &&
blockManager.shouldPopulateReplQueues()) {
blockManager.initializeReplQueues();
}
if (!force && blockManager.getBytesInFuture() > 0) {
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of {} byte(s). Please use " +
"-forceExit flag to exit safe mode forcefully if data loss is " +
"acceptable.", blockManager.getBytesInFuture());
return;
}
if (status != BMSafeModeStatus.OFF) {
NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
}
status = BMSafeModeStatus.OFF;
final long timeInSafemode = monotonicNow() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after {} secs",
timeInSafemode / 1000);
NameNode.getNameNodeMetrics().setSafeModeTime(timeInSafemode);
final NetworkTopology nt = blockManager.getDatanodeManager()
.getNetworkTopology();
NameNode.stateChangeLog.info("STATE* Network topology has {} racks and {}" +
" datanodes", nt.getNumOfRacks(), nt.getNumOfLeaves());
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has {} blocks",
blockManager.numOfUnderReplicatedBlocks());
namesystem.startSecretManagerIfNecessary();
// If startup has not yet completed, end safemode phase.
StartupProgress prog = NameNode.getStartupProgress();
if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
prog.endStep(Phase.SAFEMODE,
BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS);
prog.endPhase(Phase.SAFEMODE);
}
}
/**
* Increment number of safe blocks if current block has reached minimal
* replication.
* If safe mode is not currently on, this is a no-op.
* @param storageNum current number of replicas or number of internal blocks
* of a striped block group
* @param storedBlock current storedBlock which is either a
* BlockInfoContiguous or a BlockInfoStriped
*/
synchronized void incrementSafeBlockCount(int storageNum,
BlockInfo storedBlock) {
assert namesystem.hasWriteLock();
if (status == BMSafeModeStatus.OFF) {
return;
}
final int safe = storedBlock.isStriped() ?
((BlockInfoStriped)storedBlock).getRealDataBlockNum() : safeReplication;
if (storageNum == safe) {
this.blockSafe++;
// Report startup progress only if we haven't completed startup yet.
StartupProgress prog = NameNode.getStartupProgress();
if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
if (this.awaitingReportedBlocksCounter == null) {
this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
STEP_AWAITING_REPORTED_BLOCKS);
}
this.awaitingReportedBlocksCounter.increment();
}
checkSafeMode();
}
}
/**
* Decrement number of safe blocks if current block has fallen below minimal
* replication.
* If safe mode is not currently on, this is a no-op.
*/
synchronized void decrementSafeBlockCount(BlockInfo b) {
assert namesystem.hasWriteLock();
if (status == BMSafeModeStatus.OFF) {
return;
}
BlockInfo storedBlock = blockManager.getStoredBlock(b);
if (storedBlock.isComplete() &&
blockManager.countNodes(b).liveReplicas() == safeReplication - 1) {
this.blockSafe--;
assert blockSafe >= 0;
checkSafeMode();
}
}
void close() {
assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!";
try {
smmthread.interrupt();
smmthread.join(3000);
} catch (InterruptedException ignored) {
}
}
/**
* Get time (counting in milliseconds) left to leave extension period.
*
* Negative value indicates the extension period has passed.
*/
private long timeToLeaveExtension() {
return reachedTime.get() + extension - monotonicNow();
}
/** Check if we are ready to initialize replication queues. */
private void initializeReplQueuesIfNecessary() {
assert namesystem.hasWriteLock();
// Whether it has reached the threshold for initializing replication queues.
boolean canInitializeReplQueues = blockManager.shouldPopulateReplQueues() &&
blockSafe >= blockReplQueueThreshold;
if (canInitializeReplQueues &&
!blockManager.isPopulatingReplQueues() &&
!haEnabled) {
blockManager.initializeReplQueues();
}
}
/**
* @return true if both block and datanode threshold are met else false.
*/
private boolean areThresholdsMet() {
assert namesystem.hasWriteLock();
int datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes();
synchronized (this) {
return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;
}
}
/**
* Checks consistency of the class state.
* This is costly so only runs if asserts are enabled.
*/
private void doConsistencyCheck() {
boolean assertsOn = false;
assert assertsOn = true; // set to true if asserts are on
if (!assertsOn) {
return;
}
int activeBlocks = blockManager.getActiveBlockCount();
synchronized (this) {
if (blockTotal != activeBlocks &&
!(blockSafe >= 0 && blockSafe <= blockTotal)) {
LOG.warn("SafeMode is in inconsistent filesystem state. " +
"BlockManagerSafeMode data: blockTotal={}, blockSafe={}; " +
"BlockManager data: activeBlocks={}",
blockTotal, blockSafe, activeBlocks);
}
}
}
/**
* Print status every 20 seconds.
*/
private void reportStatus(String msg, boolean rightNow) {
assert namesystem.hasWriteLock();
long curTime = monotonicNow();
if(!rightNow && (curTime - lastStatusReport < 20 * 1000)) {
return;
}
NameNode.stateChangeLog.info(msg + " \n" + getSafeModeTip());
lastStatusReport = curTime;
}
/**
* Periodically check whether it is time to leave safe mode.
* This thread starts when the threshold level is reached.
*/
private class SafeModeMonitor implements Runnable {
/** Interval in msec for checking safe mode. */
private static final long RECHECK_INTERVAL = 1000;
@Override
public void run() {
while (namesystem.isRunning()) {
try {
namesystem.writeLock();
if (status == BMSafeModeStatus.OFF) { // Not in safe mode.
break;
}
if (canLeave()) {
// EXTENSION -> OFF
leaveSafeMode(false);
break;
}
} finally {
namesystem.writeUnlock();
}
try {
Thread.sleep(RECHECK_INTERVAL);
} catch (InterruptedException ignored) {
}
}
if (!namesystem.isRunning()) {
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
}
}
/**
* Check whether the safe mode can be turned off by this monitor.
*
* Safe mode can be turned off iff
* the threshold is reached, and
* the extension time has passed.
*/
private boolean canLeave() {
if (timeToLeaveExtension() > 0) {
reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
return false;
} else if (!areThresholdsMet()) {
reportStatus("STATE* Safe mode ON, thresholds not met.", false);
return false;
} else {
return true;
}
}
}
}

View File

@ -88,7 +88,7 @@ public class DatanodeManager {
private final Map<String, DatanodeDescriptor> datanodeMap
= new HashMap<>();
/** Cluster network topology */
/** Cluster network topology. */
private final NetworkTopology networktopology;
/** Host names to datanode descriptors mapping. */
@ -105,7 +105,7 @@ public class DatanodeManager {
private final int defaultIpcPort;
/** Read include/exclude files*/
/** Read include/exclude files. */
private final HostFileManager hostFileManager = new HostFileManager();
/** The period to wait for datanode heartbeat.*/
@ -560,7 +560,7 @@ public class DatanodeManager {
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
}
namesystem.checkSafeMode();
blockManager.checkSafeMode();
}
/**

View File

@ -256,7 +256,8 @@ class Checkpointer extends Daemon {
if(backupNode.namesystem.getBlocksTotal() > 0) {
long completeBlocksTotal =
backupNode.namesystem.getCompleteBlocksTotal();
backupNode.namesystem.setBlockTotal(completeBlocksTotal);
backupNode.namesystem.getBlockManager().setBlockTotal(
completeBlocksTotal);
}
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
if (!backupNode.namesystem.isRollingUpgrade()) {

View File

@ -150,7 +150,7 @@ class FSDirDeleteOp {
if (filesRemoved) {
fsn.removeLeasesAndINodes(removedUCFiles, removedINodes, false);
fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}
}

View File

@ -312,7 +312,8 @@ class FSDirRenameOp {
unprotectedRenameTo(fsd, src, dst, srcIIP, dstIIP, timestamp,
collectedBlocks, options);
if (!collectedBlocks.getToDeleteList().isEmpty()) {
fsd.getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
fsd.getFSNamesystem().getBlockManager()
.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}
}

View File

@ -191,7 +191,7 @@ final class FSDirTruncateOp {
}
assert onBlockBoundary == (truncateBlock == null) :
"truncateBlock is null iff on block boundary: " + truncateBlock;
fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
fsn.getBlockManager().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}
/**

View File

@ -747,7 +747,8 @@ public class FSEditLogLoader {
deleteSnapshotOp.snapshotName,
new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
collectedBlocks, removedINodes, null));
fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
collectedBlocks);
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();

View File

@ -383,7 +383,7 @@ public class NameNode implements NameNodeStatusMXBean {
return rpcServer;
}
static void initMetrics(Configuration conf, NamenodeRole role) {
public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}
@ -1682,11 +1682,9 @@ public class NameNode implements NameNodeStatusMXBean {
HAServiceState retState = state.getServiceState();
HAServiceStatus ret = new HAServiceStatus(retState);
if (retState == HAServiceState.STANDBY) {
String safemodeTip = namesystem.getSafeModeTip();
if (!safemodeTip.isEmpty()) {
ret.setNotReadyToBecomeActive(
"The NameNode is in safemode. " +
safemodeTip);
if (namesystem.isInSafeMode()) {
ret.setNotReadyToBecomeActive("The NameNode is in safemode. " +
namesystem.getSafeModeTip());
} else {
ret.setReadyToBecomeActive();
}

View File

@ -48,10 +48,10 @@ public interface Namesystem extends RwLock, SafeMode {
BlockCollection getBlockCollection(long id);
void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
void checkOperation(OperationCategory read) throws StandbyException;
void startSecretManagerIfNecessary();
/**
* Gets the erasure coding policy for the path
* @param src
@ -67,4 +67,15 @@ public interface Namesystem extends RwLock, SafeMode {
CacheManager getCacheManager();
HAContext getHAContext();
/**
* @return true if the HA is enabled else false
*/
boolean isHaEnabled();
/**
* @return Whether the namenode is transitioning to active state and is in the
* middle of the starting active services.
*/
boolean inTransitionToActive();
}

View File

@ -18,18 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
/** SafeMode related operations. */
@InterfaceAudience.Private
public interface SafeMode {
/**
* Check safe mode conditions.
* If the corresponding conditions are satisfied,
* trigger the system to enter/leave safe mode.
*/
public void checkSafeMode();
/** Is the system in safe mode? */
public boolean isInSafeMode();
@ -38,14 +30,4 @@ public interface SafeMode {
* safe mode turned on automatically?
*/
public boolean isInStartupSafeMode();
/**
* Increment number of blocks that reached minimal replication.
* @param replication current replication
* @param storedBlock current stored Block
*/
public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
/** Decrement number of blocks that reached minimal replication. */
public void decrementSafeBlockCount(BlockInfo b);
}

View File

@ -219,7 +219,7 @@ public class TestSafeMode {
}
}, 10, 10000);
final int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
final long safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
assertTrue("Expected first block report to make some blocks safe.", safe > 0);
assertTrue("Did not expect first block report to make all blocks safe.", safe < 15);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -35,6 +36,7 @@ import org.apache.hadoop.util.Daemon;
import org.junit.Assert;
import com.google.common.base.Preconditions;
import org.mockito.internal.util.reflection.Whitebox;
public class BlockManagerTestUtil {
public static void setNodeReplicationLimit(final BlockManager blockManager,
@ -314,4 +316,11 @@ public class BlockManagerTestUtil {
Block block, DatanodeStorageInfo[] targets) {
node.addBlockToBeReplicated(block, targets);
}
public static void setStartupSafeModeForTest(BlockManager bm) {
BlockManagerSafeMode bmSafeMode = (BlockManagerSafeMode)Whitebox
.getInternalState(bm, "bmSafeMode");
Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
Whitebox.setInternalState(bmSafeMode, "status", BMSafeModeStatus.EXTENSION);
}
}

View File

@ -0,0 +1,420 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* This test is for testing {@link BlockManagerSafeMode} package local APIs.
*
* They use heavily mocked objects, treating the {@link BlockManagerSafeMode}
* as white-box. Tests are light-weight thus no multi-thread scenario or real
* mini-cluster is tested.
*
* @see org.apache.hadoop.hdfs.TestSafeMode
* @see org.apache.hadoop.hdfs.server.namenode.ha.TestHASafeMode
* @see org.apache.hadoop.hdfs.TestSafeModeWithStripedFile
*/
public class TestBlockManagerSafeMode {
private static final int DATANODE_NUM = 3;
private static final long BLOCK_TOTAL = 10;
private static final double THRESHOLD = 0.99;
private static final long BLOCK_THRESHOLD = (long)(BLOCK_TOTAL * THRESHOLD);
private static final int EXTENSION = 1000; // 1 second
private BlockManager bm;
private DatanodeManager dn;
private BlockManagerSafeMode bmSafeMode;
/**
* Set up the mock context.
*
* - extension is always needed (default period is {@link #EXTENSION} ms
* - datanode threshold is always reached via mock
* - safe block is 0 and it needs {@link #BLOCK_THRESHOLD} to reach threshold
* - write/read lock is always held by current thread
*
* @throws IOException
*/
@Before
public void setupMockCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setDouble(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
THRESHOLD);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY,
EXTENSION);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
DATANODE_NUM);
FSNamesystem fsn = mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
Mockito.doReturn(true).when(fsn).hasReadLock();
Mockito.doReturn(true).when(fsn).isRunning();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
bm = spy(new BlockManager(fsn, conf));
dn = spy(bm.getDatanodeManager());
Whitebox.setInternalState(bm, "datanodeManager", dn);
// the datanode threshold is always met
when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf);
}
/**
* Test set block total.
*
* The block total is set which will call checkSafeMode for the first time
* and bmSafeMode transfers from INITIALIZED to PENDING_THRESHOLD status
*/
@Test(timeout = 30000)
public void testInitialize() {
assertFalse("Block manager should not be in safe mode at beginning.",
bmSafeMode.isInSafeMode());
bmSafeMode.activate(BLOCK_TOTAL);
assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
assertTrue(bmSafeMode.isInSafeMode());
}
/**
* Test the state machine transition.
*/
@Test(timeout = 30000)
public void testCheckSafeMode() {
bmSafeMode.activate(BLOCK_TOTAL);
// stays in PENDING_THRESHOLD: pending block threshold
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
for (long i = 0; i < BLOCK_THRESHOLD; i++) {
setBlockSafe(i);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.PENDING_THRESHOLD, getSafeModeStatus());
}
// PENDING_THRESHOLD -> EXTENSION
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
Whitebox.setInternalState(bmSafeMode, "smmthread", null);
// PENDING_THRESHOLD -> OFF
setSafeModeStatus(BMSafeModeStatus.PENDING_THRESHOLD);
setBlockSafe(BLOCK_THRESHOLD);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.OFF, getSafeModeStatus());
// stays in EXTENSION
setSafeModeStatus(BMSafeModeStatus.EXTENSION);
setBlockSafe(0);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
// stays in EXTENSION: pending extension period
setSafeModeStatus(BMSafeModeStatus.EXTENSION);
setBlockSafe(BLOCK_THRESHOLD);
Whitebox.setInternalState(bmSafeMode, "extension", Integer.MAX_VALUE);
bmSafeMode.checkSafeMode();
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
}
/**
* Test that the block safe increases up to block threshold.
*
* Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
*/
@Test(timeout = 30000)
public void testIncrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
for (long i = 1; i <= BLOCK_TOTAL; i++) {
BlockInfo blockInfo = mock(BlockInfo.class);
doReturn(false).when(blockInfo).isStriped();
bmSafeMode.incrementSafeBlockCount(1, blockInfo);
if (i < BLOCK_THRESHOLD) {
assertEquals(i, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
} else {
// block manager leaves safe mode if block threshold is met
assertFalse(bmSafeMode.isInSafeMode());
// the increment will be a no-op if safe mode is OFF
assertEquals(BLOCK_THRESHOLD, getblockSafe());
}
}
}
/**
* Test that the block safe increases up to block threshold.
*
* Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> EXTENSION-> OFF
*/
@Test(timeout = 30000)
public void testIncrementSafeBlockCountWithExtension() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
for (long i = 1; i <= BLOCK_TOTAL; i++) {
BlockInfo blockInfo = mock(BlockInfo.class);
doReturn(false).when(blockInfo).isStriped();
bmSafeMode.incrementSafeBlockCount(1, blockInfo);
if (i < BLOCK_THRESHOLD) {
assertTrue(bmSafeMode.isInSafeMode());
}
}
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test that the block safe decreases the block safe.
*
* The block manager stays in safe mode.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD
*/
@Test(timeout = 30000)
public void testDecrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
mockBlockManagerForBlockSafeDecrement();
setBlockSafe(BLOCK_THRESHOLD);
for (long i = BLOCK_THRESHOLD; i > 0; i--) {
BlockInfo blockInfo = mock(BlockInfo.class);
bmSafeMode.decrementSafeBlockCount(blockInfo);
assertEquals(i - 1, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
}
}
/**
* Test when the block safe increment and decrement interleave.
*
* Both the increment and decrement will be a no-op if the safe mode is OFF.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF
*/
@Test(timeout = 30000)
public void testIncrementAndDecrementSafeBlockCount() {
bmSafeMode.activate(BLOCK_TOTAL);
Whitebox.setInternalState(bmSafeMode, "extension", 0);
mockBlockManagerForBlockSafeDecrement();
for (long i = 1; i <= BLOCK_TOTAL; i++) {
BlockInfo blockInfo = mock(BlockInfo.class);
doReturn(false).when(blockInfo).isStriped();
bmSafeMode.incrementSafeBlockCount(1, blockInfo);
bmSafeMode.decrementSafeBlockCount(blockInfo);
bmSafeMode.incrementSafeBlockCount(1, blockInfo);
if (i < BLOCK_THRESHOLD) {
assertEquals(i, getblockSafe());
assertTrue(bmSafeMode.isInSafeMode());
} else {
// block manager leaves safe mode if block threshold is met
assertEquals(BLOCK_THRESHOLD, getblockSafe());
assertFalse(bmSafeMode.isInSafeMode());
}
}
}
/**
* Test the safe mode monitor.
*
* The monitor will make block manager leave the safe mode after extension
* period.
*/
@Test(timeout = 30000)
public void testSafeModeMonitor() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
setBlockSafe(BLOCK_THRESHOLD);
// PENDING_THRESHOLD -> EXTENSION
bmSafeMode.checkSafeMode();
assertTrue(bmSafeMode.isInSafeMode());
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test block manager won't leave safe mode if datanode threshold is not met.
*/
@Test(timeout = 30000)
public void testDatanodeThreshodShouldBeMet() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
// All datanode have not registered yet.
when(dn.getNumLiveDataNodes()).thenReturn(1);
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
assertTrue(bmSafeMode.isInSafeMode());
// The datanode number reaches threshold after all data nodes register
when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
bmSafeMode.checkSafeMode();
waitForExtensionPeriod();
assertFalse(bmSafeMode.isInSafeMode());
}
/**
* Test block manager won't leave safe mode if there are orphan blocks.
*/
@Test(timeout = 30000)
public void testStayInSafeModeWhenBytesInFuture() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
when(bm.getBytesInFuture()).thenReturn(1L);
// safe blocks are enough
setBlockSafe(BLOCK_THRESHOLD);
// PENDING_THRESHOLD -> EXTENSION
bmSafeMode.checkSafeMode();
try {
waitForExtensionPeriod();
fail("Safe mode should not leave extension period with orphan blocks!");
} catch (TimeoutException e) {
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
}
}
/**
* Test get safe mode tip.
*/
@Test(timeout = 30000)
public void testGetSafeModeTip() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
String tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format(
"The reported blocks %d needs additional %d blocks to reach the " +
"threshold %.4f of total blocks %d.%n",
0, BLOCK_THRESHOLD, THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("Safe mode will be turned off automatically once " +
"the thresholds have been reached."));
// safe blocks are enough
setBlockSafe(BLOCK_THRESHOLD);
bmSafeMode.checkSafeMode();
tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("In safe mode extension. Safe mode will be turned" +
" off automatically in"));
waitForExtensionPeriod();
tip = bmSafeMode.getSafeModeTip();
System.out.println(tip);
assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " +
"minimum number %d. ", dn.getNumLiveDataNodes(), DATANODE_NUM)));
assertTrue(tip.contains("Safe mode will be turned off automatically soon"));
}
/**
* Mock block manager internal state for decrement safe block
*/
private void mockBlockManagerForBlockSafeDecrement() {
BlockInfo storedBlock = mock(BlockInfo.class);
when(storedBlock.isComplete()).thenReturn(true);
doReturn(storedBlock).when(bm).getStoredBlock(any(Block.class));
NumberReplicas numberReplicas = mock(NumberReplicas.class);
when(numberReplicas.liveReplicas()).thenReturn(0);
doReturn(numberReplicas).when(bm).countNodes(any(Block.class));
}
/**
* Wait the bmSafeMode monitor for the extension period.
* @throws InterruptedIOException
* @throws TimeoutException
*/
private void waitForExtensionPeriod() throws Exception{
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return getSafeModeStatus() != BMSafeModeStatus.EXTENSION;
}
}, EXTENSION / 10, EXTENSION * 2);
}
private void setSafeModeStatus(BMSafeModeStatus status) {
Whitebox.setInternalState(bmSafeMode, "status", status);
}
private BMSafeModeStatus getSafeModeStatus() {
return (BMSafeModeStatus)Whitebox.getInternalState(bmSafeMode, "status");
}
private void setBlockSafe(long blockSafe) {
Whitebox.setInternalState(bmSafeMode, "blockSafe", blockSafe);
}
private long getblockSafe() {
return (long)Whitebox.getInternalState(bmSafeMode, "blockSafe");
}
}

View File

@ -1359,7 +1359,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.hasReadLock()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -236,12 +235,13 @@ public class NameNodeAdapter {
* @return the number of blocks marked safe by safemode, or -1
* if safemode is not running.
*/
public static int getSafeModeSafeBlocks(NameNode nn) {
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
if (smi == null) {
public static long getSafeModeSafeBlocks(NameNode nn) {
if (!nn.getNamesystem().isInSafeMode()) {
return -1;
}
return smi.blockSafe;
Object bmSafeMode = Whitebox.getInternalState(
nn.getNamesystem().getBlockManager(), "bmSafeMode");
return (long)Whitebox.getInternalState(bmSafeMode, "blockSafe");
}
/**

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
@ -332,7 +331,11 @@ public class TestFSNamesystem {
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
SafeModeInfo safemodeInfo = fsn.getSafeModeInfoForTests();
assertTrue(safemodeInfo.toString().contains("Minimal replication = 2"));
Object bmSafeMode = Whitebox.getInternalState(fsn.getBlockManager(),
"bmSafeMode");
int safeReplication = (int)Whitebox.getInternalState(bmSafeMode,
"safeReplication");
assertEquals(2, safeReplication);
}
}

View File

@ -103,8 +103,6 @@ public class TestNameNodeMetadataConsistency {
// we also need to tell block manager that we are in the startup path
FSNamesystem spyNameSystem = spy(cluster.getNameNode().getNamesystem());
spyNameSystem.enableSafeModeForTesting(conf);
Whitebox.setInternalState(cluster.getNameNode()
.getNamesystem().getBlockManager(),
"namesystem", spyNameSystem);

View File

@ -32,7 +32,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -56,7 +55,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
@ -126,10 +124,13 @@ public class TestHASafeMode {
.synchronizedMap(new HashMap<Path, Boolean>());
final Path test = new Path("/test");
// let nn0 enter safemode
cluster.getConfiguration(0).setInt(
DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 3);
NameNodeAdapter.enterSafeMode(nn0, false);
SafeModeInfo safeMode = (SafeModeInfo) Whitebox.getInternalState(
nn0.getNamesystem(), "safeMode");
Whitebox.setInternalState(safeMode, "extension", Integer.valueOf(30000));
Whitebox.setInternalState(nn0.getNamesystem(), "manualSafeMode", false);
BlockManagerTestUtil.setStartupSafeModeForTest(nn0.getNamesystem()
.getBlockManager());
assertTrue(nn0.getNamesystem().isInStartupSafeMode());
LOG.info("enter safemode");
new Thread() {
@Override