HDFS-16764. [SBN Read] ObserverNamenode should throw ObserverRetryOnActiveException instead of FileNotFoundException during processing of addBlock rpc (#4872)

Signed-off-by: Erik Krogen <xkrogen@apache.org>
Co-authored-by: zengqiang.xu <zengqiang.xu@shopee.com>
This commit is contained in:
ZanderXu 2022-12-21 07:50:58 +08:00 committed by GitHub
parent 7ff326129d
commit 8d221255f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 4 deletions

View File

@ -3044,12 +3044,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlock[] onRetryBlock = new LocatedBlock[1]; LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirWriteFileOp.ValidateAddBlockResult r; FSDirWriteFileOp.ValidateAddBlockResult r;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(operationName); FSPermissionChecker.setOperationType(operationName);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.WRITE);
r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName, r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
previous, onRetryBlock); previous, onRetryBlock);
} finally { } finally {
@ -3095,12 +3095,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final byte storagePolicyID; final byte storagePolicyID;
final List<DatanodeStorageInfo> chosen; final List<DatanodeStorageInfo> chosen;
final BlockType blockType; final BlockType blockType;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
FSPermissionChecker.setOperationType(null); FSPermissionChecker.setOperationType(null);
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); // Changing this operation category to WRITE instead of making getAdditionalDatanode as a
// read method is aim to let Active NameNode to handle this RPC, because Active NameNode
// contains a more complete DN selection context than Observer NameNode.
checkOperation(OperationCategory.WRITE);
//check safe mode //check safe mode
checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk); checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
final INodesInPath iip = dir.resolvePath(pc, src, fileId); final INodesInPath iip = dir.resolvePath(pc, src, fileId);

View File

@ -29,26 +29,32 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KE
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test; import org.junit.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
@ -158,6 +164,43 @@ public class TestNameNodeRpcServer {
} }
} }
@Test
@Timeout(30000)
public void testObserverHandleAddBlock() throws Exception {
String baseDir = GenericTestUtils.getRandomizedTempPath();
Configuration conf = new HdfsConfiguration();
MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf).setNumNameNodes(3);
builder.getDfsBuilder().numDataNodes(3);
try (MiniQJMHACluster qjmhaCluster = builder.baseDir(baseDir).build()) {
MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
dfsCluster.waitActive();
dfsCluster.transitionToActive(0);
dfsCluster.transitionToObserver(2);
NameNode activeNN = dfsCluster.getNameNode(0);
NameNode observerNN = dfsCluster.getNameNode(2);
// Stop the editLogTailer of Observer NameNode
observerNN.getNamesystem().getEditLogTailer().stop();
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
Path testPath = new Path("/testObserverHandleAddBlock/file.txt");
try (FSDataOutputStream ignore = dfs.create(testPath)) {
HdfsFileStatus fileStatus = activeNN.getRpcServer().getFileInfo(testPath.toUri().getPath());
assertNotNull(fileStatus);
assertNull(observerNN.getRpcServer().getFileInfo(testPath.toUri().getPath()));
LambdaTestUtils.intercept(ObserverRetryOnActiveException.class, () -> {
observerNN.getRpcServer().addBlock(testPath.toUri().getPath(),
dfs.getClient().getClientName(), null, null,
fileStatus.getFileId(), null, EnumSet.noneOf(AddBlockFlag.class));
});
} finally {
dfs.delete(testPath, true);
}
}
}
/** /**
* A test to make sure that if an authorized user adds "clientIp:" to their * A test to make sure that if an authorized user adds "clientIp:" to their
* caller context, it will be used to make locality decisions on the NN. * caller context, it will be used to make locality decisions on the NN.