HDFS-8984. Move replication queues related methods in FSNamesystem to BlockManager. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-09-04 11:39:58 -07:00
parent 21178f0b1b
commit 67bce1e827
12 changed files with 80 additions and 65 deletions

View File

@ -549,6 +549,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8384. Allow NN to startup if there are files having a lease but are not
under construction. (jing9)
HDFS-8984. Move replication queues related methods in FSNamesystem to
BlockManager. (wheat9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@ -126,6 +127,10 @@ public class BlockManager implements BlockStatsMXBean {
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues;
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
@ -1083,7 +1088,7 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
* datanode and log the operation
*/
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.add(block, datanode, true);
@ -1094,7 +1099,7 @@ void addToInvalidates(final Block block, final DatanodeInfo datanode) {
* datanodes.
*/
private void addToInvalidates(Block b) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
@ -1115,7 +1120,7 @@ private void addToInvalidates(Block b) {
* is wiped.
*/
void removeFromInvalidates(final DatanodeInfo datanode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.remove(datanode);
@ -1202,7 +1207,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
} else if (isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.getStored(), -1, 0);
}
@ -2475,7 +2480,7 @@ private void addStoredBlockImmediate(BlockInfo storedBlock,
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
|| isPopulatingReplQueues()) {
addStoredBlock(storedBlock, storageInfo, null, false);
return;
}
@ -2577,7 +2582,7 @@ private Block addStoredBlock(final BlockInfo block,
}
// do not try to handle over/under-replicated blocks during first safe mode
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return storedBlock;
}
@ -3314,7 +3319,7 @@ int countLiveNodes(BlockInfo b) {
*/
void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
@ -3408,7 +3413,7 @@ private void updateNeededReplications(final BlockInfo block,
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
if (!namesystem.isPopulatingReplQueues()) {
if (!isPopulatingReplQueues()) {
return;
}
NumberReplicas repl = countNodes(block);
@ -3653,7 +3658,7 @@ public void run() {
while (namesystem.isRunning()) {
try {
// Process replication work only when active NN is out of safe mode.
if (namesystem.isPopulatingReplQueues()) {
if (isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
rescanPostponedMisreplicatedBlocks();
@ -3781,4 +3786,35 @@ public BlockReportLeaseManager getBlockReportLeaseManager() {
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return datanodeManager.getDatanodeStatistics().getStorageTypeStats();
}
/**
* Initialize replication queues.
*/
public void initializeReplQueues() {
LOG.info("initializing replication queues");
processMisReplicatedBlocks();
initializedReplQueues = true;
}
/**
* Check if replication queues are to be populated
* @return true when node is HAState.Active and not in the very first safemode
*/
public boolean isPopulatingReplQueues() {
if (!shouldPopulateReplQueues()) {
return false;
}
return initializedReplQueues;
}
public void setInitializedReplQueues(boolean v) {
this.initializedReplQueues = v;
}
public boolean shouldPopulateReplQueues() {
HAContext haContext = namesystem.getHAContext();
if (haContext == null || haContext.getState() == null)
return false;
return haContext.getState().shouldPopulateReplQueues();
}
}

View File

@ -1199,7 +1199,7 @@ void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
if (!hasClusterEverBeenMultiRack && networktopology.getNumOfRacks() > 1) {
String message = "DN " + node + " joining cluster has expanded a formerly " +
"single-rack cluster to be multi-rack. ";
if (namesystem.isPopulatingReplQueues()) {
if (blockManager.isPopulatingReplQueues()) {
message += "Re-checking all blocks for replication, since they should " +
"now be replicated cross-rack";
LOG.info(message);
@ -1209,7 +1209,7 @@ void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
LOG.debug(message);
}
hasClusterEverBeenMultiRack = true;
if (namesystem.isPopulatingReplQueues()) {
if (blockManager.isPopulatingReplQueues()) {
blockManager.processMisReplicatedBlocks();
}
}

View File

@ -570,7 +570,7 @@ private void processBlocksForDecomInternal(
if (blockManager.isNeededReplication(block, liveReplicas)) {
if (!blockManager.neededReplications.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
curReplicas,

View File

@ -495,9 +495,6 @@ private void logAuditEvent(boolean succeeded,
private final boolean haEnabled;
/** flag indicating whether replication queues have been initialized */
boolean initializedReplQueues = false;
/**
* Whether the namenode is in the middle of starting the active service
*/
@ -1030,7 +1027,7 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues();
assert safeMode != null && !blockManager.isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
@ -1097,7 +1094,7 @@ void startActiveServices() throws IOException {
// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {
LOG.info("Reprocessing replication and invalidation queues");
initializeReplQueues();
blockManager.initializeReplQueues();
}
if (LOG.isDebugEnabled()) {
@ -1152,15 +1149,6 @@ void startActiveServices() throws IOException {
}
}
/**
* Initialize replication queues.
*/
private void initializeReplQueues() {
LOG.info("initializing replication queues");
blockManager.processMisReplicatedBlocks();
initializedReplQueues = true;
}
private boolean inActiveState() {
return haContext != null &&
haContext.getState().getServiceState() == HAServiceState.ACTIVE;
@ -1217,8 +1205,8 @@ void stopActiveServices() {
blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
}
initializedReplQueues = false;
} finally {
writeUnlock();
}
@ -4232,8 +4220,8 @@ private void enter() {
private synchronized void leave() {
// if not done yet, initialize replication queues.
// In the standby, do not populate repl queues
if (!isPopulatingReplQueues() && shouldPopulateReplQueues()) {
initializeReplQueues();
if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
blockManager.initializeReplQueues();
}
long timeInSafemode = now() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
@ -4269,7 +4257,7 @@ private synchronized void leave() {
* initializing replication queues.
*/
private synchronized boolean canInitializeReplQueues() {
return shouldPopulateReplQueues()
return blockManager.shouldPopulateReplQueues()
&& blockSafe >= blockReplQueueThreshold;
}
@ -4322,9 +4310,9 @@ private void checkMode() {
if (smmthread == null && needEnter()) {
enter();
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()
if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
&& !haEnabled) {
initializeReplQueues();
blockManager.initializeReplQueues();
}
reportStatus("STATE* Safe mode ON.", false);
return;
@ -4349,8 +4337,8 @@ private void checkMode() {
}
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
initializeReplQueues();
if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
blockManager.initializeReplQueues();
}
}
@ -4653,24 +4641,6 @@ public boolean isInStartupSafeMode() {
&& safeMode.isOn();
}
/**
* Check if replication queues are to be populated
* @return true when node is HAState.Active and not in the very first safemode
*/
@Override
public boolean isPopulatingReplQueues() {
if (!shouldPopulateReplQueues()) {
return false;
}
return initializedReplQueues;
}
private boolean shouldPopulateReplQueues() {
if(haContext == null || haContext.getState() == null)
return false;
return haContext.getState().shouldPopulateReplQueues();
}
@Override
public void incrementSafeBlockCount(int replication) {
// safeMode is volatile, and may be set to null at any time
@ -5488,7 +5458,7 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
readLock();
try {
checkOperation(OperationCategory.READ);
if (!isPopulatingReplQueues()) {
if (!blockManager.isPopulatingReplQueues()) {
throw new IOException("Cannot run listCorruptFileBlocks because " +
"replication queues have not been initialized.");
}
@ -6164,6 +6134,11 @@ public CacheManager getCacheManager() {
return cacheManager;
}
@Override
public HAContext getHAContext() {
return haContext;
}
@Override // NameNodeMXBean
public String getCorruptFiles() {
List<String> list = new ArrayList<String>();

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
@ -51,4 +52,5 @@ public interface Namesystem extends RwLock, SafeMode {
boolean isInSnapshot(BlockInfo blockUC);
CacheManager getCacheManager();
HAContext getHAContext();
}

View File

@ -39,9 +39,6 @@ public interface SafeMode {
*/
public boolean isInStartupSafeMode();
/** Check whether replication queues are being populated. */
public boolean isPopulatingReplQueues();
/**
* Increment number of blocks that reached minimal replication.
* @param replication current replication

View File

@ -1221,7 +1221,6 @@ public void testUpdateDoesNotCauseSkippedReplication() {
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@ -1271,7 +1270,6 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
@ -1334,7 +1332,6 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());

View File

@ -243,7 +243,7 @@ public static int getSafeModeSafeBlocks(NameNode nn) {
* @return Replication queue initialization status
*/
public static boolean safeModeInitializedReplQueues(NameNode nn) {
return nn.getNamesystem().isPopulatingReplQueues();
return nn.getNamesystem().getBlockManager().isPopulatingReplQueues();
}
public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@ -123,13 +124,15 @@ public void testReplQueuesActiveAfterStartupSafemode() throws IOException, Inter
FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
FSNamesystem fsn = Mockito.spy(fsNamesystem);
BlockManager bm = fsn.getBlockManager();
Whitebox.setInternalState(bm, "namesystem", fsn);
//Make shouldPopulaeReplQueues return true
HAContext haContext = Mockito.mock(HAContext.class);
HAState haState = Mockito.mock(HAState.class);
Mockito.when(haContext.getState()).thenReturn(haState);
Mockito.when(haState.shouldPopulateReplQueues()).thenReturn(true);
Whitebox.setInternalState(fsn, "haContext", haContext);
Mockito.when(fsn.getHAContext()).thenReturn(haContext);
//Make NameNode.getNameNodeMetrics() not return null
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@ -137,15 +140,15 @@ public void testReplQueuesActiveAfterStartupSafemode() throws IOException, Inter
fsn.enterSafeMode(false);
assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
assertTrue("Replication queues were being populated during very first "
+ "safemode", !fsn.isPopulatingReplQueues());
+ "safemode", !bm.isPopulatingReplQueues());
fsn.leaveSafeMode();
assertTrue("FSNamesystem didn't leave safemode", !fsn.isInSafeMode());
assertTrue("Replication queues weren't being populated even after leaving "
+ "safemode", fsn.isPopulatingReplQueues());
+ "safemode", bm.isPopulatingReplQueues());
fsn.enterSafeMode(false);
assertTrue("FSNamesystem didn't enter safemode", fsn.isInSafeMode());
assertTrue("Replication queues weren't being populated after entering "
+ "safemode 2nd time", fsn.isPopulatingReplQueues());
+ "safemode 2nd time", bm.isPopulatingReplQueues());
}
@Test

View File

@ -210,7 +210,8 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception {
fs = cluster.getFileSystem();
// wait until replication queues have been initialized
while (!cluster.getNameNode().namesystem.isPopulatingReplQueues()) {
while (!cluster.getNameNode().namesystem.getBlockManager()
.isPopulatingReplQueues()) {
try {
LOG.info("waiting for replication queues");
Thread.sleep(1000);