HDFS-4979. Merge 1507170 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507173 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca8723f488
commit
3bd39b2ea1
|
@ -290,4 +290,4 @@ public class RetryCache {
|
|||
cache.set.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,6 +109,8 @@ Release 2.1.0-beta - 2013-07-02
|
|||
|
||||
HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
|
||||
protocol methods. (suresh)
|
||||
|
||||
HDFS-4979. Implement retry cache on Namenode. (suresh)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
|
|
|
@ -488,4 +488,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
|
||||
public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
|
||||
public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
|
||||
|
||||
public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
|
||||
public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
|
||||
public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
|
||||
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
|
||||
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
|
||||
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ import com.google.common.collect.Sets;
|
|||
public class BlockManager {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(BlockManager.class);
|
||||
static final Log blockLog = NameNode.blockStateChangeLog;
|
||||
public static final Log blockLog = NameNode.blockStateChangeLog;
|
||||
|
||||
/** Default load factor of map */
|
||||
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
|
||||
|
@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
* The given node is reporting incremental information about some blocks.
|
||||
* This includes blocks that are starting to be received, completed being
|
||||
* received, or deleted.
|
||||
*
|
||||
* This method must be called with FSNamesystem lock held.
|
||||
*/
|
||||
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
||||
final String poolId,
|
||||
final ReceivedDeletedBlockInfo blockInfos[]
|
||||
) throws IOException {
|
||||
namesystem.writeLock();
|
||||
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
||||
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
|
||||
throws IOException {
|
||||
assert namesystem.hasWriteLock();
|
||||
int received = 0;
|
||||
int deleted = 0;
|
||||
int receiving = 0;
|
||||
try {
|
||||
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
||||
if (node == null || !node.isAlive) {
|
||||
blockLog
|
||||
.warn("BLOCK* processIncrementalBlockReport"
|
||||
+ " is received from dead or unregistered node "
|
||||
+ nodeID);
|
||||
throw new IOException(
|
||||
"Got incremental block report from unregistered or dead node");
|
||||
}
|
||||
|
||||
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
|
||||
switch (rdbi.getStatus()) {
|
||||
case DELETED_BLOCK:
|
||||
removeStoredBlock(rdbi.getBlock(), node);
|
||||
deleted++;
|
||||
break;
|
||||
case RECEIVED_BLOCK:
|
||||
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
|
||||
received++;
|
||||
break;
|
||||
case RECEIVING_BLOCK:
|
||||
receiving++;
|
||||
processAndHandleReportedBlock(node, rdbi.getBlock(),
|
||||
ReplicaState.RBW, null);
|
||||
break;
|
||||
default:
|
||||
String msg =
|
||||
"Unknown block status code reported by " + nodeID +
|
||||
": " + rdbi;
|
||||
blockLog.warn(msg);
|
||||
assert false : msg; // if assertions are enabled, throw.
|
||||
break;
|
||||
}
|
||||
if (blockLog.isDebugEnabled()) {
|
||||
blockLog.debug("BLOCK* block "
|
||||
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
|
||||
+ " is received from " + nodeID);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
||||
if (node == null || !node.isAlive) {
|
||||
blockLog
|
||||
.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
|
||||
+ nodeID
|
||||
+ " receiving: " + receiving + ", "
|
||||
+ " received: " + received + ", "
|
||||
+ " deleted: " + deleted);
|
||||
.warn("BLOCK* processIncrementalBlockReport"
|
||||
+ " is received from dead or unregistered node "
|
||||
+ nodeID);
|
||||
throw new IOException(
|
||||
"Got incremental block report from unregistered or dead node");
|
||||
}
|
||||
|
||||
for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
|
||||
switch (rdbi.getStatus()) {
|
||||
case DELETED_BLOCK:
|
||||
removeStoredBlock(rdbi.getBlock(), node);
|
||||
deleted++;
|
||||
break;
|
||||
case RECEIVED_BLOCK:
|
||||
addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
|
||||
received++;
|
||||
break;
|
||||
case RECEIVING_BLOCK:
|
||||
receiving++;
|
||||
processAndHandleReportedBlock(node, rdbi.getBlock(),
|
||||
ReplicaState.RBW, null);
|
||||
break;
|
||||
default:
|
||||
String msg =
|
||||
"Unknown block status code reported by " + nodeID +
|
||||
": " + rdbi;
|
||||
blockLog.warn(msg);
|
||||
assert false : msg; // if assertions are enabled, throw.
|
||||
break;
|
||||
}
|
||||
if (blockLog.isDebugEnabled()) {
|
||||
blockLog.debug("BLOCK* block "
|
||||
+ (rdbi.getStatus()) + ": " + rdbi.getBlock()
|
||||
+ " is received from " + nodeID);
|
||||
}
|
||||
}
|
||||
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
|
||||
+ nodeID + " receiving: " + receiving + ", " + " received: " + received
|
||||
+ ", " + " deleted: " + deleted);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|||
import org.apache.hadoop.hdfs.util.Canceler;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
import org.apache.hadoop.util.IdGenerator;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
|
|
@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
|
@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
|
||||
|
@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
|
@ -249,7 +259,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
};
|
||||
|
||||
private boolean isAuditEnabled() {
|
||||
@VisibleForTesting
|
||||
public boolean isAuditEnabled() {
|
||||
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
|
||||
}
|
||||
|
||||
|
@ -422,6 +433,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
private INodeId inodeId;
|
||||
|
||||
private final RetryCache retryCache;
|
||||
|
||||
/**
|
||||
* Set the last allocated inode id when fsimage or editlog is loaded.
|
||||
*/
|
||||
|
@ -656,6 +669,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
this.auditLoggers = initAuditLoggers(conf);
|
||||
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
|
||||
auditLoggers.get(0) instanceof DefaultAuditLogger;
|
||||
this.retryCache = initRetryCache(conf);
|
||||
} catch(IOException e) {
|
||||
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
|
||||
close();
|
||||
|
@ -666,6 +680,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throw re;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static RetryCache initRetryCache(Configuration conf) {
|
||||
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
|
||||
DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
|
||||
LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
|
||||
if (enable) {
|
||||
float heapPercent = conf.getFloat(
|
||||
DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
|
||||
DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
|
||||
long entryExpiryMillis = conf.getLong(
|
||||
DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
|
||||
DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
|
||||
LOG.info("Retry cache will use " + heapPercent
|
||||
+ " of total heap and retry cache entry expiry time is "
|
||||
+ entryExpiryMillis + " millis");
|
||||
long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
|
||||
return new RetryCache("Namenode Retry Cache", heapPercent,
|
||||
entryExpiryNanos);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<AuditLogger> initAuditLoggers(Configuration conf) {
|
||||
// Initialize the custom access loggers if configured.
|
||||
|
@ -726,7 +762,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (!haEnabled) {
|
||||
fsImage.openEditLogForWrite();
|
||||
}
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
|
@ -803,6 +838,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
RetryCache.clear(retryCache);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1471,15 +1507,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
void concat(String target, String [] srcs)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
// Either there is no previous request in progres or it has failed
|
||||
if(FSNamesystem.LOG.isDebugEnabled()) {
|
||||
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
|
||||
" to " + target);
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
concatInt(target, srcs);
|
||||
success = true;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1688,17 +1735,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
void createSymlink(String target, String link,
|
||||
PermissionStatus dirPerms, boolean createParent)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
if (!DFSUtil.isValidName(link)) {
|
||||
throw new InvalidPathException("Invalid link name: " + link);
|
||||
}
|
||||
if (FSDirectory.isReservedName(target)) {
|
||||
throw new InvalidPathException("Invalid target name: " + target);
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
createSymlinkInt(target, link, dirPerms, createParent);
|
||||
success = true;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "createSymlink", link, target, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1837,13 +1892,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new file entry in the namespace.
|
||||
*
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#create()}, except it returns valid file status
|
||||
* upon success
|
||||
* For description of parameters and exceptions thrown see
|
||||
* {@link ClientProtocol#create()}, except it returns valid file status upon
|
||||
* success
|
||||
*
|
||||
* For retryCache handling details see -
|
||||
* {@link #getFileStatus(boolean, CacheEntryWithPayload)}
|
||||
*
|
||||
*/
|
||||
HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
|
@ -1851,13 +1910,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
HdfsFileStatus status = null;
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (HdfsFileStatus) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
try {
|
||||
return startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
status = startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "create", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, status != null, status);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
|
||||
|
@ -1880,7 +1949,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
blockManager.verifyReplication(src, replication, clientMachine);
|
||||
|
||||
boolean skipSync = false;
|
||||
final HdfsFileStatus stat;
|
||||
HdfsFileStatus stat = null;
|
||||
FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
if (blockSize < minBlockSize) {
|
||||
|
@ -1960,7 +2029,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
} else {
|
||||
if (overwrite) {
|
||||
delete(src, true); // File exists - delete if overwrite
|
||||
try {
|
||||
deleteInt(src, true); // File exists - delete if overwrite
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "delete", src);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
// If lease soft limit time is expired, recover the lease
|
||||
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
|
||||
|
@ -2209,16 +2283,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
ParentNotDirectoryException, IOException {
|
||||
LocatedBlock lb = null;
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (LocatedBlock) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
return appendFileInt(src, holder, clientMachine);
|
||||
lb = appendFileInt(src, holder, clientMachine);
|
||||
success = true;
|
||||
return lb;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "append", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success, lb);
|
||||
}
|
||||
}
|
||||
|
||||
private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
|
||||
throws AccessControlException, SafeModeException,
|
||||
private LocatedBlock appendFileInt(String src, String holder,
|
||||
String clientMachine) throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
ParentNotDirectoryException, IOException {
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
|
@ -2781,12 +2867,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
@Deprecated
|
||||
boolean renameTo(String src, String dst)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
boolean ret = false;
|
||||
try {
|
||||
return renameToInt(src, dst);
|
||||
ret = renameToInt(src, dst);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "rename", src, dst, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private boolean renameToInt(String src, String dst)
|
||||
|
@ -2853,6 +2947,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
/** Rename src to dst */
|
||||
void renameTo(String src, String dst, Options.Rename... options)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
|
||||
+ src + " to " + dst);
|
||||
|
@ -2865,6 +2963,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
|
||||
HdfsFileStatus resultingStat = null;
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
|
@ -2875,8 +2974,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
|
||||
renameToInternal(pc, src, dst, options);
|
||||
resultingStat = getAuditFileInfo(dst, false);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
if (resultingStat != null) {
|
||||
|
@ -2908,12 +3009,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
boolean delete(String src, boolean recursive)
|
||||
throws AccessControlException, SafeModeException,
|
||||
UnresolvedLinkException, IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
boolean ret = false;
|
||||
try {
|
||||
return deleteInt(src, recursive);
|
||||
ret = deleteInt(src, recursive);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "delete", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private boolean deleteInt(String src, boolean recursive)
|
||||
|
@ -2937,6 +3046,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throw new AccessControlException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a file/directory from the namespace.
|
||||
* <p>
|
||||
|
@ -2957,6 +3067,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
FSPermissionChecker pc = getPermissionChecker();
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
boolean ret = false;
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
|
@ -2975,6 +3086,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (!dir.delete(src, collectedBlocks, removedINodes)) {
|
||||
return false;
|
||||
}
|
||||
ret = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
@ -2992,7 +3104,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
|
||||
+ src +" is removed");
|
||||
}
|
||||
return true;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3158,12 +3270,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
boolean mkdirs(String src, PermissionStatus permissions,
|
||||
boolean createParent) throws IOException, UnresolvedLinkException {
|
||||
boolean ret = false;
|
||||
try {
|
||||
return mkdirsInt(src, permissions, createParent);
|
||||
ret = mkdirsInt(src, permissions, createParent);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "mkdirs", src);
|
||||
throw e;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private boolean mkdirsInt(String src, PermissionStatus permissions,
|
||||
|
@ -3223,7 +3337,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
// validate that we have enough inodes. This is, at best, a
|
||||
// heuristic because the mkdirs() operation migth need to
|
||||
// heuristic because the mkdirs() operation might need to
|
||||
// create multiple inodes.
|
||||
checkFsObjectLimit();
|
||||
|
||||
|
@ -4023,8 +4137,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* @throws IOException if
|
||||
*/
|
||||
void saveNamespace() throws AccessControlException, IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
checkSuperuserPrivilege();
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
boolean success = false;
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
|
@ -4033,10 +4152,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
"in order to create namespace image.");
|
||||
}
|
||||
getFSImage().saveNamespace(this);
|
||||
LOG.info("New namespace image has been created");
|
||||
success = true;
|
||||
} finally {
|
||||
readUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
LOG.info("New namespace image has been created");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4594,6 +4715,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
try {
|
||||
Thread.sleep(recheckInterval);
|
||||
} catch (InterruptedException ie) {
|
||||
// Ignored
|
||||
}
|
||||
}
|
||||
if (!fsRunning) {
|
||||
|
@ -4852,30 +4974,51 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
NamenodeCommand startCheckpoint(
|
||||
NamenodeRegistration bnReg, // backup node
|
||||
NamenodeRegistration nnReg) // active name-node
|
||||
throws IOException {
|
||||
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
|
||||
NamenodeRegistration activeNamenode) throws IOException {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (NamenodeCommand) cacheEntry.getPayload();
|
||||
}
|
||||
writeLock();
|
||||
NamenodeCommand cmd = null;
|
||||
try {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
|
||||
if (isInSafeMode()) {
|
||||
throw new SafeModeException("Checkpoint not started", safeMode);
|
||||
}
|
||||
LOG.info("Start checkpoint for " + bnReg.getAddress());
|
||||
NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
|
||||
LOG.info("Start checkpoint for " + backupNode.getAddress());
|
||||
cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
|
||||
getEditLog().logSync();
|
||||
return cmd;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, cmd != null, cmd);
|
||||
}
|
||||
}
|
||||
|
||||
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
||||
final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
|
||||
throws IOException {
|
||||
writeLock();
|
||||
try {
|
||||
blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
void endCheckpoint(NamenodeRegistration registration,
|
||||
CheckpointSignature sig) throws IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
boolean success = false;
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
|
@ -4885,8 +5028,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
LOG.info("End checkpoint for " + registration.getAddress());
|
||||
getFSImage().endCheckpoint(sig);
|
||||
success = true;
|
||||
} finally {
|
||||
readUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5404,6 +5549,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
void updatePipeline(String clientName, ExtendedBlock oldBlock,
|
||||
ExtendedBlock newBlock, DatanodeID[] newNodes)
|
||||
throws IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
LOG.info("updatePipeline(block=" + oldBlock
|
||||
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
|
||||
|
@ -5412,17 +5561,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
+ ", clientName=" + clientName
|
||||
+ ")");
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
|
||||
if (isInSafeMode()) {
|
||||
throw new SafeModeException("Pipeline not updated", safeMode);
|
||||
}
|
||||
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
|
||||
+ oldBlock + " has different block identifier";
|
||||
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
|
||||
|
@ -6287,9 +6438,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
String createSnapshot(String snapshotRoot, String snapshotName)
|
||||
throws SafeModeException, IOException {
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (String) cacheEntry.getPayload();
|
||||
}
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
writeLock();
|
||||
final String snapshotPath;
|
||||
String snapshotPath = null;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
if (isInSafeMode()) {
|
||||
|
@ -6313,6 +6469,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
@ -6332,8 +6489,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
void renameSnapshot(String path, String snapshotOldName,
|
||||
String snapshotNewName) throws SafeModeException, IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
if (isInSafeMode()) {
|
||||
|
@ -6347,8 +6509,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
|
||||
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
|
||||
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
@ -6441,6 +6605,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
void deleteSnapshot(String snapshotRoot, String snapshotName)
|
||||
throws SafeModeException, IOException {
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
|
@ -6464,8 +6633,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
this.removeBlocks(collectedBlocks);
|
||||
collectedBlocks.clear();
|
||||
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
|
|
@ -951,7 +951,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
|
||||
+" blocks.");
|
||||
}
|
||||
namesystem.getBlockManager().processIncrementalBlockReport(
|
||||
namesystem.processIncrementalBlockReport(
|
||||
nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
|
||||
}
|
||||
|
||||
|
|
|
@ -1352,4 +1352,43 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.enable.retrycache</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
This enables the retry cache on the namenode. Namenode tracks for
|
||||
non-idempotent requests the corresponding response. If a client retries the
|
||||
request, the response from the retry cache is sent. Such operations
|
||||
are tagged with annotation @AtMostOnce in namenode protocols. It is
|
||||
recommended that this flag be set to true. Setting it to false, will result
|
||||
in clients getting failure responses to retried request. This flag must
|
||||
be enabled in HA setup for transparent fail-overs.
|
||||
|
||||
The entries in the cache have expiration time configurable
|
||||
using dfs.namenode.retrycache.expirytime.millis.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.retrycache.expirytime.millis</name>
|
||||
<value>600000</value>
|
||||
<description>
|
||||
The time for which retry cache entries are retained.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.retrycache.heap.percent</name>
|
||||
<value>0.03f</value>
|
||||
<description>
|
||||
This parameter configures the heap size allocated for retry cache
|
||||
(excluding the response cached). This corresponds to approximately
|
||||
4096 entries for every 64MB of namenode process java heap size.
|
||||
Assuming retry cache entry expiration time (configured using
|
||||
dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
|
||||
enables retry cache to support 7 operations per second sustained
|
||||
for 10 minutes. As the heap size is increased, the operation rate
|
||||
linearly increases.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,355 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.RpcConstants;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests for ensuring the namenode retry cache works correctly for
|
||||
* non-idempotent requests.
|
||||
*
|
||||
* Retry cache works based on tracking previously received request based on the
|
||||
* ClientId and CallId received in RPC requests and storing the response. The
|
||||
* response is replayed on retry when the same request is received again.
|
||||
*
|
||||
* The test works by manipulating the Rpc {@link Server} current RPC call. For
|
||||
* testing retried requests, an Rpc callId is generated only once using
|
||||
* {@link #newCall()} and reused for many method calls. For testing non-retried
|
||||
* request, a new callId is generated using {@link #newCall()}.
|
||||
*/
|
||||
public class TestNamenodeRetryCache {
|
||||
private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
|
||||
private static MiniDFSCluster cluster;
|
||||
private static FSNamesystem namesystem;
|
||||
private static PermissionStatus perm = new PermissionStatus(
|
||||
"TestNamenodeRetryCache", null, FsPermission.getDefault());
|
||||
private static FileSystem filesystem;
|
||||
private static int callId = 100;
|
||||
|
||||
/** Start a cluster */
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
namesystem = cluster.getNamesystem();
|
||||
filesystem = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
/** Cleanup after the test
|
||||
* @throws IOException
|
||||
* @throws UnresolvedLinkException
|
||||
* @throws SafeModeException
|
||||
* @throws AccessControlException */
|
||||
@After
|
||||
public void cleanup() throws IOException {
|
||||
namesystem.delete("/", true);
|
||||
}
|
||||
|
||||
public static void incrementCallId() {
|
||||
callId++;
|
||||
}
|
||||
|
||||
/** Set the current Server RPC call */
|
||||
public static void newCall() {
|
||||
Server.Call call = new Server.Call(++callId, 1, null, null,
|
||||
RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
|
||||
Server.getCurCall().set(call);
|
||||
}
|
||||
|
||||
public static void resetCall() {
|
||||
Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
|
||||
null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
|
||||
Server.getCurCall().set(call);
|
||||
}
|
||||
|
||||
private void concatSetup(String file1, String file2) throws Exception {
|
||||
DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
|
||||
DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests for concat call
|
||||
*/
|
||||
@Test
|
||||
public void testConcat() throws Exception {
|
||||
resetCall();
|
||||
String file1 = "/testNamenodeRetryCache/testConcat/file1";
|
||||
String file2 = "/testNamenodeRetryCache/testConcat/file2";
|
||||
|
||||
// Two retried concat calls succeed
|
||||
concatSetup(file1, file2);
|
||||
newCall();
|
||||
namesystem.concat(file1, new String[]{file2});
|
||||
namesystem.concat(file1, new String[]{file2});
|
||||
namesystem.concat(file1, new String[]{file2});
|
||||
|
||||
// A non-retried concat request fails
|
||||
newCall();
|
||||
try {
|
||||
// Second non-retry call should fail with an exception
|
||||
namesystem.concat(file1, new String[]{file2});
|
||||
Assert.fail("testConcat - expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests for delete call
|
||||
*/
|
||||
@Test
|
||||
public void testDelete() throws Exception {
|
||||
String dir = "/testNamenodeRetryCache/testDelete";
|
||||
// Two retried calls to create a non existent file
|
||||
newCall();
|
||||
namesystem.mkdirs(dir, perm, true);
|
||||
newCall();
|
||||
Assert.assertTrue(namesystem.delete(dir, false));
|
||||
Assert.assertTrue(namesystem.delete(dir, false));
|
||||
Assert.assertTrue(namesystem.delete(dir, false));
|
||||
|
||||
// non-retried call fails and gets false as return
|
||||
newCall();
|
||||
Assert.assertFalse(namesystem.delete(dir, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for createSymlink
|
||||
*/
|
||||
@Test
|
||||
public void testCreateSymlink() throws Exception {
|
||||
String target = "/testNamenodeRetryCache/testCreateSymlink/target";
|
||||
|
||||
// Two retried symlink calls succeed
|
||||
newCall();
|
||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
||||
|
||||
// non-retried call fails
|
||||
newCall();
|
||||
try {
|
||||
// Second non-retry call should fail with an exception
|
||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
||||
Assert.fail("testCreateSymlink - expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for create file
|
||||
*/
|
||||
@Test
|
||||
public void testCreate() throws Exception {
|
||||
String src = "/testNamenodeRetryCache/testCreate/file";
|
||||
// Two retried calls succeed
|
||||
newCall();
|
||||
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
|
||||
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
|
||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
||||
true, (short) 1, 512));
|
||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
||||
true, (short) 1, 512));
|
||||
|
||||
// A non-retried call fails
|
||||
newCall();
|
||||
try {
|
||||
namesystem.startFile(src, perm, "holder", "clientmachine",
|
||||
EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
|
||||
Assert.fail("testCreate - expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for rename1
|
||||
*/
|
||||
@Test
|
||||
public void testAppend() throws Exception {
|
||||
String src = "/testNamenodeRetryCache/testAppend/src";
|
||||
resetCall();
|
||||
// Create a file with partial block
|
||||
DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L);
|
||||
|
||||
// Retried append requests succeed
|
||||
newCall();
|
||||
LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine");
|
||||
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
|
||||
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
|
||||
|
||||
// non-retried call fails
|
||||
newCall();
|
||||
try {
|
||||
namesystem.appendFile(src, "holder", "clientMachine");
|
||||
Assert.fail("testAppend - expected exception is not thrown");
|
||||
} catch (Exception e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for rename1
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testRename1() throws Exception {
|
||||
String src = "/testNamenodeRetryCache/testRename1/src";
|
||||
String target = "/testNamenodeRetryCache/testRename1/target";
|
||||
resetCall();
|
||||
namesystem.mkdirs(src, perm, true);
|
||||
|
||||
// Retried renames succeed
|
||||
newCall();
|
||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
||||
|
||||
// A non-retried request fails
|
||||
newCall();
|
||||
Assert.assertFalse(namesystem.renameTo(src, target));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for rename2
|
||||
*/
|
||||
@Test
|
||||
public void testRename2() throws Exception {
|
||||
String src = "/testNamenodeRetryCache/testRename2/src";
|
||||
String target = "/testNamenodeRetryCache/testRename2/target";
|
||||
resetCall();
|
||||
namesystem.mkdirs(src, perm, true);
|
||||
|
||||
// Retried renames succeed
|
||||
newCall();
|
||||
namesystem.renameTo(src, target, Rename.NONE);
|
||||
namesystem.renameTo(src, target, Rename.NONE);
|
||||
namesystem.renameTo(src, target, Rename.NONE);
|
||||
|
||||
// A non-retried request fails
|
||||
newCall();
|
||||
try {
|
||||
namesystem.renameTo(src, target, Rename.NONE);
|
||||
Assert.fail("testRename 2 expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for crateSnapshot
|
||||
*/
|
||||
@Test
|
||||
public void testSnapshotMethods() throws Exception {
|
||||
String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
|
||||
resetCall();
|
||||
namesystem.mkdirs(dir, perm, true);
|
||||
namesystem.allowSnapshot(dir);
|
||||
|
||||
// Test retry of create snapshot
|
||||
newCall();
|
||||
String name = namesystem.createSnapshot(dir, "snap1");
|
||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
||||
|
||||
// Non retried calls should fail
|
||||
newCall();
|
||||
try {
|
||||
namesystem.createSnapshot(dir, "snap1");
|
||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// exptected
|
||||
}
|
||||
|
||||
// Test retry of rename snapshot
|
||||
newCall();
|
||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
||||
|
||||
// Non retried calls should fail
|
||||
newCall();
|
||||
try {
|
||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// Test retry of delete snapshot
|
||||
newCall();
|
||||
namesystem.deleteSnapshot(dir, "snap2");
|
||||
namesystem.deleteSnapshot(dir, "snap2");
|
||||
namesystem.deleteSnapshot(dir, "snap2");
|
||||
|
||||
// Non retried calls should fail
|
||||
newCall();
|
||||
try {
|
||||
namesystem.deleteSnapshot(dir, "snap2");
|
||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryCacheConfig() {
|
||||
// By default retry configuration should be enabled
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
Assert.assertNotNull(FSNamesystem.initRetryCache(conf));
|
||||
|
||||
// If retry cache is disabled, it should not be created
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
|
||||
Assert.assertNull(FSNamesystem.initRetryCache(conf));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue