HDFS-7412. Move RetryCache to NameNodeRpcServer. Contributed by Haohui Mai.
This commit is contained in:
parent
f636f9d943
commit
8e253cb930
|
@ -388,6 +388,8 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb
|
||||
via atm)
|
||||
|
||||
HDFS-7412. Move RetryCache to NameNodeRpcServer. (wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -526,7 +526,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
private volatile boolean startingActiveService = false;
|
||||
|
||||
private INodeId inodeId;
|
||||
|
||||
|
||||
private final RetryCache retryCache;
|
||||
|
||||
private final NNConf nnConf;
|
||||
|
@ -1943,28 +1943,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @param srcs file that will be concatenated
|
||||
* @throws IOException on error
|
||||
*/
|
||||
void concat(String target, String [] srcs)
|
||||
void concat(String target, String [] srcs, boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
// Either there is no previous request in progress or it has failed
|
||||
if(FSNamesystem.LOG.isDebugEnabled()) {
|
||||
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
|
||||
" to " + target);
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
|
||||
try {
|
||||
concatInt(target, srcs, cacheEntry != null);
|
||||
success = true;
|
||||
concatInt(target, srcs, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2181,7 +2171,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* Create a symbolic link.
|
||||
*/
|
||||
void createSymlink(String target, String link,
|
||||
PermissionStatus dirPerms, boolean createParent)
|
||||
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
if (!DFSUtil.isValidName(link)) {
|
||||
throw new InvalidPathException("Invalid link name: " + link);
|
||||
|
@ -2189,19 +2179,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
if (FSDirectory.isReservedName(target)) {
|
||||
throw new InvalidPathException("Invalid target name: " + target);
|
||||
}
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
|
||||
createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
|
||||
success = true;
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "createSymlink", link, target, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2489,26 +2474,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
||||
String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
CryptoProtocolVersion[] supportedVersions)
|
||||
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException {
|
||||
|
||||
HdfsFileStatus status = null;
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (HdfsFileStatus) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
try {
|
||||
status = startFileInt(src, permissions, holder, clientMachine, flag,
|
||||
createParent, replication, blockSize, supportedVersions,
|
||||
cacheEntry != null);
|
||||
logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "create", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, status != null, status);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
@ -3035,27 +3013,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
/**
|
||||
* Append to an existing file in the namespace.
|
||||
*/
|
||||
LocatedBlock appendFile(String src, String holder, String clientMachine)
|
||||
throws AccessControlException, SafeModeException,
|
||||
FileAlreadyExistsException, FileNotFoundException,
|
||||
ParentNotDirectoryException, IOException {
|
||||
LocatedBlock lb = null;
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (LocatedBlock) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
LocatedBlock appendFile(
|
||||
String src, String holder, String clientMachine, boolean logRetryCache)
|
||||
throws IOException {
|
||||
try {
|
||||
lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
|
||||
success = true;
|
||||
return lb;
|
||||
return appendFileInt(src, holder, clientMachine, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "append", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success, lb);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3717,20 +3682,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean renameTo(String src, String dst)
|
||||
boolean renameTo(String src, String dst, boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
boolean ret = false;
|
||||
try {
|
||||
ret = renameToInt(src, dst, cacheEntry != null);
|
||||
ret = renameToInt(src, dst, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "rename", src, dst, null);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -3775,7 +3734,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return status;
|
||||
}
|
||||
|
||||
/** @deprecated See {@link #renameTo(String, String)} */
|
||||
/** @deprecated See {@link #renameTo(String, String, boolean)} */
|
||||
@Deprecated
|
||||
private boolean renameToInternal(FSPermissionChecker pc, String src,
|
||||
String dst, boolean logRetryCache) throws IOException,
|
||||
|
@ -3808,7 +3767,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
|
||||
/** Rename src to dst */
|
||||
void renameTo(final String srcArg, final String dstArg,
|
||||
void renameTo(final String srcArg, final String dstArg, boolean logRetryCache,
|
||||
Options.Rename... options) throws IOException, UnresolvedLinkException {
|
||||
String src = srcArg;
|
||||
String dst = dstArg;
|
||||
|
@ -3822,14 +3781,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
||||
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
|
||||
HdfsFileStatus resultingStat = null;
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
|
||||
try {
|
||||
|
@ -3837,13 +3792,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
checkNameNodeSafeMode("Cannot rename " + src);
|
||||
src = dir.resolvePath(pc, src, srcComponents);
|
||||
dst = dir.resolvePath(pc, dst, dstComponents);
|
||||
renameToInternal(pc, src, dst, cacheEntry != null,
|
||||
collectedBlocks, options);
|
||||
renameToInternal(pc, src, dst, logRetryCache, collectedBlocks, options);
|
||||
resultingStat = getAuditFileInfo(dst, false);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
if (!collectedBlocks.getToDeleteList().isEmpty()) {
|
||||
|
@ -3886,21 +3838,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @see ClientProtocol#delete(String, boolean) for detailed description and
|
||||
* description of exceptions
|
||||
*/
|
||||
boolean delete(String src, boolean recursive)
|
||||
boolean delete(String src, boolean recursive, boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
UnresolvedLinkException, IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
|
||||
boolean ret = false;
|
||||
try {
|
||||
ret = deleteInt(src, recursive, cacheEntry != null);
|
||||
ret = deleteInt(src, recursive, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "delete", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -5478,12 +5425,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
void saveNamespace() throws AccessControlException, IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
checkSuperuserPrivilege();
|
||||
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
|
||||
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
|
@ -5493,10 +5436,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
+ "in order to create namespace image.");
|
||||
}
|
||||
getFSImage().saveNamespace(this);
|
||||
success = true;
|
||||
} finally {
|
||||
readUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
LOG.info("New namespace image has been created");
|
||||
}
|
||||
|
@ -6327,24 +6268,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
|
||||
NamenodeRegistration activeNamenode) throws IOException {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (NamenodeCommand) cacheEntry.getPayload();
|
||||
}
|
||||
writeLock();
|
||||
NamenodeCommand cmd = null;
|
||||
try {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
checkNameNodeSafeMode("Checkpoint not started");
|
||||
|
||||
LOG.info("Start checkpoint for " + backupNode.getAddress());
|
||||
cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
|
||||
NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode,
|
||||
activeNamenode);
|
||||
getEditLog().logSync();
|
||||
return cmd;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, cmd != null, cmd);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6362,22 +6297,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
void endCheckpoint(NamenodeRegistration registration,
|
||||
CheckpointSignature sig) throws IOException {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.CHECKPOINT);
|
||||
|
||||
checkNameNodeSafeMode("Checkpoint not ended");
|
||||
LOG.info("End checkpoint for " + registration.getAddress());
|
||||
getFSImage().endCheckpoint(sig);
|
||||
success = true;
|
||||
} finally {
|
||||
readUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6821,14 +6748,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @param newNodes datanodes in the pipeline
|
||||
* @throws IOException if any error occurs
|
||||
*/
|
||||
void updatePipeline(String clientName, ExtendedBlock oldBlock,
|
||||
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
|
||||
void updatePipeline(
|
||||
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
|
||||
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
|
||||
+ ", newGS=" + newBlock.getGenerationStamp()
|
||||
+ ", newLength=" + newBlock.getNumBytes()
|
||||
|
@ -6837,18 +6762,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
+ ")");
|
||||
waitForLoadingFSImage();
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Pipeline not updated");
|
||||
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
|
||||
+ oldBlock + " has different block identifier";
|
||||
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
|
||||
newStorageIDs, cacheEntry != null);
|
||||
success = true;
|
||||
newStorageIDs, logRetryCache);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => "
|
||||
|
@ -7844,15 +7766,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @param snapshotRoot The directory path where the snapshot is taken
|
||||
* @param snapshotName The name of the snapshot
|
||||
*/
|
||||
String createSnapshot(String snapshotRoot, String snapshotName)
|
||||
String createSnapshot(String snapshotRoot, String snapshotName,
|
||||
boolean logRetryCache)
|
||||
throws SafeModeException, IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (String) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
String snapshotPath = null;
|
||||
writeLock();
|
||||
try {
|
||||
|
@ -7878,11 +7797,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
} finally {
|
||||
dir.writeUnlock();
|
||||
}
|
||||
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
|
||||
cacheEntry != null);
|
||||
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, logRetryCache);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
@ -7900,16 +7817,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws SafeModeException
|
||||
* @throws IOException
|
||||
*/
|
||||
void renameSnapshot(String path, String snapshotOldName,
|
||||
String snapshotNewName) throws SafeModeException, IOException {
|
||||
void renameSnapshot(
|
||||
String path, String snapshotOldName, String snapshotNewName,
|
||||
boolean logRetryCache) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
|
||||
try {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
checkNameNodeSafeMode("Cannot rename snapshot for " + path);
|
||||
|
@ -7920,11 +7835,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
|
||||
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
|
||||
cacheEntry != null);
|
||||
success = true;
|
||||
logRetryCache);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
@ -8014,16 +7928,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws SafeModeException
|
||||
* @throws IOException
|
||||
*/
|
||||
void deleteSnapshot(String snapshotRoot, String snapshotName)
|
||||
void deleteSnapshot(String snapshotRoot, String snapshotName,
|
||||
boolean logRetryCache)
|
||||
throws SafeModeException, IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = getPermissionChecker();
|
||||
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
|
||||
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
|
||||
writeLock();
|
||||
try {
|
||||
|
@ -8043,12 +7953,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
dir.writeUnlock();
|
||||
}
|
||||
removedINodes.clear();
|
||||
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
|
||||
cacheEntry != null);
|
||||
success = true;
|
||||
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, logRetryCache);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
|
||||
}
|
||||
getEditLog().logSync();
|
||||
|
||||
|
@ -8246,20 +8154,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
|
||||
}
|
||||
|
||||
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
|
||||
long addCacheDirective(CacheDirectiveInfo directive,
|
||||
EnumSet<CacheFlag> flags, boolean logRetryCache)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = isPermissionEnabled ?
|
||||
getPermissionChecker() : null;
|
||||
CacheEntryWithPayload cacheEntry =
|
||||
RetryCache.waitForCompletion(retryCache, null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (Long) cacheEntry.getPayload();
|
||||
}
|
||||
boolean success = false;
|
||||
|
||||
if (!flags.contains(CacheFlag.FORCE)) {
|
||||
cacheManager.waitForRescanIfNeeded();
|
||||
}
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
String effectiveDirectiveStr = null;
|
||||
Long result = null;
|
||||
|
@ -8275,8 +8180,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
CacheDirectiveInfo effectiveDirective =
|
||||
cacheManager.addDirective(directive, pc, flags);
|
||||
getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
|
||||
cacheEntry != null);
|
||||
getEditLog().logAddCacheDirectiveInfo(effectiveDirective, logRetryCache);
|
||||
result = effectiveDirective.getId();
|
||||
effectiveDirectiveStr = effectiveDirective.toString();
|
||||
success = true;
|
||||
|
@ -8288,21 +8192,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
if (isAuditEnabled() && isExternalInvocation()) {
|
||||
logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr, null, null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success, result);
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void modifyCacheDirective(CacheDirectiveInfo directive,
|
||||
EnumSet<CacheFlag> flags) throws IOException {
|
||||
EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = isPermissionEnabled ?
|
||||
getPermissionChecker() : null;
|
||||
boolean success = false;
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!flags.contains(CacheFlag.FORCE)) {
|
||||
cacheManager.waitForRescanIfNeeded();
|
||||
}
|
||||
|
@ -8314,8 +8215,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
"Cannot add cache directive", safeMode);
|
||||
}
|
||||
cacheManager.modifyDirective(directive, pc, flags);
|
||||
getEditLog().logModifyCacheDirectiveInfo(directive,
|
||||
cacheEntry != null);
|
||||
getEditLog().logModifyCacheDirectiveInfo(directive, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
|
@ -8326,18 +8226,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String idStr = "{id: " + directive.getId().toString() + "}";
|
||||
logAuditEvent(success, "modifyCacheDirective", idStr, directive.toString(), null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
void removeCacheDirective(Long id) throws IOException {
|
||||
void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = isPermissionEnabled ?
|
||||
getPermissionChecker() : null;
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
writeLock();
|
||||
try {
|
||||
|
@ -8347,16 +8243,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
"Cannot remove cache directives", safeMode);
|
||||
}
|
||||
cacheManager.removeDirective(id, pc);
|
||||
getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
|
||||
getEditLog().logRemoveCacheDirectiveInfo(id, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
if (isAuditEnabled() && isExternalInvocation()) {
|
||||
String idStr = "{id: " + id.toString() + "}";
|
||||
String idStr = "{id: " + Long.toString(id) + "}";
|
||||
logAuditEvent(success, "removeCacheDirective", idStr, null,
|
||||
null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
getEditLog().logSync();
|
||||
}
|
||||
|
@ -8385,14 +8280,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return results;
|
||||
}
|
||||
|
||||
public void addCachePool(CachePoolInfo req) throws IOException {
|
||||
public void addCachePool(CachePoolInfo req, boolean logRetryCache)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc = isPermissionEnabled ?
|
||||
getPermissionChecker() : null;
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
String poolInfoStr = null;
|
||||
|
@ -8407,27 +8300,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
CachePoolInfo info = cacheManager.addCachePool(req);
|
||||
poolInfoStr = info.toString();
|
||||
getEditLog().logAddCachePool(info, cacheEntry != null);
|
||||
getEditLog().logAddCachePool(info, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
if (isAuditEnabled() && isExternalInvocation()) {
|
||||
logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
|
||||
getEditLog().logSync();
|
||||
}
|
||||
|
||||
public void modifyCachePool(CachePoolInfo req) throws IOException {
|
||||
public void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc =
|
||||
isPermissionEnabled ? getPermissionChecker() : null;
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -8440,7 +8330,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
pc.checkSuperuserPrivilege();
|
||||
}
|
||||
cacheManager.modifyCachePool(req);
|
||||
getEditLog().logModifyCachePool(req, cacheEntry != null);
|
||||
getEditLog().logModifyCachePool(req, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
|
@ -8448,20 +8338,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String poolNameStr = "{poolName: " + req.getPoolName() + "}";
|
||||
logAuditEvent(success, "modifyCachePool", poolNameStr, req.toString(), null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
|
||||
getEditLog().logSync();
|
||||
}
|
||||
|
||||
public void removeCachePool(String cachePoolName) throws IOException {
|
||||
public void removeCachePool(String cachePoolName, boolean logRetryCache)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
final FSPermissionChecker pc =
|
||||
isPermissionEnabled ? getPermissionChecker() : null;
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
writeLock();
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -8474,7 +8361,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
pc.checkSuperuserPrivilege();
|
||||
}
|
||||
cacheManager.removeCachePool(cachePoolName);
|
||||
getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
|
||||
getEditLog().logRemoveCachePool(cachePoolName, logRetryCache);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock();
|
||||
|
@ -8482,7 +8369,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String poolNameStr = "{poolName: " + cachePoolName + "}";
|
||||
logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
|
||||
}
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
|
||||
getEditLog().logSync();
|
||||
|
@ -8675,15 +8561,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws UnresolvedLinkException if the path can't be resolved.
|
||||
* @throws SafeModeException if the Namenode is in safe mode.
|
||||
*/
|
||||
void createEncryptionZone(final String src, final String keyName)
|
||||
void createEncryptionZone(final String src, final String keyName,
|
||||
boolean logRetryCache)
|
||||
throws IOException, UnresolvedLinkException,
|
||||
SafeModeException, AccessControlException {
|
||||
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
if (provider == null) {
|
||||
throw new IOException(
|
||||
|
@ -8709,13 +8590,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
// If the provider supports pool for EDEKs, this will fill in the pool
|
||||
generateEncryptedDataEncryptionKey(keyName);
|
||||
createEncryptionZoneInt(src, metadata.getCipher(),
|
||||
keyName, cacheEntry != null);
|
||||
success = true;
|
||||
keyName, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "createEncryptionZone", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8820,22 +8698,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws UnresolvedLinkException
|
||||
* @throws IOException
|
||||
*/
|
||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
|
||||
boolean logRetryCache)
|
||||
throws AccessControlException, SafeModeException,
|
||||
UnresolvedLinkException, IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
setXAttrInt(src, xAttr, flag, cacheEntry != null);
|
||||
success = true;
|
||||
setXAttrInt(src, xAttr, flag, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "setXAttr", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8985,20 +8856,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* @throws UnresolvedLinkException
|
||||
* @throws IOException
|
||||
*/
|
||||
void removeXAttr(String src, XAttr xAttr) throws IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
|
||||
throws IOException {
|
||||
try {
|
||||
removeXAttrInt(src, xAttr, cacheEntry != null);
|
||||
success = true;
|
||||
removeXAttrInt(src, xAttr, logRetryCache);
|
||||
} catch (AccessControlException e) {
|
||||
logAuditEvent(false, "removeXAttr", src);
|
||||
throw e;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -138,6 +138,9 @@ import org.apache.hadoop.io.EnumSetWritable;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
||||
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.WritableRpcEngine;
|
||||
import org.apache.hadoop.ipc.RefreshRegistry;
|
||||
|
@ -166,7 +169,6 @@ import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappin
|
|||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.tracing.SpanReceiverInfo;
|
||||
import org.apache.hadoop.tracing.TraceAdminPB;
|
||||
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
|
||||
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
|
||||
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
||||
|
@ -190,7 +192,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
protected final FSNamesystem namesystem;
|
||||
protected final NameNode nn;
|
||||
private final NameNodeMetrics metrics;
|
||||
|
||||
|
||||
private final RetryCache retryCache;
|
||||
|
||||
private final boolean serviceAuthEnabled;
|
||||
|
||||
/** The RPC server that listens to requests from DataNodes */
|
||||
|
@ -207,6 +211,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throws IOException {
|
||||
this.nn = nn;
|
||||
this.namesystem = nn.getNamesystem();
|
||||
this.retryCache = namesystem.getRetryCache();
|
||||
this.metrics = NameNode.getNameNodeMetrics();
|
||||
|
||||
int handlerCount =
|
||||
|
@ -510,14 +515,36 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
verifyRequest(registration);
|
||||
if(!nn.isRole(NamenodeRole.NAMENODE))
|
||||
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
|
||||
return namesystem.startCheckpoint(registration, nn.setRegistration());
|
||||
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (NamenodeCommand) cacheEntry.getPayload();
|
||||
}
|
||||
NamenodeCommand ret = null;
|
||||
try {
|
||||
ret = namesystem.startCheckpoint(registration, nn.setRegistration());
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret != null, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
public void endCheckpoint(NamenodeRegistration registration,
|
||||
CheckpointSignature sig) throws IOException {
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
namesystem.endCheckpoint(registration, sig);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.endCheckpoint(registration, sig);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -562,19 +589,32 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
String clientMachine = getClientMachine();
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*DIR* NameNode.create: file "
|
||||
+src+" for "+clientName+" at "+clientMachine);
|
||||
+src+" for "+clientName+" at "+clientMachine);
|
||||
}
|
||||
if (!checkPathLength(src)) {
|
||||
throw new IOException("create: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
|
||||
getRemoteUser().getShortUserName(), null, masked),
|
||||
clientName, clientMachine, flag.get(), createParent, replication,
|
||||
blockSize, supportedVersions);
|
||||
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (HdfsFileStatus) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
HdfsFileStatus status = null;
|
||||
try {
|
||||
PermissionStatus perm = new PermissionStatus(getRemoteUser()
|
||||
.getShortUserName(), null, masked);
|
||||
status = namesystem.startFile(src, perm, clientName, clientMachine,
|
||||
flag.get(), createParent, replication, blockSize, supportedVersions,
|
||||
cacheEntry != null);
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, status != null, status);
|
||||
}
|
||||
|
||||
metrics.incrFilesCreated();
|
||||
metrics.incrCreateFileOps();
|
||||
return fileStatus;
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -585,7 +625,20 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
stateChangeLog.debug("*DIR* NameNode.append: file "
|
||||
+src+" for "+clientName+" at "+clientMachine);
|
||||
}
|
||||
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (LocatedBlock) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
LocatedBlock info = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
info = namesystem.appendFile(src, clientName, clientMachine,
|
||||
cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success, info);
|
||||
}
|
||||
metrics.incrFilesAppended();
|
||||
return info;
|
||||
}
|
||||
|
@ -727,7 +780,19 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
|
||||
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
|
||||
throws IOException {
|
||||
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes,
|
||||
newStorageIDs, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // DatanodeProtocol
|
||||
|
@ -756,7 +821,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
boolean ret = namesystem.renameTo(src, dst);
|
||||
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
|
||||
boolean ret = false;
|
||||
try {
|
||||
ret = namesystem.renameTo(src, dst, cacheEntry != null);
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
if (ret) {
|
||||
metrics.incrFilesRenamed();
|
||||
}
|
||||
|
@ -765,7 +841,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
@Override // ClientProtocol
|
||||
public void concat(String trg, String[] src) throws IOException {
|
||||
namesystem.concat(trg, src);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
|
||||
try {
|
||||
namesystem.concat(trg, src, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -778,7 +865,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("rename: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
namesystem.renameTo(src, dst, options);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.renameTo(src, dst, cacheEntry != null, options);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
metrics.incrFilesRenamed();
|
||||
}
|
||||
|
||||
|
@ -788,7 +885,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
|
||||
+ ", recursive=" + recursive);
|
||||
}
|
||||
boolean ret = namesystem.delete(src, recursive);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return true; // Return previous response
|
||||
}
|
||||
|
||||
boolean ret = false;
|
||||
try {
|
||||
ret = namesystem.delete(src, recursive, cacheEntry != null);
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret);
|
||||
}
|
||||
if (ret)
|
||||
metrics.incrDeleteFileOps();
|
||||
return ret;
|
||||
|
@ -904,7 +1011,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
@Override // ClientProtocol
|
||||
public void saveNamespace() throws IOException {
|
||||
namesystem.saveNamespace();
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.saveNamespace();
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -1025,6 +1142,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override // ClientProtocol
|
||||
public void createSymlink(String target, String link, FsPermission dirPerms,
|
||||
boolean createParent) throws IOException {
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
|
||||
metrics.incrCreateSymlinkOps();
|
||||
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
|
||||
* URI may refer to a non-HDFS file system.
|
||||
|
@ -1038,8 +1160,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("Invalid symlink target");
|
||||
}
|
||||
final UserGroupInformation ugi = getRemoteUser();
|
||||
namesystem.createSymlink(target, link,
|
||||
new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
PermissionStatus perm = new PermissionStatus(ugi.getShortUserName(),
|
||||
null, dirPerms);
|
||||
namesystem.createSymlink(target, link, perm, createParent,
|
||||
cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -1326,15 +1457,38 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("createSnapshot: Pathname too long. Limit "
|
||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||
}
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
|
||||
null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (String) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
metrics.incrCreateSnapshotOps();
|
||||
return namesystem.createSnapshot(snapshotRoot, snapshotName);
|
||||
String ret = null;
|
||||
try {
|
||||
ret = namesystem.createSnapshot(snapshotRoot, snapshotName,
|
||||
cacheEntry != null);
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, ret != null, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(String snapshotRoot, String snapshotName)
|
||||
throws IOException {
|
||||
metrics.incrDeleteSnapshotOps();
|
||||
namesystem.deleteSnapshot(snapshotRoot, snapshotName);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.deleteSnapshot(snapshotRoot, snapshotName, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1358,7 +1512,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
throw new IOException("The new snapshot name is null or empty.");
|
||||
}
|
||||
metrics.incrRenameSnapshotOps();
|
||||
namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.renameSnapshot(snapshotRoot, snapshotOldName,
|
||||
snapshotNewName, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // Client Protocol
|
||||
|
@ -1382,18 +1547,53 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override
|
||||
public long addCacheDirective(
|
||||
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
|
||||
return namesystem.addCacheDirective(path, flags);
|
||||
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
|
||||
(retryCache, null);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return (Long) cacheEntry.getPayload();
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
long ret = 0;
|
||||
try {
|
||||
ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modifyCacheDirective(
|
||||
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
|
||||
namesystem.modifyCacheDirective(directive, flags);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeCacheDirective(long id) throws IOException {
|
||||
namesystem.removeCacheDirective(id);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.removeCacheDirective(id, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1407,17 +1607,47 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
@Override
|
||||
public void addCachePool(CachePoolInfo info) throws IOException {
|
||||
namesystem.addCachePool(info);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.addCachePool(info, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modifyCachePool(CachePoolInfo info) throws IOException {
|
||||
namesystem.modifyCachePool(info);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.modifyCachePool(info, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeCachePool(String cachePoolName) throws IOException {
|
||||
namesystem.removeCachePool(cachePoolName);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.removeCachePool(cachePoolName, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1461,7 +1691,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override
|
||||
public void createEncryptionZone(String src, String keyName)
|
||||
throws IOException {
|
||||
namesystem.createEncryptionZone(src, keyName);
|
||||
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.createEncryptionZone(src, keyName, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1479,7 +1719,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
@Override
|
||||
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||
throws IOException {
|
||||
namesystem.setXAttr(src, xAttr, flag);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.setXAttr(src, xAttr, flag, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1495,7 +1745,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
@Override
|
||||
public void removeXAttr(String src, XAttr xAttr) throws IOException {
|
||||
namesystem.removeXAttr(src, xAttr);
|
||||
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
|
||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||
return; // Return previous response
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
namesystem.removeXAttr(src, xAttr, cacheEntry != null);
|
||||
success = true;
|
||||
} finally {
|
||||
RetryCache.setState(cacheEntry, success);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.net.StaticMapping;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestDefaultBlockPlacementPolicy {
|
||||
|
||||
private Configuration conf;
|
||||
private final short REPLICATION_FACTOR = (short) 3;
|
||||
private final int DEFAULT_BLOCK_SIZE = 1024;
|
||||
private static final short REPLICATION_FACTOR = (short) 3;
|
||||
private static final int DEFAULT_BLOCK_SIZE = 1024;
|
||||
private MiniDFSCluster cluster = null;
|
||||
private NamenodeProtocols nameNodeRpc = null;
|
||||
private FSNamesystem namesystem = null;
|
||||
|
@ -55,13 +49,12 @@ public class TestDefaultBlockPlacementPolicy {
|
|||
@Before
|
||||
public void setup() throws IOException {
|
||||
StaticMapping.resetMap();
|
||||
conf = new HdfsConfiguration();
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
final String[] racks = { "/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2" };
|
||||
final String[] hosts = { "/host0", "/host1", "/host2", "/host3", "/host4" };
|
||||
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||
DEFAULT_BLOCK_SIZE / 2);
|
||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
|
||||
.hosts(hosts).build();
|
||||
cluster.waitActive();
|
||||
|
@ -104,17 +97,14 @@ public class TestDefaultBlockPlacementPolicy {
|
|||
}
|
||||
|
||||
private void testPlacement(String clientMachine,
|
||||
String clientRack) throws AccessControlException,
|
||||
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
||||
FileNotFoundException, ParentNotDirectoryException, IOException,
|
||||
NotReplicatedYetException {
|
||||
String clientRack) throws IOException {
|
||||
// write 5 files and check whether all times block placed
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String src = "/test-" + i;
|
||||
// Create the file with client machine
|
||||
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
|
||||
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
|
||||
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null);
|
||||
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
|
||||
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
|
||||
null, null, fileStatus.getFileId(), null);
|
||||
|
||||
|
|
|
@ -191,7 +191,7 @@ public class TestFsLimits {
|
|||
lazyInitFSDirectory();
|
||||
Class<?> generated = null;
|
||||
try {
|
||||
fs.renameTo(src, dst, new Rename[] { });
|
||||
fs.renameTo(src, dst, false, new Rename[] { });
|
||||
} catch (Throwable e) {
|
||||
generated = e.getClass();
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ public class TestFsLimits {
|
|||
lazyInitFSDirectory();
|
||||
Class<?> generated = null;
|
||||
try {
|
||||
fs.renameTo(src, dst);
|
||||
fs.renameTo(src, dst, false);
|
||||
} catch (Throwable e) {
|
||||
generated = e.getClass();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue