HDFS-16192: ViewDistributedFileSystem#rename wrongly using src in the place of dst. (#3353)

Co-authored-by: Uma Maheswara Rao G <umagangumalla@cloudera.com>
This commit is contained in:
Uma Maheswara Rao G 2021-08-30 21:25:03 -07:00 committed by GitHub
parent 265a48e245
commit 164608b546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 7 deletions

View File

@ -1872,4 +1872,20 @@ public class FileUtil {
final Path path, final CharSequence charseq) throws IOException {
return write(fileContext, path, charseq, StandardCharsets.UTF_8);
}
@InterfaceAudience.LimitedPrivate({"ViewDistributedFileSystem"})
@InterfaceStability.Unstable
/**
* Used in ViewDistributedFileSystem rename API to get access to the protected
* API of FileSystem interface. Even though Rename with options API
* deprecated, we are still using as part of trash. If any filesystem provided
* implementation to this protected FileSystem API, we can't invoke it with
* out casting to the specific filesystem. This util method is proposed to get
* the access to FileSystem#rename with options.
*/
@SuppressWarnings("deprecation")
public static void rename(FileSystem srcFs, Path src, Path dst,
final Options.Rename... options) throws IOException {
srcFs.rename(src, dst, options);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -537,14 +538,11 @@ public class ViewDistributedFileSystem extends DistributedFileSystem {
return;
}
// TODO: revisit
ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountSrcPathInfo =
this.vfs.getMountPathInfo(src, getConf());
checkDFS(mountSrcPathInfo.getTargetFs(), "rename");
ViewFileSystemOverloadScheme.MountPathInfo<FileSystem> mountDstPathInfo =
this.vfs.getMountPathInfo(src, getConf());
checkDFS(mountDstPathInfo.getTargetFs(), "rename");
this.vfs.getMountPathInfo(dst, getConf());
//Check both in same cluster.
if (!mountSrcPathInfo.getTargetFs().getUri()
@ -553,9 +551,9 @@ public class ViewDistributedFileSystem extends DistributedFileSystem {
"Can't rename across file systems.");
}
((DistributedFileSystem) mountSrcPathInfo.getTargetFs())
.rename(mountSrcPathInfo.getPathOnTarget(),
mountDstPathInfo.getPathOnTarget(), options);
FileUtil.rename(mountSrcPathInfo.getTargetFs(),
mountSrcPathInfo.getPathOnTarget(), mountDstPathInfo.getPathOnTarget(),
options);
}
@Override

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.test.Whitebox;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@ -89,4 +91,31 @@ public class TestViewDistributedFileSystem extends TestDistributedFileSystem{
}
}
}
@Test
public void testRenameWithOptions() throws IOException {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
URI defaultUri =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
conf.set("fs.viewfs.mounttable." + defaultUri.getHost() + ".linkFallback",
defaultUri.toString());
conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 30000);
try (ViewDistributedFileSystem fileSystem =
(ViewDistributedFileSystem) FileSystem.get(conf)) {
final Path testDir = new Path("/test");
final Path renameDir = new Path("/testRename");
fileSystem.mkdirs(testDir);
fileSystem.rename(testDir, renameDir, Options.Rename.TO_TRASH);
Assert.assertTrue(fileSystem.exists(renameDir));
Assert.assertFalse(fileSystem.exists(testDir));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}