From 36d1c49486587c2dbb193e8538b1d4510c462fa6 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Wed, 21 Dec 2011 03:03:23 +0000 Subject: [PATCH] HDFS-2693. Fix synchronization issues around state transition. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1221582 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../java/org/apache/hadoop/hdfs/HAUtil.java | 12 ++ .../server/blockmanagement/BlockManager.java | 26 ++-- .../hdfs/server/namenode/BackupNode.java | 29 ++-- .../hdfs/server/namenode/FSNamesystem.java | 133 ++++++++++++++++-- .../hadoop/hdfs/server/namenode/NameNode.java | 37 ++++- .../server/namenode/NameNodeRpcServer.java | 66 +-------- .../server/namenode/ha/EditLogTailer.java | 48 ++++--- .../hdfs/server/namenode/ha/HAContext.java | 27 ++++ .../hdfs/server/namenode/ha/HAState.java | 13 +- .../hdfs/server/namenode/ha/StandbyState.java | 3 + .../apache/hadoop/hdfs/MiniDFSCluster.java | 19 +++ .../hadoop/hdfs/TestDFSClientFailover.java | 14 +- .../hadoop/hdfs/TestFileCorruption.java | 10 +- .../hdfs/server/namenode/NameNodeAdapter.java | 12 +- .../hdfs/server/namenode/TestBackupNode.java | 3 + .../server/namenode/ha/TestEditLogTailer.java | 2 + .../ha/TestEditLogsDuringFailover.java | 2 + .../namenode/ha/TestHAStateTransitions.java | 59 ++++++++ .../server/namenode/ha/TestStandbyIsHot.java | 3 + .../namenode/metrics/TestNameNodeMetrics.java | 14 +- .../apache/hadoop/test/GenericTestUtils.java | 32 +++++ 22 files changed, 432 insertions(+), 134 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index fdd6d6e06eb..6ffb0dfc4d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -71,3 +71,5 @@ HDFS-2677. Web UI should indicate the NN state. (eli via todd) HDFS-2678. When a FailoverProxyProvider is used, DFSClient should not retry connection ten times before failing over (atm via todd) HDFS-2682. When a FailoverProxyProvider is used, Client should not retry for 45 times if it is timing out to connect to server. (Uma Maheswara Rao G via todd) + +HDFS-2693. Fix synchronization issues around state transition (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 24537a3d38c..6a619712c48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -99,4 +99,16 @@ public class HAUtil { return null; } + /** + * This is used only by tests at the moment. + * @return true if the NN should allow read operations while in standby mode. + */ + public static boolean shouldAllowStandbyReads(Configuration conf) { + return conf.getBoolean("dfs.ha.allow.stale.reads", false); + } + + public static void setAllowStandbyReads(Configuration conf, boolean val) { + conf.setBoolean("dfs.ha.allow.stale.reads", val); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 03a851a7b5a..abefbb562d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -817,22 +817,18 @@ public class BlockManager { */ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn) throws IOException { - namesystem.writeLock(); - try { - final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); - if (storedBlock == null) { - // Check if the replica is in the blockMap, if not - // ignore the request for now. This could happen when BlockScanner - // thread of Datanode reports bad block before Block reports are sent - // by the Datanode on startup - NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: " - + blk + " not found."); - return; - } - markBlockAsCorrupt(storedBlock, dn); - } finally { - namesystem.writeUnlock(); + assert namesystem.hasWriteLock(); + final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); + if (storedBlock == null) { + // Check if the replica is in the blockMap, if not + // ignore the request for now. This could happen when BlockScanner + // thread of Datanode reports bad block before Block reports are sent + // by the Datanode on startup + NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: " + + blk + " not found."); + return; } + markBlockAsCorrupt(storedBlock, dn); } private void markBlockAsCorrupt(BlockInfo storedBlock, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 3acec16874f..c54743962ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -244,18 +244,17 @@ public class BackupNode extends NameNode { @Override public void startLogSegment(NamenodeRegistration registration, long txid) throws IOException { - nn.checkOperation(OperationCategory.JOURNAL); + namesystem.checkOperation(OperationCategory.JOURNAL); verifyRequest(registration); - verifyRequest(registration); - getBNImage().namenodeStartedLogSegment(txid); + getBNImage().namenodeStartedLogSegment(txid); } @Override public void journal(NamenodeRegistration nnReg, long firstTxId, int numTxns, byte[] records) throws IOException { - nn.checkOperation(OperationCategory.JOURNAL); + namesystem.checkOperation(OperationCategory.JOURNAL); verifyRequest(nnReg); if(!nnRpcAddress.equals(nnReg.getAddress())) throw new IOException("Journal request from unexpected name-node: " @@ -401,13 +400,21 @@ public class BackupNode extends NameNode { return clusterId; } - @Override // NameNode - protected void checkOperation(OperationCategory op) - throws StandbyException { - if (OperationCategory.JOURNAL != op) { - String msg = "Operation category " + op - + " is not supported at the BackupNode"; - throw new StandbyException(msg); + @Override + protected NameNodeHAContext createHAContext() { + return new BNHAContext(); + } + + private class BNHAContext extends NameNodeHAContext { + @Override // NameNode + public void checkOperation(OperationCategory op) + throws StandbyException { + if (OperationCategory.JOURNAL != op && + !(OperationCategory.READ == op && allowStaleStandbyReads)) { + String msg = "Operation category " + op + + " is not supported at the BackupNode"; + throw new StandbyException(msg); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 02bf2118cdf..4c4aac3e951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -149,6 +149,7 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage; import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage; import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage; @@ -170,6 +171,7 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -563,6 +565,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, dir.fsImage.editLog.close(); } + + void checkOperation(OperationCategory op) throws StandbyException { + haContext.checkOperation(op); + } + public static Collection getNamespaceDirs(Configuration conf) { return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY); } @@ -793,7 +800,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return serverDefaults.getBlockSize(); } - FsServerDefaults getServerDefaults() { + FsServerDefaults getServerDefaults() throws StandbyException { + checkOperation(OperationCategory.READ); return serverDefaults; } @@ -820,6 +828,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, HdfsFileStatus resultingStat = null; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot set permission for " + src, safeMode); } @@ -849,6 +859,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, HdfsFileStatus resultingStat = null; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot set owner for " + src, safeMode); } @@ -939,13 +951,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } else { // second attempt is with write lock writeLock(); // writelock is needed to set accesstime } - - // if the namenode is in safemode, then do not update access time - if (isInSafeMode()) { - doAccessTime = false; - } - try { + checkOperation(OperationCategory.READ); + + // if the namenode is in safemode, then do not update access time + if (isInSafeMode()) { + doAccessTime = false; + } + long now = now(); INodeFile inode = dir.getFileINode(src); if (inode == null) { @@ -1013,6 +1026,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, HdfsFileStatus resultingStat = null; writeLock(); try { + checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot concat " + target, safeMode); } @@ -1144,6 +1158,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } writeLock(); try { + checkOperation(OperationCategory.WRITE); + // Write access is required to set access and modification times if (isPermissionEnabled) { checkPathAccess(src, FsAction.WRITE); @@ -1174,6 +1190,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, HdfsFileStatus resultingStat = null; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (!createParent) { verifyParentDir(link); } @@ -1243,6 +1261,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final boolean isFile; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot set replication for " + src, safeMode); } @@ -1273,6 +1293,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException, UnresolvedLinkException { readLock(); try { + checkOperation(OperationCategory.READ); if (isPermissionEnabled) { checkTraverse(filename); } @@ -1315,6 +1336,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FileNotFoundException, ParentNotDirectoryException, IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); + startFileInternal(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize); } finally { @@ -1495,6 +1518,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException( "Cannot recover the lease of " + src, safeMode); @@ -1614,6 +1639,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LocatedBlock lb = null; writeLock(); try { + checkOperation(OperationCategory.WRITE); + lb = startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), false, blockManager.maxReplication, 0); @@ -1678,6 +1705,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot add block to " + src, safeMode); } @@ -1711,6 +1740,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // Allocate a new block and record it in the INode. writeLock(); try { + checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot add block to " + src, safeMode); } @@ -1757,6 +1787,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final List chosen; readLock(); try { + checkOperation(OperationCategory.WRITE); //check safe mode if (isInSafeMode()) { throw new SafeModeException("Cannot add datanode; src=" + src @@ -1798,6 +1829,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, UnresolvedLinkException, IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); // // Remove the block from the pending creates list // @@ -1873,6 +1905,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean success = false; writeLock(); try { + checkOperation(OperationCategory.WRITE); + success = completeFileInternal(src, holder, ExtendedBlock.getLocalBlock(last)); } finally { @@ -2012,6 +2046,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } writeLock(); try { + checkOperation(OperationCategory.WRITE); + status = renameToInternal(src, dst); if (status && auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); @@ -2067,6 +2103,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } writeLock(); try { + checkOperation(OperationCategory.WRITE); + renameToInternal(src, dst, options); if (auditLog.isInfoEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); @@ -2145,6 +2183,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, writeLock(); try { + checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot delete " + src, safeMode); } @@ -2222,11 +2261,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * * @return object containing information regarding the file * or null if file not found + * @throws StandbyException */ HdfsFileStatus getFileInfo(String src, boolean resolveLink) - throws AccessControlException, UnresolvedLinkException { + throws AccessControlException, UnresolvedLinkException, + StandbyException { readLock(); try { + checkOperation(OperationCategory.READ); + if (!DFSUtil.isValidName(src)) { throw new InvalidPathException("Invalid file name: " + src); } @@ -2250,6 +2293,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } writeLock(); try { + checkOperation(OperationCategory.WRITE); + status = mkdirsInternal(src, permissions, createParent); } finally { writeUnlock(); @@ -2304,9 +2349,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } ContentSummary getContentSummary(String src) throws AccessControlException, - FileNotFoundException, UnresolvedLinkException { + FileNotFoundException, UnresolvedLinkException, StandbyException { readLock(); try { + checkOperation(OperationCategory.READ); + if (isPermissionEnabled) { checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE); } @@ -2325,6 +2372,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException, UnresolvedLinkException { writeLock(); try { + checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot set quota on " + path, safeMode); } @@ -2349,6 +2397,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + src + " for " + clientName); writeLock(); try { + checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot fsync file " + src, safeMode); } @@ -2558,6 +2607,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats, String src = ""; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (haContext.getState().equals(NameNode.STANDBY_STATE)) { + // TODO(HA) we'll never get here, since we check for WRITE operation above! + if (isGenStampInFuture(newgenerationstamp)) { + LOG.info("Required GS=" + newgenerationstamp + + ", Queuing commitBlockSynchronization message"); + getPendingDataNodeMessages().queueMessage( + new PendingDataNodeMessages.CommitBlockSynchronizationMessage( + lastblock, newgenerationstamp, newlength, closeFile, deleteblock, + newtargets, newgenerationstamp)); + return; + } + } + if (isInSafeMode()) { throw new SafeModeException( "Cannot commitBlockSynchronization while in safe mode", @@ -2658,6 +2721,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, void renewLease(String holder) throws IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot renew lease for " + holder, safeMode); } @@ -2685,6 +2750,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, DirectoryListing dl; readLock(); try { + checkOperation(OperationCategory.READ); + if (isPermissionEnabled) { if (dir.isDir(src)) { checkPathAccess(src, FsAction.READ_EXECUTE); @@ -3699,6 +3766,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException { writeLock(); try { + checkOperation(OperationCategory.CHECKPOINT); + if (isInSafeMode()) { throw new SafeModeException("Checkpoint not started", safeMode); } @@ -3715,6 +3784,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, CheckpointSignature sig) throws IOException { readLock(); try { + checkOperation(OperationCategory.CHECKPOINT); + if (isInSafeMode()) { throw new SafeModeException("Checkpoint not ended", safeMode); } @@ -3976,6 +4047,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return pendingFile; } + /** + * Client is reporting some bad block locations. + */ + void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + + NameNode.stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); + for (int i = 0; i < blocks.length; i++) { + ExtendedBlock blk = blocks[i].getBlock(); + DatanodeInfo[] nodes = blocks[i].getLocations(); + for (int j = 0; j < nodes.length; j++) { + DatanodeInfo dn = nodes[j]; + blockManager.findAndMarkBlockAsCorrupt(blk, dn); + } + } + } finally { + writeUnlock(); + } + } + /** * Get a new generation stamp together with an access token for * a block under construction @@ -3993,6 +4086,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LocatedBlock locatedBlock; writeLock(); try { + checkOperation(OperationCategory.WRITE); + // check vadility of parameters checkUCBlock(block, clientName); @@ -4022,6 +4117,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Pipeline not updated", safeMode); } @@ -4222,6 +4319,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, readLock(); try { + checkOperation(OperationCategory.READ); + if (!isPopulatingReplQueues()) { throw new IOException("Cannot run listCorruptFileBlocks because " + "replication queues have not been initialized."); @@ -4314,6 +4413,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, Token token; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot issue delegation token", safeMode); } @@ -4358,6 +4459,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, long expiryTime; writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot renew delegation token", safeMode); } @@ -4388,6 +4491,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws IOException { writeLock(); try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { throw new SafeModeException("Cannot cancel delegation token", safeMode); } @@ -4727,4 +4832,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, public EditLogTailer getEditLogTailer() { return editLogTailer; } + + @VisibleForTesting + void setFsLockForTests(ReentrantReadWriteLock lock) { + this.fsLock = lock; + } + + @VisibleForTesting + ReentrantReadWriteLock getFsLockForTests() { + return fsLock; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 7d8ecd993b3..54d4d2f2901 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -182,6 +182,7 @@ public class NameNode { private HAState state; private final boolean haEnabled; private final HAContext haContext; + protected boolean allowStaleStandbyReads; /** httpServer */ @@ -531,7 +532,8 @@ public class NameNode { this.role = role; String nsId = getNameServiceId(conf); this.haEnabled = HAUtil.isHAEnabled(conf, nsId); - this.haContext = new NameNodeHAContext(); + this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); + this.haContext = createHAContext(); try { initializeGenericKeys(conf, nsId); initialize(conf); @@ -553,6 +555,10 @@ public class NameNode { } } + protected HAContext createHAContext() { + return new NameNodeHAContext(); + } + /** * Wait for service to finish. * (Normally, it runs forever.) @@ -914,11 +920,6 @@ public class NameNode { return state.getServiceState(); } - /** Check if an operation of given category is allowed */ - protected synchronized void checkOperation(final OperationCategory op) - throws StandbyException { - state.checkOperation(haContext, op); - } /** * Class used as expose {@link NameNode} as context to {@link HAState} @@ -928,7 +929,7 @@ public class NameNode { * appropriate action is needed todo either shutdown the node or recover * from failure. */ - private class NameNodeHAContext implements HAContext { + protected class NameNodeHAContext implements HAContext { @Override public void setState(HAState s) { state = s; @@ -961,6 +962,28 @@ public class NameNode { // TODO(HA): Are we guaranteed to be the only active here? namesystem.stopStandbyServices(); } + + @Override + public void writeLock() { + namesystem.writeLock(); + } + + @Override + public void writeUnlock() { + namesystem.writeUnlock(); + } + + /** Check if an operation of given category is allowed */ + @Override + public void checkOperation(final OperationCategory op) + throws StandbyException { + state.checkOperation(haContext, op); + } + + @Override + public boolean allowStaleReads() { + return allowStaleStandbyReads; + } } public boolean isStandbyState() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index cfea22f5a26..b7433ef7b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -126,7 +126,7 @@ class NameNodeRpcServer implements NamenodeProtocols { private static final Log stateChangeLog = NameNode.stateChangeLog; // Dependencies from other parts of NN. - private final FSNamesystem namesystem; + protected final FSNamesystem namesystem; protected final NameNode nn; private final NameNodeMetrics metrics; @@ -318,7 +318,9 @@ class NameNodeRpcServer implements NamenodeProtocols { public void errorReport(NamenodeRegistration registration, int errorCode, String msg) throws IOException { - nn.checkOperation(OperationCategory.WRITE); + // nn.checkOperation(OperationCategory.WRITE); + // TODO: I dont think this should be checked - it's just for logging + // and dropping backups verifyRequest(registration); LOG.info("Error report from " + registration + ": " + msg); if(errorCode == FATAL) @@ -346,28 +348,24 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // NamenodeProtocol public void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { - nn.checkOperation(OperationCategory.CHECKPOINT); namesystem.endCheckpoint(registration, sig); } @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { - nn.checkOperation(OperationCategory.WRITE); return namesystem.getDelegationToken(renewer); } @Override // ClientProtocol public long renewDelegationToken(Token token) throws InvalidToken, IOException { - nn.checkOperation(OperationCategory.WRITE); return namesystem.renewDelegationToken(token); } @Override // ClientProtocol public void cancelDelegationToken(Token token) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.cancelDelegationToken(token); } @@ -376,7 +374,6 @@ class NameNodeRpcServer implements NamenodeProtocols { long offset, long length) throws IOException { - nn.checkOperation(OperationCategory.READ); metrics.incrGetBlockLocations(); return namesystem.getBlockLocations(getClientMachine(), src, offset, length); @@ -384,7 +381,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public FsServerDefaults getServerDefaults() throws IOException { - nn.checkOperation(OperationCategory.READ); return namesystem.getServerDefaults(); } @@ -396,7 +392,6 @@ class NameNodeRpcServer implements NamenodeProtocols { boolean createParent, short replication, long blockSize) throws IOException { - nn.checkOperation(OperationCategory.WRITE); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.create: file " @@ -417,7 +412,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public LocatedBlock append(String src, String clientName) throws IOException { - nn.checkOperation(OperationCategory.WRITE); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.append: file " @@ -430,7 +424,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean recoverLease(String src, String clientName) throws IOException { - nn.checkOperation(OperationCategory.WRITE); String clientMachine = getClientMachine(); return namesystem.recoverLease(src, clientName, clientMachine); } @@ -438,21 +431,18 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean setReplication(String src, short replication) throws IOException { - nn.checkOperation(OperationCategory.WRITE); return namesystem.setReplication(src, replication); } @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.setPermission(src, permissions); } @Override // ClientProtocol public void setOwner(String src, String username, String groupname) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.setOwner(src, username, groupname); } @@ -462,7 +452,6 @@ class NameNodeRpcServer implements NamenodeProtocols { ExtendedBlock previous, DatanodeInfo[] excludedNodes) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " +src+" for "+clientName); @@ -486,7 +475,6 @@ class NameNodeRpcServer implements NamenodeProtocols { final DatanodeInfo[] existings, final DatanodeInfo[] excludes, final int numAdditionalNodes, final String clientName ) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if (LOG.isDebugEnabled()) { LOG.debug("getAdditionalDatanode: src=" + src + ", blk=" + blk @@ -514,7 +502,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void abandonBlock(ExtendedBlock b, String src, String holder) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " +b+" of file "+src); @@ -527,7 +514,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean complete(String src, String clientName, ExtendedBlock last) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName); @@ -543,22 +529,12 @@ class NameNodeRpcServer implements NamenodeProtocols { */ @Override // ClientProtocol, DatanodeProtocol public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - nn.checkOperation(OperationCategory.WRITE); - stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); - for (int i = 0; i < blocks.length; i++) { - ExtendedBlock blk = blocks[i].getBlock(); - DatanodeInfo[] nodes = blocks[i].getLocations(); - for (int j = 0; j < nodes.length; j++) { - DatanodeInfo dn = nodes[j]; - namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn); - } - } + namesystem.reportBadBlocks(blocks); } @Override // ClientProtocol public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) throws IOException { - nn.checkOperation(OperationCategory.WRITE); return namesystem.updateBlockForPipeline(block, clientName); } @@ -567,7 +543,6 @@ class NameNodeRpcServer implements NamenodeProtocols { public void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); } @@ -576,18 +551,6 @@ class NameNodeRpcServer implements NamenodeProtocols { long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) throws IOException { - nn.checkOperation(OperationCategory.WRITE); - if (nn.isStandbyState()) { - if (namesystem.isGenStampInFuture(newgenerationstamp)) { - LOG.info("Required GS=" + newgenerationstamp - + ", Queuing commitBlockSynchronization message"); - namesystem.getPendingDataNodeMessages().queueMessage( - new PendingDataNodeMessages.CommitBlockSynchronizationMessage( - block, newgenerationstamp, newlength, closeFile, deleteblock, - newtargets, newgenerationstamp)); - return; - } - } namesystem.commitBlockSynchronization(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets); } @@ -595,14 +558,12 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public long getPreferredBlockSize(String filename) throws IOException { - nn.checkOperation(OperationCategory.READ); return namesystem.getPreferredBlockSize(filename); } @Deprecated @Override // ClientProtocol public boolean rename(String src, String dst) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); } @@ -619,14 +580,12 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void concat(String trg, String[] src) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.concat(trg, src); } @Override // ClientProtocol public void rename2(String src, String dst, Options.Rename... options) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); } @@ -640,7 +599,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean delete(String src, boolean recursive) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* Namenode.delete: src=" + src + ", recursive=" + recursive); @@ -665,7 +623,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { - nn.checkOperation(OperationCategory.WRITE); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); } @@ -680,14 +637,12 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void renewLease(String clientName) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.renewLease(clientName); } @Override // ClientProtocol public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { - nn.checkOperation(OperationCategory.READ); DirectoryListing files = namesystem.getListing( src, startAfter, needLocation); if (files != null) { @@ -699,21 +654,19 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public HdfsFileStatus getFileInfo(String src) throws IOException { - nn.checkOperation(OperationCategory.READ); metrics.incrFileInfoOps(); return namesystem.getFileInfo(src, true); } @Override // ClientProtocol public HdfsFileStatus getFileLinkInfo(String src) throws IOException { - nn.checkOperation(OperationCategory.READ); metrics.incrFileInfoOps(); return namesystem.getFileInfo(src, false); } @Override // ClientProtocol public long[] getStats() throws IOException { - nn.checkOperation(OperationCategory.READ); + namesystem.checkOperation(OperationCategory.READ); return namesystem.getStats(); } @@ -793,7 +746,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { - nn.checkOperation(OperationCategory.READ); String[] cookieTab = new String[] { cookie }; Collection fbs = namesystem.listCorruptFileBlocks(path, cookieTab); @@ -820,34 +772,29 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public ContentSummary getContentSummary(String path) throws IOException { - nn.checkOperation(OperationCategory.READ); return namesystem.getContentSummary(path); } @Override // ClientProtocol public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.setQuota(path, namespaceQuota, diskspaceQuota); } @Override // ClientProtocol public void fsync(String src, String clientName) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.fsync(src, clientName); } @Override // ClientProtocol public void setTimes(String src, long mtime, long atime) throws IOException { - nn.checkOperation(OperationCategory.WRITE); namesystem.setTimes(src, mtime, atime); } @Override // ClientProtocol public void createSymlink(String target, String link, FsPermission dirPerms, boolean createParent) throws IOException { - nn.checkOperation(OperationCategory.WRITE); metrics.incrCreateSymlinkOps(); /* We enforce the MAX_PATH_LENGTH limit even though a symlink target * URI may refer to a non-HDFS file system. @@ -867,7 +814,6 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public String getLinkTarget(String path) throws IOException { - nn.checkOperation(OperationCategory.READ); metrics.incrGetLinkTargetOps(); try { HdfsFileStatus stat = namesystem.getFileInfo(path, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 8a837eea725..b7b1adb479c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -85,27 +85,37 @@ public class EditLogTailer { Preconditions.checkState(tailerThread == null || !tailerThread.isAlive(), "Tailer thread should not be running once failover starts"); - doTailEdits(); + try { + doTailEdits(); + } catch (InterruptedException e) { + throw new IOException(e); + } } - private void doTailEdits() throws IOException { - // TODO(HA) in a transition from active to standby, - // the following is wrong and ends up causing all of the - // last log segment to get re-read - long lastTxnId = image.getLastAppliedTxId(); - - if (LOG.isDebugEnabled()) { - LOG.debug("lastTxnId: " + lastTxnId); - } - Collection streams = editLog - .selectInputStreams(lastTxnId + 1, 0, false); - if (LOG.isDebugEnabled()) { - LOG.debug("edit streams to load from: " + streams.size()); - } - - long editsLoaded = image.loadEdits(streams, namesystem); - if (LOG.isDebugEnabled()) { - LOG.debug("editsLoaded: " + editsLoaded); + private void doTailEdits() throws IOException, InterruptedException { + // Write lock needs to be interruptible here because the + // transitionToActive RPC takes the write lock before calling + // tailer.stop() -- so if we're not interruptible, it will + // deadlock. + namesystem.writeLockInterruptibly(); + try { + long lastTxnId = image.getLastAppliedTxId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("lastTxnId: " + lastTxnId); + } + Collection streams = editLog + .selectInputStreams(lastTxnId + 1, 0, false); + if (LOG.isDebugEnabled()) { + LOG.debug("edit streams to load from: " + streams.size()); + } + + long editsLoaded = image.loadEdits(streams, namesystem); + if (LOG.isDebugEnabled()) { + LOG.debug("editsLoaded: " + editsLoaded); + } + } finally { + namesystem.writeUnlock(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java index 58d7773d514..dce1cfb34a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAContext.java @@ -3,6 +3,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.ipc.StandbyException; /** * Context that is to be used by {@link HAState} for getting/setting the @@ -27,4 +29,29 @@ public interface HAContext { /** Stop the services when exiting standby state */ public void stopStandbyServices() throws IOException; + + /** + * Take a write-lock on the underlying namesystem + * so that no concurrent state transitions or edits + * can be made. + */ + void writeLock(); + + /** + * Unlock the lock taken by {@link #writeLock()} + */ + void writeUnlock(); + + /** + * Verify that the given operation category is allowed in the + * current state. This is to allow NN implementations (eg BackupNode) + * to override it with node-specific handling. + */ + void checkOperation(OperationCategory op) throws StandbyException; + + /** + * @return true if the node should allow stale reads (ie reads + * while the namespace is not up to date) + */ + boolean allowStaleReads(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java index 7dfab914939..20c09d5f4e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java @@ -54,9 +54,14 @@ abstract public class HAState { */ protected final void setStateInternal(final HAContext context, final HAState s) throws ServiceFailedException { - exitState(context); - context.setState(s); - s.enterState(context); + context.writeLock(); + try { + exitState(context); + context.setState(s); + s.enterState(context); + } finally { + context.writeUnlock(); + } } /** @@ -107,4 +112,4 @@ abstract public class HAState { public String toString() { return state.toString(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java index eb34f0f43a4..b22b2e43ed8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java @@ -73,6 +73,9 @@ public class StandbyState extends HAState { @Override public void checkOperation(HAContext context, OperationCategory op) throws StandbyException { + if (op == OperationCategory.READ && context.allowStaleReads()) { + return; + } String msg = "Operation category " + op + " is not supported in state " + context.getState(); throw new StandbyException(msg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 57dafa807eb..f4732986364 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -307,6 +307,14 @@ public class MiniDFSCluster { private boolean waitSafeMode = true; private boolean federation; + /** + * A unique instance identifier for the cluster. This + * is used to disambiguate HA filesystems in the case where + * multiple MiniDFSClusters are used in the same test suite. + */ + private int instanceId; + private static int instanceCount = 0; + /** * Stores the information related to a namenode in the cluster */ @@ -325,6 +333,9 @@ public class MiniDFSCluster { */ public MiniDFSCluster() { nameNodes = new NameNodeInfo[0]; // No namenode in the cluster + synchronized (MiniDFSCluster.class) { + instanceId = instanceCount++; + } } /** @@ -510,6 +521,10 @@ public class MiniDFSCluster { boolean waitSafeMode, boolean setupHostsFile, MiniDFSNNTopology nnTopology) throws IOException { + synchronized (MiniDFSCluster.class) { + instanceId = instanceCount++; + } + this.conf = conf; base_dir = new File(determineDfsBaseDir()); data_dir = new File(base_dir, "data"); @@ -737,6 +752,10 @@ public class MiniDFSCluster { } return uri; } + + public int getInstanceId() { + return instanceId; + } /** * @return Configuration of for the given namenode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java index 1146ae7b7a2..90739693f5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -46,7 +46,7 @@ public class TestDFSClientFailover { private Configuration conf = new Configuration(); private MiniDFSCluster cluster; - private static final String LOGICAL_HOSTNAME = "ha-nn-uri"; + private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d"; @Before public void setUpCluster() throws IOException { @@ -91,7 +91,8 @@ public class TestDFSClientFailover { // Check that it functions even if the URL becomes canonicalized // to include a port number. - Path withPort = new Path("hdfs://" + LOGICAL_HOSTNAME + ":" + + Path withPort = new Path("hdfs://" + + getLogicalHostname(cluster) + ":" + NameNode.DEFAULT_PORT + "/" + TEST_FILE.toUri().getPath()); FileSystem fs2 = withPort.getFileSystem(fs.getConf()); assertTrue(fs2.exists(withPort)); @@ -126,6 +127,7 @@ public class TestDFSClientFailover { String nameNodeId1 = "nn1"; String nameNodeId2 = "nn2"; + String logicalName = getLogicalHostname(cluster); conf = new Configuration(conf); String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); @@ -138,11 +140,15 @@ public class TestDFSClientFailover { conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId), nameNodeId1 + "," + nameNodeId2); - conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + LOGICAL_HOSTNAME, + conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName()); - FileSystem fs = FileSystem.get(new URI("hdfs://" + LOGICAL_HOSTNAME), conf); + FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); return fs; } + private static String getLogicalHostname(MiniDFSCluster cluster) { + return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index d5ba1992a8f..2c25855accb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -146,8 +146,14 @@ public class TestFileCorruption extends TestCase { // report corrupted block by the third datanode DatanodeRegistration dnR = DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId()); - cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt( - blk, new DatanodeInfo(dnR)); + FSNamesystem ns = cluster.getNamesystem(); + ns.writeLock(); + try { + cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt( + blk, new DatanodeInfo(dnR)); + } finally { + ns.writeUnlock(); + } // open the file fs.open(FILE_PATH); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index c7cc61dc137..d05df3eaef1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -29,7 +30,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; +import org.mockito.Mockito; /** * This is a utility class to expose NameNode functionality for unit tests. @@ -52,7 +55,8 @@ public class NameNodeAdapter { } public static HdfsFileStatus getFileInfo(NameNode namenode, String src, - boolean resolveLink) throws AccessControlException, UnresolvedLinkException { + boolean resolveLink) throws AccessControlException, UnresolvedLinkException, + StandbyException { return namenode.getNamesystem().getFileInfo(src, resolveLink); } @@ -134,4 +138,10 @@ public class NameNodeAdapter { public static long[] getStats(final FSNamesystem fsn) { return fsn.getStats(); } + + public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) { + ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests()); + fsn.setFsLockForTests(spy); + return spy; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java index 545d4b5660b..67f821288df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -120,6 +121,7 @@ public class TestBackupNode extends TestCase { */ public void testBackupNodeTailsEdits() throws Exception { Configuration conf = new HdfsConfiguration(); + HAUtil.setAllowStandbyReads(conf, true); MiniDFSCluster cluster = null; FileSystem fileSys = null; BackupNode backup = null; @@ -245,6 +247,7 @@ public class TestBackupNode extends TestCase { Path file3 = new Path("/backup.dat"); Configuration conf = new HdfsConfiguration(); + HAUtil.setAllowStandbyReads(conf, true); short replication = (short)conf.getInt("dfs.replication", 3); int numDatanodes = Math.max(3, replication); conf.set(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "0"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index b22ef02b864..4c398916790 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; @@ -52,6 +53,7 @@ public class TestEditLogTailer { public void testTailer() throws IOException, InterruptedException, ServiceFailedException { Configuration conf = new HdfsConfiguration(); + HAUtil.setAllowStandbyReads(conf, true); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java index 1bbe33b72d8..952df211a74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; @@ -52,6 +53,7 @@ public class TestEditLogsDuringFailover { @Test public void testStartup() throws Exception { Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(0) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java index 7ac3c658de9..6eac5756b61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,8 +30,12 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.TestDFSClientFailover; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread; import org.junit.Test; +import org.mockito.Mockito; /** * Tests state transition from active->standby, and manual failover @@ -133,4 +139,57 @@ public class TestHAStateTransitions { cluster.shutdown(); } } + + /** + * Regression test for HDFS-2693: when doing state transitions, we need to + * lock the FSNamesystem so that we don't end up doing any writes while it's + * "in between" states. + * This test case starts up several client threads which do mutation operations + * while flipping a NN back and forth from active to standby. + */ + @Test(timeout=120000) + public void testTransitionSynchronization() throws Exception { + Configuration conf = new Configuration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + cluster.waitActive(); + ReentrantReadWriteLock spyLock = NameNodeAdapter.spyOnFsLock( + cluster.getNameNode(0).getNamesystem()); + Mockito.doAnswer(new GenericTestUtils.SleepAnswer(50)) + .when(spyLock).writeLock(); + + final FileSystem fs = TestDFSClientFailover.configureFailoverFs( + cluster, conf); + + TestContext ctx = new TestContext(); + for (int i = 0; i < 50; i++) { + final int finalI = i; + ctx.addThread(new RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + Path p = new Path("/test-" + finalI); + fs.mkdirs(p); + fs.delete(p, true); + } + }); + } + + ctx.addThread(new RepeatingTestThread(ctx) { + @Override + public void doAnAction() throws Exception { + cluster.transitionToStandby(0); + Thread.sleep(50); + cluster.transitionToActive(0); + } + }); + ctx.startThreads(); + ctx.waitFor(20000); + ctx.stop(); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java index 036e914cee9..22604275882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.TestDFSClientFailover; @@ -54,6 +55,8 @@ public class TestStandbyIsHot { @Test public void testStandbyIsHot() throws Exception { Configuration conf = new Configuration(); + // We read from the standby to watch block locations + HAUtil.setAllowStandbyReads(conf, true); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index c8c528d0bbd..b9f49145569 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -167,7 +167,12 @@ public class TestNameNodeMetrics extends TestCase { // Corrupt first replica of the block LocatedBlock block = NameNodeAdapter.getBlockLocations( cluster.getNameNode(), file.toString(), 0, 1).get(0); - bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); + } finally { + cluster.getNamesystem().writeUnlock(); + } updateMetrics(); MetricsRecordBuilder rb = getMetrics(NS_METRICS); assertGauge("CorruptBlocks", 1L, rb); @@ -204,7 +209,12 @@ public class TestNameNodeMetrics extends TestCase { // Corrupt the only replica of the block to result in a missing block LocatedBlock block = NameNodeAdapter.getBlockLocations( cluster.getNameNode(), file.toString(), 0, 1).get(0); - bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); + } finally { + cluster.getNamesystem().writeUnlock(); + } updateMetrics(); MetricsRecordBuilder rb = getMetrics(NS_METRICS); assertGauge("UnderReplicatedBlocks", 1L, rb); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index 13e96830847..ea2b11e2b17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.test; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -176,4 +177,35 @@ public abstract class GenericTestUtils { } } + /** + * An Answer implementation which sleeps for a random number of milliseconds + * between 0 and a configurable value before delegating to the real + * implementation of the method. This can be useful for drawing out race + * conditions. + */ + public static class SleepAnswer implements Answer { + private final int maxSleepTime; + private static Random r = new Random(); + + public SleepAnswer(int maxSleepTime) { + this.maxSleepTime = maxSleepTime; + } + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + boolean interrupted = false; + try { + Thread.sleep(r.nextInt(maxSleepTime)); + } catch (InterruptedException ie) { + interrupted = true; + } + try { + return invocation.callRealMethod(); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + } }