HDFS-5299. Merge change r1529660 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-10-06 18:42:08 +00:00
parent 763521305d
commit a59090bbdc
3 changed files with 68 additions and 18 deletions

View File

@ -159,6 +159,9 @@ Release 2.1.2 - UNRELEASED
HDFS-5306. Datanode https port is not available at the namenode. (Suresh
Srinivas via brandonli)
HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
(Vinay via jing9)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES

View File

@ -1777,16 +1777,16 @@ private void setTimesInt(String src, long mtime, long atime)
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
@ -3006,10 +3006,6 @@ private boolean renameToInternal(FSPermissionChecker pc, String src,
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
@ -3017,8 +3013,13 @@ void renameTo(String src, String dst, Options.Rename... options)
if (!DFSUtil.isValidName(dst)) {
throw new InvalidPathException("Invalid name: " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
@ -4232,12 +4233,13 @@ DatanodeInfo[] datanodeReport(final DatanodeReportType type
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkSuperuserPrivilege();
checkOperation(OperationCategory.UNCHECKED);
boolean success = false;
readLock();
try {
@ -5123,11 +5125,11 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.CHECKPOINT);
boolean success = false;
readLock();
try {
@ -5659,11 +5661,11 @@ LocatedBlock updateBlockForPipeline(ExtendedBlock block,
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
checkOperation(OperationCategory.WRITE);
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
@ -6597,12 +6599,13 @@ void disallowSnapshot(String path) throws SafeModeException, IOException {
*/
String createSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
String snapshotPath = null;
try {
@ -6649,11 +6652,12 @@ String createSnapshot(String snapshotRoot, String snapshotName)
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
boolean success = false;
try {
@ -6694,10 +6698,10 @@ void renameSnapshot(String path, String snapshotOldName,
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
SnapshottableDirectoryStatus[] status = null;
final FSPermissionChecker checker = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
FSPermissionChecker checker = getPermissionChecker();
final String user = checker.isSuperUser()? null : checker.getUser();
status = snapshotManager.getSnapshottableDirListing(user);
} finally {
@ -6765,13 +6769,14 @@ private void checkSubtreeReadPermission(final FSPermissionChecker pc,
*/
void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
boolean success = false;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.EnumSet;
@ -35,11 +36,15 @@
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ClientId;
@ -47,7 +52,9 @@
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
import org.junit.Assert;
@ -75,12 +82,13 @@ public class TestNamenodeRetryCache {
"TestNamenodeRetryCache", null, FsPermission.getDefault());
private static DistributedFileSystem filesystem;
private static int callId = 100;
private static Configuration conf = new HdfsConfiguration();
private static Configuration conf;
private static final int BlockSize = 512;
/** Start a cluster */
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BlockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
cluster = new MiniDFSCluster.Builder(conf).build();
@ -293,6 +301,40 @@ public void testRename2() throws Exception {
}
}
/**
* Make sure a retry call does not hang because of the exception thrown in the
* first call.
*/
@Test(timeout = 60000)
public void testUpdatePipelineWithFailOver() throws Exception {
cluster.shutdown();
namesystem = null;
filesystem = null;
cluster = new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
FSNamesystem ns0 = cluster.getNamesystem(0);
ExtendedBlock oldBlock = new ExtendedBlock();
ExtendedBlock newBlock = new ExtendedBlock();
DatanodeID[] newNodes = new DatanodeID[2];
newCall();
try {
ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
fail("Expect StandbyException from the updatePipeline call");
} catch (StandbyException e) {
// expected, since in the beginning both nn are in standby state
GenericTestUtils.assertExceptionContains(
HAServiceState.STANDBY.toString(), e);
}
cluster.transitionToActive(0);
try {
ns0.updatePipeline("testClient", oldBlock, newBlock, newNodes);
} catch (IOException e) {
// ignore call should not hang.
}
}
/**
* Test for crateSnapshot
*/