HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma.

(cherry picked from commit 7817674a3a)
(cherry picked from commit 17fb442a4c)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

(cherry picked from commit fad2a062ddbb955a42dd5a90d64781617287f8df)
This commit is contained in:
Jing Zhao 2015-05-29 11:05:13 -07:00 committed by Vinod Kumar Vavilapalli
parent 563dbd29eb
commit 46b9393cab
4 changed files with 58 additions and 20 deletions

View File

@ -147,6 +147,9 @@ Release 2.6.1 - UNRELEASED
HDFS-8431. hdfs crypto class not found in Windows.
(Anu Engineer via cnauroth)
HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
(Ming Ma via jing9)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -2003,7 +2003,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
try {
@ -2563,7 +2562,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean skipSync = false;
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
@ -3137,7 +3135,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
@ -3806,7 +3803,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException("Invalid name: " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
boolean status = false;
@ -3879,7 +3875,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
@ -4003,7 +3998,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
List<INode> removedINodes = new ChunkedArrayList<INode>();
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean ret = false;
@ -7048,7 +7042,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
@ -8141,7 +8134,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -8255,7 +8247,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8501,7 +8492,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntryWithPayload cacheEntry =
@ -8548,7 +8538,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
boolean success = false;
@ -8584,7 +8573,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void removeCacheDirective(Long id) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8639,7 +8627,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
public void addCachePool(CachePoolInfo req) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8674,7 +8661,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
public void modifyCachePool(CachePoolInfo req) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8708,7 +8694,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
public void removeCachePool(String cachePoolName) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8975,7 +8960,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String src = srcArg;
HdfsFileStatus resultingStat = null;
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
final byte[][] pathComponents =
FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
@ -9098,7 +9082,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSPermissionChecker pc = getPermissionChecker();
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
FSDirectory.isReservedRawName(src));
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {
@ -9260,7 +9243,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSPermissionChecker pc = getPermissionChecker();
XAttrPermissionFilter.checkPermissionForApi(pc, xAttr,
FSDirectory.isReservedRawName(src));
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock();
try {

View File

@ -576,6 +576,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
getRemoteUser().getShortUserName(), null, masked),
clientName, clientMachine, flag.get(), createParent, replication,
@ -594,6 +595,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
namesystem.checkOperation(OperationCategory.WRITE);
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
metrics.incrFilesAppended();
return info;
@ -749,6 +751,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
}
@ -781,6 +784,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
boolean ret = namesystem.renameTo(src, dst);
if (ret) {
metrics.incrFilesRenamed();
@ -791,6 +795,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.concat(trg, src);
}
@ -805,6 +810,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.renameTo(src, dst, options);
metrics.incrFilesRenamed();
}
@ -816,6 +822,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
namesystem.checkOperation(OperationCategory.WRITE);
boolean ret = namesystem.delete(src, recursive);
if (ret)
metrics.incrDeleteFileOps();
@ -1088,6 +1095,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
@ -1408,6 +1416,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("createSnapshot: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrCreateSnapshotOps();
return namesystem.createSnapshot(snapshotRoot, snapshotName);
}
@ -1416,6 +1425,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrDeleteSnapshotOps();
namesystem.deleteSnapshot(snapshotRoot, snapshotName);
}
@ -1444,6 +1454,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
throw new IOException("The new snapshot name is null or empty.");
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrRenameSnapshotOps();
namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
}
@ -1472,6 +1483,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
return namesystem.addCacheDirective(path, flags);
}
@ -1479,12 +1491,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.modifyCacheDirective(directive, flags);
}
@Override // ClientProtocol
public void removeCacheDirective(long id) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.removeCacheDirective(id);
}
@ -1501,18 +1515,21 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override //ClientProtocol
public void addCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.addCachePool(info);
}
@Override // ClientProtocol
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.modifyCachePool(info);
}
@Override // ClientProtocol
public void removeCachePool(String cachePoolName) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.removeCachePool(cachePoolName);
}
@ -1565,6 +1582,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void createEncryptionZone(String src, String keyName)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.createEncryptionZone(src, keyName);
}
@ -1586,6 +1604,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.setXAttr(src, xAttr, flag);
}
@ -1605,6 +1624,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void removeXAttr(String src, XAttr xAttr) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
namesystem.removeXAttr(src, xAttr);
}

View File

@ -213,7 +213,8 @@ public class TestRetryCacheWithHA {
abstract class AtMostOnceOp {
private final String name;
final DFSClient client;
int expectedUpdateCount = 0;
AtMostOnceOp(String name, DFSClient client) {
this.name = name;
this.client = client;
@ -223,6 +224,9 @@ public class TestRetryCacheWithHA {
abstract void invoke() throws Exception;
abstract boolean checkNamenodeBeforeReturn() throws Exception;
abstract Object getResult();
int getExpectedCacheUpdateCount() {
return expectedUpdateCount;
}
}
/** createSnapshot operaiton */
@ -601,7 +605,7 @@ public class TestRetryCacheWithHA {
class DeleteOp extends AtMostOnceOp {
private final String target;
private boolean deleted;
DeleteOp(DFSClient client, String target) {
super("delete", client);
this.target = target;
@ -611,12 +615,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception {
Path p = new Path(target);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
deleted = client.delete(target, true);
}
@ -652,12 +658,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception {
Path p = new Path(target);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.createSymlink(target, link, false);
}
@ -769,11 +777,13 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@ -815,12 +825,15 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
@ -871,12 +884,15 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
expectedUpdateCount++;
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
expectedUpdateCount++;
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeCacheDirective(id);
}
@ -918,6 +934,7 @@ public class TestRetryCacheWithHA {
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool));
}
@ -950,11 +967,13 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
}
@ -987,11 +1006,13 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
expectedUpdateCount++;
client.addCachePool(new CachePoolInfo(pool));
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeCachePool(pool);
}
@ -1026,12 +1047,14 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception {
Path p = new Path(src);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
}
}
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE));
}
@ -1068,7 +1091,9 @@ public class TestRetryCacheWithHA {
void prepare() throws Exception {
Path p = new Path(src);
if (!dfs.exists(p)) {
expectedUpdateCount++;
DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
expectedUpdateCount++;
client.setXAttr(src, "user.key", "value".getBytes(),
EnumSet.of(XAttrSetFlag.CREATE));
}
@ -1076,6 +1101,7 @@ public class TestRetryCacheWithHA {
@Override
void invoke() throws Exception {
expectedUpdateCount++;
client.removeXAttr(src, "user.key");
}
@ -1312,6 +1338,13 @@ public class TestRetryCacheWithHA {
assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
// Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
long expectedUpdateCount = op.getExpectedCacheUpdateCount();
if (expectedUpdateCount > 0) {
assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
updatedNN0);
assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
updatedNN1);
}
}
/**