HDFS-15539. When disallowing snapshot on a dir, throw exception if its trash root is not empty (#2258)

This commit is contained in:
Siyao Meng 2020-09-14 13:31:34 -07:00 committed by GitHub
parent c78d18023d
commit 6d3312de47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 0 deletions

View File

@ -2068,6 +2068,12 @@ public class DistributedFileSystem extends FileSystem
new FileSystemLinkResolver<Void>() {
@Override
public Void doCall(final Path p) throws IOException {
String ssTrashRoot =
new Path(p, FileSystem.TRASH_PREFIX).toUri().getPath();
if (dfs.exists(ssTrashRoot)) {
throw new IOException("Found trash root under path " + p + ". "
+ "Please remove or move the trash root and then try again.");
}
dfs.disallowSnapshot(getPathName(p));
return null;
}

View File

@ -2269,6 +2269,7 @@ public class TestDistributedFileSystem {
assertEquals(trashRootsAfter2.size() + 1, trashRootsAfter3.size());
// Cleanup
dfs.delete(new Path(testDir, FileSystem.TRASH_PREFIX), true);
dfs.disallowSnapshot(testDir);
dfs.delete(testDir, true);
} finally {
@ -2323,6 +2324,7 @@ public class TestDistributedFileSystem {
assertEquals(trashRoots, trashRootsAfter);
// Cleanup
dfs.delete(new Path(testDir, FileSystem.TRASH_PREFIX), true);
dfs.disallowSnapshot(testDir);
dfs.delete(testDir, true);
} finally {
@ -2452,4 +2454,34 @@ public class TestDistributedFileSystem {
}
}
}
@Test
public void testDisallowSnapshotShouldThrowWhenTrashRootExists()
throws Exception {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
DistributedFileSystem dfs = cluster.getFileSystem();
Path testDir = new Path("/disallowss/test1/");
Path file0path = new Path(testDir, "file-0");
dfs.create(file0path);
dfs.allowSnapshot(testDir);
// Create trash root manually
Path testDirTrashRoot = new Path(testDir, FileSystem.TRASH_PREFIX);
dfs.mkdirs(testDirTrashRoot);
// Try disallowing snapshot, should throw
LambdaTestUtils.intercept(IOException.class,
() -> dfs.disallowSnapshot(testDir));
// Remove the trash root and try again, should pass this time
dfs.delete(testDirTrashRoot, true);
dfs.disallowSnapshot(testDir);
// Cleanup
dfs.delete(testDir, true);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}