HDFS-15525. Make trash root inside each snapshottable directory for WebHDFS (#2220)

This commit is contained in:
Siyao Meng 2020-08-18 03:28:19 -07:00 committed by GitHub
parent fefacf2578
commit b65e43fe38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 6 deletions

View File

@ -1345,19 +1345,55 @@ protected Response get(
}
}
/**
* Get the snapshot root of a given file or directory if it exists.
* e.g. if /snapdir1 is a snapshottable directory and path given is
* /snapdir1/path/to/file, this method would return /snapdir1
* @param pathStr String of path to a file or a directory.
* @return Not null if found in a snapshot root directory.
* @throws IOException
*/
String getSnapshotRoot(String pathStr) throws IOException {
SnapshottableDirectoryStatus[] dirStatusList =
getRpcClientProtocol().getSnapshottableDirListing();
if (dirStatusList == null) {
return null;
}
for (SnapshottableDirectoryStatus dirStatus : dirStatusList) {
String currDir = dirStatus.getFullPath().toString();
if (pathStr.startsWith(currDir)) {
return currDir;
}
}
return null;
}
private String getTrashRoot(Configuration conf, String fullPath)
throws IOException {
UserGroupInformation ugi= UserGroupInformation.getCurrentUser();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String parentSrc = getParent(fullPath);
String ssTrashRoot = "";
boolean isSnapshotTrashRootEnabled = getRpcClientProtocol()
.getServerDefaults().getSnapshotTrashRootEnabled();
if (isSnapshotTrashRootEnabled) {
String ssRoot = getSnapshotRoot(fullPath);
if (ssRoot != null) {
ssTrashRoot = DFSUtilClient.getSnapshotTrashRoot(ssRoot, ugi);
}
}
EncryptionZone ez = getRpcClientProtocol().getEZForPath(
parentSrc != null ? parentSrc : fullPath);
String trashRoot;
String ezTrashRoot = "";
if (ez != null) {
trashRoot = DFSUtilClient.getEZTrashRoot(ez, ugi);
} else {
trashRoot = DFSUtilClient.getTrashRoot(conf, ugi);
ezTrashRoot = DFSUtilClient.getEZTrashRoot(ez, ugi);
}
// Choose the longest path
if (ssTrashRoot.isEmpty() && ezTrashRoot.isEmpty()) {
return DFSUtilClient.getTrashRoot(conf, ugi);
} else {
return ssTrashRoot.length() > ezTrashRoot.length() ?
ssTrashRoot : ezTrashRoot;
}
return trashRoot;
}
/**

View File

@ -1569,6 +1569,34 @@ public void testGetTrashRoot() throws Exception {
assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
}
@Test
public void testGetSnapshotTrashRoot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
conf.setBoolean("dfs.namenode.snapshot.trashroot.enabled", true);
final String currentUser =
UserGroupInformation.getCurrentUser().getShortUserName();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path ssDir1 = new Path("/ssDir1");
assertTrue(webFS.mkdirs(ssDir1));
Path trashPath = webFS.getTrashRoot(ssDir1);
Path expectedPath = new Path(FileSystem.USER_HOME_PREFIX,
new Path(currentUser, FileSystem.TRASH_PREFIX));
assertEquals(expectedPath.toUri().getPath(), trashPath.toUri().getPath());
// Enable snapshot
webFS.allowSnapshot(ssDir1);
Path trashPathAfter = webFS.getTrashRoot(ssDir1);
Path expectedPathAfter = new Path(ssDir1,
new Path(FileSystem.TRASH_PREFIX, currentUser));
assertEquals(expectedPathAfter.toUri().getPath(),
trashPathAfter.toUri().getPath());
// Cleanup
webFS.disallowSnapshot(ssDir1);
webFS.delete(ssDir1, true);
}
@Test
public void testGetEZTrashRoot() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();