From 788fca4124ecac818a20bfc2607676849cf0d94f Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 18 Dec 2013 21:30:37 +0000 Subject: [PATCH] HDFS-5496. Make replication queue initialization asynchronous. Contributed by Vinay. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1552109 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/util/LightWeightGSet.java | 9 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../server/blockmanagement/BlockManager.java | 172 ++++++++++++++---- .../server/blockmanagement/BlocksMap.java | 16 +- .../hdfs/server/namenode/FSNamesystem.java | 71 +++----- .../hdfs/server/namenode/NameNodeAdapter.java | 9 +- 7 files changed, 195 insertions(+), 89 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java index f1661d750d7..1767d85abad 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java @@ -244,13 +244,14 @@ public class LightWeightGSet implements GSet { out.println("\n]"); } - private class SetIterator implements Iterator { + public class SetIterator implements Iterator { /** The starting modification for fail-fast. */ private int iterModification = modification; /** The current index of the entry array. */ private int index = -1; private LinkedElement cur = null; private LinkedElement next = nextNonemptyEntry(); + private boolean trackModification = true; /** Find the next nonempty entry starting at (index + 1). */ private LinkedElement nextNonemptyEntry() { @@ -259,7 +260,7 @@ public class LightWeightGSet implements GSet { } private void ensureNext() { - if (modification != iterModification) { + if (trackModification && modification != iterModification) { throw new ConcurrentModificationException("modification=" + modification + " != iterModification = " + iterModification); } @@ -304,6 +305,10 @@ public class LightWeightGSet implements GSet { iterModification++; cur = null; } + + public void setTrackModification(boolean trackModification) { + this.trackModification = trackModification; + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6090ef78e4c..b7375f44304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -242,6 +242,9 @@ Trunk (Unreleased) HDFS-5629. Support HTTPS in JournalNode and SecondaryNameNode. (Haohui Mai via jing9) + HDFS-5496. Make replication queue initialization asynchronous. (Vinay via + jing9) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index ab170058808..7961e1d3679 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -398,7 +398,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 1000; public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max"; public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500; - + /* Maximum number of blocks to process for initializing replication queues */ + public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit"; + public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000; public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit"; public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false; public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3f4780a0e63..5190138db5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -229,6 +229,22 @@ public class BlockManager { */ private boolean shouldPostponeBlocksFromFuture = false; + /** + * Process replication queues asynchronously to allow namenode safemode exit + * and failover to be faster. HDFS-5496 + */ + private Daemon replicationQueuesInitializer = null; + /** + * Number of blocks to process asychronously for replication queues + * initialization once aquired the namesystem lock. Remaining blocks will be + * processed again after aquiring lock again. + */ + private int numBlocksPerIteration; + /** + * Progress of the Replication queues initialisation. + */ + private double replicationQueuesInitProgress = 0.0; + /** for block replicas placement */ private BlockPlacementPolicy blockplacement; @@ -305,6 +321,9 @@ public class BlockManager { this.maxNumBlocksToLog = conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); + this.numBlocksPerIteration = conf.getInt( + DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT, + DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT); LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); @@ -2319,45 +2338,127 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block */ public void processMisReplicatedBlocks() { assert namesystem.hasWriteLock(); - - long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0, - nrUnderConstruction = 0; + stopReplicationInitializer(); neededReplications.clear(); - for (BlockInfo block : blocksMap.getBlocks()) { - MisReplicationResult res = processMisReplicatedBlock(block); - if (LOG.isTraceEnabled()) { - LOG.trace("block " + block + ": " + res); + replicationQueuesInitializer = new Daemon() { + + @Override + public void run() { + try { + processMisReplicatesAsync(); + } catch (InterruptedException ie) { + LOG.info("Interrupted while processing replication queues."); + } catch (Exception e) { + LOG.error("Error while processing replication queues async", e); + } } - switch (res) { - case UNDER_REPLICATED: - nrUnderReplicated++; - break; - case OVER_REPLICATED: - nrOverReplicated++; - break; - case INVALID: - nrInvalid++; - break; - case POSTPONE: - nrPostponed++; - postponeBlock(block); - break; - case UNDER_CONSTRUCTION: - nrUnderConstruction++; - break; - case OK: - break; - default: - throw new AssertionError("Invalid enum value: " + res); + }; + replicationQueuesInitializer.setName("Replication Queue Initializer"); + replicationQueuesInitializer.start(); + } + + /* + * Stop the ongoing initialisation of replication queues + */ + private void stopReplicationInitializer() { + if (replicationQueuesInitializer != null) { + replicationQueuesInitializer.interrupt(); + try { + replicationQueuesInitializer.join(); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning.."); + return; + } finally { + replicationQueuesInitializer = null; } } - - LOG.info("Total number of blocks = " + blocksMap.size()); - LOG.info("Number of invalid blocks = " + nrInvalid); - LOG.info("Number of under-replicated blocks = " + nrUnderReplicated); - LOG.info("Number of over-replicated blocks = " + nrOverReplicated + - ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : "")); - LOG.info("Number of blocks being written = " + nrUnderConstruction); + } + + /* + * Since the BlocksMapGset does not throw the ConcurrentModificationException + * and supports further iteration after modification to list, there is a + * chance of missing the newly added block while iterating. Since every + * addition to blocksMap will check for mis-replication, missing mis-replication + * check for new blocks will not be a problem. + */ + private void processMisReplicatesAsync() throws InterruptedException { + long nrInvalid = 0, nrOverReplicated = 0; + long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0; + long startTimeMisReplicatedScan = Time.now(); + Iterator blocksItr = blocksMap.getBlocks().iterator(); + long totalBlocks = blocksMap.size(); + replicationQueuesInitProgress = 0; + long totalProcessed = 0; + while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) { + int processed = 0; + namesystem.writeLockInterruptibly(); + try { + while (processed < numBlocksPerIteration && blocksItr.hasNext()) { + BlockInfo block = blocksItr.next(); + MisReplicationResult res = processMisReplicatedBlock(block); + if (LOG.isTraceEnabled()) { + LOG.trace("block " + block + ": " + res); + } + switch (res) { + case UNDER_REPLICATED: + nrUnderReplicated++; + break; + case OVER_REPLICATED: + nrOverReplicated++; + break; + case INVALID: + nrInvalid++; + break; + case POSTPONE: + nrPostponed++; + postponeBlock(block); + break; + case UNDER_CONSTRUCTION: + nrUnderConstruction++; + break; + case OK: + break; + default: + throw new AssertionError("Invalid enum value: " + res); + } + processed++; + } + totalProcessed += processed; + // there is a possibility that if any of the blocks deleted/added during + // initialisation, then progress might be different. + replicationQueuesInitProgress = Math.min((double) totalProcessed + / totalBlocks, 1.0); + + if (!blocksItr.hasNext()) { + LOG.info("Total number of blocks = " + blocksMap.size()); + LOG.info("Number of invalid blocks = " + nrInvalid); + LOG.info("Number of under-replicated blocks = " + nrUnderReplicated); + LOG.info("Number of over-replicated blocks = " + nrOverReplicated + + ((nrPostponed > 0) ? (" (" + nrPostponed + " postponed)") : "")); + LOG.info("Number of blocks being written = " + nrUnderConstruction); + NameNode.stateChangeLog + .info("STATE* Replication Queue initialization " + + "scan for invalid, over- and under-replicated blocks " + + "completed in " + (Time.now() - startTimeMisReplicatedScan) + + " msec"); + break; + } + } finally { + namesystem.writeUnlock(); + } + } + if (Thread.currentThread().isInterrupted()) { + LOG.info("Interrupted while processing replication queues."); + } + } + + /** + * Get the progress of the Replication queues initialisation + * + * @return Returns values between 0 and 1 for the progress. + */ + public double getReplicationQueuesInitProgress() { + return replicationQueuesInitProgress; } /** @@ -3308,6 +3409,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } public void shutdown() { + stopReplicationInitializer(); blocksMap.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 99dd965ef98..732f61344b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -22,6 +22,7 @@ import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; +import org.apache.hadoop.util.LightWeightGSet.SetIterator; /** * This class maintains the map from a block to its metadata. @@ -62,7 +63,20 @@ class BlocksMap { BlocksMap(int capacity) { // Use 2% of total memory to size the GSet capacity this.capacity = capacity; - this.blocks = new LightWeightGSet(capacity); + this.blocks = new LightWeightGSet(capacity) { + @Override + public Iterator iterator() { + SetIterator iterator = new SetIterator(); + /* + * Not tracking any modifications to set. As this set will be used + * always under FSNameSystem lock, modifications will not cause any + * ConcurrentModificationExceptions. But there is a chance of missing + * newly added elements during iteration. + */ + iterator.setTrackModification(false); + return iterator; + } + }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index bb5bcfec092..e4a7560ac7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -482,7 +482,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private HAContext haContext; private final boolean haEnabled; - + + /** flag indicating whether replication queues have been initialized */ + boolean initializedReplQueues = false; + /** * Whether the namenode is in the middle of starting the active service */ @@ -914,8 +917,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { nnResourceChecker = new NameNodeResourceChecker(conf); checkAvailableResources(); - assert safeMode != null && - !safeMode.isPopulatingReplQueues(); + assert safeMode != null && !isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, @@ -971,12 +973,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockManager.clearQueues(); blockManager.processAllPendingDNMessages(); - if (!isInSafeMode() || - (isInSafeMode() && safeMode.isPopulatingReplQueues())) { + // Only need to re-process the queue, If not in SafeMode. + if (!isInSafeMode()) { LOG.info("Reprocessing replication and invalidation queues"); - blockManager.processMisReplicatedBlocks(); + initializeReplQueues(); } - + if (LOG.isDebugEnabled()) { LOG.debug("NameNode metadata after re-processing " + "replication and invalidation queues during failover:\n" + @@ -1015,7 +1017,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, startingActiveService = false; } } - + + /** + * Initialize replication queues. + */ + private void initializeReplQueues() { + LOG.info("initializing replication queues"); + blockManager.processMisReplicatedBlocks(); + initializedReplQueues = true; + } + /** * @return Whether the namenode is transitioning to active state and is in the * middle of the {@link #startActiveServices()} @@ -1061,6 +1072,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } cacheManager.deactivate(); blockManager.getDatanodeManager().setShouldSendCachingCommands(false); + // Don't want to keep replication queues when not in Active. + blockManager.clearQueues(); + initializedReplQueues = false; } finally { writeUnlock(); } @@ -4548,7 +4562,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private int safeReplication; /** threshold for populating needed replication queues */ private double replQueueThreshold; - // internal fields /** Time when threshold was reached. *
-1 safe mode is off @@ -4566,8 +4579,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private int blockReplQueueThreshold; /** time of the last status printout */ private long lastStatusReport = 0; - /** flag indicating whether replication queues have been initialized */ - boolean initializedReplQueues = false; /** Was safemode entered automatically because available resources were low. */ private boolean resourcesLow = false; /** Should safemode adjust its block totals as blocks come in */ @@ -4627,7 +4638,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * * @see SafeModeInfo */ - private SafeModeInfo(boolean resourcesLow, boolean isReplQueuesInited) { + private SafeModeInfo(boolean resourcesLow) { this.threshold = 1.5f; // this threshold can never be reached this.datanodeThreshold = Integer.MAX_VALUE; this.extension = Integer.MAX_VALUE; @@ -4636,7 +4647,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.blockTotal = -1; this.blockSafe = -1; this.resourcesLow = resourcesLow; - this.initializedReplQueues = isReplQueuesInited; enter(); reportStatus("STATE* Safe mode is ON.", true); } @@ -4650,13 +4660,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return this.reached >= 0; } - /** - * Check if we are populating replication queues. - */ - private synchronized boolean isPopulatingReplQueues() { - return initializedReplQueues; - } - /** * Enter safe mode. */ @@ -4703,21 +4706,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - /** - * Initialize replication queues. - */ - private synchronized void initializeReplQueues() { - LOG.info("initializing replication queues"); - assert !isPopulatingReplQueues() : "Already initialized repl queues"; - long startTimeMisReplicatedScan = now(); - blockManager.processMisReplicatedBlocks(); - initializedReplQueues = true; - NameNode.stateChangeLog.info("STATE* Replication Queue initialization " - + "scan for invalid, over- and under-replicated blocks " - + "completed in " + (now() - startTimeMisReplicatedScan) - + " msec"); - } - /** * Check whether we have reached the threshold for * initializing replication queues. @@ -4765,7 +4753,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (smmthread == null && needEnter()) { enter(); // check if we are ready to initialize replication queues - if (canInitializeReplQueues() && !isPopulatingReplQueues()) { + if (canInitializeReplQueues() && !isPopulatingReplQueues() + && !haEnabled) { initializeReplQueues(); } reportStatus("STATE* Safe mode ON.", false); @@ -4790,7 +4779,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } // check if we are ready to initialize replication queues - if (canInitializeReplQueues() && !isPopulatingReplQueues()) { + if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) { initializeReplQueues(); } } @@ -5100,11 +5089,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (!shouldPopulateReplQueues()) { return false; } - // safeMode is volatile, and may be set to null at any time - SafeModeInfo safeMode = this.safeMode; - if (safeMode == null) - return true; - return safeMode.isPopulatingReplQueues(); + return initializedReplQueues; } private boolean shouldPopulateReplQueues() { @@ -5224,7 +5209,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, getEditLog().logSyncAll(); } if (!isInSafeMode()) { - safeMode = new SafeModeInfo(resourcesLow, isPopulatingReplQueues()); + safeMode = new SafeModeInfo(resourcesLow); return; } if (resourcesLow) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 4f5c2b6732b..2e45845c48b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -228,15 +228,10 @@ public class NameNodeAdapter { } /** - * @return true if safemode is not running, or if safemode has already - * initialized the replication queues + * @return Replication queue initialization status */ public static boolean safeModeInitializedReplQueues(NameNode nn) { - SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests(); - if (smi == null) { - return true; - } - return smi.initializedReplQueues; + return nn.getNamesystem().isPopulatingReplQueues(); } public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {