diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index a7024c5ef9f..6ef92b8b69e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,25 @@ public class MetricsAsserts { return captor.getValue(); } + /** + * Assert a float gauge metric as expected + * @param name of the metric + * @param expected value of the metric + * @param rb the record builder mock used to getMetrics + */ + public static void assertGauge(String name, float expected, + MetricsRecordBuilder rb) { + Assert.assertEquals("Bad value for metric " + name, + expected, getFloatGauge(name, rb), EPSILON); + } + + public static float getFloatGauge(String name, MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(Float.class); + verify(rb, atLeast(0)).addGauge(eqName(info(name, "")), captor.capture()); + checkCaptured(captor, name); + return captor.getValue(); + } + /** * Check that this metric was captured exactly once. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 35b2cf24e58..6c2d72c978a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -317,6 +317,8 @@ Release 2.1.0-beta - 2013-07-02 HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup. (Junping Du via szetszwo) + + HDFS-4372. Track NameNode startup progress. (cnauroth) IMPROVEMENTS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java index ebb3f5d242d..98fb76216ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java @@ -31,6 +31,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.Credentials; @@ -127,14 +132,15 @@ public class DelegationTokenSecretManager * Store the current state of the SecretManager for persistence * * @param out Output stream for writing into fsimage. + * @param sdPath String storage directory path * @throws IOException */ - public synchronized void saveSecretManagerState(DataOutputStream out) - throws IOException { + public synchronized void saveSecretManagerState(DataOutputStream out, + String sdPath) throws IOException { out.writeInt(currentId); - saveAllKeys(out); + saveAllKeys(out, sdPath); out.writeInt(delegationTokenSequenceNumber); - saveCurrentTokens(out); + saveCurrentTokens(out, sdPath); } /** @@ -237,8 +243,13 @@ public class DelegationTokenSecretManager /** * Private helper methods to save delegation keys and tokens in fsimage */ - private synchronized void saveCurrentTokens(DataOutputStream out) - throws IOException { + private synchronized void saveCurrentTokens(DataOutputStream out, + String sdPath) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.DELEGATION_TOKENS, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(currentTokens.size()); Iterator iter = currentTokens.keySet() .iterator(); @@ -247,20 +258,29 @@ public class DelegationTokenSecretManager id.write(out); DelegationTokenInformation info = currentTokens.get(id); out.writeLong(info.getRenewDate()); + counter.increment(); } + prog.endStep(Phase.SAVING_CHECKPOINT, step); } /* * Save the current state of allKeys */ - private synchronized void saveAllKeys(DataOutputStream out) + private synchronized void saveAllKeys(DataOutputStream out, String sdPath) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.DELEGATION_KEYS, sdPath); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); out.writeInt(allKeys.size()); Iterator iter = allKeys.keySet().iterator(); while (iter.hasNext()) { Integer key = iter.next(); allKeys.get(key).write(out); + counter.increment(); } + prog.endStep(Phase.SAVING_CHECKPOINT, step); } /** @@ -268,13 +288,20 @@ public class DelegationTokenSecretManager */ private synchronized void loadCurrentTokens(DataInput in) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.DELEGATION_TOKENS); + prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfTokens = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfTokens); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfTokens; i++) { DelegationTokenIdentifier id = new DelegationTokenIdentifier(); id.readFields(in); long expiryTime = in.readLong(); addPersistedDelegationToken(id, expiryTime); + counter.increment(); } + prog.endStep(Phase.LOADING_FSIMAGE, step); } /** @@ -283,12 +310,19 @@ public class DelegationTokenSecretManager * @throws IOException */ private synchronized void loadAllKeys(DataInput in) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.DELEGATION_KEYS); + prog.beginStep(Phase.LOADING_FSIMAGE, step); int numberOfKeys = in.readInt(); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); for (int i = 0; i < numberOfKeys; i++) { DelegationKey value = new DelegationKey(); value.readFields(in); addKey(value); + counter.increment(); } + prog.endStep(Phase.LOADING_FSIMAGE, step); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 2b949541237..1ee581cde9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -406,8 +406,6 @@ public class EditLogFileInputStream extends EditLogInputStream { @Override public long length() { - Preconditions.checkState(advertisedSize != -1, - "must get input stream before length is available"); return advertisedSize; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index 6fdf9389972..6fbcbfeed60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -33,7 +33,18 @@ import java.io.IOException; @InterfaceStability.Evolving public abstract class EditLogInputStream implements Closeable { private FSEditLogOp cachedOp = null; - + + /** + * Returns the name of the currently active underlying stream. The default + * implementation returns the same value as getName unless overridden by the + * subclass. + * + * @return String name of the currently active underlying stream + */ + public String getCurrentStreamName() { + return getName(); + } + /** * @return the name of the EditLogInputStream */ @@ -157,7 +168,9 @@ public abstract class EditLogInputStream implements Closeable { public abstract long getPosition(); /** - * Return the size of the current edits log. + * Return the size of the current edits log or -1 if unknown. + * + * @return long size of the current edits log or -1 if unknown */ public abstract long length() throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 29a22fc9686..27d3c6529ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -69,7 +69,12 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.util.Holder; +import org.apache.hadoop.util.StringUtils; import com.google.common.base.Joiner; @@ -93,6 +98,9 @@ public class FSEditLogLoader { */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, MetaRecoveryContext recovery) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + Step step = createStartupProgressStep(edits); + prog.beginStep(Phase.LOADING_EDITS, step); fsNamesys.writeLock(); try { long startTime = now(); @@ -106,6 +114,7 @@ public class FSEditLogLoader { } finally { edits.close(); fsNamesys.writeUnlock(); + prog.endStep(Phase.LOADING_EDITS, step); } } @@ -131,6 +140,10 @@ public class FSEditLogLoader { long numEdits = 0; long lastTxId = in.getLastTxId(); long numTxns = (lastTxId - expectedStartingTxId) + 1; + StartupProgress prog = NameNode.getStartupProgress(); + Step step = createStartupProgressStep(in); + prog.setTotal(Phase.LOADING_EDITS, step, numTxns); + Counter counter = prog.getCounter(Phase.LOADING_EDITS, step); long lastLogTime = now(); long lastInodeId = fsNamesys.getLastInodeId(); @@ -191,7 +204,7 @@ public class FSEditLogLoader { } // Now that the operation has been successfully decoded and // applied, update our bookkeeping. - incrOpCount(op.opCode, opCounts); + incrOpCount(op.opCode, opCounts, step, counter); if (op.hasTransactionId()) { lastAppliedTxId = op.getTransactionId(); expectedTxId = lastAppliedTxId + 1; @@ -682,7 +695,8 @@ public class FSEditLogLoader { } private void incrOpCount(FSEditLogOpCodes opCode, - EnumMap> opCounts) { + EnumMap> opCounts, Step step, + Counter counter) { Holder holder = opCounts.get(opCode); if (holder == null) { holder = new Holder(1); @@ -690,6 +704,7 @@ public class FSEditLogLoader { } else { holder.held++; } + counter.increment(); } /** @@ -861,4 +876,20 @@ public class FSEditLogLoader { public long getLastAppliedTxId() { return lastAppliedTxId; } + + /** + * Creates a Step used for updating startup progress, populated with + * information from the given edits. The step always includes the log's name. + * If the log has a known length, then the length is included in the step too. + * + * @param edits EditLogInputStream to use for populating step + * @return Step populated with information from edits + * @throws IOException thrown if there is an I/O error + */ + private static Step createStartupProgressStep(EditLogInputStream edits) + throws IOException { + long length = edits.length(); + String name = edits.getCurrentStreamName(); + return length != -1 ? new Step(name, length) : new Step(name); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 1b18fb330d4..2ef05b54837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.util.IdGenerator; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -584,6 +587,12 @@ public class FSImage implements Closeable { isUpgradeFinalized = inspector.isUpgradeFinalized(); List imageFiles = inspector.getLatestImages(); + + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginPhase(Phase.LOADING_FSIMAGE); + File phaseFile = imageFiles.get(0).getFile(); + prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath()); + prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length()); boolean needToSave = inspector.needToSave(); Iterable editStreams = null; @@ -633,6 +642,7 @@ public class FSImage implements Closeable { FSEditLog.closeAllStreams(editStreams); throw new IOException("Failed to load an FSImage file!"); } + prog.endPhase(Phase.LOADING_FSIMAGE); long txnsAdvanced = loadEdits(editStreams, target, recovery); needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), txnsAdvanced); @@ -707,6 +717,8 @@ public class FSImage implements Closeable { public long loadEdits(Iterable editStreams, FSNamesystem target, MetaRecoveryContext recovery) throws IOException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginPhase(Phase.LOADING_EDITS); long prevLastAppliedTxId = lastAppliedTxId; try { @@ -733,6 +745,7 @@ public class FSImage implements Closeable { // update the counts updateCountForQuota(target.dir.rootDir); } + prog.endPhase(Phase.LOADING_EDITS); return lastAppliedTxId - prevLastAppliedTxId; } @@ -946,6 +959,8 @@ public class FSImage implements Closeable { protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid, Canceler canceler) throws IOException { + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginPhase(Phase.SAVING_CHECKPOINT); if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { throw new IOException("No image directories available!"); } @@ -991,6 +1006,7 @@ public class FSImage implements Closeable { ctx.markComplete(); ctx = null; } + prog.endPhase(Phase.SAVING_CHECKPOINT); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index e9213ff4722..74f5219c491 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -58,6 +58,11 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; @@ -233,6 +238,9 @@ public class FSImageFormat { checkNotLoaded(); assert curFile != null : "curFile is null"; + StartupProgress prog = NameNode.getStartupProgress(); + Step step = new Step(StepType.INODES); + prog.beginStep(Phase.LOADING_FSIMAGE, step); long startTime = now(); // @@ -322,18 +330,24 @@ public class FSImageFormat { // load all inodes LOG.info("Number of files = " + numFiles); + prog.setTotal(Phase.LOADING_FSIMAGE, step, numFiles); + Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, imgVersion)) { if (supportSnapshot) { - loadLocalNameINodesWithSnapshot(in); + loadLocalNameINodesWithSnapshot(numFiles, in, counter); } else { - loadLocalNameINodes(numFiles, in); + loadLocalNameINodes(numFiles, in, counter); } } else { - loadFullNameINodes(numFiles, in); + loadFullNameINodes(numFiles, in, counter); } - loadFilesUnderConstruction(in, supportSnapshot); + loadFilesUnderConstruction(in, supportSnapshot, counter); + prog.endStep(Phase.LOADING_FSIMAGE, step); + // Now that the step is finished, set counter equal to total to adjust + // for possible under-counting due to reference inodes. + prog.setCount(Phase.LOADING_FSIMAGE, step, numFiles); loadSecretManagerState(in); @@ -367,18 +381,20 @@ public class FSImageFormat { * Load fsimage files when 1) only local names are stored, * and 2) snapshot is supported. * + * @param numFiles number of files expected to be read * @param in Image input stream + * @param counter Counter to increment for namenode startup progress */ - private void loadLocalNameINodesWithSnapshot(DataInput in) - throws IOException { + private void loadLocalNameINodesWithSnapshot(long numFiles, DataInput in, + Counter counter) throws IOException { assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, getLayoutVersion()); assert LayoutVersion.supports(Feature.SNAPSHOT, getLayoutVersion()); // load root - loadRoot(in); + loadRoot(in, counter); // load rest of the nodes recursively - loadDirectoryWithSnapshot(in); + loadDirectoryWithSnapshot(in, counter); } /** @@ -386,22 +402,23 @@ public class FSImageFormat { * * @param numFiles number of files expected to be read * @param in image input stream + * @param counter Counter to increment for namenode startup progress * @throws IOException */ - private void loadLocalNameINodes(long numFiles, DataInput in) + private void loadLocalNameINodes(long numFiles, DataInput in, Counter counter) throws IOException { assert LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, getLayoutVersion()); assert numFiles > 0; // load root - loadRoot(in); + loadRoot(in, counter); // have loaded the first file (the root) numFiles--; // load rest of the nodes directory by directory while (numFiles > 0) { - numFiles -= loadDirectory(in); + numFiles -= loadDirectory(in, counter); } if (numFiles != 0) { throw new IOException("Read unexpect number of files: " + -numFiles); @@ -412,24 +429,27 @@ public class FSImageFormat { * Load information about root, and use the information to update the root * directory of NameSystem. * @param in The {@link DataInput} instance to read. + * @param counter Counter to increment for namenode startup progress */ - private void loadRoot(DataInput in) throws IOException { + private void loadRoot(DataInput in, Counter counter) + throws IOException { // load root if (in.readShort() != 0) { throw new IOException("First node is not root"); } - final INodeDirectory root = loadINode(null, false, in).asDirectory(); + final INodeDirectory root = loadINode(null, false, in, counter) + .asDirectory(); // update the root's attributes updateRootAttr(root); } /** Load children nodes for the parent directory. */ - private int loadChildren(INodeDirectory parent, DataInput in) - throws IOException { + private int loadChildren(INodeDirectory parent, DataInput in, + Counter counter) throws IOException { int numChildren = in.readInt(); for (int i = 0; i < numChildren; i++) { // load single inode - INode newNode = loadINodeWithLocalName(false, in, true); + INode newNode = loadINodeWithLocalName(false, in, true, counter); addToParent(parent, newNode); } return numChildren; @@ -438,8 +458,9 @@ public class FSImageFormat { /** * Load a directory when snapshot is supported. * @param in The {@link DataInput} instance to read. + * @param counter Counter to increment for namenode startup progress */ - private void loadDirectoryWithSnapshot(DataInput in) + private void loadDirectoryWithSnapshot(DataInput in, Counter counter) throws IOException { // Step 1. Identify the parent INode long inodeId = in.readLong(); @@ -470,7 +491,7 @@ public class FSImageFormat { } // Step 3. Load children nodes under parent - loadChildren(parent, in); + loadChildren(parent, in, counter); // Step 4. load Directory Diff List SnapshotFSImageFormat.loadDirectoryDiffList(parent, in, this); @@ -479,7 +500,7 @@ public class FSImageFormat { // directories int numSubTree = in.readInt(); for (int i = 0; i < numSubTree; i++) { - loadDirectoryWithSnapshot(in); + loadDirectoryWithSnapshot(in, counter); } } @@ -487,14 +508,15 @@ public class FSImageFormat { * Load all children of a directory * * @param in + * @param counter Counter to increment for namenode startup progress * @return number of child inodes read * @throws IOException */ - private int loadDirectory(DataInput in) throws IOException { + private int loadDirectory(DataInput in, Counter counter) throws IOException { String parentPath = FSImageSerialization.readString(in); final INodeDirectory parent = INodeDirectory.valueOf( namesystem.dir.rootDir.getNode(parentPath, true), parentPath); - return loadChildren(parent, in); + return loadChildren(parent, in, counter); } /** @@ -502,10 +524,11 @@ public class FSImageFormat { * * @param numFiles total number of files to load * @param in data input stream + * @param counter Counter to increment for namenode startup progress * @throws IOException if any error occurs */ - private void loadFullNameINodes(long numFiles, - DataInput in) throws IOException { + private void loadFullNameINodes(long numFiles, DataInput in, Counter counter) + throws IOException { byte[][] pathComponents; byte[][] parentPath = {{}}; FSDirectory fsDir = namesystem.dir; @@ -513,7 +536,7 @@ public class FSImageFormat { for (long i = 0; i < numFiles; i++) { pathComponents = FSImageSerialization.readPathComponents(in); final INode newNode = loadINode( - pathComponents[pathComponents.length-1], false, in); + pathComponents[pathComponents.length-1], false, in, counter); if (isRoot(pathComponents)) { // it is the root // update the root's attributes @@ -580,10 +603,16 @@ public class FSImageFormat { return namesystem.dir; } + public INode loadINodeWithLocalName(boolean isSnapshotINode, DataInput in, + boolean updateINodeMap) throws IOException { + return loadINodeWithLocalName(isSnapshotINode, in, updateINodeMap, null); + } + public INode loadINodeWithLocalName(boolean isSnapshotINode, - DataInput in, boolean updateINodeMap) throws IOException { + DataInput in, boolean updateINodeMap, Counter counter) + throws IOException { final byte[] localName = FSImageSerialization.readLocalName(in); - INode inode = loadINode(localName, isSnapshotINode, in); + INode inode = loadINode(localName, isSnapshotINode, in, counter); if (updateINodeMap && LayoutVersion.supports(Feature.ADD_INODE_ID, getLayoutVersion())) { namesystem.dir.addToInodeMap(inode); @@ -595,10 +624,11 @@ public class FSImageFormat { * load an inode from fsimage except for its name * * @param in data input stream from which image is read + * @param counter Counter to increment for namenode startup progress * @return an inode */ INode loadINode(final byte[] localName, boolean isSnapshotINode, - DataInput in) throws IOException { + DataInput in, Counter counter) throws IOException { final int imgVersion = getLayoutVersion(); if (LayoutVersion.supports(Feature.SNAPSHOT, imgVersion)) { namesystem.getFSDirectory().verifyINodeName(localName); @@ -650,6 +680,9 @@ public class FSImageFormat { final PermissionStatus permissions = PermissionStatus.read(in); // return + if (counter != null) { + counter.increment(); + } final INodeFile file = new INodeFile(inodeId, localName, permissions, modificationTime, atime, blocks, replication, blockSize); return fileDiffs != null? new INodeFileWithSnapshot(file, fileDiffs) @@ -679,6 +712,9 @@ public class FSImageFormat { final PermissionStatus permissions = PermissionStatus.read(in); //return + if (counter != null) { + counter.increment(); + } final INodeDirectory dir = nsQuota >= 0 || dsQuota >= 0? new INodeDirectoryWithQuota(inodeId, localName, permissions, modificationTime, nsQuota, dsQuota) @@ -691,10 +727,16 @@ public class FSImageFormat { final String symlink = Text.readString(in); final PermissionStatus permissions = PermissionStatus.read(in); + if (counter != null) { + counter.increment(); + } return new INodeSymlink(inodeId, localName, permissions, modificationTime, atime, symlink); } else if (numBlocks == -3) { //reference + // Intentionally do not increment counter, because it is too difficult at + // this point to assess whether or not this is a reference that counts + // toward quota. final boolean isWithName = in.readBoolean(); // lastSnapshotId for WithName node, dstSnapshotId for DstReference node @@ -761,7 +803,7 @@ public class FSImageFormat { } private void loadFilesUnderConstruction(DataInput in, - boolean supportSnapshot) throws IOException { + boolean supportSnapshot, Counter counter) throws IOException { FSDirectory fsDir = namesystem.dir; int size = in.readInt(); @@ -770,6 +812,7 @@ public class FSImageFormat { for (int i = 0; i < size; i++) { INodeFileUnderConstruction cons = FSImageSerialization .readINodeUnderConstruction(in, namesystem, getLayoutVersion()); + counter.increment(); // verify that file exists in namespace String path = cons.getLocalName(); @@ -888,6 +931,13 @@ public class FSImageFormat { final FSNamesystem sourceNamesystem = context.getSourceNamesystem(); FSDirectory fsDir = sourceNamesystem.dir; + String sdPath = newFile.getParentFile().getParentFile().getAbsolutePath(); + Step step = new Step(StepType.INODES, sdPath); + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginStep(Phase.SAVING_CHECKPOINT, step); + prog.setTotal(Phase.SAVING_CHECKPOINT, step, + fsDir.rootDir.numItemsInTree()); + Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); long startTime = now(); // // Write out data @@ -922,14 +972,18 @@ public class FSImageFormat { " using " + compression); // save the root - FSImageSerialization.saveINode2Image(fsDir.rootDir, out, false, - referenceMap); + saveINode2Image(fsDir.rootDir, out, false, referenceMap, counter); // save the rest of the nodes - saveImage(fsDir.rootDir, out, true); + saveImage(fsDir.rootDir, out, true, counter); + prog.endStep(Phase.SAVING_CHECKPOINT, step); + // Now that the step is finished, set counter equal to total to adjust + // for possible under-counting due to reference inodes. + prog.setCount(Phase.SAVING_CHECKPOINT, step, + fsDir.rootDir.numItemsInTree()); // save files under construction sourceNamesystem.saveFilesUnderConstruction(out); context.checkCancelled(); - sourceNamesystem.saveSecretManagerState(out); + sourceNamesystem.saveSecretManagerState(out, sdPath); context.checkCancelled(); out.flush(); context.checkCancelled(); @@ -950,17 +1004,18 @@ public class FSImageFormat { * Save children INodes. * @param children The list of children INodes * @param out The DataOutputStream to write + * @param counter Counter to increment for namenode startup progress * @return Number of children that are directory */ - private int saveChildren(ReadOnlyList children, DataOutputStream out) - throws IOException { + private int saveChildren(ReadOnlyList children, DataOutputStream out, + Counter counter) throws IOException { // Write normal children INode. out.writeInt(children.size()); int dirNum = 0; int i = 0; for(INode child : children) { // print all children first - FSImageSerialization.saveINode2Image(child, out, false, referenceMap); + saveINode2Image(child, out, false, referenceMap, counter); if (child.isDirectory()) { dirNum++; } @@ -983,9 +1038,10 @@ public class FSImageFormat { * @param toSaveSubtree Whether or not to save the subtree to fsimage. For * reference node, its subtree may already have been * saved before. + * @param counter Counter to increment for namenode startup progress */ private void saveImage(INodeDirectory current, DataOutputStream out, - boolean toSaveSubtree) throws IOException { + boolean toSaveSubtree, Counter counter) throws IOException { // write the inode id of the directory out.writeLong(current.getId()); @@ -1014,7 +1070,7 @@ public class FSImageFormat { } // 3. Write children INode - dirNum += saveChildren(children, out); + dirNum += saveChildren(children, out, counter); // 4. Write DirectoryDiff lists, if there is any. SnapshotFSImageFormat.saveDirectoryDiffList(current, out, referenceMap); @@ -1029,16 +1085,39 @@ public class FSImageFormat { // make sure we only save the subtree under a reference node once boolean toSave = child.isReference() ? referenceMap.toProcessSubtree(child.getId()) : true; - saveImage(child.asDirectory(), out, toSave); + saveImage(child.asDirectory(), out, toSave, counter); } if (snapshotDirs != null) { for (INodeDirectory subDir : snapshotDirs) { // make sure we only save the subtree under a reference node once boolean toSave = subDir.getParentReference() != null ? referenceMap.toProcessSubtree(subDir.getId()) : true; - saveImage(subDir, out, toSave); + saveImage(subDir, out, toSave, counter); } } } + + /** + * Saves inode and increments progress counter. + * + * @param inode INode to save + * @param out DataOutputStream to receive inode + * @param writeUnderConstruction boolean true if this is under construction + * @param referenceMap ReferenceMap containing reference inodes + * @param counter Counter to increment for namenode startup progress + * @throws IOException thrown if there is an I/O error + */ + private void saveINode2Image(INode inode, DataOutputStream out, + boolean writeUnderConstruction, ReferenceMap referenceMap, + Counter counter) throws IOException { + FSImageSerialization.saveINode2Image(inode, out, writeUnderConstruction, + referenceMap); + // Intentionally do not increment counter for reference inodes, because it + // is too difficult at this point to assess whether or not this is a + // reference that counts toward quota. + if (!(inode instanceof INodeReference)) { + counter.increment(); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 2a69d20fbc9..134c12ee61e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -169,6 +169,12 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; @@ -328,7 +334,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); final DelegationTokenSecretManager dtSecretManager; private final boolean alwaysUseDelegationTokensForTests; - + + private static final Step STEP_AWAITING_REPORTED_BLOCKS = + new Step(StepType.AWAITING_REPORTED_BLOCKS); + // Tracks whether the default audit logger is the only configured audit // logger; this allows isAuditEnabled() to return false in case the // underlying logger is disabled, and avoid some unnecessary work. @@ -716,8 +725,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { // We shouldn't be calling saveNamespace if we've come up in standby state. MetaRecoveryContext recovery = startOpt.createRecoveryContext(); - if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) { + boolean needToSave = + fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled; + if (needToSave) { fsImage.saveNamespace(this); + } else { + // No need to save, so mark the phase done. + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginPhase(Phase.SAVING_CHECKPOINT); + prog.endPhase(Phase.SAVING_CHECKPOINT); } // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state @@ -776,6 +792,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkAvailableResources(); assert safeMode != null && !safeMode.isPopulatingReplQueues(); + StartupProgress prog = NameNode.getStartupProgress(); + prog.beginPhase(Phase.SAFEMODE); + prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, + getCompleteBlocksTotal()); setBlockTotal(); blockManager.activate(conf); } finally { @@ -4077,6 +4097,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private boolean resourcesLow = false; /** Should safemode adjust its block totals as blocks come in */ private boolean shouldIncrementallyTrackBlocks = false; + /** counter for tracking startup progress of reported blocks */ + private Counter awaitingReportedBlocksCounter; /** * Creates SafeModeInfo when the name node enters @@ -4197,6 +4219,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + blockManager.numOfUnderReplicatedBlocks() + " blocks"); startSecretManagerIfNecessary(); + + // If startup has not yet completed, end safemode phase. + StartupProgress prog = NameNode.getStartupProgress(); + if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { + prog.endStep(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS); + prog.endPhase(Phase.SAFEMODE); + } } /** @@ -4314,6 +4343,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private synchronized void incrementSafeBlockCount(short replication) { if (replication == safeReplication) { this.blockSafe++; + + // Report startup progress only if we haven't completed startup yet. + StartupProgress prog = NameNode.getStartupProgress(); + if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) { + if (this.awaitingReportedBlocksCounter == null) { + this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE, + STEP_AWAITING_REPORTED_BLOCKS); + } + this.awaitingReportedBlocksCounter.increment(); + } + checkMode(); } } @@ -5671,9 +5711,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** * @param out save state of the secret manager + * @param sdPath String storage directory path */ - void saveSecretManagerState(DataOutputStream out) throws IOException { - dtSecretManager.saveSecretManagerState(out); + void saveSecretManagerState(DataOutputStream out, String sdPath) + throws IOException { + dtSecretManager.saveSecretManagerState(out, sdPath); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 93cf482ab21..08f73b29fa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressMetrics; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -260,6 +262,10 @@ public class NameNode { } static NameNodeMetrics metrics; + private static final StartupProgress startupProgress = new StartupProgress(); + static { + StartupProgressMetrics.register(startupProgress); + } /** Return the {@link FSNamesystem} object. * @return {@link FSNamesystem} object. @@ -279,7 +285,16 @@ public class NameNode { public static NameNodeMetrics getNameNodeMetrics() { return metrics; } - + + /** + * Returns object used for reporting namenode startup progress. + * + * @return StartupProgress for reporting namenode startup progress + */ + public static StartupProgress getStartupProgress() { + return startupProgress; + } + public static InetSocketAddress getAddress(String address) { return NetUtils.createSocketAddr(address, DEFAULT_PORT); } @@ -432,15 +447,19 @@ public class NameNode { loginAsNameNodeUser(conf); NameNode.initMetrics(conf, this.getRole()); + + if (NamenodeRole.NAMENODE == role) { + startHttpServer(conf); + validateConfigurationSettingsOrAbort(conf); + } loadNamesystem(conf); rpcServer = createRpcServer(conf); - - try { - validateConfigurationSettings(conf); - } catch (IOException e) { - LOG.fatal(e.toString()); - throw e; + if (NamenodeRole.NAMENODE == role) { + httpServer.setNameNodeAddress(getNameNodeAddress()); + httpServer.setFSImage(getFSImage()); + } else { + validateConfigurationSettingsOrAbort(conf); } startCommonServices(conf); @@ -477,10 +496,31 @@ public class NameNode { } } + /** + * Validate NameNode configuration. Log a fatal error and abort if + * configuration is invalid. + * + * @param conf Configuration to validate + * @throws IOException thrown if conf is invalid + */ + private void validateConfigurationSettingsOrAbort(Configuration conf) + throws IOException { + try { + validateConfigurationSettings(conf); + } catch (IOException e) { + LOG.fatal(e.toString()); + throw e; + } + } + /** Start the services common to active and standby states */ private void startCommonServices(Configuration conf) throws IOException { namesystem.startCommonServices(conf, haContext); - startHttpServer(conf); + if (NamenodeRole.NAMENODE != role) { + startHttpServer(conf); + httpServer.setNameNodeAddress(getNameNodeAddress()); + httpServer.setFSImage(getFSImage()); + } rpcServer.start(); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); @@ -548,6 +588,7 @@ public class NameNode { private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerAddress(conf)); httpServer.start(); + httpServer.setStartupProgress(startupProgress); setHttpServerAddress(conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java index 32e9fd15f05..7c27a6bf4d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.web.AuthFilter; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; @@ -55,6 +56,7 @@ public class NameNodeHttpServer { public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address"; public static final String FSIMAGE_ATTRIBUTE_KEY = "name.system.image"; protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node"; + public static final String STARTUP_PROGRESS_ATTRIBUTE_KEY = "startup.progress"; public NameNodeHttpServer( Configuration conf, @@ -146,9 +148,6 @@ public class NameNodeHttpServer { .getPort()); } httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); - httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY, - NetUtils.getConnectAddress(nn.getNameNodeAddress())); - httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, nn.getFSImage()); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); setupServlets(httpServer, conf); httpServer.start(); @@ -166,6 +165,34 @@ public class NameNodeHttpServer { return httpAddress; } + /** + * Sets fsimage for use by servlets. + * + * @param fsImage FSImage to set + */ + public void setFSImage(FSImage fsImage) { + httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, fsImage); + } + + /** + * Sets address of namenode for use by servlets. + * + * @param nameNodeAddress InetSocketAddress to set + */ + public void setNameNodeAddress(InetSocketAddress nameNodeAddress) { + httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY, + NetUtils.getConnectAddress(nameNodeAddress)); + } + + /** + * Sets startup progress of namenode for use by servlets. + * + * @param prog StartupProgress to set + */ + public void setStartupProgress(StartupProgress prog) { + httpServer.setAttribute(STARTUP_PROGRESS_ATTRIBUTE_KEY, prog); + } + private static void setupServlets(HttpServer httpServer, Configuration conf) { httpServer.addInternalServlet("getDelegationToken", GetDelegationTokenServlet.PATH_SPEC, @@ -207,4 +234,15 @@ public class NameNodeHttpServer { return (InetSocketAddress)context.getAttribute( NAMENODE_ADDRESS_ATTRIBUTE_KEY); } + + /** + * Returns StartupProgress associated with ServletContext. + * + * @param context ServletContext to get + * @return StartupProgress associated with context + */ + public static StartupProgress getStartupProgressFromContext( + ServletContext context) { + return (StartupProgress)context.getAttribute(STARTUP_PROGRESS_ATTRIBUTE_KEY); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java index 7d8135669d0..023f98e5ad9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java @@ -122,6 +122,11 @@ class RedundantEditLogInputStream extends EditLogInputStream { }); } + @Override + public String getCurrentStreamName() { + return streams[curIdx].getCurrentStreamName(); + } + @Override public String getName() { StringBuilder bld = new StringBuilder(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/AbstractTracking.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/AbstractTracking.java new file mode 100644 index 00000000000..668ce175294 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/AbstractTracking.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Abstract base of internal data structures used for tracking progress. For + * primitive long properties, {@link Long#MIN_VALUE} is used as a sentinel value + * to indicate that the property is undefined. + */ +@InterfaceAudience.Private +abstract class AbstractTracking implements Cloneable { + long beginTime = Long.MIN_VALUE; + long endTime = Long.MIN_VALUE; + + /** + * Subclass instances may call this method during cloning to copy the values of + * all properties stored in this base class. + * + * @param dest AbstractTracking destination for copying properties + */ + protected void copy(AbstractTracking dest) { + dest.beginTime = beginTime; + dest.endTime = endTime; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Phase.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Phase.java new file mode 100644 index 00000000000..750947278e9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Phase.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Indicates a particular phase of the namenode startup sequence. The phases + * are listed here in their execution order. + */ +@InterfaceAudience.Private +public enum Phase { + /** + * The namenode is loading the fsimage file into memory. + */ + LOADING_FSIMAGE("LoadingFsImage", "Loading fsimage"), + + /** + * The namenode is loading the edits file and applying its operations to the + * in-memory metadata. + */ + LOADING_EDITS("LoadingEdits", "Loading edits"), + + /** + * The namenode is saving a new checkpoint. + */ + SAVING_CHECKPOINT("SavingCheckpoint", "Saving checkpoint"), + + /** + * The namenode has entered safemode, awaiting block reports from data nodes. + */ + SAFEMODE("SafeMode", "Safe mode"); + + private final String name, description; + + /** + * Returns phase description. + * + * @return String description + */ + public String getDescription() { + return description; + } + + /** + * Returns phase name. + * + * @return String phase name + */ + public String getName() { + return name; + } + + /** + * Private constructor of enum. + * + * @param name String phase name + * @param description String phase description + */ + private Phase(String name, String description) { + this.name = name; + this.description = description; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/PhaseTracking.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/PhaseTracking.java new file mode 100644 index 00000000000..3bdce3a00fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/PhaseTracking.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Internal data structure used to track progress of a {@link Phase}. + */ +@InterfaceAudience.Private +final class PhaseTracking extends AbstractTracking { + String file; + long size = Long.MIN_VALUE; + ConcurrentMap steps = + new ConcurrentHashMap(); + + @Override + public PhaseTracking clone() { + PhaseTracking clone = new PhaseTracking(); + super.copy(clone); + clone.file = file; + clone.size = size; + for (Map.Entry entry: steps.entrySet()) { + clone.steps.put(entry.getKey(), entry.getValue().clone()); + } + return clone; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgress.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgress.java new file mode 100644 index 00000000000..f242bc57119 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgress.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.util.EnumSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * StartupProgress is used in various parts of the namenode codebase to indicate + * startup progress. Its methods provide ways to indicate begin and end of a + * {@link Phase} or {@link Step} within a phase. Additional methods provide ways + * to associate a step or phase with optional information, such as a file name or + * file size. It also provides counters, which can be incremented by the caller + * to indicate progress through a long-running task. + * + * This class is thread-safe. Any number of threads may call any methods, even + * for the same phase or step, without risk of corrupting internal state. For + * all begin/end methods and set methods, the last one in wins, overwriting any + * prior writes. Instances of {@link Counter} provide an atomic increment + * operation to prevent lost updates. + * + * After startup completes, the tracked data is frozen. Any subsequent updates + * or counter increments are no-ops. + * + * For read access, call {@link #createView()} to create a consistent view with + * a clone of the data. + */ +@InterfaceAudience.Private +public class StartupProgress { + // package-private for access by StartupProgressView + Map phases = + new ConcurrentHashMap(); + + /** + * Allows a caller to increment a counter for tracking progress. + */ + public static interface Counter { + /** + * Atomically increments this counter, adding 1 to the current value. + */ + void increment(); + } + + /** + * Creates a new StartupProgress by initializing internal data structure for + * tracking progress of all defined phases. + */ + public StartupProgress() { + for (Phase phase: EnumSet.allOf(Phase.class)) { + phases.put(phase, new PhaseTracking()); + } + } + + /** + * Begins execution of the specified phase. + * + * @param phase Phase to begin + */ + public void beginPhase(Phase phase) { + if (!isComplete()) { + phases.get(phase).beginTime = monotonicNow(); + } + } + + /** + * Begins execution of the specified step within the specified phase. + * + * @param phase Phase to begin + * @param step Step to begin + */ + public void beginStep(Phase phase, Step step) { + if (!isComplete()) { + lazyInitStep(phase, step).beginTime = monotonicNow(); + } + } + + /** + * Ends execution of the specified phase. + * + * @param phase Phase to end + */ + public void endPhase(Phase phase) { + if (!isComplete()) { + phases.get(phase).endTime = monotonicNow(); + } + } + + /** + * Ends execution of the specified step within the specified phase. + * + * @param phase Phase to end + * @param step Step to end + */ + public void endStep(Phase phase, Step step) { + if (!isComplete()) { + lazyInitStep(phase, step).endTime = monotonicNow(); + } + } + + /** + * Returns the current run status of the specified phase. + * + * @param phase Phase to get + * @return Status run status of phase + */ + public Status getStatus(Phase phase) { + PhaseTracking tracking = phases.get(phase); + if (tracking.beginTime == Long.MIN_VALUE) { + return Status.PENDING; + } else if (tracking.endTime == Long.MIN_VALUE) { + return Status.RUNNING; + } else { + return Status.COMPLETE; + } + } + + /** + * Returns a counter associated with the specified phase and step. Typical + * usage is to increment a counter within a tight loop. Callers may use this + * method to obtain a counter once and then increment that instance repeatedly + * within a loop. This prevents redundant lookup operations and object + * creation within the tight loop. Incrementing the counter is an atomic + * operation, so there is no risk of lost updates even if multiple threads + * increment the same counter. + * + * @param phase Phase to get + * @param step Step to get + * @return Counter associated with phase and step + */ + public Counter getCounter(Phase phase, Step step) { + final StepTracking tracking = lazyInitStep(phase, step); + if (!isComplete()) { + return new Counter() { + @Override + public void increment() { + tracking.count.incrementAndGet(); + } + }; + } else { + return new Counter() { + @Override + public void increment() { + // no-op, because startup has completed + } + }; + } + } + + /** + * Sets counter to the specified value. + * + * @param phase Phase to set + * @param step Step to set + * @param count long to set + */ + public void setCount(Phase phase, Step step, long count) { + lazyInitStep(phase, step).count.set(count); + } + + /** + * Sets the optional file name associated with the specified phase. For + * example, this can be used while loading fsimage to indicate the full path to + * the fsimage file. + * + * @param phase Phase to set + * @param file String file name to set + */ + public void setFile(Phase phase, String file) { + if (!isComplete()) { + phases.get(phase).file = file; + } + } + + /** + * Sets the optional size in bytes associated with the specified phase. For + * example, this can be used while loading fsimage to indicate the size of the + * fsimage file. + * + * @param phase Phase to set + * @param size long to set + */ + public void setSize(Phase phase, long size) { + if (!isComplete()) { + phases.get(phase).size = size; + } + } + + /** + * Sets the total associated with the specified phase and step. For example, + * this can be used while loading edits to indicate the number of operations to + * be applied. + * + * @param phase Phase to set + * @param step Step to set + * @param total long to set + */ + public void setTotal(Phase phase, Step step, long total) { + if (!isComplete()) { + lazyInitStep(phase, step).total = total; + } + } + + /** + * Creates a {@link StartupProgressView} containing data cloned from this + * StartupProgress. Subsequent updates to this StartupProgress will not be + * shown in the view. This gives a consistent, unchanging view for callers + * that need to perform multiple related read operations. Calculations that + * require aggregation, such as overall percent complete, will not be impacted + * by mutations performed in other threads mid-way through the calculation. + * + * @return StartupProgressView containing cloned data + */ + public StartupProgressView createView() { + return new StartupProgressView(this); + } + + /** + * Returns true if the entire startup process has completed, determined by + * checking if each phase is complete. + * + * @return boolean true if the entire startup process has completed + */ + private boolean isComplete() { + for (Phase phase: EnumSet.allOf(Phase.class)) { + if (getStatus(phase) != Status.COMPLETE) { + return false; + } + } + return true; + } + + /** + * Lazily initializes the internal data structure for tracking the specified + * phase and step. Returns either the newly initialized data structure or the + * existing one. Initialization is atomic, so there is no risk of lost updates + * even if multiple threads attempt to initialize the same step simultaneously. + * + * @param phase Phase to initialize + * @param step Step to initialize + * @return StepTracking newly initialized, or existing if found + */ + private StepTracking lazyInitStep(Phase phase, Step step) { + ConcurrentMap steps = phases.get(phase).steps; + if (!steps.containsKey(step)) { + steps.putIfAbsent(step, new StepTracking()); + } + return steps.get(step); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressMetrics.java new file mode 100644 index 00000000000..c637b84866f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressMetrics.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; +import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressView; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +/** + * Links {@link StartupProgress} to a {@link MetricsSource} to expose its + * information via JMX. + */ +@InterfaceAudience.Private +public class StartupProgressMetrics implements MetricsSource { + + private static final MetricsInfo STARTUP_PROGRESS_METRICS_INFO = + info("StartupProgress", "NameNode startup progress"); + + private final StartupProgress startupProgress; + + /** + * Registers StartupProgressMetrics linked to the given StartupProgress. + * + * @param prog StartupProgress to link + */ + public static void register(StartupProgress prog) { + new StartupProgressMetrics(prog); + } + + /** + * Creates a new StartupProgressMetrics registered with the metrics system. + * + * @param startupProgress StartupProgress to link + */ + public StartupProgressMetrics(StartupProgress startupProgress) { + this.startupProgress = startupProgress; + DefaultMetricsSystem.instance().register( + STARTUP_PROGRESS_METRICS_INFO.name(), + STARTUP_PROGRESS_METRICS_INFO.description(), this); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + StartupProgressView prog = startupProgress.createView(); + MetricsRecordBuilder builder = collector.addRecord( + STARTUP_PROGRESS_METRICS_INFO); + + builder.addCounter(info("ElapsedTime", "overall elapsed time"), + prog.getElapsedTime()); + builder.addGauge(info("PercentComplete", "overall percent complete"), + prog.getPercentComplete()); + + for (Phase phase: prog.getPhases()) { + addCounter(builder, phase, "Count", " count", prog.getCount(phase)); + addCounter(builder, phase, "ElapsedTime", " elapsed time", + prog.getElapsedTime(phase)); + addCounter(builder, phase, "Total", " total", prog.getTotal(phase)); + addGauge(builder, phase, "PercentComplete", " percent complete", + prog.getPercentComplete(phase)); + } + } + + /** + * Adds a counter with a name built by using the specified phase's name as + * prefix and then appending the specified suffix. + * + * @param builder MetricsRecordBuilder to receive counter + * @param phase Phase to add + * @param nameSuffix String suffix of metric name + * @param descSuffix String suffix of metric description + * @param value long counter value + */ + private static void addCounter(MetricsRecordBuilder builder, Phase phase, + String nameSuffix, String descSuffix, long value) { + MetricsInfo metricsInfo = info(phase.getName() + nameSuffix, + phase.getDescription() + descSuffix); + builder.addCounter(metricsInfo, value); + } + + /** + * Adds a gauge with a name built by using the specified phase's name as prefix + * and then appending the specified suffix. + * + * @param builder MetricsRecordBuilder to receive counter + * @param phase Phase to add + * @param nameSuffix String suffix of metric name + * @param descSuffix String suffix of metric description + * @param value float gauge value + */ + private static void addGauge(MetricsRecordBuilder builder, Phase phase, + String nameSuffix, String descSuffix, float value) { + MetricsInfo metricsInfo = info(phase.getName() + nameSuffix, + phase.getDescription() + descSuffix); + builder.addGauge(metricsInfo, value); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressView.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressView.java new file mode 100644 index 00000000000..3101741d958 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressView.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.Time; + +/** + * StartupProgressView is an immutable, consistent, read-only view of namenode + * startup progress. Callers obtain an instance by calling + * {@link StartupProgress#createView()} to clone current startup progress state. + * Subsequent updates to startup progress will not alter the view. This isolates + * the reader from ongoing updates and establishes a guarantee that the values + * returned by the view are consistent and unchanging across multiple related + * read operations. Calculations that require aggregation, such as overall + * percent complete, will not be impacted by mutations performed in other threads + * mid-way through the calculation. + * + * Methods that return primitive long may return {@link Long#MIN_VALUE} as a + * sentinel value to indicate that the property is undefined. + */ +@InterfaceAudience.Private +public class StartupProgressView { + + private final Map phases; + + /** + * Returns the sum of the counter values for all steps in the specified phase. + * + * @param phase Phase to get + * @return long sum of counter values for all steps + */ + public long getCount(Phase phase) { + long sum = 0; + for (Step step: getSteps(phase)) { + sum += getCount(phase, step); + } + return sum; + } + + /** + * Returns the counter value for the specified phase and step. + * + * @param phase Phase to get + * @param step Step to get + * @return long counter value for phase and step + */ + public long getCount(Phase phase, Step step) { + StepTracking tracking = getStepTracking(phase, step); + return tracking != null ? tracking.count.get() : 0; + } + + /** + * Returns overall elapsed time, calculated as time between start of loading + * fsimage and end of safemode. + * + * @return long elapsed time + */ + public long getElapsedTime() { + return getElapsedTime(phases.get(Phase.LOADING_FSIMAGE), + phases.get(Phase.SAFEMODE)); + } + + /** + * Returns elapsed time for the specified phase, calculated as (end - begin) if + * phase is complete or (now - begin) if phase is running or 0 if the phase is + * still pending. + * + * @param phase Phase to get + * @return long elapsed time + */ + public long getElapsedTime(Phase phase) { + return getElapsedTime(phases.get(phase)); + } + + /** + * Returns elapsed time for the specified phase and step, calculated as + * (end - begin) if step is complete or (now - begin) if step is running or 0 + * if the step is still pending. + * + * @param phase Phase to get + * @param step Step to get + * @return long elapsed time + */ + public long getElapsedTime(Phase phase, Step step) { + return getElapsedTime(getStepTracking(phase, step)); + } + + /** + * Returns the optional file name associated with the specified phase, possibly + * null. + * + * @param phase Phase to get + * @return String optional file name, possibly null + */ + public String getFile(Phase phase) { + return phases.get(phase).file; + } + + /** + * Returns overall percent complete, calculated by aggregating percent complete + * of all phases. This is an approximation that assumes all phases have equal + * running time. In practice, this isn't true, but there isn't sufficient + * information available to predict proportional weights for each phase. + * + * @return float percent complete + */ + public float getPercentComplete() { + if (getStatus(Phase.SAFEMODE) == Status.COMPLETE) { + return 1.0f; + } else { + float total = 0.0f; + int numPhases = 0; + for (Phase phase: phases.keySet()) { + ++numPhases; + total += getPercentComplete(phase); + } + return getBoundedPercent(total / numPhases); + } + } + + /** + * Returns percent complete for the specified phase, calculated by aggregating + * the counter values and totals for all steps within the phase. + * + * @param phase Phase to get + * @return float percent complete + */ + public float getPercentComplete(Phase phase) { + if (getStatus(phase) == Status.COMPLETE) { + return 1.0f; + } else { + long total = getTotal(phase); + long count = 0; + for (Step step: getSteps(phase)) { + count += getCount(phase, step); + } + return total > 0 ? getBoundedPercent(1.0f * count / total) : 0.0f; + } + } + + /** + * Returns percent complete for the specified phase and step, calculated as + * counter value divided by total. + * + * @param phase Phase to get + * @param step Step to get + * @return float percent complete + */ + public float getPercentComplete(Phase phase, Step step) { + if (getStatus(phase) == Status.COMPLETE) { + return 1.0f; + } else { + long total = getTotal(phase, step); + long count = getCount(phase, step); + return total > 0 ? getBoundedPercent(1.0f * count / total) : 0.0f; + } + } + + /** + * Returns all phases. + * + * @return Iterable containing all phases + */ + public Iterable getPhases() { + return EnumSet.allOf(Phase.class); + } + + /** + * Returns all steps within a phase. + * + * @param phase Phase to get + * @return Iterable all steps + */ + public Iterable getSteps(Phase phase) { + return new TreeSet(phases.get(phase).steps.keySet()); + } + + /** + * Returns the optional size in bytes associated with the specified phase, + * possibly Long.MIN_VALUE if undefined. + * + * @param phase Phase to get + * @return long optional size in bytes, possibly Long.MIN_VALUE + */ + public long getSize(Phase phase) { + return phases.get(phase).size; + } + + /** + * Returns the current run status of the specified phase. + * + * @param phase Phase to get + * @return Status run status of phase + */ + public Status getStatus(Phase phase) { + PhaseTracking tracking = phases.get(phase); + if (tracking.beginTime == Long.MIN_VALUE) { + return Status.PENDING; + } else if (tracking.endTime == Long.MIN_VALUE) { + return Status.RUNNING; + } else { + return Status.COMPLETE; + } + } + + /** + * Returns the sum of the totals for all steps in the specified phase. + * + * @param phase Phase to get + * @return long sum of totals for all steps + */ + public long getTotal(Phase phase) { + long sum = 0; + for (StepTracking tracking: phases.get(phase).steps.values()) { + if (tracking.total != Long.MIN_VALUE) { + sum += tracking.total; + } + } + return sum; + } + + /** + * Returns the total for the specified phase and step. + * + * @param phase Phase to get + * @param step Step to get + * @return long total + */ + public long getTotal(Phase phase, Step step) { + StepTracking tracking = getStepTracking(phase, step); + return tracking != null && tracking.total != Long.MIN_VALUE ? + tracking.total : 0; + } + + /** + * Creates a new StartupProgressView by cloning data from the specified + * StartupProgress. + * + * @param prog StartupProgress to clone + */ + StartupProgressView(StartupProgress prog) { + phases = new HashMap(); + for (Map.Entry entry: prog.phases.entrySet()) { + phases.put(entry.getKey(), entry.getValue().clone()); + } + } + + /** + * Returns elapsed time, calculated as (end - begin) if both are defined or + * (now - begin) if end is undefined or 0 if both are undefined. Begin and end + * time come from the same AbstractTracking instance. + * + * @param tracking AbstractTracking containing begin and end time + * @return long elapsed time + */ + private long getElapsedTime(AbstractTracking tracking) { + return getElapsedTime(tracking, tracking); + } + + /** + * Returns elapsed time, calculated as (end - begin) if both are defined or + * (now - begin) if end is undefined or 0 if both are undefined. Begin and end + * time may come from different AbstractTracking instances. + * + * @param beginTracking AbstractTracking containing begin time + * @param endTracking AbstractTracking containing end time + * @return long elapsed time + */ + private long getElapsedTime(AbstractTracking beginTracking, + AbstractTracking endTracking) { + final long elapsed; + if (beginTracking != null && beginTracking.beginTime != Long.MIN_VALUE && + endTracking != null && endTracking.endTime != Long.MIN_VALUE) { + elapsed = endTracking.endTime - beginTracking.beginTime; + } else if (beginTracking != null && + beginTracking.beginTime != Long.MIN_VALUE) { + elapsed = Time.monotonicNow() - beginTracking.beginTime; + } else { + elapsed = 0; + } + return Math.max(0, elapsed); + } + + /** + * Returns the StepTracking internal data structure for the specified phase + * and step, possibly null if not found. + * + * @param phase Phase to get + * @param step Step to get + * @return StepTracking for phase and step, possibly null + */ + private StepTracking getStepTracking(Phase phase, Step step) { + PhaseTracking phaseTracking = phases.get(phase); + Map steps = phaseTracking != null ? + phaseTracking.steps : null; + return steps != null ? steps.get(step) : null; + } + + /** + * Returns the given value restricted to the range [0.0, 1.0]. + * + * @param percent float value to restrict + * @return float value restricted to range [0.0, 1.0] + */ + private static float getBoundedPercent(float percent) { + return Math.max(0.0f, Math.min(1.0f, percent)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Status.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Status.java new file mode 100644 index 00000000000..052e937e42f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Status.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Indicates run status of a {@link Phase}. + */ +@InterfaceAudience.Private +public enum Status { + /** + * The phase has not yet started running. + */ + PENDING, + + /** + * The phase is running right now. + */ + RUNNING, + + /** + * The phase has already completed. + */ + COMPLETE +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Step.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Step.java new file mode 100644 index 00000000000..9b23e09e493 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/Step.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang.builder.CompareToBuilder; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A step performed by the namenode during a {@link Phase} of startup. + */ +@InterfaceAudience.Private +public class Step implements Comparable { + private static final AtomicInteger SEQUENCE = new AtomicInteger(); + + private final String file; + private final int sequenceNumber; + private final long size; + private final StepType type; + + /** + * Creates a new Step. + * + * @param type StepType type of step + */ + public Step(StepType type) { + this(type, null, Long.MIN_VALUE); + } + + /** + * Creates a new Step. + * + * @param file String file + */ + public Step(String file) { + this(null, file, Long.MIN_VALUE); + } + + /** + * Creates a new Step. + * + * @param file String file + * @param size long size in bytes + */ + public Step(String file, long size) { + this(null, file, size); + } + + /** + * Creates a new Step. + * + * @param type StepType type of step + * @param file String file + */ + public Step(StepType type, String file) { + this(type, file, Long.MIN_VALUE); + } + + /** + * Creates a new Step. + * + * @param type StepType type of step + * @param file String file + * @param size long size in bytes + */ + public Step(StepType type, String file, long size) { + this.file = file; + this.sequenceNumber = SEQUENCE.incrementAndGet(); + this.size = size; + this.type = type; + } + + @Override + public int compareTo(Step other) { + // Sort steps by file and then sequentially within the file to achieve the + // desired order. There is no concurrent map structure in the JDK that + // maintains insertion order, so instead we attach a sequence number to each + // step and sort on read. + return new CompareToBuilder().append(file, other.file) + .append(sequenceNumber, other.sequenceNumber).toComparison(); + } + + @Override + public boolean equals(Object otherObj) { + if (otherObj == null || otherObj.getClass() != getClass()) { + return false; + } + Step other = (Step)otherObj; + return new EqualsBuilder().append(this.file, other.file) + .append(this.size, other.size).append(this.type, other.type).isEquals(); + } + + /** + * Returns the optional file name, possibly null. + * + * @return String optional file name, possibly null + */ + public String getFile() { + return file; + } + + /** + * Returns the optional size in bytes, possibly Long.MIN_VALUE if undefined. + * + * @return long optional size in bytes, possibly Long.MIN_VALUE + */ + public long getSize() { + return size; + } + + /** + * Returns the optional step type, possibly null. + * + * @return StepType optional step type, possibly null + */ + public StepType getType() { + return type; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(file).append(size).append(type) + .toHashCode(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepTracking.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepTracking.java new file mode 100644 index 00000000000..bc224ec5670 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepTracking.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Internal data structure used to track progress of a {@link Step}. + */ +@InterfaceAudience.Private +final class StepTracking extends AbstractTracking { + AtomicLong count = new AtomicLong(); + long total = Long.MIN_VALUE; + + @Override + public StepTracking clone() { + StepTracking clone = new StepTracking(); + super.copy(clone); + clone.count = new AtomicLong(count.get()); + clone.total = total; + return clone; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java new file mode 100644 index 00000000000..2ef9c8e7013 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Indicates a particular type of {@link Step}. + */ +@InterfaceAudience.Private +public enum StepType { + /** + * The namenode has entered safemode and is awaiting block reports from + * datanodes. + */ + AWAITING_REPORTED_BLOCKS("AwaitingReportedBlocks", "awaiting reported blocks"), + + /** + * The namenode is performing an operation related to delegation keys. + */ + DELEGATION_KEYS("DelegationKeys", "delegation keys"), + + /** + * The namenode is performing an operation related to delegation tokens. + */ + DELEGATION_TOKENS("DelegationTokens", "delegation tokens"), + + /** + * The namenode is performing an operation related to inodes. + */ + INODES("Inodes", "inodes"); + + private final String name, description; + + /** + * Private constructor of enum. + * + * @param name String step type name + * @param description String step type description + */ + private StepType(String name, String description) { + this.name = name; + this.description = description; + } + + /** + * Returns step type description. + * + * @return String step type description + */ + public String getDescription() { + return description; + } + + /** + * Returns step type name. + * + * @return String step type name + */ + public String getName() { + return name; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/package-info.java new file mode 100644 index 00000000000..e6a8b97fb3f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/package-info.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides a mechanism for tracking {@link NameNode} startup + * progress. The package models NameNode startup as a series of {@link Phase}s, + * with each phase further sub-divided into multiple {@link Step}s. All phases + * are coarse-grained and typically known in advance, implied by the structure of + * the NameNode codebase (example: loading fsimage). Steps are more granular and + * typically only known at runtime after startup begins (example: loading a + * specific fsimage file with a known length from a particular location). + * + * {@link StartupProgress} provides a thread-safe data structure for + * recording status information and counters. Various parts of the NameNode + * codebase use this to describe the NameNode's activities during startup. + * + * {@link StartupProgressView} provides an immutable, consistent view of the + * current state of NameNode startup progress. This can be used to present the + * data to a user. + * + * {@link StartupProgressMetrics} exposes startup progress information via JMX + * through the standard metrics system. + */ +@InterfaceAudience.Private +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.NameNode; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressTestHelper.java new file mode 100644 index 00000000000..3dcd7e38467 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StartupProgressTestHelper.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType.*; + +/** + * Utility methods that help with writing tests covering startup progress. + */ +public class StartupProgressTestHelper { + + /** + * Increments a counter a certain number of times. + * + * @param prog StartupProgress to increment + * @param phase Phase to increment + * @param step Step to increment + * @param delta long number of times to increment + */ + public static void incrementCounter(StartupProgress prog, Phase phase, + Step step, long delta) { + StartupProgress.Counter counter = prog.getCounter(phase, step); + for (long i = 0; i < delta; ++i) { + counter.increment(); + } + } + + /** + * Sets up StartupProgress to a state part-way through the startup sequence. + * + * @param prog StartupProgress to set + */ + public static void setStartupProgressForRunningState(StartupProgress prog) { + prog.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + prog.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + prog.setTotal(LOADING_FSIMAGE, loadingFsImageInodes, 100L); + incrementCounter(prog, LOADING_FSIMAGE, loadingFsImageInodes, 100L); + prog.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + prog.endPhase(LOADING_FSIMAGE); + + prog.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + prog.beginStep(LOADING_EDITS, loadingEditsFile); + prog.setTotal(LOADING_EDITS, loadingEditsFile, 200L); + incrementCounter(prog, LOADING_EDITS, loadingEditsFile, 100L); + } + + /** + * Sets up StartupProgress to final state after startup sequence has completed. + * + * @param prog StartupProgress to set + */ + public static void setStartupProgressForFinalState(StartupProgress prog) { + prog.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + prog.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + prog.setTotal(LOADING_FSIMAGE, loadingFsImageInodes, 100L); + incrementCounter(prog, LOADING_FSIMAGE, loadingFsImageInodes, 100L); + prog.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + prog.endPhase(LOADING_FSIMAGE); + + prog.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + prog.beginStep(LOADING_EDITS, loadingEditsFile); + prog.setTotal(LOADING_EDITS, loadingEditsFile, 200L); + incrementCounter(prog, LOADING_EDITS, loadingEditsFile, 200L); + prog.endStep(LOADING_EDITS, loadingEditsFile); + prog.endPhase(LOADING_EDITS); + + prog.beginPhase(SAVING_CHECKPOINT); + Step savingCheckpointInodes = new Step(INODES); + prog.beginStep(SAVING_CHECKPOINT, savingCheckpointInodes); + prog.setTotal(SAVING_CHECKPOINT, savingCheckpointInodes, 300L); + incrementCounter(prog, SAVING_CHECKPOINT, savingCheckpointInodes, 300L); + prog.endStep(SAVING_CHECKPOINT, savingCheckpointInodes); + prog.endPhase(SAVING_CHECKPOINT); + + prog.beginPhase(SAFEMODE); + Step awaitingBlocks = new Step(AWAITING_REPORTED_BLOCKS); + prog.beginStep(SAFEMODE, awaitingBlocks); + prog.setTotal(SAFEMODE, awaitingBlocks, 400L); + incrementCounter(prog, SAFEMODE, awaitingBlocks, 400L); + prog.endStep(SAFEMODE, awaitingBlocks); + prog.endPhase(SAFEMODE); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgress.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgress.java new file mode 100644 index 00000000000..6172be68b74 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgress.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressTestHelper.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Status.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType.*; +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; + +public class TestStartupProgress { + + private StartupProgress startupProgress; + + @Before + public void setUp() { + startupProgress = new StartupProgress(); + } + + @Test(timeout=10000) + public void testCounter() { + startupProgress.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + incrementCounter(startupProgress, LOADING_FSIMAGE, loadingFsImageInodes, + 100L); + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + Step loadingFsImageDelegationKeys = new Step(DELEGATION_KEYS); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + incrementCounter(startupProgress, LOADING_FSIMAGE, + loadingFsImageDelegationKeys, 200L); + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.endPhase(LOADING_FSIMAGE); + + startupProgress.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + startupProgress.beginStep(LOADING_EDITS, loadingEditsFile); + incrementCounter(startupProgress, LOADING_EDITS, loadingEditsFile, 5000L); + + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals(100L, view.getCount(LOADING_FSIMAGE, loadingFsImageInodes)); + assertEquals(200L, view.getCount(LOADING_FSIMAGE, + loadingFsImageDelegationKeys)); + assertEquals(5000L, view.getCount(LOADING_EDITS, loadingEditsFile)); + assertEquals(0L, view.getCount(SAVING_CHECKPOINT, + new Step(INODES))); + + // Increment a counter again and check that the existing view was not + // modified, but a new view shows the updated value. + incrementCounter(startupProgress, LOADING_EDITS, loadingEditsFile, 1000L); + startupProgress.endStep(LOADING_EDITS, loadingEditsFile); + startupProgress.endPhase(LOADING_EDITS); + + assertEquals(5000L, view.getCount(LOADING_EDITS, loadingEditsFile)); + view = startupProgress.createView(); + assertNotNull(view); + assertEquals(6000L, view.getCount(LOADING_EDITS, loadingEditsFile)); + } + + @Test(timeout=10000) + public void testElapsedTime() throws Exception { + startupProgress.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + Thread.sleep(50L); // brief sleep to fake elapsed time + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + Step loadingFsImageDelegationKeys = new Step(DELEGATION_KEYS); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + Thread.sleep(50L); // brief sleep to fake elapsed time + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.endPhase(LOADING_FSIMAGE); + + startupProgress.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + startupProgress.beginStep(LOADING_EDITS, loadingEditsFile); + startupProgress.setTotal(LOADING_EDITS, loadingEditsFile, 10000L); + incrementCounter(startupProgress, LOADING_EDITS, loadingEditsFile, 5000L); + Thread.sleep(50L); // brief sleep to fake elapsed time + + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertTrue(view.getElapsedTime() > 0); + + assertTrue(view.getElapsedTime(LOADING_FSIMAGE) > 0); + assertTrue(view.getElapsedTime(LOADING_FSIMAGE, + loadingFsImageInodes) > 0); + assertTrue(view.getElapsedTime(LOADING_FSIMAGE, + loadingFsImageDelegationKeys) > 0); + + assertTrue(view.getElapsedTime(LOADING_EDITS) > 0); + assertTrue(view.getElapsedTime(LOADING_EDITS, loadingEditsFile) > 0); + + assertTrue(view.getElapsedTime(SAVING_CHECKPOINT) == 0); + assertTrue(view.getElapsedTime(SAVING_CHECKPOINT, + new Step(INODES)) == 0); + + // Brief sleep, then check that completed phases/steps have the same elapsed + // time, but running phases/steps have updated elapsed time. + long totalTime = view.getElapsedTime(); + long loadingFsImageTime = view.getElapsedTime(LOADING_FSIMAGE); + long loadingFsImageInodesTime = view.getElapsedTime(LOADING_FSIMAGE, + loadingFsImageInodes); + long loadingFsImageDelegationKeysTime = view.getElapsedTime(LOADING_FSIMAGE, + loadingFsImageInodes); + long loadingEditsTime = view.getElapsedTime(LOADING_EDITS); + long loadingEditsFileTime = view.getElapsedTime(LOADING_EDITS, + loadingEditsFile); + + Thread.sleep(50L); + + assertTrue(totalTime < view.getElapsedTime()); + assertEquals(loadingFsImageTime, view.getElapsedTime(LOADING_FSIMAGE)); + assertEquals(loadingFsImageInodesTime, view.getElapsedTime(LOADING_FSIMAGE, + loadingFsImageInodes)); + assertTrue(loadingEditsTime < view.getElapsedTime(LOADING_EDITS)); + assertTrue(loadingEditsFileTime < view.getElapsedTime(LOADING_EDITS, + loadingEditsFile)); + } + + @Test(timeout=10000) + public void testFrozenAfterStartupCompletes() { + // Do some updates and counter increments. + startupProgress.beginPhase(LOADING_FSIMAGE); + startupProgress.setFile(LOADING_FSIMAGE, "file1"); + startupProgress.setSize(LOADING_FSIMAGE, 1000L); + Step step = new Step(INODES); + startupProgress.beginStep(LOADING_FSIMAGE, step); + startupProgress.setTotal(LOADING_FSIMAGE, step, 10000L); + incrementCounter(startupProgress, LOADING_FSIMAGE, step, 100L); + startupProgress.endStep(LOADING_FSIMAGE, step); + startupProgress.endPhase(LOADING_FSIMAGE); + + // Force completion of phases, so that entire startup process is completed. + for (Phase phase: EnumSet.allOf(Phase.class)) { + if (startupProgress.getStatus(phase) != Status.COMPLETE) { + startupProgress.beginPhase(phase); + startupProgress.endPhase(phase); + } + } + + StartupProgressView before = startupProgress.createView(); + + // Attempt more updates and counter increments. + startupProgress.beginPhase(LOADING_FSIMAGE); + startupProgress.setFile(LOADING_FSIMAGE, "file2"); + startupProgress.setSize(LOADING_FSIMAGE, 2000L); + startupProgress.beginStep(LOADING_FSIMAGE, step); + startupProgress.setTotal(LOADING_FSIMAGE, step, 20000L); + incrementCounter(startupProgress, LOADING_FSIMAGE, step, 100L); + startupProgress.endStep(LOADING_FSIMAGE, step); + startupProgress.endPhase(LOADING_FSIMAGE); + + StartupProgressView after = startupProgress.createView(); + + // Expect that data was frozen after completion of entire startup process, so + // second set of updates and counter increments should have had no effect. + assertEquals(before.getCount(LOADING_FSIMAGE), + after.getCount(LOADING_FSIMAGE)); + assertEquals(before.getCount(LOADING_FSIMAGE, step), + after.getCount(LOADING_FSIMAGE, step)); + assertEquals(before.getElapsedTime(), after.getElapsedTime()); + assertEquals(before.getElapsedTime(LOADING_FSIMAGE), + after.getElapsedTime(LOADING_FSIMAGE)); + assertEquals(before.getElapsedTime(LOADING_FSIMAGE, step), + after.getElapsedTime(LOADING_FSIMAGE, step)); + assertEquals(before.getFile(LOADING_FSIMAGE), + after.getFile(LOADING_FSIMAGE)); + assertEquals(before.getSize(LOADING_FSIMAGE), + after.getSize(LOADING_FSIMAGE)); + assertEquals(before.getTotal(LOADING_FSIMAGE), + after.getTotal(LOADING_FSIMAGE)); + assertEquals(before.getTotal(LOADING_FSIMAGE, step), + after.getTotal(LOADING_FSIMAGE, step)); + } + + @Test(timeout=10000) + public void testInitialState() { + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals(0L, view.getElapsedTime()); + assertEquals(0.0f, view.getPercentComplete(), 0.001f); + List phases = new ArrayList(); + + for (Phase phase: view.getPhases()) { + phases.add(phase); + assertEquals(0L, view.getElapsedTime(phase)); + assertNull(view.getFile(phase)); + assertEquals(0.0f, view.getPercentComplete(phase), 0.001f); + assertEquals(Long.MIN_VALUE, view.getSize(phase)); + assertEquals(PENDING, view.getStatus(phase)); + assertEquals(0L, view.getTotal(phase)); + + for (Step step: view.getSteps(phase)) { + fail(String.format("unexpected step %s in phase %s at initial state", + step, phase)); + } + } + + assertArrayEquals(EnumSet.allOf(Phase.class).toArray(), phases.toArray()); + } + + @Test(timeout=10000) + public void testPercentComplete() { + startupProgress.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + startupProgress.setTotal(LOADING_FSIMAGE, loadingFsImageInodes, 1000L); + incrementCounter(startupProgress, LOADING_FSIMAGE, loadingFsImageInodes, + 100L); + Step loadingFsImageDelegationKeys = new Step(DELEGATION_KEYS); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.setTotal(LOADING_FSIMAGE, loadingFsImageDelegationKeys, + 800L); + incrementCounter(startupProgress, LOADING_FSIMAGE, + loadingFsImageDelegationKeys, 200L); + + startupProgress.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + startupProgress.beginStep(LOADING_EDITS, loadingEditsFile); + startupProgress.setTotal(LOADING_EDITS, loadingEditsFile, 10000L); + incrementCounter(startupProgress, LOADING_EDITS, loadingEditsFile, 5000L); + + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals(0.167f, view.getPercentComplete(), 0.001f); + assertEquals(0.167f, view.getPercentComplete(LOADING_FSIMAGE), 0.001f); + assertEquals(0.10f, view.getPercentComplete(LOADING_FSIMAGE, + loadingFsImageInodes), 0.001f); + assertEquals(0.25f, view.getPercentComplete(LOADING_FSIMAGE, + loadingFsImageDelegationKeys), 0.001f); + assertEquals(0.5f, view.getPercentComplete(LOADING_EDITS), 0.001f); + assertEquals(0.5f, view.getPercentComplete(LOADING_EDITS, loadingEditsFile), + 0.001f); + assertEquals(0.0f, view.getPercentComplete(SAVING_CHECKPOINT), 0.001f); + assertEquals(0.0f, view.getPercentComplete(SAVING_CHECKPOINT, + new Step(INODES)), 0.001f); + + // End steps/phases, and confirm that they jump to 100% completion. + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.endPhase(LOADING_FSIMAGE); + startupProgress.endStep(LOADING_EDITS, loadingEditsFile); + startupProgress.endPhase(LOADING_EDITS); + + view = startupProgress.createView(); + assertNotNull(view); + assertEquals(0.5f, view.getPercentComplete(), 0.001f); + assertEquals(1.0f, view.getPercentComplete(LOADING_FSIMAGE), 0.001f); + assertEquals(1.0f, view.getPercentComplete(LOADING_FSIMAGE, + loadingFsImageInodes), 0.001f); + assertEquals(1.0f, view.getPercentComplete(LOADING_FSIMAGE, + loadingFsImageDelegationKeys), 0.001f); + assertEquals(1.0f, view.getPercentComplete(LOADING_EDITS), 0.001f); + assertEquals(1.0f, view.getPercentComplete(LOADING_EDITS, loadingEditsFile), + 0.001f); + assertEquals(0.0f, view.getPercentComplete(SAVING_CHECKPOINT), 0.001f); + assertEquals(0.0f, view.getPercentComplete(SAVING_CHECKPOINT, + new Step(INODES)), 0.001f); + } + + @Test(timeout=10000) + public void testStatus() { + startupProgress.beginPhase(LOADING_FSIMAGE); + startupProgress.endPhase(LOADING_FSIMAGE); + startupProgress.beginPhase(LOADING_EDITS); + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals(COMPLETE, view.getStatus(LOADING_FSIMAGE)); + assertEquals(RUNNING, view.getStatus(LOADING_EDITS)); + assertEquals(PENDING, view.getStatus(SAVING_CHECKPOINT)); + } + + @Test(timeout=10000) + public void testStepSequence() { + // Test that steps are returned in the correct sort order (by file and then + // sequence number) by starting a few steps in a randomly shuffled order and + // then asserting that they are returned in the expected order. + Step[] expectedSteps = new Step[] { + new Step(INODES, "file1"), + new Step(DELEGATION_KEYS, "file1"), + new Step(INODES, "file2"), + new Step(DELEGATION_KEYS, "file2"), + new Step(INODES, "file3"), + new Step(DELEGATION_KEYS, "file3") + }; + + List shuffledSteps = new ArrayList(Arrays.asList(expectedSteps)); + Collections.shuffle(shuffledSteps); + + startupProgress.beginPhase(SAVING_CHECKPOINT); + for (Step step: shuffledSteps) { + startupProgress.beginStep(SAVING_CHECKPOINT, step); + } + + List actualSteps = new ArrayList(expectedSteps.length); + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + for (Step step: view.getSteps(SAVING_CHECKPOINT)) { + actualSteps.add(step); + } + + assertArrayEquals(expectedSteps, actualSteps.toArray()); + } + + @Test(timeout=10000) + public void testThreadSafety() throws Exception { + // Test for thread safety by starting multiple threads that mutate the same + // StartupProgress instance in various ways. We expect no internal + // corruption of data structures and no lost updates on counter increments. + int numThreads = 100; + + // Data tables used by each thread to determine values to pass to APIs. + Phase[] phases = { LOADING_FSIMAGE, LOADING_FSIMAGE, LOADING_EDITS, + LOADING_EDITS }; + Step[] steps = new Step[] { new Step(INODES), new Step(DELEGATION_KEYS), + new Step(INODES), new Step(DELEGATION_KEYS) }; + String[] files = { "file1", "file1", "file2", "file2" }; + long[] sizes = { 1000L, 1000L, 2000L, 2000L }; + long[] totals = { 10000L, 20000L, 30000L, 40000L }; + + ExecutorService exec = Executors.newFixedThreadPool(numThreads); + + try { + for (int i = 0; i < numThreads; ++i) { + final Phase phase = phases[i % phases.length]; + final Step step = steps[i % steps.length]; + final String file = files[i % files.length]; + final long size = sizes[i % sizes.length]; + final long total = totals[i % totals.length]; + + exec.submit(new Callable() { + @Override + public Void call() { + startupProgress.beginPhase(phase); + startupProgress.setFile(phase, file); + startupProgress.setSize(phase, size); + startupProgress.setTotal(phase, step, total); + incrementCounter(startupProgress, phase, step, 100L); + startupProgress.endStep(phase, step); + startupProgress.endPhase(phase); + return null; + } + }); + } + } finally { + exec.shutdown(); + assertTrue(exec.awaitTermination(10000L, TimeUnit.MILLISECONDS)); + } + + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals("file1", view.getFile(LOADING_FSIMAGE)); + assertEquals(1000L, view.getSize(LOADING_FSIMAGE)); + assertEquals(10000L, view.getTotal(LOADING_FSIMAGE, new Step(INODES))); + assertEquals(2500L, view.getCount(LOADING_FSIMAGE, new Step(INODES))); + assertEquals(20000L, view.getTotal(LOADING_FSIMAGE, + new Step(DELEGATION_KEYS))); + assertEquals(2500L, view.getCount(LOADING_FSIMAGE, + new Step(DELEGATION_KEYS))); + + assertEquals("file2", view.getFile(LOADING_EDITS)); + assertEquals(2000L, view.getSize(LOADING_EDITS)); + assertEquals(30000L, view.getTotal(LOADING_EDITS, new Step(INODES))); + assertEquals(2500L, view.getCount(LOADING_EDITS, new Step(INODES))); + assertEquals(40000L, view.getTotal(LOADING_EDITS, + new Step(DELEGATION_KEYS))); + assertEquals(2500L, view.getCount(LOADING_EDITS, new Step(DELEGATION_KEYS))); + } + + @Test(timeout=10000) + public void testTotal() { + startupProgress.beginPhase(LOADING_FSIMAGE); + Step loadingFsImageInodes = new Step(INODES); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageInodes); + startupProgress.setTotal(LOADING_FSIMAGE, loadingFsImageInodes, 1000L); + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageInodes); + Step loadingFsImageDelegationKeys = new Step(DELEGATION_KEYS); + startupProgress.beginStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.setTotal(LOADING_FSIMAGE, loadingFsImageDelegationKeys, + 800L); + startupProgress.endStep(LOADING_FSIMAGE, loadingFsImageDelegationKeys); + startupProgress.endPhase(LOADING_FSIMAGE); + + startupProgress.beginPhase(LOADING_EDITS); + Step loadingEditsFile = new Step("file", 1000L); + startupProgress.beginStep(LOADING_EDITS, loadingEditsFile); + startupProgress.setTotal(LOADING_EDITS, loadingEditsFile, 10000L); + startupProgress.endStep(LOADING_EDITS, loadingEditsFile); + startupProgress.endPhase(LOADING_EDITS); + + StartupProgressView view = startupProgress.createView(); + assertNotNull(view); + assertEquals(1000L, view.getTotal(LOADING_FSIMAGE, loadingFsImageInodes)); + assertEquals(800L, view.getTotal(LOADING_FSIMAGE, + loadingFsImageDelegationKeys)); + assertEquals(10000L, view.getTotal(LOADING_EDITS, loadingEditsFile)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgressMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgressMetrics.java new file mode 100644 index 00000000000..4fe3d15c4a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/TestStartupProgressMetrics.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.startupprogress; + +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressTestHelper.*; +import static org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType.*; +import static org.apache.hadoop.test.MetricsAsserts.*; +import static org.junit.Assert.*; + +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.junit.Before; +import org.junit.Test; + +public class TestStartupProgressMetrics { + + private StartupProgress startupProgress; + private StartupProgressMetrics metrics; + + @Before + public void setUp() { + mockMetricsSystem(); + startupProgress = new StartupProgress(); + metrics = new StartupProgressMetrics(startupProgress); + } + + @Test + public void testInitialState() { + MetricsRecordBuilder builder = getMetrics(metrics, true); + assertCounter("ElapsedTime", 0L, builder); + assertGauge("PercentComplete", 0.0f, builder); + assertCounter("LoadingFsImageCount", 0L, builder); + assertCounter("LoadingFsImageElapsedTime", 0L, builder); + assertCounter("LoadingFsImageTotal", 0L, builder); + assertGauge("LoadingFsImagePercentComplete", 0.0f, builder); + assertCounter("LoadingEditsCount", 0L, builder); + assertCounter("LoadingEditsElapsedTime", 0L, builder); + assertCounter("LoadingEditsTotal", 0L, builder); + assertGauge("LoadingEditsPercentComplete", 0.0f, builder); + assertCounter("SavingCheckpointCount", 0L, builder); + assertCounter("SavingCheckpointElapsedTime", 0L, builder); + assertCounter("SavingCheckpointTotal", 0L, builder); + assertGauge("SavingCheckpointPercentComplete", 0.0f, builder); + assertCounter("SafeModeCount", 0L, builder); + assertCounter("SafeModeElapsedTime", 0L, builder); + assertCounter("SafeModeTotal", 0L, builder); + assertGauge("SafeModePercentComplete", 0.0f, builder); + } + + @Test + public void testRunningState() { + setStartupProgressForRunningState(startupProgress); + + MetricsRecordBuilder builder = getMetrics(metrics, true); + assertTrue(getLongCounter("ElapsedTime", builder) >= 0L); + assertGauge("PercentComplete", 0.375f, builder); + assertCounter("LoadingFsImageCount", 100L, builder); + assertTrue(getLongCounter("LoadingFsImageElapsedTime", builder) >= 0L); + assertCounter("LoadingFsImageTotal", 100L, builder); + assertGauge("LoadingFsImagePercentComplete", 1.0f, builder); + assertCounter("LoadingEditsCount", 100L, builder); + assertTrue(getLongCounter("LoadingEditsElapsedTime", builder) >= 0L); + assertCounter("LoadingEditsTotal", 200L, builder); + assertGauge("LoadingEditsPercentComplete", 0.5f, builder); + assertCounter("SavingCheckpointCount", 0L, builder); + assertCounter("SavingCheckpointElapsedTime", 0L, builder); + assertCounter("SavingCheckpointTotal", 0L, builder); + assertGauge("SavingCheckpointPercentComplete", 0.0f, builder); + assertCounter("SafeModeCount", 0L, builder); + assertCounter("SafeModeElapsedTime", 0L, builder); + assertCounter("SafeModeTotal", 0L, builder); + assertGauge("SafeModePercentComplete", 0.0f, builder); + } + + @Test + public void testFinalState() { + setStartupProgressForFinalState(startupProgress); + + MetricsRecordBuilder builder = getMetrics(metrics, true); + assertTrue(getLongCounter("ElapsedTime", builder) >= 0L); + assertGauge("PercentComplete", 1.0f, builder); + assertCounter("LoadingFsImageCount", 100L, builder); + assertTrue(getLongCounter("LoadingFsImageElapsedTime", builder) >= 0L); + assertCounter("LoadingFsImageTotal", 100L, builder); + assertGauge("LoadingFsImagePercentComplete", 1.0f, builder); + assertCounter("LoadingEditsCount", 200L, builder); + assertTrue(getLongCounter("LoadingEditsElapsedTime", builder) >= 0L); + assertCounter("LoadingEditsTotal", 200L, builder); + assertGauge("LoadingEditsPercentComplete", 1.0f, builder); + assertCounter("SavingCheckpointCount", 300L, builder); + assertTrue(getLongCounter("SavingCheckpointElapsedTime", builder) >= 0L); + assertCounter("SavingCheckpointTotal", 300L, builder); + assertGauge("SavingCheckpointPercentComplete", 1.0f, builder); + assertCounter("SafeModeCount", 400L, builder); + assertTrue(getLongCounter("SafeModeElapsedTime", builder) >= 0L); + assertCounter("SafeModeTotal", 400L, builder); + assertGauge("SafeModePercentComplete", 1.0f, builder); + } +}