HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma.

(cherry picked from commit 7817674a3a)
This commit is contained in:
Jing Zhao 2015-05-29 11:05:13 -07:00
parent 09d68d645a
commit 17fb442a4c
4 changed files with 58 additions and 23 deletions

View File

@ -485,6 +485,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7401. Add block info to DFSInputStream' WARN message when it adds HDFS-7401. Add block info to DFSInputStream' WARN message when it adds
node to deadNodes (Arshad Mohammad via vinayakumarb) node to deadNodes (Arshad Mohammad via vinayakumarb)
HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
(Ming Ma via jing9)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1877,7 +1877,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/ */
void concat(String target, String [] srcs, boolean logRetryCache) void concat(String target, String [] srcs, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage(); waitForLoadingFSImage();
HdfsFileStatus stat = null; HdfsFileStatus stat = null;
boolean success = false; boolean success = false;
@ -2369,7 +2368,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new InvalidPathException(src); throw new InvalidPathException(src);
} }
blockManager.verifyReplication(src, replication, clientMachine); blockManager.verifyReplication(src, replication, clientMachine);
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) { if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" + throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
@ -2782,7 +2780,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlock lb = null; LocatedBlock lb = null;
HdfsFileStatus stat = null; HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
@ -3077,7 +3074,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean renameTo(String src, String dst, boolean logRetryCache) boolean renameTo(String src, String dst, boolean logRetryCache)
throws IOException { throws IOException {
waitForLoadingFSImage(); waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
FSDirRenameOp.RenameOldResult ret = null; FSDirRenameOp.RenameOldResult ret = null;
writeLock(); writeLock();
try { try {
@ -3103,7 +3099,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean logRetryCache, Options.Rename... options) boolean logRetryCache, Options.Rename... options)
throws IOException { throws IOException {
waitForLoadingFSImage(); waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null; Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
writeLock(); writeLock();
try { try {
@ -3140,7 +3135,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean delete(String src, boolean recursive, boolean logRetryCache) boolean delete(String src, boolean recursive, boolean logRetryCache)
throws IOException { throws IOException {
waitForLoadingFSImage(); waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
BlocksMapUpdateInfo toRemovedBlocks = null; BlocksMapUpdateInfo toRemovedBlocks = null;
writeLock(); writeLock();
boolean ret = false; boolean ret = false;
@ -5756,8 +5750,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache) DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(" + oldBlock.getLocalBlock() LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
+ ", newGS=" + newBlock.getGenerationStamp() + ", newGS=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes() + ", newLength=" + newBlock.getNumBytes()
@ -6738,7 +6730,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void renameSnapshot( void renameSnapshot(
String path, String snapshotOldName, String snapshotNewName, String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException { boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
boolean success = false; boolean success = false;
writeLock(); writeLock();
try { try {
@ -6826,7 +6817,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/ */
void deleteSnapshot(String snapshotRoot, String snapshotName, void deleteSnapshot(String snapshotRoot, String snapshotName,
boolean logRetryCache) throws IOException { boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
boolean success = false; boolean success = false;
writeLock(); writeLock();
BlocksMapUpdateInfo blocksToBeDeleted = null; BlocksMapUpdateInfo blocksToBeDeleted = null;
@ -7051,7 +7041,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
long addCacheDirective(CacheDirectiveInfo directive, long addCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags, boolean logRetryCache) EnumSet<CacheFlag> flags, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
CacheDirectiveInfo effectiveDirective = null; CacheDirectiveInfo effectiveDirective = null;
if (!flags.contains(CacheFlag.FORCE)) { if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded(); cacheManager.waitForRescanIfNeeded();
@ -7079,7 +7068,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void modifyCacheDirective(CacheDirectiveInfo directive, void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException { EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
boolean success = false; boolean success = false;
if (!flags.contains(CacheFlag.FORCE)) { if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded(); cacheManager.waitForRescanIfNeeded();
@ -7103,7 +7091,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
void removeCacheDirective(long id, boolean logRetryCache) throws IOException { void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
boolean success = false; boolean success = false;
writeLock(); writeLock();
try { try {
@ -7142,7 +7129,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void addCachePool(CachePoolInfo req, boolean logRetryCache) void addCachePool(CachePoolInfo req, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
boolean success = false; boolean success = false;
String poolInfoStr = null; String poolInfoStr = null;
@ -7164,7 +7150,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void modifyCachePool(CachePoolInfo req, boolean logRetryCache) void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
boolean success = false; boolean success = false;
try { try {
@ -7186,7 +7171,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void removeCachePool(String cachePoolName, boolean logRetryCache) void removeCachePool(String cachePoolName, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
boolean success = false; boolean success = false;
try { try {
@ -7381,7 +7365,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String src = srcArg; String src = srcArg;
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
checkSuperuserPrivilege(); checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
final byte[][] pathComponents = final byte[][] pathComponents =
FSDirectory.getPathComponentsForReservedPath(src); FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
@ -7467,7 +7450,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag, void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
boolean logRetryCache) boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
HdfsFileStatus auditStat = null; HdfsFileStatus auditStat = null;
writeLock(); writeLock();
try { try {
@ -7515,7 +7497,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void removeXAttr(String src, XAttr xAttr, boolean logRetryCache) void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
HdfsFileStatus auditStat = null; HdfsFileStatus auditStat = null;
writeLock(); writeLock();
try { try {

View File

@ -611,7 +611,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("create: Pathname too long. Limit " throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload(); return (HdfsFileStatus) cacheEntry.getPayload();
@ -642,6 +642,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* NameNode.append: file " stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine); +src+" for "+clientName+" at "+clientMachine);
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null); null);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -789,6 +790,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -833,7 +835,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit " throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response return true; // Return previous response
@ -854,6 +856,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException { public void concat(String trg, String[] src) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -879,6 +882,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit " throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -916,6 +920,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive); + ", recursive=" + recursive);
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response return true; // Return previous response
@ -1200,6 +1205,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void createSymlink(String target, String link, FsPermission dirPerms, public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException { boolean createParent) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -1531,6 +1537,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("createSnapshot: Pathname too long. Limit " throw new IOException("createSnapshot: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
} }
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null); null);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -1552,6 +1559,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void deleteSnapshot(String snapshotRoot, String snapshotName) public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrDeleteSnapshotOps(); metrics.incrDeleteSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -1590,6 +1598,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (snapshotNewName == null || snapshotNewName.isEmpty()) { if (snapshotNewName == null || snapshotNewName.isEmpty()) {
throw new IOException("The new snapshot name is null or empty."); throw new IOException("The new snapshot name is null or empty.");
} }
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrRenameSnapshotOps(); metrics.incrRenameSnapshotOps();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -1629,6 +1638,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public long addCacheDirective( public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException { CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
(retryCache, null); (retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -1650,6 +1660,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void modifyCacheDirective( public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException { CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; return;
@ -1667,6 +1678,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public void removeCacheDirective(long id) throws IOException { public void removeCacheDirective(long id) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; return;
@ -1693,6 +1705,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override //ClientProtocol @Override //ClientProtocol
public void addCachePool(CachePoolInfo info) throws IOException { public void addCachePool(CachePoolInfo info) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -1709,6 +1722,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public void modifyCachePool(CachePoolInfo info) throws IOException { public void modifyCachePool(CachePoolInfo info) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -1725,6 +1739,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public void removeCachePool(String cachePoolName) throws IOException { public void removeCachePool(String cachePoolName) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; return;
@ -1787,6 +1802,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void createEncryptionZone(String src, String keyName) public void createEncryptionZone(String src, String keyName)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; return;
@ -1818,6 +1834,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
@ -1847,6 +1864,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol @Override // ClientProtocol
public void removeXAttr(String src, XAttr xAttr) throws IOException { public void removeXAttr(String src, XAttr xAttr) throws IOException {
checkNNStartup(); checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response

View File

@ -215,6 +215,7 @@ public class TestRetryCacheWithHA {
abstract class AtMostOnceOp { abstract class AtMostOnceOp {
private final String name; private final String name;
final DFSClient client; final DFSClient client;
int expectedUpdateCount = 0;
AtMostOnceOp(String name, DFSClient client) { AtMostOnceOp(String name, DFSClient client) {
this.name = name; this.name = name;
@ -225,6 +226,9 @@ public class TestRetryCacheWithHA {
abstract void invoke() throws Exception; abstract void invoke() throws Exception;
abstract boolean checkNamenodeBeforeReturn() throws Exception; abstract boolean checkNamenodeBeforeReturn() throws Exception;
abstract Object getResult(); abstract Object getResult();
int getExpectedCacheUpdateCount() {
return expectedUpdateCount;
}
} }
/** createSnapshot operaiton */ /** createSnapshot operaiton */
@ -614,12 +618,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception { void prepare() throws Exception {
Path p = new Path(target); Path p = new Path(target);
if (!dfs.exists(p)) { if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
} }
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
deleted = client.delete(target, true); deleted = client.delete(target, true);
} }
@ -655,12 +661,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception { void prepare() throws Exception {
Path p = new Path(target); Path p = new Path(target);
if (!dfs.exists(p)) { if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
} }
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.createSymlink(target, link, false); client.createSymlink(target, link, false);
} }
@ -773,11 +781,13 @@ public class TestRetryCacheWithHA {
@Override @Override
void prepare() throws Exception { void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool())); dfs.addCachePool(new CachePoolInfo(directive.getPool()));
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
} }
@ -819,12 +829,15 @@ public class TestRetryCacheWithHA {
@Override @Override
void prepare() throws Exception { void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool())); dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCacheDirective( client.modifyCacheDirective(
new CacheDirectiveInfo.Builder(). new CacheDirectiveInfo.Builder().
setId(id). setId(id).
@ -875,12 +888,15 @@ public class TestRetryCacheWithHA {
@Override @Override
void prepare() throws Exception { void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool())); dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE)); id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.removeCacheDirective(id); client.removeCacheDirective(id);
} }
@ -922,6 +938,7 @@ public class TestRetryCacheWithHA {
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool)); client.addCachePool(new CachePoolInfo(pool));
} }
@ -954,11 +971,13 @@ public class TestRetryCacheWithHA {
@Override @Override
void prepare() throws Exception { void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool).setLimit(10l)); client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l)); client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
} }
@ -991,11 +1010,13 @@ public class TestRetryCacheWithHA {
@Override @Override
void prepare() throws Exception { void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool)); client.addCachePool(new CachePoolInfo(pool));
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.removeCachePool(pool); client.removeCachePool(pool);
} }
@ -1030,12 +1051,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception { void prepare() throws Exception {
Path p = new Path(src); Path p = new Path(src);
if (!dfs.exists(p)) { if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
} }
} }
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(), client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE)); EnumSet.of(XAttrSetFlag.CREATE));
} }
@ -1072,7 +1095,9 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception { void prepare() throws Exception {
Path p = new Path(src); Path p = new Path(src);
if (!dfs.exists(p)) { if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0); DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(), client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE)); EnumSet.of(XAttrSetFlag.CREATE));
} }
@ -1080,6 +1105,7 @@ public class TestRetryCacheWithHA {
@Override @Override
void invoke() throws Exception { void invoke() throws Exception {
expectedUpdateCount++;
client.removeXAttr(src, "user.key"); client.removeXAttr(src, "user.key");
} }
@ -1316,6 +1342,13 @@ public class TestRetryCacheWithHA {
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0); assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog // Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0); assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
if (expectedUpdateCount > 0) {
assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
updatedNN0);
assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
updatedNN1);
}
} }
/** /**