HDFS-16222. Fix ViewDFS with mount points for HDFS only API. (#3422). Contributed by Ayush Saxena.

Signed-off-by: Vinayakumar B <vinayakumarb@apache.org>
This commit is contained in:
Ayush Saxena 2021-10-03 10:02:35 +05:30 committed by GitHub
parent bf9106c812
commit 5f0452602f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 6 deletions

View File

@ -347,12 +347,15 @@ public class ViewFileSystemOverloadScheme extends ViewFileSystem {
res = fsState.resolve(getUriPath(path), true);
FileSystem fs = res.isInternalDir() ?
(fsState.getRootFallbackLink() != null ?
((ChRootedFileSystem) fsState
.getRootFallbackLink().getTargetFileSystem()).getMyFs() :
fsState.getRootFallbackLink().getTargetFileSystem() :
fsGetter().get(path.toUri(), conf)) :
((ChRootedFileSystem) res.targetFileSystem).getMyFs();
return new MountPathInfo<FileSystem>(res.remainingPath, res.resolvedPath,
fs);
res.targetFileSystem;
if (fs instanceof ChRootedFileSystem) {
ChRootedFileSystem chFs = (ChRootedFileSystem) fs;
return new MountPathInfo<>(chFs.fullPath(res.remainingPath),
chFs.getMyFs());
}
return new MountPathInfo<FileSystem>(res.remainingPath, fs);
} catch (FileNotFoundException e) {
// No link configured with passed path.
throw new NotInMountpointException(path,
@ -368,7 +371,7 @@ public class ViewFileSystemOverloadScheme extends ViewFileSystem {
private Path pathOnTarget;
private T targetFs;
public MountPathInfo(Path pathOnTarget, String resolvedPath, T targetFs) {
public MountPathInfo(Path pathOnTarget, T targetFs) {
this.pathOnTarget = pathOnTarget;
this.targetFs = targetFs;
}

View File

@ -32,6 +32,10 @@ import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestViewDistributedFileSystem extends TestDistributedFileSystem{
@Override
HdfsConfiguration getTestConfiguration() {
@ -118,4 +122,73 @@ public class TestViewDistributedFileSystem extends TestDistributedFileSystem{
}
}
}
@Test
public void testRenameWithOptionsWithMountEntries() throws IOException {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
URI defaultUri =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
conf.set("fs.viewfs.mounttable." + defaultUri.getHost() + ".linkFallback",
defaultUri.toString());
Path target = new Path(defaultUri.toString(), "/src");
ConfigUtil.addLink(conf, defaultUri.getHost(), "/source",
target.toUri());
FileSystem defaultFs = FileSystem.get(defaultUri, conf);
defaultFs.mkdirs(target);
try (ViewDistributedFileSystem fileSystem = (ViewDistributedFileSystem) FileSystem
.get(conf)) {
final Path testDir = new Path("/source");
Path filePath = new Path(testDir, "file");
Path renamedFilePath = new Path(testDir, "fileRename");
// Create a file.
fileSystem.create(filePath).close();
// Check the file exists before rename is called.
assertTrue(fileSystem.exists(filePath));
fileSystem.rename(filePath, renamedFilePath, Options.Rename.NONE);
// Check the file is not present at source location post a rename.
assertFalse(fileSystem.exists(filePath));
// Check the file is there at target location post rename.
assertTrue(fileSystem.exists(renamedFilePath));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testQuota() throws IOException {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
URI defaultUri =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
conf.set("fs.viewfs.mounttable." + defaultUri.getHost() + ".linkFallback",
defaultUri.toString());
Path target = new Path(defaultUri.toString(), "/src");
// /source -> /src
ConfigUtil.addLink(conf, defaultUri.getHost(), "/source",
target.toUri());
FileSystem defaultFs = FileSystem.get(defaultUri, conf);
defaultFs.mkdirs(target);
try (ViewDistributedFileSystem fileSystem = (ViewDistributedFileSystem) FileSystem
.get(conf)) {
final Path testDir = new Path("/source");
// Set Quota via ViewDFS
fileSystem.setQuota(testDir, 10L, 10L);
// Check quota through actual DFS
assertEquals(10,
defaultFs.getQuotaUsage(target).getSpaceQuota());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}