HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened. Contributed by Vinay.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-10-06 18:39:03 +00:00
parent 44f7ee7192
commit 7317e97bd7
3 changed files with 68 additions and 18 deletions

View File

@ -400,6 +400,9 @@ Release 2.1.2 - UNRELEASED
HDFS-5306. Datanode https port is not available at the namenode. (Suresh HDFS-5306. Datanode https port is not available at the namenode. (Suresh
Srinivas via brandonli) Srinivas via brandonli)
HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
(Vinay via jing9)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1793,16 +1793,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void createSymlink(String target, String link, void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent) PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (!DFSUtil.isValidName(link)) { if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link); throw new InvalidPathException("Invalid link name: " + link);
} }
if (FSDirectory.isReservedName(target)) { if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target); throw new InvalidPathException("Invalid target name: " + target);
} }
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false; boolean success = false;
try { try {
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null); createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
@ -3023,10 +3023,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Rename src to dst */ /** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options) void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst); + src + " to " + dst);
@ -3034,8 +3030,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!DFSUtil.isValidName(dst)) { if (!DFSUtil.isValidName(dst)) {
throw new InvalidPathException("Invalid name: " + dst); throw new InvalidPathException("Invalid name: " + dst);
} }
FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst); byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null; HdfsFileStatus resultingStat = null;
@ -4249,12 +4250,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException if * @throws IOException if
*/ */
void saveNamespace() throws AccessControlException, IOException { void saveNamespace() throws AccessControlException, IOException {
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
} }
checkSuperuserPrivilege();
checkOperation(OperationCategory.UNCHECKED);
boolean success = false; boolean success = false;
readLock(); readLock();
try { try {
@ -5140,11 +5142,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void endCheckpoint(NamenodeRegistration registration, void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException { CheckpointSignature sig) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
} }
checkOperation(OperationCategory.CHECKPOINT);
boolean success = false; boolean success = false;
readLock(); readLock();
try { try {
@ -5676,11 +5678,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void updatePipeline(String clientName, ExtendedBlock oldBlock, void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes) ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
} }
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp() + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes() + ", newLength=" + newBlock.getNumBytes()
@ -6614,12 +6616,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
String createSnapshot(String snapshotRoot, String snapshotName) String createSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException { throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null); null);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload(); return (String) cacheEntry.getPayload();
} }
final FSPermissionChecker pc = getPermissionChecker();
writeLock(); writeLock();
String snapshotPath = null; String snapshotPath = null;
try { try {
@ -6666,11 +6669,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
void renameSnapshot(String path, String snapshotOldName, void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException { String snapshotNewName) throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
} }
final FSPermissionChecker pc = getPermissionChecker();
writeLock(); writeLock();
boolean success = false; boolean success = false;
try { try {
@ -6711,10 +6715,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public SnapshottableDirectoryStatus[] getSnapshottableDirListing() public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException { throws IOException {
SnapshottableDirectoryStatus[] status = null; SnapshottableDirectoryStatus[] status = null;
final FSPermissionChecker checker = getPermissionChecker();
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
FSPermissionChecker checker = getPermissionChecker();
final String user = checker.isSuperUser()? null : checker.getUser(); final String user = checker.isSuperUser()? null : checker.getUser();
status = snapshotManager.getSnapshottableDirListing(user); status = snapshotManager.getSnapshottableDirListing(user);
} finally { } finally {
@ -6782,13 +6786,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
void deleteSnapshot(String snapshotRoot, String snapshotName) void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException { throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker(); final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) { if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response return; // Return previous response
} }
boolean success = false; boolean success = false;
checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
@ -35,11 +36,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.ClientId;
@ -47,7 +52,9 @@ import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RetryCache.CacheEntry; import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.LightWeightCache; import org.apache.hadoop.util.LightWeightCache;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -75,12 +82,13 @@ public class TestNamenodeRetryCache {
"TestNamenodeRetryCache", null, FsPermission.getDefault()); "TestNamenodeRetryCache", null, FsPermission.getDefault());
private static DistributedFileSystem filesystem; private static DistributedFileSystem filesystem;
private static int callId = 100; private static int callId = 100;
private static Configuration conf = new HdfsConfiguration(); private static Configuration conf;
private static final int BlockSize = 512; private static final int BlockSize = 512;
/** Start a cluster */ /** Start a cluster */
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).build(); cluster = new MiniDFSCluster.Builder(conf).build();
@ -293,6 +301,40 @@ public class TestNamenodeRetryCache {
} }
} }
/**
* Make sure a retry call does not hang because of the exception thrown in the
* first call.
*/
@Test(timeout = 60000)
public void testUpdatePipelineWithFailOver() throws Exception {
cluster.shutdown();
namesystem = null;
filesystem = null;
cluster = new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
FSNamesystem ns0 = cluster.getNamesystem(0);
ExtendedBlock oldBlock = new ExtendedBlock();
ExtendedBlock newBlock = new ExtendedBlock();
DatanodeID[] newNodes = new DatanodeID[2];
newCall();
try {
ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
fail("Expect StandbyException from the updatePipeline call");
} catch (StandbyException e) {
// expected, since in the beginning both nn are in standby state
GenericTestUtils.assertExceptionContains(
HAServiceState.STANDBY.toString(), e);
}
cluster.transitionToActive(0);
try {
ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
} catch (IOException e) {
// ignore call should not hang.
}
}
/** /**
* Test for crateSnapshot * Test for crateSnapshot
*/ */