HDFS-9498. Move code that tracks blocks with future generation stamps to BlockManagerSafeMode. (Contributed by Mingliang Liu)

This commit is contained in:
Arpit Agarwal 2016-01-06 10:30:59 -08:00
parent b9936689c9
commit 67c9780609
12 changed files with 215 additions and 154 deletions

View File

@ -1,4 +1,4 @@
Hadoop HDFS Change Log Hadoop HDFS Change Log
Trunk (Unreleased) Trunk (Unreleased)
@ -1800,6 +1800,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7779. Support changing ownership, group and replication in HDFS Web HDFS-7779. Support changing ownership, group and replication in HDFS Web
UI. (Ravi Prakash via wheat9) UI. (Ravi Prakash via wheat9)
HDFS-9498. Move code that tracks blocks with future generation stamps
to BlockManagerSafeMode. (Mingliang Liu via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -73,7 +73,6 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@ -309,12 +308,6 @@ public class BlockManager implements BlockStatsMXBean {
/** Check whether there are any non-EC blocks using StripedID */ /** Check whether there are any non-EC blocks using StripedID */
private boolean hasNonEcBlockUsingStripedID = false; private boolean hasNonEcBlockUsingStripedID = false;
/** Keeps track of how many bytes are in Future Generation blocks. */
private AtomicLong numberOfBytesInFutureBlocks;
/** Reports if Name node was started with Rollback option. */
private boolean inRollBack = false;
public BlockManager(final Namesystem namesystem, final Configuration conf) public BlockManager(final Namesystem namesystem, final Configuration conf)
throws IOException { throws IOException {
this.namesystem = namesystem; this.namesystem = namesystem;
@ -393,8 +386,6 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT, DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT); DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
this.blockReportLeaseManager = new BlockReportLeaseManager(conf); this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
this.numberOfBytesInFutureBlocks = new AtomicLong();
this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf));
bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
@ -1964,14 +1955,18 @@ public class BlockManager implements BlockStatsMXBean {
return bmSafeMode.getSafeModeTip(); return bmSafeMode.getSafeModeTip();
} }
public void leaveSafeMode(boolean force) { public boolean leaveSafeMode(boolean force) {
bmSafeMode.leaveSafeMode(force); return bmSafeMode.leaveSafeMode(force);
} }
void checkSafeMode() { void checkSafeMode() {
bmSafeMode.checkSafeMode(); bmSafeMode.checkSafeMode();
} }
public long getBytesInFuture() {
return bmSafeMode.getBytesInFuture();
}
/** /**
* Removes the blocks from blocksmap and updates the safemode blocks total. * Removes the blocks from blocksmap and updates the safemode blocks total.
* @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a * @param blocks An instance of {@link BlocksMapUpdateInfo} which contains a
@ -2370,12 +2365,7 @@ public class BlockManager implements BlockStatsMXBean {
// If block does not belong to any file, we check if it violates // If block does not belong to any file, we check if it violates
// an integrity assumption of Name node // an integrity assumption of Name node
if (storedBlock == null) { if (storedBlock == null) {
if (namesystem.isInStartupSafeMode() bmSafeMode.checkBlocksWithFutureGS(iblk);
&& !shouldPostponeBlocksFromFuture
&& !inRollBack
&& namesystem.isGenStampInFuture(iblk)) {
numberOfBytesInFutureBlocks.addAndGet(iblk.getBytesOnDisk());
}
continue; continue;
} }
@ -4254,39 +4244,8 @@ public class BlockManager implements BlockStatsMXBean {
return haContext.getState().shouldPopulateReplQueues(); return haContext.getState().shouldPopulateReplQueues();
} }
/** boolean getShouldPostponeBlocksFromFuture() {
* Returns the number of bytes that reside in blocks with Generation Stamps return shouldPostponeBlocksFromFuture;
* greater than generation stamp known to Namenode.
*
* @return Bytes in future
*/
public long getBytesInFuture() {
return numberOfBytesInFutureBlocks.get();
}
/**
* Clears the bytes in future counter.
*/
public void clearBytesInFuture() {
numberOfBytesInFutureBlocks.set(0);
}
/**
* Returns true if Namenode was started with a RollBack option.
*
* @param option - StartupOption
* @return boolean
*/
private boolean isInRollBackMode(HdfsServerConstants.StartupOption option) {
if (option == HdfsServerConstants.StartupOption.ROLLBACK) {
return true;
}
if ((option == HdfsServerConstants.StartupOption.ROLLINGUPGRADE) &&
(option.getRollingUpgradeStartupOption() ==
HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK)) {
return true;
}
return false;
} }
// async processing of an action, used for IBRs. // async processing of an action, used for IBRs.

View File

@ -21,6 +21,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@ -106,6 +109,11 @@ class BlockManagerSafeMode {
/** Counter for tracking startup progress of reported blocks. */ /** Counter for tracking startup progress of reported blocks. */
private Counter awaitingReportedBlocksCounter; private Counter awaitingReportedBlocksCounter;
/** Keeps track of how many bytes are in Future Generation blocks. */
private final AtomicLong numberOfBytesInFutureBlocks = new AtomicLong();
/** Reports if Name node was started with Rollback option. */
private final boolean inRollBack;
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem, BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
Configuration conf) { Configuration conf) {
this.blockManager = blockManager; this.blockManager = blockManager;
@ -135,9 +143,10 @@ class BlockManagerSafeMode {
this.replQueueThreshold = this.replQueueThreshold =
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
(float) threshold); (float) threshold);
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
this.inRollBack = isInRollBackMode(NameNode.getStartupOption(conf));
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold); LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, threshold);
LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, LOG.info("{} = {}", DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
datanodeThreshold); datanodeThreshold);
@ -300,15 +309,15 @@ class BlockManagerSafeMode {
numLive, datanodeThreshold); numLive, datanodeThreshold);
} }
if (blockManager.getBytesInFuture() > 0) { if (getBytesInFuture() > 0) {
msg += "Name node detected blocks with generation stamps " + msg += "Name node detected blocks with generation stamps " +
"in future. This means that Name node metadata is inconsistent." + "in future. This means that Name node metadata is inconsistent. " +
"This can happen if Name node metadata files have been manually " + "This can happen if Name node metadata files have been manually " +
"replaced. Exiting safe mode will cause loss of " + blockManager "replaced. Exiting safe mode will cause loss of " +
.getBytesInFuture() + " byte(s). Please restart name node with " + getBytesInFuture() + " byte(s). Please restart name node with " +
"right metadata or use \"hdfs dfsadmin -safemode forceExit" + "right metadata or use \"hdfs dfsadmin -safemode forceExit\" " +
"if you are certain that the NameNode was started with the" + "if you are certain that the NameNode was started with the " +
"correct FsImage and edit logs. If you encountered this during" + "correct FsImage and edit logs. If you encountered this during " +
"a rollback, it is safe to exit with -safemode forceExit."; "a rollback, it is safe to exit with -safemode forceExit.";
return msg; return msg;
} }
@ -333,11 +342,31 @@ class BlockManagerSafeMode {
/** /**
* Leave start up safe mode. * Leave start up safe mode.
*
* @param force - true to force exit * @param force - true to force exit
* @return true if it leaves safe mode successfully else false
*/ */
void leaveSafeMode(boolean force) { boolean leaveSafeMode(boolean force) {
assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!"; assert namesystem.hasWriteLock() : "Leaving safe mode needs write lock!";
final long bytesInFuture = numberOfBytesInFutureBlocks.get();
if (bytesInFuture > 0) {
if (force) {
LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
+ "loss of {} byte(s).", bytesInFuture);
numberOfBytesInFutureBlocks.set(0);
} else {
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of {} byte(s). Please " +
"use -forceExit flag to exit safe mode forcefully if data loss is" +
" acceptable.", bytesInFuture);
return false;
}
} else if (force) {
LOG.warn("forceExit used when normal exist would suffice. Treating " +
"force exit as normal safe mode exit.");
}
// if not done yet, initialize replication queues. // if not done yet, initialize replication queues.
// In the standby, do not populate repl queues // In the standby, do not populate repl queues
if (!blockManager.isPopulatingReplQueues() && if (!blockManager.isPopulatingReplQueues() &&
@ -345,14 +374,6 @@ class BlockManagerSafeMode {
blockManager.initializeReplQueues(); blockManager.initializeReplQueues();
} }
if (!force && blockManager.getBytesInFuture() > 0) {
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of {} byte(s). Please use " +
"-forceExit flag to exit safe mode forcefully if data loss is " +
"acceptable.", blockManager.getBytesInFuture());
return;
}
if (status != BMSafeModeStatus.OFF) { if (status != BMSafeModeStatus.OFF) {
NameNode.stateChangeLog.info("STATE* Safe mode is OFF"); NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
} }
@ -379,6 +400,8 @@ class BlockManagerSafeMode {
BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS); BlockManagerSafeMode.STEP_AWAITING_REPORTED_BLOCKS);
prog.endPhase(Phase.SAFEMODE); prog.endPhase(Phase.SAFEMODE);
} }
return true;
} }
/** /**
@ -436,6 +459,35 @@ class BlockManagerSafeMode {
} }
} }
/**
* Check if the block report replica has a generation stamp (GS) in future.
* If safe mode is not currently on, this is a no-op.
*
* @param brr block report replica which belongs to no file in BlockManager
*/
void checkBlocksWithFutureGS(BlockReportReplica brr) {
assert namesystem.hasWriteLock();
if (status == BMSafeModeStatus.OFF) {
return;
}
if (!blockManager.getShouldPostponeBlocksFromFuture() &&
!inRollBack &&
namesystem.isGenStampInFuture(brr)) {
numberOfBytesInFutureBlocks.addAndGet(brr.getBytesOnDisk());
}
}
/**
* Returns the number of bytes that reside in blocks with Generation Stamps
* greater than generation stamp known to Namenode.
*
* @return Bytes in future
*/
long getBytesInFuture() {
return numberOfBytesInFutureBlocks.get();
}
void close() { void close() {
assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!"; assert namesystem.hasWriteLock() : "Closing bmSafeMode needs write lock!";
try { try {
@ -454,6 +506,19 @@ class BlockManagerSafeMode {
return reachedTime.get() + extension - monotonicNow(); return reachedTime.get() + extension - monotonicNow();
} }
/**
* Returns true if Namenode was started with a RollBack option.
*
* @param option - StartupOption
* @return boolean
*/
private static boolean isInRollBackMode(StartupOption option) {
return (option == StartupOption.ROLLBACK) ||
(option == StartupOption.ROLLINGUPGRADE &&
option.getRollingUpgradeStartupOption() ==
RollingUpgradeStartupOption.ROLLBACK);
}
/** Check if we are ready to initialize replication queues. */ /** Check if we are ready to initialize replication queues. */
private void initializeReplQueuesIfNecessary() { private void initializeReplQueuesIfNecessary() {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();

View File

@ -986,6 +986,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override @Override
public void startSecretManagerIfNecessary() { public void startSecretManagerIfNecessary() {
assert hasWriteLock() : "Starting secret manager needs write lock";
boolean shouldRun = shouldUseDelegationTokens() && boolean shouldRun = shouldUseDelegationTokens() &&
!isInSafeMode() && getEditLog().isOpenForWrite(); !isInSafeMode() && getEditLog().isOpenForWrite();
boolean running = dtSecretManager.isRunning(); boolean running = dtSecretManager.isRunning();
@ -4006,29 +4007,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkSuperuserPrivilege(); checkSuperuserPrivilege();
switch(action) { switch(action) {
case SAFEMODE_LEAVE: // leave safe mode case SAFEMODE_LEAVE: // leave safe mode
if (blockManager.getBytesInFuture() > 0) { leaveSafeMode(false);
LOG.error("Refusing to leave safe mode without a force flag. " +
"Exiting safe mode will cause a deletion of " + blockManager
.getBytesInFuture() + " byte(s). Please use " +
"-forceExit flag to exit safe mode forcefully and data loss is " +
"acceptable.");
} else {
leaveSafeMode();
}
break; break;
case SAFEMODE_ENTER: // enter safe mode case SAFEMODE_ENTER: // enter safe mode
enterSafeMode(false); enterSafeMode(false);
break; break;
case SAFEMODE_FORCE_EXIT: case SAFEMODE_FORCE_EXIT:
if (blockManager.getBytesInFuture() > 0) { leaveSafeMode(true);
LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
+ "loss of " + blockManager.getBytesInFuture() + " byte(s).");
blockManager.clearBytesInFuture();
} else {
LOG.warn("forceExit used when normal exist would suffice. Treating " +
"force exit as normal safe mode exit.");
}
leaveSafeMode();
break; break;
default: default:
LOG.error("Unexpected safe mode action"); LOG.error("Unexpected safe mode action");
@ -4125,16 +4110,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** /**
* Leave safe mode. * Leave safe mode.
* @param force true if to leave safe mode forcefully with -forceExit option
*/ */
void leaveSafeMode() { void leaveSafeMode(boolean force) {
writeLock(); writeLock();
try { try {
if (!isInSafeMode()) { if (!isInSafeMode()) {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF"); NameNode.stateChangeLog.info("STATE* Safe mode is already OFF");
return; return;
} }
setManualAndResourceLowSafeMode(false, false); if (blockManager.leaveSafeMode(force)) {
blockManager.leaveSafeMode(true); setManualAndResourceLowSafeMode(false, false);
startSecretManagerIfNecessary();
}
} finally { } finally {
writeUnlock(); writeUnlock();
} }

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerSafeMode.BMSafeModeStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -31,7 +32,6 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException; import java.io.IOException;
@ -42,7 +42,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -92,9 +91,10 @@ public class TestBlockManagerSafeMode {
DATANODE_NUM); DATANODE_NUM);
FSNamesystem fsn = mock(FSNamesystem.class); FSNamesystem fsn = mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock(); doReturn(true).when(fsn).hasWriteLock();
Mockito.doReturn(true).when(fsn).hasReadLock(); doReturn(true).when(fsn).hasReadLock();
Mockito.doReturn(true).when(fsn).isRunning(); doReturn(true).when(fsn).isRunning();
doReturn(true).when(fsn).isGenStampInFuture(any(Block.class));
NameNode.initMetrics(conf, NamenodeRole.NAMENODE); NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
bm = spy(new BlockManager(fsn, conf)); bm = spy(new BlockManager(fsn, conf));
@ -110,7 +110,7 @@ public class TestBlockManagerSafeMode {
* Test set block total. * Test set block total.
* *
* The block total is set which will call checkSafeMode for the first time * The block total is set which will call checkSafeMode for the first time
* and bmSafeMode transfers from INITIALIZED to PENDING_THRESHOLD status * and bmSafeMode transfers from OFF to PENDING_THRESHOLD status
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testInitialize() { public void testInitialize() {
@ -170,7 +170,7 @@ public class TestBlockManagerSafeMode {
* *
* Once the block threshold is reached, the block manger leaves safe mode and * Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op. * increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF * The safe mode status lifecycle: OFF -> PENDING_THRESHOLD -> OFF
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testIncrementSafeBlockCount() { public void testIncrementSafeBlockCount() {
@ -198,7 +198,7 @@ public class TestBlockManagerSafeMode {
* *
* Once the block threshold is reached, the block manger leaves safe mode and * Once the block threshold is reached, the block manger leaves safe mode and
* increment will be a no-op. * increment will be a no-op.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> EXTENSION-> OFF * The safe mode status lifecycle: OFF -> PENDING_THRESHOLD -> EXTENSION-> OFF
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testIncrementSafeBlockCountWithExtension() throws Exception { public void testIncrementSafeBlockCountWithExtension() throws Exception {
@ -220,7 +220,7 @@ public class TestBlockManagerSafeMode {
* Test that the block safe decreases the block safe. * Test that the block safe decreases the block safe.
* *
* The block manager stays in safe mode. * The block manager stays in safe mode.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD * The safe mode status lifecycle: OFF -> PENDING_THRESHOLD
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testDecrementSafeBlockCount() { public void testDecrementSafeBlockCount() {
@ -242,7 +242,7 @@ public class TestBlockManagerSafeMode {
* Test when the block safe increment and decrement interleave. * Test when the block safe increment and decrement interleave.
* *
* Both the increment and decrement will be a no-op if the safe mode is OFF. * Both the increment and decrement will be a no-op if the safe mode is OFF.
* The safe mode status lifecycle: INITIALIZED -> PENDING_THRESHOLD -> OFF * The safe mode status lifecycle: OFF -> PENDING_THRESHOLD -> OFF
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testIncrementAndDecrementSafeBlockCount() { public void testIncrementAndDecrementSafeBlockCount() {
@ -309,24 +309,31 @@ public class TestBlockManagerSafeMode {
} }
/** /**
* Test block manager won't leave safe mode if there are orphan blocks. * Test block manager won't leave safe mode if there are blocks with
* generation stamp (GS) in future.
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testStayInSafeModeWhenBytesInFuture() throws Exception { public void testStayInSafeModeWhenBytesInFuture() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL); bmSafeMode.activate(BLOCK_TOTAL);
when(bm.getBytesInFuture()).thenReturn(1L); // Inject blocks with future GS
injectBlocksWithFugureGS(100L);
assertEquals(100L, bmSafeMode.getBytesInFuture());
// safe blocks are enough // safe blocks are enough
setBlockSafe(BLOCK_THRESHOLD); setBlockSafe(BLOCK_THRESHOLD);
// PENDING_THRESHOLD -> EXTENSION // PENDING_THRESHOLD -> EXTENSION
bmSafeMode.checkSafeMode(); bmSafeMode.checkSafeMode();
try {
waitForExtensionPeriod(); assertFalse("Shouldn't leave safe mode in case of blocks with future GS! ",
fail("Safe mode should not leave extension period with orphan blocks!"); bmSafeMode.leaveSafeMode(false));
} catch (TimeoutException e) { assertTrue("Leaving safe mode forcefully should succeed regardless of " +
assertEquals(BMSafeModeStatus.EXTENSION, getSafeModeStatus()); "blocks with future GS.", bmSafeMode.leaveSafeMode(true));
} assertEquals("Number of blocks with future GS should have been cleared " +
"after leaving safe mode", 0L, bmSafeMode.getBytesInFuture());
assertTrue("Leaving safe mode should succeed after blocks with future GS " +
"are cleared.", bmSafeMode.leaveSafeMode(false));
} }
/** /**
@ -353,7 +360,7 @@ public class TestBlockManagerSafeMode {
tip = bmSafeMode.getSafeModeTip(); tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains( assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold" String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ", + " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL))); getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains( assertTrue(tip.contains(
String.format("The number of live datanodes %d has reached the " + String.format("The number of live datanodes %d has reached the " +
@ -363,7 +370,6 @@ public class TestBlockManagerSafeMode {
waitForExtensionPeriod(); waitForExtensionPeriod();
tip = bmSafeMode.getSafeModeTip(); tip = bmSafeMode.getSafeModeTip();
System.out.println(tip);
assertTrue(tip.contains( assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold" String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ", + " %.4f of total blocks %d. ",
@ -375,7 +381,55 @@ public class TestBlockManagerSafeMode {
} }
/** /**
* Mock block manager internal state for decrement safe block * Test get safe mode tip in case of blocks with future GS.
*/
@Test(timeout = 30000)
public void testGetSafeModeTipForBlocksWithFutureGS() throws Exception {
bmSafeMode.activate(BLOCK_TOTAL);
injectBlocksWithFugureGS(40L);
String tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format(
"The reported blocks %d needs additional %d blocks to reach the " +
"threshold %.4f of total blocks %d.%n",
0, BLOCK_THRESHOLD, THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
"Name node detected blocks with generation stamps " +
"in future. This means that Name node metadata is inconsistent. " +
"This can happen if Name node metadata files have been manually " +
"replaced. Exiting safe mode will cause loss of " +
40 + " byte(s). Please restart name node with " +
"right metadata or use \"hdfs dfsadmin -safemode forceExit\" " +
"if you are certain that the NameNode was started with the " +
"correct FsImage and edit logs. If you encountered this during " +
"a rollback, it is safe to exit with -safemode forceExit."
));
assertFalse(tip.contains("Safe mode will be turned off"));
// blocks with future GS were already injected before.
setBlockSafe(BLOCK_THRESHOLD);
tip = bmSafeMode.getSafeModeTip();
assertTrue(tip.contains(
String.format("The reported blocks %d has reached the threshold"
+ " %.4f of total blocks %d. ",
getblockSafe(), THRESHOLD, BLOCK_TOTAL)));
assertTrue(tip.contains(
"Name node detected blocks with generation stamps " +
"in future. This means that Name node metadata is inconsistent. " +
"This can happen if Name node metadata files have been manually " +
"replaced. Exiting safe mode will cause loss of " +
40 + " byte(s). Please restart name node with " +
"right metadata or use \"hdfs dfsadmin -safemode forceExit\" " +
"if you are certain that the NameNode was started with the " +
"correct FsImage and edit logs. If you encountered this during " +
"a rollback, it is safe to exit with -safemode forceExit."
));
assertFalse(tip.contains("Safe mode will be turned off"));
}
/**
* Mock block manager internal state for decrement safe block.
*/ */
private void mockBlockManagerForBlockSafeDecrement() { private void mockBlockManagerForBlockSafeDecrement() {
BlockInfo storedBlock = mock(BlockInfo.class); BlockInfo storedBlock = mock(BlockInfo.class);
@ -402,6 +456,12 @@ public class TestBlockManagerSafeMode {
}, EXTENSION / 10, EXTENSION * 2); }, EXTENSION / 10, EXTENSION * 2);
} }
private void injectBlocksWithFugureGS(long numBytesInFuture) {
BlockReportReplica brr = mock(BlockReportReplica.class);
when(brr.getBytesOnDisk()).thenReturn(numBytesInFuture);
bmSafeMode.checkBlocksWithFutureGS(brr);
}
private void setSafeModeStatus(BMSafeModeStatus status) { private void setSafeModeStatus(BMSafeModeStatus status) {
Whitebox.setInternalState(bmSafeMode, "status", status); Whitebox.setInternalState(bmSafeMode, "status", status);
} }

View File

@ -96,7 +96,7 @@ public class NameNodeAdapter {
} }
public static void leaveSafeMode(NameNode namenode) { public static void leaveSafeMode(NameNode namenode) {
namenode.getNamesystem().leaveSafeMode(); namenode.getNamesystem().leaveSafeMode(false);
} }
public static void abortEditLogs(NameNode nn) { public static void abortEditLogs(NameNode nn) {

View File

@ -1608,7 +1608,7 @@ public class TestCheckpoint {
FSNamesystem fsns = cluster.getNamesystem(); FSNamesystem fsns = cluster.getNamesystem();
fsns.enterSafeMode(false); fsns.enterSafeMode(false);
fsns.saveNamespace(0, 0); fsns.saveNamespace(0, 0);
fsns.leaveSafeMode(); fsns.leaveSafeMode(false);
secondary = startSecondaryNameNode(conf); secondary = startSecondaryNameNode(conf);

View File

@ -306,7 +306,7 @@ public class TestEditLogRace {
assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(), assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(),
editLog.getLastWrittenTxId() - 1); editLog.getLastWrittenTxId() - 1);
namesystem.leaveSafeMode(); namesystem.leaveSafeMode(false);
LOG.info("Save " + i + ": complete"); LOG.info("Save " + i + ": complete");
} }
} finally { } finally {

View File

@ -464,7 +464,7 @@ public class TestFSEditLogLoader {
fns.enterSafeMode(false); fns.enterSafeMode(false);
fns.saveNamespace(0, 0); fns.saveNamespace(0, 0);
fns.leaveSafeMode(); fns.leaveSafeMode(false);
// Add a striped block to the file // Add a striped block to the file
BlockInfoStriped stripedBlk = new BlockInfoStriped( BlockInfoStriped stripedBlk = new BlockInfoStriped(
@ -542,7 +542,7 @@ public class TestFSEditLogLoader {
file.toCompleteFile(System.currentTimeMillis()); file.toCompleteFile(System.currentTimeMillis());
fns.enterSafeMode(false); fns.enterSafeMode(false);
fns.saveNamespace(0, 0); fns.saveNamespace(0, 0);
fns.leaveSafeMode(); fns.leaveSafeMode(false);
//update the last block //update the last block
long newBlkNumBytes = 1024*8; long newBlkNumBytes = 1024*8;

View File

@ -105,7 +105,7 @@ public class TestFSNamesystem {
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
FSNamesystem fsn = new FSNamesystem(conf, fsImage); FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.leaveSafeMode(); fsn.leaveSafeMode(false);
assertTrue("After leaving safemode FSNamesystem.isInStartupSafeMode still " assertTrue("After leaving safemode FSNamesystem.isInStartupSafeMode still "
+ "returned true", !fsn.isInStartupSafeMode()); + "returned true", !fsn.isInStartupSafeMode());
assertTrue("After leaving safemode FSNamesystem.isInSafeMode still returned" assertTrue("After leaving safemode FSNamesystem.isInSafeMode still returned"
@ -145,7 +145,7 @@ public class TestFSNamesystem {
assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode()); assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
assertTrue("Replication queues were being populated during very first " assertTrue("Replication queues were being populated during very first "
+ "safemode", !bm.isPopulatingReplQueues()); + "safemode", !bm.isPopulatingReplQueues());
fsn.leaveSafeMode(); fsn.leaveSafeMode(false);
assertTrue("FSNamesystem didn't leave safemode", !fsn.isInSafeMode()); assertTrue("FSNamesystem didn't leave safemode", !fsn.isInSafeMode());
assertTrue("Replication queues weren't being populated even after leaving " assertTrue("Replication queues weren't being populated even after leaving "
+ "safemode", bm.isPopulatingReplQueues()); + "safemode", bm.isPopulatingReplQueues());

View File

@ -490,7 +490,7 @@ public class TestINodeFile {
// Apply editlogs to fsimage, ensure inodeUnderConstruction is handled // Apply editlogs to fsimage, ensure inodeUnderConstruction is handled
fsn.enterSafeMode(false); fsn.enterSafeMode(false);
fsn.saveNamespace(0, 0); fsn.saveNamespace(0, 0);
fsn.leaveSafeMode(); fsn.leaveSafeMode(false);
outStream.close(); outStream.close();

View File

@ -23,26 +23,26 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.hamcrest.CoreMatchers; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class TestNameNodeMetadataConsistency { public class TestNameNodeMetadataConsistency {
private static final Path filePath1 = new Path("/testdata1.txt"); private static final Path filePath1 = new Path("/testdata1.txt");
private static final Path filePath2 = new Path("/testdata2.txt"); private static final Path filePath2 = new Path("/testdata2.txt");
private static final String TEST_DATA_IN_FUTURE = "This is test data";
private static final int SCAN_INTERVAL = 1; private static final int SCAN_INTERVAL = 1;
private static final int SCAN_WAIT = 3; private static final int SCAN_WAIT = 3;
@ -75,59 +75,45 @@ public class TestNameNodeMetadataConsistency {
@Test @Test
public void testGenerationStampInFuture() throws public void testGenerationStampInFuture() throws
IOException, InterruptedException { IOException, InterruptedException {
String testData = " This is test data";
int datalen = testData.length();
cluster.waitActive(); cluster.waitActive();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
OutputStream ostream = fs.create(filePath1); OutputStream ostream = fs.create(filePath1);
ostream.write(testData.getBytes()); ostream.write(TEST_DATA_IN_FUTURE.getBytes());
ostream.close(); ostream.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath1);
long genStamp = block.getGenerationStamp();
// Re-write the Generation Stamp to a Generation Stamp in future. // Re-write the Generation Stamp to a Generation Stamp in future.
cluster.changeGenStampOfBlock(0, block, genStamp + 1); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath1);
MiniDFSCluster.DataNodeProperties dnProps = cluster.stopDataNode(0); final long genStamp = block.getGenerationStamp();
final int datanodeIndex = 0;
cluster.changeGenStampOfBlock(datanodeIndex, block, genStamp + 1);
// stop the data node so that it won't remove block
final DataNodeProperties dnProps = cluster.stopDataNode(datanodeIndex);
// Simulate Namenode forgetting a Block
// Simulate Namenode forgetting a Block
cluster.restartNameNode(true); cluster.restartNameNode(true);
BlockInfo bInfo = cluster.getNameNode().getNamesystem().getBlockManager
().getStoredBlock(block.getLocalBlock());
cluster.getNameNode().getNamesystem().writeLock(); cluster.getNameNode().getNamesystem().writeLock();
BlockInfo bInfo = cluster.getNameNode().getNamesystem().getBlockManager()
.getStoredBlock(block.getLocalBlock());
cluster.getNameNode().getNamesystem().getBlockManager() cluster.getNameNode().getNamesystem().getBlockManager()
.removeBlock(bInfo); .removeBlock(bInfo);
cluster.getNameNode().getNamesystem().writeUnlock(); cluster.getNameNode().getNamesystem().writeUnlock();
// we also need to tell block manager that we are in the startup path // we also need to tell block manager that we are in the startup path
FSNamesystem spyNameSystem = spy(cluster.getNameNode().getNamesystem()); BlockManagerTestUtil.setStartupSafeModeForTest(
Whitebox.setInternalState(cluster.getNameNode() cluster.getNameNode().getNamesystem().getBlockManager());
.getNamesystem().getBlockManager(),
"namesystem", spyNameSystem);
Whitebox.setInternalState(cluster.getNameNode(),
"namesystem", spyNameSystem);
Mockito.doReturn(true).when(spyNameSystem).isInStartupSafeMode();
// Since Data Node is already shutdown we didn't remove blocks
cluster.restartDataNode(dnProps); cluster.restartDataNode(dnProps);
waitTil(TimeUnit.SECONDS.toMillis(SCAN_WAIT)); waitTil(TimeUnit.SECONDS.toMillis(SCAN_WAIT));
cluster.triggerBlockReports(); cluster.triggerBlockReports();
// Give some buffer to process the block reports
waitTil(TimeUnit.SECONDS.toMillis(SCAN_WAIT)); waitTil(TimeUnit.SECONDS.toMillis(SCAN_WAIT));
// Make sure that we find all written bytes in future block // Make sure that we find all written bytes in future block
assertEquals(datalen, cluster.getNameNode().getBytesWithFutureGenerationStamps()); assertEquals(TEST_DATA_IN_FUTURE.length(),
cluster.getNameNode().getBytesWithFutureGenerationStamps());
// Assert safemode reason // Assert safemode reason
String safeModeMessage = cluster.getNameNode().getNamesystem() assertTrue(cluster.getNameNode().getNamesystem().getSafeModeTip().contains(
.getSafeModeTip(); "Name node detected blocks with generation stamps in future"));
assertThat(safeModeMessage, CoreMatchers.containsString("Name node " +
"detected blocks with generation stamps in future"));
} }
/** /**