HDFS-2827. When the parent of a directory is the root, renaming the directory results in leases updated incorrectly. Contributed by Uma Maheswara Rao G

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1238700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-01-31 17:36:42 +00:00
parent 7f23d72352
commit 8dfef7d2dc
3 changed files with 50 additions and 4 deletions

View File

@ -378,6 +378,10 @@ Release 0.23.1 - UNRELEASED
HDFS-2791. If block report races with closing of file, replica is
incorrectly marked corrupt. (todd)
HDFS-2827. When the parent of a directory is the root, renaming the
directory results in leases updated incorrectly. (Uma Maheswara Rao G
via szetszwo)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -3907,7 +3907,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (destinationExisted && dinfo.isDir()) {
Path spath = new Path(src);
overwrite = spath.getParent().toString() + Path.SEPARATOR;
Path parent = spath.getParent();
if (isRoot(parent)) {
overwrite = parent.toString();
} else {
overwrite = parent.toString() + Path.SEPARATOR;
}
replaceBy = dst + Path.SEPARATOR;
} else {
overwrite = src;
@ -3917,6 +3922,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
leaseManager.changeLease(src, dst, overwrite, replaceBy);
}
private boolean isRoot(Path path) {
return path.getParent() == null;
}
/**
* Serializes leases.
*/

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@ -28,6 +29,7 @@ import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -38,16 +40,19 @@ import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
@ -566,6 +571,34 @@ public class TestSaveNamespace {
}
}
/**
* Test for save namespace should succeed when parent directory renamed with
* open lease and destination directory exist.
* This test is a regression for HDFS-2827
*/
@Test
public void testSaveNamespaceWithRenamedLease() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
.numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
OutputStream out = null;
try {
fs.mkdirs(new Path("/test-target"));
out = fs.create(new Path("/test-source/foo")); // don't close
fs.rename(new Path("/test-source/"), new Path("/test-target/"));
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
} finally {
IOUtils.cleanup(LOG, out, fs);
if (cluster != null) {
cluster.shutdown();
}
}
}
private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
// Make an edit
fsn.mkdirs(