HDFS-4979. Implement retry cache on Namenode. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1507170 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-07-26 01:09:27 +00:00
parent 42e8cbf92c
commit 1b531c1dbb
9 changed files with 649 additions and 83 deletions

View File

@ -290,4 +290,4 @@ public class RetryCache {
cache.set.clear();
}
}
}
}

View File

@ -336,6 +336,8 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
protocol methods. (suresh)
HDFS-4979. Implement retry cache on Namenode. (suresh)
IMPROVEMENTS

View File

@ -488,4 +488,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
}

View File

@ -87,7 +87,7 @@ import com.google.common.collect.Sets;
public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
static final Log blockLog = NameNode.blockStateChangeLog;
public static final Log blockLog = NameNode.blockStateChangeLog;
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* The given node is reporting incremental information about some blocks.
* This includes blocks that are starting to be received, completed being
* received, or deleted.
*
* This method must be called with FSNamesystem lock held.
*/
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId,
final ReceivedDeletedBlockInfo blockInfos[]
) throws IOException {
namesystem.writeLock();
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
throws IOException {
assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
blockLog
.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node "
+ nodeID);
throw new IOException(
"Got incremental block report from unregistered or dead node");
}
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(rdbi.getBlock(), node);
deleted++;
break;
case RECEIVED_BLOCK:
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
processAndHandleReportedBlock(node, rdbi.getBlock(),
ReplicaState.RBW, null);
break;
default:
String msg =
"Unknown block status code reported by " + nodeID +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
if (blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* block "
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ " is received from " + nodeID);
}
}
} finally {
namesystem.writeUnlock();
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
blockLog
.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ nodeID
+ " receiving: " + receiving + ", "
+ " received: " + received + ", "
+ " deleted: " + deleted);
.warn("BLOCK* processIncrementalBlockReport"
+ " is received from dead or unregistered node "
+ nodeID);
throw new IOException(
"Got incremental block report from unregistered or dead node");
}
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
removeStoredBlock(rdbi.getBlock(), node);
deleted++;
break;
case RECEIVED_BLOCK:
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
processAndHandleReportedBlock(node, rdbi.getBlock(),
ReplicaState.RBW, null);
break;
default:
String msg =
"Unknown block status code reported by " + nodeID +
": " + rdbi;
blockLog.warn(msg);
assert false : msg; // if assertions are enabled, throw.
break;
}
if (blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* block "
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ " is received from " + nodeID);
}
}
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ nodeID + " receiving: " + receiving + ", " + " received: " + received
+ ", " + " deleted: " + deleted);
}
/**

View File

@ -63,8 +63,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.IdGenerator;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;

View File

@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
@ -264,7 +274,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
};
private boolean isAuditEnabled() {
@VisibleForTesting
public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
@ -437,6 +448,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private INodeId inodeId;
private final RetryCache retryCache;
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@ -671,6 +684,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@ -681,6 +695,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw re;
}
}
@VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
if (enable) {
float heapPercent = conf.getFloat(
DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
long entryExpiryMillis = conf.getLong(
DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
LOG.info("Retry cache will use " + heapPercent
+ " of total heap and retry cache entry expiry time is "
+ entryExpiryMillis + " millis");
long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
return new RetryCache("Namenode Retry Cache", heapPercent,
entryExpiryNanos);
}
return null;
}
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
@ -741,7 +777,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!haEnabled) {
fsImage.openEditLogForWrite();
}
success = true;
} finally {
if (!success) {
@ -818,6 +853,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} finally {
writeUnlock();
}
RetryCache.clear(retryCache);
}
/**
@ -1487,15 +1523,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void concat(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
// Either there is no previous request in progres or it has failed
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
boolean success = false;
try {
concatInt(target, srcs);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -1704,17 +1751,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -1853,13 +1908,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
}
/**
* Create a new file entry in the namespace.
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create()}, except it returns valid file status
* upon success
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create()}, except it returns valid file status upon
* success
*
* For retryCache handling details see -
* {@link #getFileStatus(boolean, CacheEntryWithPayload)}
*
*/
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
@ -1867,13 +1926,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
HdfsFileStatus status = null;
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
try {
return startFileInt(src, permissions, holder, clientMachine, flag,
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
return status;
}
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
@ -1896,7 +1965,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
final HdfsFileStatus stat;
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) {
@ -1977,7 +2046,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
} else {
if (overwrite) {
delete(src, true); // File exists - delete if overwrite
try {
deleteInt(src, true); // File exists - delete if overwrite
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
}
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@ -2226,16 +2300,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
LocatedBlock lb = null;
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LocatedBlock) cacheEntry.getPayload();
}
boolean success = false;
try {
return appendFileInt(src, holder, clientMachine);
lb = appendFileInt(src, holder, clientMachine);
success = true;
return lb;
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success, lb);
}
}
private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
throws AccessControlException, SafeModeException,
private LocatedBlock appendFileInt(String src, String holder,
String clientMachine) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -2798,12 +2884,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@Deprecated
boolean renameTo(String src, String dst)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
return renameToInt(src, dst);
ret = renameToInt(src, dst);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, ret);
}
return ret;
}
private boolean renameToInt(String src, String dst)
@ -2870,6 +2964,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
@ -2882,6 +2980,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2892,8 +2991,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false);
success = true;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (resultingStat != null) {
@ -2925,12 +3026,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean delete(String src, boolean recursive)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
return deleteInt(src, recursive);
ret = deleteInt(src, recursive);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, ret);
}
return ret;
}
private boolean deleteInt(String src, boolean recursive)
@ -2954,6 +3063,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new AccessControlException(ioe);
}
}
/**
* Remove a file/directory from the namespace.
* <p>
@ -2974,6 +3084,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean ret = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2992,6 +3103,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!dir.delete(src, collectedBlocks, removedINodes)) {
return false;
}
ret = true;
} finally {
writeUnlock();
}
@ -3009,7 +3121,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+ src +" is removed");
}
return true;
return ret;
}
/**
@ -3175,12 +3287,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
boolean ret = false;
try {
return mkdirsInt(src, permissions, createParent);
ret = mkdirsInt(src, permissions, createParent);
} catch (AccessControlException e) {
logAuditEvent(false, "mkdirs", src);
throw e;
}
return ret;
}
private boolean mkdirsInt(String src, PermissionStatus permissions,
@ -3240,7 +3354,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
// validate that we have enough inodes. This is, at best, a
// heuristic because the mkdirs() operation migth need to
// heuristic because the mkdirs() operation might need to
// create multiple inodes.
checkFsObjectLimit();
@ -4040,8 +4154,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkSuperuserPrivilege();
checkOperation(OperationCategory.UNCHECKED);
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
@ -4050,10 +4169,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"in order to create namespace image.");
}
getFSImage().saveNamespace(this);
LOG.info("New namespace image has been created");
success = true;
} finally {
readUnlock();
RetryCache.setState(cacheEntry, success);
}
LOG.info("New namespace image has been created");
}
/**
@ -4611,6 +4732,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
// Ignored
}
}
if (!fsRunning) {
@ -4869,30 +4991,51 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
NamenodeCommand startCheckpoint(
NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
NamenodeRegistration activeNamenode) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
writeLock();
NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not started", safeMode);
}
LOG.info("Start checkpoint for " + bnReg.getAddress());
NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
LOG.info("Start checkpoint for " + backupNode.getAddress());
cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
getEditLog().logSync();
return cmd;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, cmd != null, cmd);
}
}
public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
throws IOException {
writeLock();
try {
blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
} finally {
writeUnlock();
}
}
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.CHECKPOINT);
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
@ -4902,8 +5045,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig);
success = true;
} finally {
readUnlock();
RetryCache.setState(cacheEntry, success);
}
}
@ -5421,6 +5566,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
@ -5429,17 +5578,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ ", clientName=" + clientName
+ ")");
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Pipeline not updated", safeMode);
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
success = true;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@ -6304,9 +6455,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
String createSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
final String snapshotPath;
String snapshotPath = null;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@ -6330,6 +6486,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
}
getEditLog().logSync();
@ -6349,8 +6506,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@ -6364,8 +6526,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
success = true;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@ -6458,6 +6622,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -6481,8 +6650,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
success = true;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();

View File

@ -960,7 +960,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
namesystem.getBlockManager().processIncrementalBlockReport(
namesystem.processIncrementalBlockReport(
nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
}

View File

@ -1352,4 +1352,43 @@
</description>
</property>
<property>
<name>dfs.namenode.enable.retrycache</name>
<value>true</value>
<description>
This enables the retry cache on the namenode. Namenode tracks for
non-idempotent requests the corresponding response. If a client retries the
request, the response from the retry cache is sent. Such operations
are tagged with annotation @AtMostOnce in namenode protocols. It is
recommended that this flag be set to true. Setting it to false, will result
in clients getting failure responses to retried request. This flag must
be enabled in HA setup for transparent fail-overs.
The entries in the cache have expiration time configurable
using dfs.namenode.retrycache.expirytime.millis.
</description>
</property>
<property>
<name>dfs.namenode.retrycache.expirytime.millis</name>
<value>600000</value>
<description>
The time for which retry cache entries are retained.
</description>
</property>
<property>
<name>dfs.namenode.retrycache.heap.percent</name>
<value>0.03f</value>
<description>
This parameter configures the heap size allocated for retry cache
(excluding the response cached). This corresponds to approximately
4096 entries for every 64MB of namenode process java heap size.
Assuming retry cache entry expiration time (configured using
dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
enables retry cache to support 7 operations per second sustained
for 10 minutes. As the heap size is increased, the operation rate
linearly increases.
</description>
</property>
</configuration>

View File

@ -0,0 +1,355 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests for ensuring the namenode retry cache works correctly for
* non-idempotent requests.
*
* Retry cache works based on tracking previously received request based on the
* ClientId and CallId received in RPC requests and storing the response. The
* response is replayed on retry when the same request is received again.
*
* The test works by manipulating the Rpc {@link Server} current RPC call. For
* testing retried requests, an Rpc callId is generated only once using
* {@link #newCall()} and reused for many method calls. For testing non-retried
* request, a new callId is generated using {@link #newCall()}.
*/
public class TestNamenodeRetryCache {
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
private static MiniDFSCluster cluster;
private static FSNamesystem namesystem;
private static PermissionStatus perm = new PermissionStatus(
"TestNamenodeRetryCache", null, FsPermission.getDefault());
private static FileSystem filesystem;
private static int callId = 100;
/** Start a cluster */
@BeforeClass
public static void setup() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
namesystem = cluster.getNamesystem();
filesystem = cluster.getFileSystem();
}
/** Cleanup after the test
* @throws IOException
* @throws UnresolvedLinkException
* @throws SafeModeException
* @throws AccessControlException */
@After
public void cleanup() throws IOException {
namesystem.delete("/", true);
}
public static void incrementCallId() {
callId++;
}
/** Set the current Server RPC call */
public static void newCall() {
Server.Call call = new Server.Call(++callId, 1, null, null,
RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
Server.getCurCall().set(call);
}
public static void resetCall() {
Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
Server.getCurCall().set(call);
}
private void concatSetup(String file1, String file2) throws Exception {
DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
}
/**
* Tests for concat call
*/
@Test
public void testConcat() throws Exception {
resetCall();
String file1 = "/testNamenodeRetryCache/testConcat/file1";
String file2 = "/testNamenodeRetryCache/testConcat/file2";
// Two retried concat calls succeed
concatSetup(file1, file2);
newCall();
namesystem.concat(file1, new String[]{file2});
namesystem.concat(file1, new String[]{file2});
namesystem.concat(file1, new String[]{file2});
// A non-retried concat request fails
newCall();
try {
// Second non-retry call should fail with an exception
namesystem.concat(file1, new String[]{file2});
Assert.fail("testConcat - expected exception is not thrown");
} catch (IOException e) {
// Expected
}
}
/**
* Tests for delete call
*/
@Test
public void testDelete() throws Exception {
String dir = "/testNamenodeRetryCache/testDelete";
// Two retried calls to create a non existent file
newCall();
namesystem.mkdirs(dir, perm, true);
newCall();
Assert.assertTrue(namesystem.delete(dir, false));
Assert.assertTrue(namesystem.delete(dir, false));
Assert.assertTrue(namesystem.delete(dir, false));
// non-retried call fails and gets false as return
newCall();
Assert.assertFalse(namesystem.delete(dir, false));
}
/**
* Test for createSymlink
*/
@Test
public void testCreateSymlink() throws Exception {
String target = "/testNamenodeRetryCache/testCreateSymlink/target";
// Two retried symlink calls succeed
newCall();
namesystem.createSymlink(target, "/a/b", perm, true);
namesystem.createSymlink(target, "/a/b", perm, true);
namesystem.createSymlink(target, "/a/b", perm, true);
// non-retried call fails
newCall();
try {
// Second non-retry call should fail with an exception
namesystem.createSymlink(target, "/a/b", perm, true);
Assert.fail("testCreateSymlink - expected exception is not thrown");
} catch (IOException e) {
// Expected
}
}
/**
* Test for create file
*/
@Test
public void testCreate() throws Exception {
String src = "/testNamenodeRetryCache/testCreate/file";
// Two retried calls succeed
newCall();
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
Assert.assertEquals(status, namesystem.startFile(src, perm,
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
true, (short) 1, 512));
// A non-retried call fails
newCall();
try {
namesystem.startFile(src, perm, "holder", "clientmachine",
EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
Assert.fail("testCreate - expected exception is not thrown");
} catch (IOException e) {
// expected
}
}
/**
* Test for rename1
*/
@Test
public void testAppend() throws Exception {
String src = "/testNamenodeRetryCache/testAppend/src";
resetCall();
// Create a file with partial block
DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L);
// Retried append requests succeed
newCall();
LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine");
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
// non-retried call fails
newCall();
try {
namesystem.appendFile(src, "holder", "clientMachine");
Assert.fail("testAppend - expected exception is not thrown");
} catch (Exception e) {
// Expected
}
}
/**
* Test for rename1
*/
@SuppressWarnings("deprecation")
@Test
public void testRename1() throws Exception {
String src = "/testNamenodeRetryCache/testRename1/src";
String target = "/testNamenodeRetryCache/testRename1/target";
resetCall();
namesystem.mkdirs(src, perm, true);
// Retried renames succeed
newCall();
Assert.assertTrue(namesystem.renameTo(src, target));
Assert.assertTrue(namesystem.renameTo(src, target));
Assert.assertTrue(namesystem.renameTo(src, target));
// A non-retried request fails
newCall();
Assert.assertFalse(namesystem.renameTo(src, target));
}
/**
* Test for rename2
*/
@Test
public void testRename2() throws Exception {
String src = "/testNamenodeRetryCache/testRename2/src";
String target = "/testNamenodeRetryCache/testRename2/target";
resetCall();
namesystem.mkdirs(src, perm, true);
// Retried renames succeed
newCall();
namesystem.renameTo(src, target, Rename.NONE);
namesystem.renameTo(src, target, Rename.NONE);
namesystem.renameTo(src, target, Rename.NONE);
// A non-retried request fails
newCall();
try {
namesystem.renameTo(src, target, Rename.NONE);
Assert.fail("testRename 2 expected exception is not thrown");
} catch (IOException e) {
// expected
}
}
/**
* Test for crateSnapshot
*/
@Test
public void testSnapshotMethods() throws Exception {
String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
resetCall();
namesystem.mkdirs(dir, perm, true);
namesystem.allowSnapshot(dir);
// Test retry of create snapshot
newCall();
String name = namesystem.createSnapshot(dir, "snap1");
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
// Non retried calls should fail
newCall();
try {
namesystem.createSnapshot(dir, "snap1");
Assert.fail("testSnapshotMethods expected exception is not thrown");
} catch (IOException e) {
// exptected
}
// Test retry of rename snapshot
newCall();
namesystem.renameSnapshot(dir, "snap1", "snap2");
namesystem.renameSnapshot(dir, "snap1", "snap2");
namesystem.renameSnapshot(dir, "snap1", "snap2");
// Non retried calls should fail
newCall();
try {
namesystem.renameSnapshot(dir, "snap1", "snap2");
Assert.fail("testSnapshotMethods expected exception is not thrown");
} catch (IOException e) {
// expected
}
// Test retry of delete snapshot
newCall();
namesystem.deleteSnapshot(dir, "snap2");
namesystem.deleteSnapshot(dir, "snap2");
namesystem.deleteSnapshot(dir, "snap2");
// Non retried calls should fail
newCall();
try {
namesystem.deleteSnapshot(dir, "snap2");
Assert.fail("testSnapshotMethods expected exception is not thrown");
} catch (IOException e) {
// expected
}
}
@Test
public void testRetryCacheConfig() {
// By default retry configuration should be enabled
Configuration conf = new HdfsConfiguration();
Assert.assertNotNull(FSNamesystem.initRetryCache(conf));
// If retry cache is disabled, it should not be created
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
Assert.assertNull(FSNamesystem.initRetryCache(conf));
}
}