From 3bd39b2ea15ad689e00e1064aed9f7f6e7db45cc Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 26 Jul 2013 01:19:25 +0000 Subject: [PATCH] HDFS-4979. Merge 1507170 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507173 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/ipc/RetryCache.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 7 + .../server/blockmanagement/BlockManager.java | 100 +++-- .../hadoop/hdfs/server/namenode/FSImage.java | 2 - .../hdfs/server/namenode/FSNamesystem.java | 223 +++++++++-- .../server/namenode/NameNodeRpcServer.java | 2 +- .../src/main/resources/hdfs-default.xml | 39 ++ .../namenode/TestNamenodeRetryCache.java | 355 ++++++++++++++++++ 9 files changed, 649 insertions(+), 83 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java index 3858a307ad7..d89dceb508a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java @@ -290,4 +290,4 @@ public class RetryCache { cache.set.clear(); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5a1c48ba29a..1df4aa90661 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -109,6 +109,8 @@ Release 2.1.0-beta - 2013-07-02 HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode protocol methods. (suresh) + + HDFS-4979. Implement retry cache on Namenode. (suresh) IMPROVEMENTS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index cca1df4d5db..b32c15f3d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -488,4 +488,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log"; public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l; + + public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache"; + public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true; + public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis"; + public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes + public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent"; + public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 98fc9427c30..f20b8cc5589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -87,7 +87,7 @@ import com.google.common.collect.Sets; public class BlockManager { static final Log LOG = LogFactory.getLog(BlockManager.class); - static final Log blockLog = NameNode.blockStateChangeLog; + public static final Log blockLog = NameNode.blockStateChangeLog; /** Default load factor of map */ public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; @@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block * The given node is reporting incremental information about some blocks. * This includes blocks that are starting to be received, completed being * received, or deleted. + * + * This method must be called with FSNamesystem lock held. */ - public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, - final ReceivedDeletedBlockInfo blockInfos[] - ) throws IOException { - namesystem.writeLock(); + public void processIncrementalBlockReport(final DatanodeID nodeID, + final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + throws IOException { + assert namesystem.hasWriteLock(); int received = 0; int deleted = 0; int receiving = 0; - try { - final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); - if (node == null || !node.isAlive) { - blockLog - .warn("BLOCK* processIncrementalBlockReport" - + " is received from dead or unregistered node " - + nodeID); - throw new IOException( - "Got incremental block report from unregistered or dead node"); - } - - for (ReceivedDeletedBlockInfo rdbi : blockInfos) { - switch (rdbi.getStatus()) { - case DELETED_BLOCK: - removeStoredBlock(rdbi.getBlock(), node); - deleted++; - break; - case RECEIVED_BLOCK: - addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); - received++; - break; - case RECEIVING_BLOCK: - receiving++; - processAndHandleReportedBlock(node, rdbi.getBlock(), - ReplicaState.RBW, null); - break; - default: - String msg = - "Unknown block status code reported by " + nodeID + - ": " + rdbi; - blockLog.warn(msg); - assert false : msg; // if assertions are enabled, throw. - break; - } - if (blockLog.isDebugEnabled()) { - blockLog.debug("BLOCK* block " - + (rdbi.getStatus()) + ": " + rdbi.getBlock() - + " is received from " + nodeID); - } - } - } finally { - namesystem.writeUnlock(); + final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + if (node == null || !node.isAlive) { blockLog - .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from " - + nodeID - + " receiving: " + receiving + ", " - + " received: " + received + ", " - + " deleted: " + deleted); + .warn("BLOCK* processIncrementalBlockReport" + + " is received from dead or unregistered node " + + nodeID); + throw new IOException( + "Got incremental block report from unregistered or dead node"); } + + for (ReceivedDeletedBlockInfo rdbi : blockInfos) { + switch (rdbi.getStatus()) { + case DELETED_BLOCK: + removeStoredBlock(rdbi.getBlock(), node); + deleted++; + break; + case RECEIVED_BLOCK: + addBlock(node, rdbi.getBlock(), rdbi.getDelHints()); + received++; + break; + case RECEIVING_BLOCK: + receiving++; + processAndHandleReportedBlock(node, rdbi.getBlock(), + ReplicaState.RBW, null); + break; + default: + String msg = + "Unknown block status code reported by " + nodeID + + ": " + rdbi; + blockLog.warn(msg); + assert false : msg; // if assertions are enabled, throw. + break; + } + if (blockLog.isDebugEnabled()) { + blockLog.debug("BLOCK* block " + + (rdbi.getStatus()) + ": " + rdbi.getBlock() + + " is received from " + nodeID); + } + } + blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from " + + nodeID + " receiving: " + receiving + ", " + " received: " + received + + ", " + " deleted: " + deleted); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 141abaade6d..591b8a1f897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; -import org.apache.hadoop.util.IdGenerator; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 920b2239c4a..9f6d26b1543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; @@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; @@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetryCache; +import org.apache.hadoop.ipc.RetryCache.CacheEntry; +import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.annotation.Metric; @@ -249,7 +259,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } }; - private boolean isAuditEnabled() { + @VisibleForTesting + public boolean isAuditEnabled() { return !isDefaultAuditLogger || auditLog.isInfoEnabled(); } @@ -422,6 +433,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private INodeId inodeId; + private final RetryCache retryCache; + /** * Set the last allocated inode id when fsimage or editlog is loaded. */ @@ -656,6 +669,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && auditLoggers.get(0) instanceof DefaultAuditLogger; + this.retryCache = initRetryCache(conf); } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); @@ -666,6 +680,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw re; } } + + @VisibleForTesting + static RetryCache initRetryCache(Configuration conf) { + boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, + DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT); + LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled")); + if (enable) { + float heapPercent = conf.getFloat( + DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY, + DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT); + long entryExpiryMillis = conf.getLong( + DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY, + DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT); + LOG.info("Retry cache will use " + heapPercent + + " of total heap and retry cache entry expiry time is " + + entryExpiryMillis + " millis"); + long entryExpiryNanos = entryExpiryMillis * 1000 * 1000; + return new RetryCache("Namenode Retry Cache", heapPercent, + entryExpiryNanos); + } + return null; + } private List initAuditLoggers(Configuration conf) { // Initialize the custom access loggers if configured. @@ -726,7 +762,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (!haEnabled) { fsImage.openEditLogForWrite(); } - success = true; } finally { if (!success) { @@ -803,6 +838,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } finally { writeUnlock(); } + RetryCache.clear(retryCache); } /** @@ -1471,15 +1507,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ void concat(String target, String [] srcs) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + + // Either there is no previous request in progres or it has failed if(FSNamesystem.LOG.isDebugEnabled()) { FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target); } + + boolean success = false; try { concatInt(target, srcs); + success = true; } catch (AccessControlException e) { logAuditEvent(false, "concat", Arrays.toString(srcs), target, null); throw e; + } finally { + RetryCache.setState(cacheEntry, success); } } @@ -1688,17 +1735,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats, void createSymlink(String target, String link, PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } if (!DFSUtil.isValidName(link)) { throw new InvalidPathException("Invalid link name: " + link); } if (FSDirectory.isReservedName(target)) { throw new InvalidPathException("Invalid target name: " + target); } + boolean success = false; try { createSymlinkInt(target, link, dirPerms, createParent); + success = true; } catch (AccessControlException e) { logAuditEvent(false, "createSymlink", link, target, null); throw e; + } finally { + RetryCache.setState(cacheEntry, success); } } @@ -1837,13 +1892,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } } - + /** * Create a new file entry in the namespace. * - * For description of parameters and exceptions thrown see - * {@link ClientProtocol#create()}, except it returns valid file status - * upon success + * For description of parameters and exceptions thrown see + * {@link ClientProtocol#create()}, except it returns valid file status upon + * success + * + * For retryCache handling details see - + * {@link #getFileStatus(boolean, CacheEntryWithPayload)} + * */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, @@ -1851,13 +1910,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, SafeModeException, FileAlreadyExistsException, UnresolvedLinkException, FileNotFoundException, ParentNotDirectoryException, IOException { + HdfsFileStatus status = null; + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (HdfsFileStatus) cacheEntry.getPayload(); + } + try { - return startFileInt(src, permissions, holder, clientMachine, flag, + status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; + } finally { + RetryCache.setState(cacheEntry, status != null, status); } + return status; } private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, @@ -1880,7 +1949,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockManager.verifyReplication(src, replication, clientMachine); boolean skipSync = false; - final HdfsFileStatus stat; + HdfsFileStatus stat = null; FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { @@ -1960,7 +2029,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } else { if (overwrite) { - delete(src, true); // File exists - delete if overwrite + try { + deleteInt(src, true); // File exists - delete if overwrite + } catch (AccessControlException e) { + logAuditEvent(false, "delete", src); + throw e; + } } else { // If lease soft limit time is expired, recover the lease recoverLeaseInternal(myFile, src, holder, clientMachine, false); @@ -2209,16 +2283,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, SafeModeException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, IOException { + LocatedBlock lb = null; + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (LocatedBlock) cacheEntry.getPayload(); + } + + boolean success = false; try { - return appendFileInt(src, holder, clientMachine); + lb = appendFileInt(src, holder, clientMachine); + success = true; + return lb; } catch (AccessControlException e) { logAuditEvent(false, "append", src); throw e; + } finally { + RetryCache.setState(cacheEntry, success, lb); } } - private LocatedBlock appendFileInt(String src, String holder, String clientMachine) - throws AccessControlException, SafeModeException, + private LocatedBlock appendFileInt(String src, String holder, + String clientMachine) throws AccessControlException, SafeModeException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -2781,12 +2867,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats, @Deprecated boolean renameTo(String src, String dst) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return true; // Return previous response + } + boolean ret = false; try { - return renameToInt(src, dst); + ret = renameToInt(src, dst); } catch (AccessControlException e) { logAuditEvent(false, "rename", src, dst, null); throw e; + } finally { + RetryCache.setState(cacheEntry, ret); } + return ret; } private boolean renameToInt(String src, String dst) @@ -2853,6 +2947,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** Rename src to dst */ void renameTo(String src, String dst, Options.Rename... options) throws IOException, UnresolvedLinkException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " + src + " to " + dst); @@ -2865,6 +2963,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst); HdfsFileStatus resultingStat = null; + boolean success = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2875,8 +2974,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, dst = FSDirectory.resolvePath(dst, dstComponents, dir); renameToInternal(pc, src, dst, options); resultingStat = getAuditFileInfo(dst, false); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); if (resultingStat != null) { @@ -2908,12 +3009,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean delete(String src, boolean recursive) throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return true; // Return previous response + } + boolean ret = false; try { - return deleteInt(src, recursive); + ret = deleteInt(src, recursive); } catch (AccessControlException e) { logAuditEvent(false, "delete", src); throw e; + } finally { + RetryCache.setState(cacheEntry, ret); } + return ret; } private boolean deleteInt(String src, boolean recursive) @@ -2937,6 +3046,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new AccessControlException(ioe); } } + /** * Remove a file/directory from the namespace. *

@@ -2957,6 +3067,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + boolean ret = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2975,6 +3086,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (!dir.delete(src, collectedBlocks, removedINodes)) { return false; } + ret = true; } finally { writeUnlock(); } @@ -2992,7 +3104,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, NameNode.stateChangeLog.debug("DIR* Namesystem.delete: " + src +" is removed"); } - return true; + return ret; } /** @@ -3158,12 +3270,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ boolean mkdirs(String src, PermissionStatus permissions, boolean createParent) throws IOException, UnresolvedLinkException { + boolean ret = false; try { - return mkdirsInt(src, permissions, createParent); + ret = mkdirsInt(src, permissions, createParent); } catch (AccessControlException e) { logAuditEvent(false, "mkdirs", src); throw e; } + return ret; } private boolean mkdirsInt(String src, PermissionStatus permissions, @@ -3223,7 +3337,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } // validate that we have enough inodes. This is, at best, a - // heuristic because the mkdirs() operation migth need to + // heuristic because the mkdirs() operation might need to // create multiple inodes. checkFsObjectLimit(); @@ -4023,8 +4137,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException if */ void saveNamespace() throws AccessControlException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkSuperuserPrivilege(); checkOperation(OperationCategory.UNCHECKED); + boolean success = false; readLock(); try { checkOperation(OperationCategory.UNCHECKED); @@ -4033,10 +4152,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, "in order to create namespace image."); } getFSImage().saveNamespace(this); - LOG.info("New namespace image has been created"); + success = true; } finally { readUnlock(); + RetryCache.setState(cacheEntry, success); } + LOG.info("New namespace image has been created"); } /** @@ -4594,6 +4715,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { Thread.sleep(recheckInterval); } catch (InterruptedException ie) { + // Ignored } } if (!fsRunning) { @@ -4852,30 +4974,51 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - NamenodeCommand startCheckpoint( - NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node - throws IOException { + NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, + NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (NamenodeCommand) cacheEntry.getPayload(); + } writeLock(); + NamenodeCommand cmd = null; try { checkOperation(OperationCategory.CHECKPOINT); if (isInSafeMode()) { throw new SafeModeException("Checkpoint not started", safeMode); } - LOG.info("Start checkpoint for " + bnReg.getAddress()); - NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg); + LOG.info("Start checkpoint for " + backupNode.getAddress()); + cmd = getFSImage().startCheckpoint(backupNode, activeNamenode); getEditLog().logSync(); return cmd; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, cmd != null, cmd); } } + public void processIncrementalBlockReport(final DatanodeID nodeID, + final String poolId, final ReceivedDeletedBlockInfo blockInfos[]) + throws IOException { + writeLock(); + try { + blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos); + } finally { + writeUnlock(); + } + } + void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkOperation(OperationCategory.CHECKPOINT); + boolean success = false; readLock(); try { checkOperation(OperationCategory.CHECKPOINT); @@ -4885,8 +5028,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } LOG.info("End checkpoint for " + registration.getAddress()); getFSImage().endCheckpoint(sig); + success = true; } finally { readUnlock(); + RetryCache.setState(cacheEntry, success); } } @@ -5404,6 +5549,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } checkOperation(OperationCategory.WRITE); LOG.info("updatePipeline(block=" + oldBlock + ", newGenerationStamp=" + newBlock.getGenerationStamp() @@ -5412,17 +5561,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + ", clientName=" + clientName + ")"); writeLock(); + boolean success = false; try { checkOperation(OperationCategory.WRITE); - if (isInSafeMode()) { throw new SafeModeException("Pipeline not updated", safeMode); } assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; updatePipelineInternal(clientName, oldBlock, newBlock, newNodes); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock); @@ -6287,9 +6438,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ String createSnapshot(String snapshotRoot, String snapshotName) throws SafeModeException, IOException { + CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, + null); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return (String) cacheEntry.getPayload(); + } final FSPermissionChecker pc = getPermissionChecker(); writeLock(); - final String snapshotPath; + String snapshotPath = null; try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { @@ -6313,6 +6469,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, getEditLog().logCreateSnapshot(snapshotRoot, snapshotName); } finally { writeUnlock(); + RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath); } getEditLog().logSync(); @@ -6332,8 +6489,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ void renameSnapshot(String path, String snapshotOldName, String snapshotNewName) throws SafeModeException, IOException { + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } final FSPermissionChecker pc = getPermissionChecker(); writeLock(); + boolean success = false; try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { @@ -6347,8 +6509,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName); getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); @@ -6441,6 +6605,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, void deleteSnapshot(String snapshotRoot, String snapshotName) throws SafeModeException, IOException { final FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + boolean success = false; writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -6464,8 +6633,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.removeBlocks(collectedBlocks); collectedBlocks.clear(); getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName); + success = true; } finally { writeUnlock(); + RetryCache.setState(cacheEntry, success); } getEditLog().logSync(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 397421696db..b22b13c9f17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -951,7 +951,7 @@ class NameNodeRpcServer implements NamenodeProtocols { +"from "+nodeReg+" "+receivedAndDeletedBlocks.length +" blocks."); } - namesystem.getBlockManager().processIncrementalBlockReport( + namesystem.processIncrementalBlockReport( nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 11d632f317e..06eca701264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1352,4 +1352,43 @@ + + dfs.namenode.enable.retrycache + true + + This enables the retry cache on the namenode. Namenode tracks for + non-idempotent requests the corresponding response. If a client retries the + request, the response from the retry cache is sent. Such operations + are tagged with annotation @AtMostOnce in namenode protocols. It is + recommended that this flag be set to true. Setting it to false, will result + in clients getting failure responses to retried request. This flag must + be enabled in HA setup for transparent fail-overs. + + The entries in the cache have expiration time configurable + using dfs.namenode.retrycache.expirytime.millis. + + + + + dfs.namenode.retrycache.expirytime.millis + 600000 + + The time for which retry cache entries are retained. + + + + + dfs.namenode.retrycache.heap.percent + 0.03f + + This parameter configures the heap size allocated for retry cache + (excluding the response cached). This corresponds to approximately + 4096 entries for every 64MB of namenode process java heap size. + Assuming retry cache entry expiration time (configured using + dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this + enables retry cache to support 7 operations per second sustained + for 10 minutes. As the heap size is increased, the operation rate + linearly increases. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java new file mode 100644 index 00000000000..b581c9ae206 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + + + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.RetryCache; +import org.apache.hadoop.ipc.RpcConstants; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.util.StringUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests for ensuring the namenode retry cache works correctly for + * non-idempotent requests. + * + * Retry cache works based on tracking previously received request based on the + * ClientId and CallId received in RPC requests and storing the response. The + * response is replayed on retry when the same request is received again. + * + * The test works by manipulating the Rpc {@link Server} current RPC call. For + * testing retried requests, an Rpc callId is generated only once using + * {@link #newCall()} and reused for many method calls. For testing non-retried + * request, a new callId is generated using {@link #newCall()}. + */ +public class TestNamenodeRetryCache { + private static final byte[] CLIENT_ID = StringUtils.getUuidBytes(); + private static MiniDFSCluster cluster; + private static FSNamesystem namesystem; + private static PermissionStatus perm = new PermissionStatus( + "TestNamenodeRetryCache", null, FsPermission.getDefault()); + private static FileSystem filesystem; + private static int callId = 100; + + /** Start a cluster */ + @BeforeClass + public static void setup() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512"); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + namesystem = cluster.getNamesystem(); + filesystem = cluster.getFileSystem(); + } + + /** Cleanup after the test + * @throws IOException + * @throws UnresolvedLinkException + * @throws SafeModeException + * @throws AccessControlException */ + @After + public void cleanup() throws IOException { + namesystem.delete("/", true); + } + + public static void incrementCallId() { + callId++; + } + + /** Set the current Server RPC call */ + public static void newCall() { + Server.Call call = new Server.Call(++callId, 1, null, null, + RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID); + Server.getCurCall().set(call); + } + + public static void resetCall() { + Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null, + null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID); + Server.getCurCall().set(call); + } + + private void concatSetup(String file1, String file2) throws Exception { + DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L); + DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L); + } + + /** + * Tests for concat call + */ + @Test + public void testConcat() throws Exception { + resetCall(); + String file1 = "/testNamenodeRetryCache/testConcat/file1"; + String file2 = "/testNamenodeRetryCache/testConcat/file2"; + + // Two retried concat calls succeed + concatSetup(file1, file2); + newCall(); + namesystem.concat(file1, new String[]{file2}); + namesystem.concat(file1, new String[]{file2}); + namesystem.concat(file1, new String[]{file2}); + + // A non-retried concat request fails + newCall(); + try { + // Second non-retry call should fail with an exception + namesystem.concat(file1, new String[]{file2}); + Assert.fail("testConcat - expected exception is not thrown"); + } catch (IOException e) { + // Expected + } + } + + /** + * Tests for delete call + */ + @Test + public void testDelete() throws Exception { + String dir = "/testNamenodeRetryCache/testDelete"; + // Two retried calls to create a non existent file + newCall(); + namesystem.mkdirs(dir, perm, true); + newCall(); + Assert.assertTrue(namesystem.delete(dir, false)); + Assert.assertTrue(namesystem.delete(dir, false)); + Assert.assertTrue(namesystem.delete(dir, false)); + + // non-retried call fails and gets false as return + newCall(); + Assert.assertFalse(namesystem.delete(dir, false)); + } + + /** + * Test for createSymlink + */ + @Test + public void testCreateSymlink() throws Exception { + String target = "/testNamenodeRetryCache/testCreateSymlink/target"; + + // Two retried symlink calls succeed + newCall(); + namesystem.createSymlink(target, "/a/b", perm, true); + namesystem.createSymlink(target, "/a/b", perm, true); + namesystem.createSymlink(target, "/a/b", perm, true); + + // non-retried call fails + newCall(); + try { + // Second non-retry call should fail with an exception + namesystem.createSymlink(target, "/a/b", perm, true); + Assert.fail("testCreateSymlink - expected exception is not thrown"); + } catch (IOException e) { + // Expected + } + } + + /** + * Test for create file + */ + @Test + public void testCreate() throws Exception { + String src = "/testNamenodeRetryCache/testCreate/file"; + // Two retried calls succeed + newCall(); + HdfsFileStatus status = namesystem.startFile(src, perm, "holder", + "clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512); + Assert.assertEquals(status, namesystem.startFile(src, perm, + "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), + true, (short) 1, 512)); + Assert.assertEquals(status, namesystem.startFile(src, perm, + "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE), + true, (short) 1, 512)); + + // A non-retried call fails + newCall(); + try { + namesystem.startFile(src, perm, "holder", "clientmachine", + EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512); + Assert.fail("testCreate - expected exception is not thrown"); + } catch (IOException e) { + // expected + } + } + + /** + * Test for rename1 + */ + @Test + public void testAppend() throws Exception { + String src = "/testNamenodeRetryCache/testAppend/src"; + resetCall(); + // Create a file with partial block + DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L); + + // Retried append requests succeed + newCall(); + LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine"); + Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine")); + Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine")); + + // non-retried call fails + newCall(); + try { + namesystem.appendFile(src, "holder", "clientMachine"); + Assert.fail("testAppend - expected exception is not thrown"); + } catch (Exception e) { + // Expected + } + } + + /** + * Test for rename1 + */ + @SuppressWarnings("deprecation") + @Test + public void testRename1() throws Exception { + String src = "/testNamenodeRetryCache/testRename1/src"; + String target = "/testNamenodeRetryCache/testRename1/target"; + resetCall(); + namesystem.mkdirs(src, perm, true); + + // Retried renames succeed + newCall(); + Assert.assertTrue(namesystem.renameTo(src, target)); + Assert.assertTrue(namesystem.renameTo(src, target)); + Assert.assertTrue(namesystem.renameTo(src, target)); + + // A non-retried request fails + newCall(); + Assert.assertFalse(namesystem.renameTo(src, target)); + } + + /** + * Test for rename2 + */ + @Test + public void testRename2() throws Exception { + String src = "/testNamenodeRetryCache/testRename2/src"; + String target = "/testNamenodeRetryCache/testRename2/target"; + resetCall(); + namesystem.mkdirs(src, perm, true); + + // Retried renames succeed + newCall(); + namesystem.renameTo(src, target, Rename.NONE); + namesystem.renameTo(src, target, Rename.NONE); + namesystem.renameTo(src, target, Rename.NONE); + + // A non-retried request fails + newCall(); + try { + namesystem.renameTo(src, target, Rename.NONE); + Assert.fail("testRename 2 expected exception is not thrown"); + } catch (IOException e) { + // expected + } + } + + /** + * Test for crateSnapshot + */ + @Test + public void testSnapshotMethods() throws Exception { + String dir = "/testNamenodeRetryCache/testCreateSnapshot/src"; + resetCall(); + namesystem.mkdirs(dir, perm, true); + namesystem.allowSnapshot(dir); + + // Test retry of create snapshot + newCall(); + String name = namesystem.createSnapshot(dir, "snap1"); + Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1")); + Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1")); + Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1")); + + // Non retried calls should fail + newCall(); + try { + namesystem.createSnapshot(dir, "snap1"); + Assert.fail("testSnapshotMethods expected exception is not thrown"); + } catch (IOException e) { + // exptected + } + + // Test retry of rename snapshot + newCall(); + namesystem.renameSnapshot(dir, "snap1", "snap2"); + namesystem.renameSnapshot(dir, "snap1", "snap2"); + namesystem.renameSnapshot(dir, "snap1", "snap2"); + + // Non retried calls should fail + newCall(); + try { + namesystem.renameSnapshot(dir, "snap1", "snap2"); + Assert.fail("testSnapshotMethods expected exception is not thrown"); + } catch (IOException e) { + // expected + } + + // Test retry of delete snapshot + newCall(); + namesystem.deleteSnapshot(dir, "snap2"); + namesystem.deleteSnapshot(dir, "snap2"); + namesystem.deleteSnapshot(dir, "snap2"); + + // Non retried calls should fail + newCall(); + try { + namesystem.deleteSnapshot(dir, "snap2"); + Assert.fail("testSnapshotMethods expected exception is not thrown"); + } catch (IOException e) { + // expected + } + } + + @Test + public void testRetryCacheConfig() { + // By default retry configuration should be enabled + Configuration conf = new HdfsConfiguration(); + Assert.assertNotNull(FSNamesystem.initRetryCache(conf)); + + // If retry cache is disabled, it should not be created + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false); + Assert.assertNull(FSNamesystem.initRetryCache(conf)); + } +}