HDFS-4147. When there is a snapshot in a subtree, deletion of the subtree should fail. Contributed by Jing Zhao

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1405688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-05 01:22:54 +00:00
parent f84000900a
commit b3bc2fb76e
4 changed files with 143 additions and 26 deletions

View File

@ -53,3 +53,6 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4149. Implement the disallowSnapshot(..) in FSNamesystem and add
resetSnapshottable(..) to SnapshotManager. (szetszwo)
HDFS-4147. When there is a snapshot in a subtree, deletion of the subtree
should fail. (Jing Zhao via szetszwo)

View File

@ -997,7 +997,7 @@ public class FSDirectory implements Closeable {
* @return true on successful deletion; else false
*/
boolean delete(String src, List<Block>collectedBlocks)
throws UnresolvedLinkException {
throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
@ -1006,7 +1006,22 @@ public class FSDirectory implements Closeable {
int filesRemoved;
writeLock();
try {
filesRemoved = unprotectedDelete(src, collectedBlocks, now);
INode[] inodes = rootDir.getExistingPathINodes(normalizePath(src), false);
if (checkPathINodes(inodes, src) == 0) {
filesRemoved = 0;
} else {
// Before removing the node, first check if the targetNode is for a
// snapshottable dir with snapshots, or its descendants have
// snapshottable dir with snapshots
INode targetNode = inodes[inodes.length-1];
INode snapshotNode = hasSnapshot(targetNode);
if (snapshotNode != null) {
throw new IOException("The direcotry " + targetNode.getFullPathName()
+ " cannot be deleted since " + snapshotNode.getFullPathName()
+ " is snapshottable and already has snapshots");
}
filesRemoved = unprotectedDelete(inodes, collectedBlocks, now);
}
} finally {
writeUnlock();
}
@ -1020,6 +1035,23 @@ public class FSDirectory implements Closeable {
return true;
}
private int checkPathINodes(INode[] inodes, String src) {
if (inodes == null || inodes.length == 0
|| inodes[inodes.length - 1] == null) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+ "failed to remove " + src + " because it does not exist");
}
return 0;
} else if (inodes.length == 1) { // src is the root
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
+ "failed to remove " + src
+ " because the root is not allowed to be deleted");
return 0;
}
return inodes.length;
}
/**
* @return true if the path is a non-empty directory; otherwise, return false.
*/
@ -1050,7 +1082,14 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException {
assert hasWriteLock();
List<Block> collectedBlocks = new ArrayList<Block>();
int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
int filesRemoved = 0;
INode[] inodes = rootDir.getExistingPathINodes(normalizePath(src), false);
if (checkPathINodes(inodes, src) == 0) {
filesRemoved = 0;
} else {
filesRemoved = unprotectedDelete(inodes, collectedBlocks, mtime);
}
if (filesRemoved > 0) {
getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
}
@ -1059,32 +1098,16 @@ public class FSDirectory implements Closeable {
/**
* Delete a path from the name space
* Update the count at each ancestor directory with quota
* @param src a string representation of a path to an inode
* @param inodes the INode array resolved from the path
* @param collectedBlocks blocks collected from the deleted path
* @param mtime the time the inode is removed
* @return the number of inodes deleted; 0 if no inodes are deleted.
*/
int unprotectedDelete(String src, List<Block> collectedBlocks,
long mtime) throws UnresolvedLinkException {
int unprotectedDelete(INode[] inodes, List<Block> collectedBlocks,
long mtime) {
assert hasWriteLock();
src = normalizePath(src);
INode[] inodes = rootDir.getExistingPathINodes(src, false);
INode targetNode = inodes[inodes.length-1];
if (targetNode == null) { // non-existent src
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+"failed to remove "+src+" because it does not exist");
}
return 0;
}
if (inodes.length == 1) { // src is the root
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
"failed to remove " + src +
" because the root is not allowed to be deleted");
return 0;
}
int pos = inodes.length - 1;
// Remove the node from the namespace
targetNode = removeChild(inodes, pos);
@ -1096,10 +1119,34 @@ public class FSDirectory implements Closeable {
int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+src+" is removed");
+ targetNode.getFullPathName() + " is removed");
}
return filesRemoved;
}
/**
* Check if the given INode (or one of its descendants) is snapshottable and
* already has snapshots.
*
* @param target The given INode
* @return The INode which is snapshottable and already has snapshots.
*/
private static INode hasSnapshot(INode target) {
if (target instanceof INodeDirectory) {
INodeDirectory targetDir = (INodeDirectory) target;
if (targetDir.isSnapshottable()
&& ((INodeDirectorySnapshottable) targetDir).getNumSnapshots() > 0) {
return target;
}
for (INode child : targetDir.getChildren()) {
INode snapshotDir = hasSnapshot(child);
if (snapshotDir != null) {
return snapshotDir;
}
}
}
return null;
}
/**
* Replaces the specified INode.

View File

@ -75,7 +75,7 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithQuota {
setSnapshotQuota(snapshotQuota);
}
int getNumSnapshots() {
public int getNumSnapshots() {
return snapshots.size();
}

View File

@ -33,9 +33,12 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/**
* This class tests snapshot functionality. One or multiple snapshots are
@ -50,7 +53,8 @@ public class TestSnapshot {
private final Path dir = new Path("/TestSnapshot");
private final Path sub1 = new Path(dir, "sub1");
private final Path subsub1 = new Path(sub1, "subsub1");
protected Configuration conf;
protected MiniDFSCluster cluster;
protected FSNamesystem fsn;
@ -61,6 +65,9 @@ public class TestSnapshot {
* records a snapshot root.
*/
protected static ArrayList<Path> snapshotList = new ArrayList<Path>();
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setUp() throws Exception {
@ -186,6 +193,66 @@ public class TestSnapshot {
testSnapshot(sub1, mList);
}
/**
* Creating snapshots for a directory that is not snapshottable must fail.
*
* TODO: Listing/Deleting snapshots for a directory that is not snapshottable
* should also fail.
*/
@Test
public void testSnapshottableDirectory() throws Exception {
Path file0 = new Path(sub1, "file0");
Path file1 = new Path(sub1, "file1");
DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
exception.expect(RemoteException.class);
hdfs.createSnapshot("s1", sub1.toString());
}
/**
* Deleting snapshottable directory with snapshots must fail.
*/
@Test
public void testDeleteDirectoryWithSnapshot() throws Exception {
Path file0 = new Path(sub1, "file0");
Path file1 = new Path(sub1, "file1");
DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
// Allow snapshot for sub1, and create snapshot for it
hdfs.allowSnapshot(sub1.toString());
hdfs.createSnapshot("s1", sub1.toString());
// Deleting a snapshottable dir with snapshots should fail
exception.expect(RemoteException.class);
hdfs.delete(sub1, true);
}
/**
* Deleting directory with snapshottable descendant with snapshots must fail.
*/
@Test
public void testDeleteDirectoryWithSnapshot2() throws Exception {
Path file0 = new Path(sub1, "file0");
Path file1 = new Path(sub1, "file1");
DFSTestUtil.createFile(hdfs, file0, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, seed);
Path subfile1 = new Path(subsub1, "file0");
Path subfile2 = new Path(subsub1, "file1");
DFSTestUtil.createFile(hdfs, subfile1, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.createFile(hdfs, subfile2, BLOCKSIZE, REPLICATION, seed);
// Allow snapshot for subsub1, and create snapshot for it
hdfs.allowSnapshot(subsub1.toString());
hdfs.createSnapshot("s1", subsub1.toString());
// Deleting dir while its descedant subsub1 having snapshots should fail
exception.expect(RemoteException.class);
hdfs.delete(dir, true);
}
/**
* Base class to present changes applied to current file/dir. A modification
* can be file creation, deletion, or other modifications such as appending on