HDFS-7412. Move RetryCache to NameNodeRpcServer. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-24 11:11:15 -08:00
parent 4435ac9af5
commit 9db888b896
5 changed files with 383 additions and 267 deletions

View File

@ -131,6 +131,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb
via atm)
HDFS-7412. Move RetryCache to NameNodeRpcServer. (wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -515,7 +515,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private volatile boolean startingActiveService = false;
private INodeId inodeId;
private final RetryCache retryCache;
private final NNConf nnConf;
@ -1933,28 +1933,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param srcs file that will be concatenated
* @throws IOException on error
*/
void concat(String target, String [] srcs)
void concat(String target, String [] srcs, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
// Either there is no previous request in progress or it has failed
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
boolean success = false;
try {
concatInt(target, srcs, cacheEntry != null);
success = true;
concatInt(target, srcs, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -2172,7 +2162,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
@SuppressWarnings("deprecation")
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (!FileSystem.areSymlinksEnabled()) {
throw new UnsupportedOperationException("Symlinks not supported");
@ -2183,19 +2173,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -2483,26 +2468,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions)
CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
HdfsFileStatus status = null;
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions,
cacheEntry != null);
logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
return status;
}
@ -3029,27 +3007,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Append to an existing file in the namespace.
*/
LocatedBlock appendFile(String src, String holder, String clientMachine)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
LocatedBlock lb = null;
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LocatedBlock) cacheEntry.getPayload();
}
boolean success = false;
LocatedBlock appendFile(
String src, String holder, String clientMachine, boolean logRetryCache)
throws IOException {
try {
lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
success = true;
return lb;
return appendFileInt(src, holder, clientMachine, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success, lb);
}
}
@ -3717,20 +3682,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
*/
@Deprecated
boolean renameTo(String src, String dst)
boolean renameTo(String src, String dst, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = renameToInt(src, dst, cacheEntry != null);
ret = renameToInt(src, dst, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
} finally {
RetryCache.setState(cacheEntry, ret);
}
return ret;
}
@ -3775,7 +3734,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return status;
}
/** @deprecated See {@link #renameTo(String, String)} */
/** @deprecated See {@link #renameTo(String, String, boolean)} */
@Deprecated
private boolean renameToInternal(FSPermissionChecker pc, String src,
String dst, boolean logRetryCache) throws IOException,
@ -3808,7 +3767,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** Rename src to dst */
void renameTo(final String srcArg, final String dstArg,
void renameTo(final String srcArg, final String dstArg, boolean logRetryCache,
Options.Rename... options) throws IOException, UnresolvedLinkException {
String src = srcArg;
String dst = dstArg;
@ -3822,14 +3781,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
boolean success = false;
writeLock();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
try {
@ -3837,13 +3792,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot rename " + src);
src = dir.resolvePath(pc, src, srcComponents);
dst = dir.resolvePath(pc, dst, dstComponents);
renameToInternal(pc, src, dst, cacheEntry != null,
collectedBlocks, options);
renameToInternal(pc, src, dst, logRetryCache, collectedBlocks, options);
resultingStat = getAuditFileInfo(dst, false);
success = true;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (!collectedBlocks.getToDeleteList().isEmpty()) {
@ -3886,21 +3838,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @see ClientProtocol#delete(String, boolean) for detailed description and
* description of exceptions
*/
boolean delete(String src, boolean recursive)
boolean delete(String src, boolean recursive, boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = deleteInt(src, recursive, cacheEntry != null);
ret = deleteInt(src, recursive, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, ret);
}
return ret;
}
@ -5478,12 +5425,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void saveNamespace() throws AccessControlException, IOException {
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
@ -5493,10 +5436,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "in order to create namespace image.");
}
getFSImage().saveNamespace(this);
success = true;
} finally {
readUnlock();
RetryCache.setState(cacheEntry, success);
}
LOG.info("New namespace image has been created");
}
@ -6327,24 +6268,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
NamenodeRegistration activeNamenode) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
writeLock();
NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
checkNameNodeSafeMode("Checkpoint not started");
LOG.info("Start checkpoint for " + backupNode.getAddress());
cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode,
activeNamenode);
getEditLog().logSync();
return cmd;
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, cmd != null, cmd);
}
}
@ -6362,22 +6297,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
checkNameNodeSafeMode("Checkpoint not ended");
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig);
success = true;
} finally {
readUnlock();
RetryCache.setState(cacheEntry, success);
}
}
@ -6821,14 +6748,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
void updatePipeline(
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
+ ", newGS=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
@ -6837,18 +6762,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ ")");
waitForLoadingFSImage();
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Pipeline not updated");
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
newStorageIDs, cacheEntry != null);
success = true;
newStorageIDs, logRetryCache);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => "
@ -7844,15 +7766,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param snapshotRoot The directory path where the snapshot is taken
* @param snapshotName The name of the snapshot
*/
String createSnapshot(String snapshotRoot, String snapshotName)
String createSnapshot(String snapshotRoot, String snapshotName,
boolean logRetryCache)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
String snapshotPath = null;
writeLock();
try {
@ -7878,11 +7797,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} finally {
dir.writeUnlock();
}
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
}
getEditLog().logSync();
@ -7900,16 +7817,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws SafeModeException
* @throws IOException
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
void renameSnapshot(
String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename snapshot for " + path);
@ -7920,11 +7835,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
cacheEntry != null);
success = true;
logRetryCache);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@ -8014,16 +7928,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws SafeModeException
* @throws IOException
*/
void deleteSnapshot(String snapshotRoot, String snapshotName)
void deleteSnapshot(String snapshotRoot, String snapshotName,
boolean logRetryCache)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
writeLock();
try {
@ -8043,12 +7953,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
dir.writeUnlock();
}
removedINodes.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
success = true;
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@ -8246,20 +8154,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
}
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
long addCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntryWithPayload cacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
boolean success = false;
if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded();
}
boolean success = false;
writeLock();
String effectiveDirectiveStr = null;
Long result = null;
@ -8275,8 +8180,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc, flags);
getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
cacheEntry != null);
getEditLog().logAddCacheDirectiveInfo(effectiveDirective, logRetryCache);
result = effectiveDirective.getId();
effectiveDirectiveStr = effectiveDirective.toString();
success = true;
@ -8288,21 +8192,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr, null, null);
}
RetryCache.setState(cacheEntry, success, result);
}
return result;
}
void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
boolean success = false;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded();
}
@ -8314,8 +8215,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"Cannot add cache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc, flags);
getEditLog().logModifyCacheDirectiveInfo(directive,
cacheEntry != null);
getEditLog().logModifyCacheDirectiveInfo(directive, logRetryCache);
success = true;
} finally {
writeUnlock();
@ -8326,18 +8226,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String idStr = "{id: " + directive.getId().toString() + "}";
logAuditEvent(success, "modifyCacheDirective", idStr, directive.toString(), null);
}
RetryCache.setState(cacheEntry, success);
}
}
void removeCacheDirective(Long id) throws IOException {
void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
writeLock();
try {
@ -8347,16 +8243,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"Cannot remove cache directives", safeMode);
}
cacheManager.removeDirective(id, pc);
getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
getEditLog().logRemoveCacheDirectiveInfo(id, logRetryCache);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
String idStr = "{id: " + id.toString() + "}";
String idStr = "{id: " + Long.toString(id) + "}";
logAuditEvent(success, "removeCacheDirective", idStr, null,
null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
@ -8385,14 +8280,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return results;
}
public void addCachePool(CachePoolInfo req) throws IOException {
public void addCachePool(CachePoolInfo req, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
writeLock();
boolean success = false;
String poolInfoStr = null;
@ -8407,27 +8300,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
CachePoolInfo info = cacheManager.addCachePool(req);
poolInfoStr = info.toString();
getEditLog().logAddCachePool(info, cacheEntry != null);
getEditLog().logAddCachePool(info, logRetryCache);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
public void modifyCachePool(CachePoolInfo req) throws IOException {
public void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
writeLock();
boolean success = false;
try {
@ -8440,7 +8330,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
pc.checkSuperuserPrivilege();
}
cacheManager.modifyCachePool(req);
getEditLog().logModifyCachePool(req, cacheEntry != null);
getEditLog().logModifyCachePool(req, logRetryCache);
success = true;
} finally {
writeUnlock();
@ -8448,20 +8338,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String poolNameStr = "{poolName: " + req.getPoolName() + "}";
logAuditEvent(success, "modifyCachePool", poolNameStr, req.toString(), null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
public void removeCachePool(String cachePoolName) throws IOException {
public void removeCachePool(String cachePoolName, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
writeLock();
boolean success = false;
try {
@ -8474,7 +8361,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
pc.checkSuperuserPrivilege();
}
cacheManager.removeCachePool(cachePoolName);
getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
getEditLog().logRemoveCachePool(cachePoolName, logRetryCache);
success = true;
} finally {
writeUnlock();
@ -8482,7 +8369,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String poolNameStr = "{poolName: " + cachePoolName + "}";
logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@ -8675,15 +8561,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException if the path can't be resolved.
* @throws SafeModeException if the Namenode is in safe mode.
*/
void createEncryptionZone(final String src, final String keyName)
void createEncryptionZone(final String src, final String keyName,
boolean logRetryCache)
throws IOException, UnresolvedLinkException,
SafeModeException, AccessControlException {
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
if (provider == null) {
throw new IOException(
@ -8709,13 +8590,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If the provider supports pool for EDEKs, this will fill in the pool
generateEncryptedDataEncryptionKey(keyName);
createEncryptionZoneInt(src, metadata.getCipher(),
keyName, cacheEntry != null);
success = true;
keyName, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "createEncryptionZone", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -8820,22 +8698,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
setXAttrInt(src, xAttr, flag, cacheEntry != null);
success = true;
setXAttrInt(src, xAttr, flag, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "setXAttr", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@ -8985,20 +8856,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
void removeXAttr(String src, XAttr xAttr) throws IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
throws IOException {
try {
removeXAttrInt(src, xAttr, cacheEntry != null);
success = true;
removeXAttrInt(src, xAttr, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "removeXAttr", src);
throw e;
} finally {
RetryCache.setState(cacheEntry, success);
}
}

View File

@ -138,6 +138,9 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
@ -166,7 +169,6 @@ import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappin
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@ -190,7 +192,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
protected final FSNamesystem namesystem;
protected final NameNode nn;
private final NameNodeMetrics metrics;
private final RetryCache retryCache;
private final boolean serviceAuthEnabled;
/** The RPC server that listens to requests from DataNodes */
@ -207,6 +211,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
this.nn = nn;
this.namesystem = nn.getNamesystem();
this.retryCache = namesystem.getRetryCache();
this.metrics = NameNode.getNameNodeMetrics();
int handlerCount =
@ -505,14 +510,36 @@ class NameNodeRpcServer implements NamenodeProtocols {
verifyRequest(registration);
if(!nn.isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
return namesystem.startCheckpoint(registration, nn.setRegistration());
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
NamenodeCommand ret = null;
try {
ret = namesystem.startCheckpoint(registration, nn.setRegistration());
} finally {
RetryCache.setState(cacheEntry, ret != null, ret);
}
return ret;
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
namesystem.checkSuperuserPrivilege();
namesystem.endCheckpoint(registration, sig);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.endCheckpoint(registration, sig);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -557,19 +584,32 @@ class NameNodeRpcServer implements NamenodeProtocols {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
+src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
getRemoteUser().getShortUserName(), null, masked),
clientName, clientMachine, flag.get(), createParent, replication,
blockSize, supportedVersions);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
HdfsFileStatus status = null;
try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return fileStatus;
return status;
}
@Override // ClientProtocol
@ -580,7 +620,20 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LocatedBlock) cacheEntry.getPayload();
}
LocatedBlock info = null;
boolean success = false;
try {
info = namesystem.appendFile(src, clientName, clientMachine,
cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, info);
}
metrics.incrFilesAppended();
return info;
}
@ -722,7 +775,19 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes,
newStorageIDs, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // DatanodeProtocol
@ -751,7 +816,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
boolean ret = namesystem.renameTo(src, dst);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = namesystem.renameTo(src, dst, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret);
}
if (ret) {
metrics.incrFilesRenamed();
}
@ -760,7 +836,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
namesystem.concat(trg, src);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.concat(trg, src, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -773,7 +860,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.renameTo(src, dst, options);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.renameTo(src, dst, cacheEntry != null, options);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
metrics.incrFilesRenamed();
}
@ -783,7 +880,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
boolean ret = namesystem.delete(src, recursive);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
boolean ret = false;
try {
ret = namesystem.delete(src, recursive, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret);
}
if (ret)
metrics.incrDeleteFileOps();
return ret;
@ -899,7 +1006,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void saveNamespace() throws IOException {
namesystem.saveNamespace();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.saveNamespace();
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -1020,6 +1137,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
@ -1033,8 +1155,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("Invalid symlink target");
}
final UserGroupInformation ugi = getRemoteUser();
namesystem.createSymlink(target, link,
new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
boolean success = false;
try {
PermissionStatus perm = new PermissionStatus(ugi.getShortUserName(),
null, dirPerms);
namesystem.createSymlink(target, link, perm, createParent,
cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // ClientProtocol
@ -1322,15 +1453,38 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("createSnapshot: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
metrics.incrCreateSnapshotOps();
return namesystem.createSnapshot(snapshotRoot, snapshotName);
String ret = null;
try {
ret = namesystem.createSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, ret != null, ret);
}
return ret;
}
@Override
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
metrics.incrDeleteSnapshotOps();
namesystem.deleteSnapshot(snapshotRoot, snapshotName);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.deleteSnapshot(snapshotRoot, snapshotName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
@ -1354,7 +1508,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("The new snapshot name is null or empty.");
}
metrics.incrRenameSnapshotOps();
namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.renameSnapshot(snapshotRoot, snapshotOldName,
snapshotNewName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override // Client Protocol
@ -1378,18 +1543,53 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
return namesystem.addCacheDirective(path, flags);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
boolean success = false;
long ret = 0;
try {
ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success, ret);
}
return ret;
}
@Override
public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
namesystem.modifyCacheDirective(directive, flags);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
public void removeCacheDirective(long id) throws IOException {
namesystem.removeCacheDirective(id);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.removeCacheDirective(id, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
@ -1403,17 +1603,47 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void addCachePool(CachePoolInfo info) throws IOException {
namesystem.addCachePool(info);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.addCachePool(info, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
public void modifyCachePool(CachePoolInfo info) throws IOException {
namesystem.modifyCachePool(info);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.modifyCachePool(info, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
public void removeCachePool(String cachePoolName) throws IOException {
namesystem.removeCachePool(cachePoolName);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.removeCachePool(cachePoolName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
@ -1457,7 +1687,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void createEncryptionZone(String src, String keyName)
throws IOException {
namesystem.createEncryptionZone(src, keyName);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
boolean success = false;
try {
namesystem.createEncryptionZone(src, keyName, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
@ -1475,7 +1715,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
namesystem.setXAttr(src, xAttr, flag);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.setXAttr(src, xAttr, flag, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override
@ -1491,7 +1741,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void removeXAttr(String src, XAttr xAttr) throws IOException {
namesystem.removeXAttr(src, xAttr);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
namesystem.removeXAttr(src, xAttr, cacheEntry != null);
success = true;
} finally {
RetryCache.setState(cacheEntry, success);
}
}
@Override

View File

@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.AccessControlException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestDefaultBlockPlacementPolicy {
private Configuration conf;
private final short REPLICATION_FACTOR = (short) 3;
private final int DEFAULT_BLOCK_SIZE = 1024;
private static final short REPLICATION_FACTOR = (short) 3;
private static final int DEFAULT_BLOCK_SIZE = 1024;
private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null;
@ -55,13 +49,12 @@ public class TestDefaultBlockPlacementPolicy {
@Before
public void setup() throws IOException {
StaticMapping.resetMap();
conf = new HdfsConfiguration();
Configuration conf = new HdfsConfiguration();
final String[] racks = { "/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2" };
final String[] hosts = { "/host0", "/host1", "/host2", "/host3", "/host4" };
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DEFAULT_BLOCK_SIZE / 2);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
@ -104,17 +97,14 @@ public class TestDefaultBlockPlacementPolicy {
}
private void testPlacement(String clientMachine,
String clientRack) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException,
NotReplicatedYetException {
String clientRack) throws IOException {
// write 5 files and check whether all times block placed
for (int i = 0; i < 5; i++) {
String src = "/test-" + i;
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null);
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);

View File

@ -191,7 +191,7 @@ public class TestFsLimits {
lazyInitFSDirectory();
Class<?> generated = null;
try {
fs.renameTo(src, dst, new Rename[] { });
fs.renameTo(src, dst, false, new Rename[] { });
} catch (Throwable e) {
generated = e.getClass();
}
@ -204,7 +204,7 @@ public class TestFsLimits {
lazyInitFSDirectory();
Class<?> generated = null;
try {
fs.renameTo(src, dst);
fs.renameTo(src, dst, false);
} catch (Throwable e) {
generated = e.getClass();
}