HDFS-8984. Move replication queues related methods in FSNamesystem to BlockManager. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-09-04 11:39:58 -07:00
parent 8928729c80
commit 715b9c6499
12 changed files with 80 additions and 65 deletions

View File

@ -894,6 +894,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8384. Allow NN to startup if there are files having a lease but are not
under construction. (jing9)
HDFS-8984. Move replication queues related methods in FSNamesystem to
BlockManager. (wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -127,6 +128,10 @@ public class BlockManager implements BlockStatsMXBean {
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues;
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
@ -1092,7 +1097,7 @@ public class BlockManager implements BlockStatsMXBean {
* datanode and log the operation
*/
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.add(block, datanode, true);
@ -1103,7 +1108,7 @@ public class BlockManager implements BlockStatsMXBean {
* datanodes.
*/
private void addToInvalidates(Block b) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
@ -1124,7 +1129,7 @@ public class BlockManager implements BlockStatsMXBean {
* is wiped.
*/
void removeFromInvalidates(final DatanodeInfo datanode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.remove(datanode);
@ -1211,7 +1216,7 @@ public class BlockManager implements BlockStatsMXBean {
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
} else if (isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.getStored(), -1, 0);
}
@ -2484,7 +2489,7 @@ public class BlockManager implements BlockStatsMXBean {
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
|| isPopulatingReplQueues()) {
addStoredBlock(storedBlock, storageInfo, null, false);
return;
}
@ -2586,7 +2591,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// do not try to handle over/under-replicated blocks during first safe mode
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return storedBlock;
}
@ -3323,7 +3328,7 @@ public class BlockManager implements BlockStatsMXBean {
*/
void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
@ -3417,7 +3422,7 @@ public class BlockManager implements BlockStatsMXBean {
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
NumberReplicas repl = countNodes(block);
@ -3662,7 +3667,7 @@ public class BlockManager implements BlockStatsMXBean {
while (namesystem.isRunning()) {
try {
// Process replication work only when active NN is out of safe mode.
if (namesystem.isPopulatingReplQueues()) {
if (isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
rescanPostponedMisreplicatedBlocks();
@ -3790,4 +3795,35 @@ public class BlockManager implements BlockStatsMXBean {
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return datanodeManager.getDatanodeStatistics().getStorageTypeStats();
}
/**
* Initialize replication queues.
*/
public void initializeReplQueues() {
LOG.info("initializing replication queues");
processMisReplicatedBlocks();
initializedReplQueues = true;
}
/**
* Check if replication queues are to be populated
* @return true when node is HAState.Active and not in the very first safemode
*/
public boolean isPopulatingReplQueues() {
if (!shouldPopulateReplQueues()) {
return false;
}
return initializedReplQueues;
}
public void setInitializedReplQueues(boolean v) {
this.initializedReplQueues = v;
}
public boolean shouldPopulateReplQueues() {
HAContext haContext = namesystem.getHAContext();
if (haContext == null || haContext.getState() == null)
return false;
return haContext.getState().shouldPopulateReplQueues();
}
}

View File

@ -1200,7 +1200,7 @@ public class DatanodeManager {
if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
String message = "DN " + node + " joining cluster has expanded a formerly " +
"single-rack cluster to be multi-rack. ";
if (namesystem.isPopulatingReplQueues()) {
if (blockManager.isPopulatingReplQueues()) {
message += "Re-checking all blocks for replication, since they should " +
"now be replicated cross-rack";
LOG.info(message);
@ -1210,7 +1210,7 @@ public class DatanodeManager {
LOG.debug(message);
}
hasClusterEverBeenMultiRack = true;
if (namesystem.isPopulatingReplQueues()) {
if (blockManager.isPopulatingReplQueues()) {
blockManager.processMisReplicatedBlocks();
}
}

View File

@ -546,7 +546,7 @@ public class DecommissionManager {
if (blockManager.isNeededReplication(block, liveReplicas)) {
if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
curReplicas,

View File

@ -506,9 +506,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final boolean haEnabled;
/** flag indicating whether replication queues have been initialized */
boolean initializedReplQueues = false;
/**
* Whether the namenode is in the middle of starting the active service
*/
@ -1038,7 +1035,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues();
assert safeMode != null && !blockManager.isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
@ -1105,7 +1102,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {
LOG.info("Reprocessing replication and invalidation queues");
initializeReplQueues();
blockManager.initializeReplQueues();
}
if (LOG.isDebugEnabled()) {
@ -1164,15 +1161,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE;
}
/**
* Initialize replication queues.
*/
private void initializeReplQueues() {
LOG.info("initializing replication queues");
blockManager.processMisReplicatedBlocks();
initializedReplQueues = true;
}
/**
* @return Whether the namenode is transitioning to active state and is in the
@ -1225,8 +1213,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
}
initializedReplQueues = false;
} finally {
writeUnlock();
}
@ -4237,8 +4225,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private synchronized void leave() {
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
initializeReplQueues();
if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
blockManager.initializeReplQueues();
}
long timeInSafemode = now() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
@ -4274,7 +4262,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* initializing replication queues.
*/
private synchronized boolean canInitializeReplQueues() {
return shouldPopulateReplQueues()
return blockManager.shouldPopulateReplQueues()
&& blockSafe >= blockReplQueueThreshold;
}
@ -4327,9 +4315,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (smmthread == null && needEnter()) {
enter();
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()
if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
&& !haEnabled) {
initializeReplQueues();
blockManager.initializeReplQueues();
}
reportStatus("STATE* Safe mode ON.", false);
return;
@ -4354,8 +4342,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
initializeReplQueues();
if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
blockManager.initializeReplQueues();
}
}
@ -4658,24 +4646,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
&& safeMode.isOn();
}
/**
* Check if replication queues are to be populated
* @return true when node is HAState.Active and not in the very first safemode
*/
@Override
public boolean isPopulatingReplQueues() {
if (!shouldPopulateReplQueues()) {
return false;
}
return initializedReplQueues;
}
private boolean shouldPopulateReplQueues() {
if(haContext == null || haContext.getState() == null)
return false;
return haContext.getState().shouldPopulateReplQueues();
}
@Override
public void incrementSafeBlockCount(int replication) {
// safeMode is volatile, and may be set to null at any time
@ -5493,7 +5463,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
readLock();
try {
checkOperation(OperationCategory.READ);
if (!isPopulatingReplQueues()) {
if (!blockManager.isPopulatingReplQueues()) {
throw new IOException("Cannot run listCorruptFileBlocks because " +
"replication queues have not been initialized.");
}
@ -6169,6 +6139,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return cacheManager;
}
@Override
public HAContext getHAContext() {
return haContext;
}
@Override // NameNodeMXBean
public String getCorruptFiles() {
List<String> list = new ArrayList<String>();

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
@ -51,4 +52,5 @@ public interface Namesystem extends RwLock, SafeMode {
boolean isInSnapshot(BlockInfo blockUC);
CacheManager getCacheManager();
HAContext getHAContext();
}

View File

@ -39,9 +39,6 @@ public interface SafeMode {
*/
public boolean isInStartupSafeMode();
/** Check whether replication queues are being populated. */
public boolean isPopulatingReplQueues();
/**
* Increment number of blocks that reached minimal replication.
* @param replication current replication

View File

@ -1221,7 +1221,6 @@ public class TestReplicationPolicy {
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@ -1271,7 +1270,6 @@ public class TestReplicationPolicy {
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@ -1334,7 +1332,6 @@ public class TestReplicationPolicy {
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());

View File

@ -243,7 +243,7 @@ public class NameNodeAdapter {
* @return Replication queue initialization status
*/
public static boolean safeModeInitializedReplQueues(NameNode nn) {
return nn.getNamesystem().isPopulatingReplQueues();
return nn.getNamesystem().getBlockManager().isPopulatingReplQueues();
}
public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@ -123,13 +124,15 @@ public class TestFSNamesystem {
FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
FSNamesystem fsn = Mockito.spy(fsNamesystem);
BlockManager bm = fsn.getBlockManager();
Whitebox.setInternalState(bm, "namesystem", fsn);
//Make shouldPopulaeReplQueues return true
HAContext haContext = Mockito.mock(HAContext.class);
HAState haState = Mockito.mock(HAState.class);
Mockito.when(haContext.getState()).thenReturn(haState);
Mockito.when(haState.shouldPopulateReplQueues()).thenReturn(true);
Whitebox.setInternalState(fsn, "haContext", haContext);
Mockito.when(fsn.getHAContext()).thenReturn(haContext);
//Make NameNode.getNameNodeMetrics() not return null
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@ -137,15 +140,15 @@ public class TestFSNamesystem {
fsn.enterSafeMode(false);
assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
assertTrue("Replication queues were being populated during very first "
+ "safemode", !fsn.isPopulatingReplQueues());
+ "safemode", !bm.isPopulatingReplQueues());
fsn.leaveSafeMode();
assertTrue("FSNamesystem didn't leave safemode", !fsn.isInSafeMode());
assertTrue("Replication queues weren't being populated even after leaving "
+ "safemode", fsn.isPopulatingReplQueues());
+ "safemode", bm.isPopulatingReplQueues());
fsn.enterSafeMode(false);
assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
assertTrue("Replication queues weren't being populated after entering "
+ "safemode 2nd time", fsn.isPopulatingReplQueues());
+ "safemode 2nd time", bm.isPopulatingReplQueues());
}
@Test

View File

@ -210,7 +210,8 @@ public class TestListCorruptFileBlocks {
fs = cluster.getFileSystem();
// wait until replication queues have been initialized
while (!cluster.getNameNode().namesystem.isPopulatingReplQueues()) {
while (!cluster.getNameNode().namesystem.getBlockManager()
.isPopulatingReplQueues()) {
try {
LOG.info("waiting for replication queues");
Thread.sleep(1000);