HDFS-6870. Merge r1619192 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619193 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-08-20 18:14:43 +00:00
parent 8991c7d1b2
commit f87e8dc4b8
3 changed files with 57 additions and 5 deletions

View File

@ -268,6 +268,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6868. portmap and nfs3 are documented as hadoop commands instead of hdfs HDFS-6868. portmap and nfs3 are documented as hadoop commands instead of hdfs
(brandonli) (brandonli)
HDFS-6870. Blocks and INodes could leak for Rename with overwrite flag. (Yi
Liu via jing9)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -646,15 +646,20 @@ public class FSDirectory implements Closeable {
tx.updateMtimeAndLease(timestamp); tx.updateMtimeAndLease(timestamp);
// Collect the blocks and remove the lease for previous dst // Collect the blocks and remove the lease for previous dst
long filesDeleted = -1; boolean filesDeleted = false;
if (removedDst != null) { if (removedDst != null) {
undoRemoveDst = false; undoRemoveDst = false;
if (removedNum > 0) { if (removedNum > 0) {
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
List<INode> removedINodes = new ChunkedArrayList<INode>(); List<INode> removedINodes = new ChunkedArrayList<INode>();
filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID, if (!removedDst.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes, removedDst.destroyAndCollectBlocks(collectedBlocks, removedINodes);
true).get(Quota.NAMESPACE); filesDeleted = true;
} else {
filesDeleted = removedDst.cleanSubtree(Snapshot.CURRENT_STATE_ID,
dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes,
true).get(Quota.NAMESPACE) >= 0;
}
getFSNamesystem().removePathAndBlocks(src, collectedBlocks, getFSNamesystem().removePathAndBlocks(src, collectedBlocks,
removedINodes, false); removedINodes, false);
} }
@ -667,7 +672,7 @@ public class FSDirectory implements Closeable {
} }
tx.updateQuotasInSourceTree(); tx.updateQuotasInSourceTree();
return filesDeleted >= 0; return filesDeleted;
} }
} finally { } finally {
if (undoRemoveSrc) { if (undoRemoveSrc) {

View File

@ -27,6 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.Test; import org.junit.Test;
@ -125,4 +128,45 @@ public class TestDFSRename {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
/**
* Check the blocks of dst file are cleaned after rename with overwrite
*/
@Test(timeout = 120000)
public void testRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(replFactor).build();
DistributedFileSystem dfs = cluster.getFileSystem();
try {
long fileLen = blockSize*3;
String src = "/foo/src";
String dst = "/foo/dst";
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), dst, 0, fileLen);
BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode()).
getBlockManager();
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
getLocalBlock()) != null);
dfs.rename(srcPath, dstPath, Rename.OVERWRITE);
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock().
getLocalBlock()) == null);
} finally {
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
} }