diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java index cfef1c38279..07c16b22358 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java @@ -42,4 +42,6 @@ public interface FsConstants { */ public static final URI VIEWFS_URI = URI.create("viewfs:///"); public static final String VIEWFS_SCHEME = "viewfs"; + String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN = + "fs.viewfs.overload.scheme.target.%s.impl"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index d7fcb349d86..f0527432983 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.fs.viewfs; -import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555; import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE; import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT; +import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555; import java.io.FileNotFoundException; import java.io.IOException; @@ -34,9 +34,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; -import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -94,16 +94,49 @@ public class ViewFileSystem extends FileSystem { return readOnlyMountTable(operation, p.toString()); } + /** + * File system instance getter. + */ + static class FsGetter { + + /** + * Gets new file system instance of given uri. + */ + public FileSystem getNewInstance(URI uri, Configuration conf) + throws IOException { + return FileSystem.newInstance(uri, conf); + } + + /** + * Gets file system instance of given uri. + */ + public FileSystem get(URI uri, Configuration conf) throws IOException { + return FileSystem.get(uri, conf); + } + } + + /** + * Gets file system creator instance. + */ + protected FsGetter fsGetter() { + return new FsGetter(); + } + /** * Caching children filesystems. HADOOP-15565. */ static class InnerCache { private Map map = new HashMap<>(); + private FsGetter fsCreator; + + InnerCache(FsGetter fsCreator) { + this.fsCreator = fsCreator; + } FileSystem get(URI uri, Configuration config) throws IOException { Key key = new Key(uri); if (map.get(key) == null) { - FileSystem fs = FileSystem.newInstance(uri, config); + FileSystem fs = fsCreator.getNewInstance(uri, config); map.put(key, fs); return fs; } else { @@ -191,7 +224,7 @@ public class ViewFileSystem extends FileSystem { final long creationTime; // of the the mount table final UserGroupInformation ugi; // the user/group of user who created mtable - URI myUri; + private URI myUri; private Path workingDir; Configuration config; InodeTree fsState; // the fs state; ie the mount table @@ -253,13 +286,13 @@ public class ViewFileSystem extends FileSystem { config = conf; enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT); - final InnerCache innerCache = new InnerCache(); + FsGetter fsGetter = fsGetter(); + final InnerCache innerCache = new InnerCache(fsGetter); // Now build client side view (i.e. client side mount table) from config. final String authority = theUri.getAuthority(); try { - myUri = new URI(FsConstants.VIEWFS_SCHEME, authority, "/", null, null); + myUri = new URI(getScheme(), authority, "/", null, null); fsState = new InodeTree(conf, authority) { - @Override protected FileSystem getTargetFileSystem(final URI uri) throws URISyntaxException, IOException { @@ -267,7 +300,7 @@ public class ViewFileSystem extends FileSystem { if (enableInnerCache) { fs = innerCache.get(uri, config); } else { - fs = FileSystem.get(uri, config); + fs = fsGetter.get(uri, config); } return new ChRootedFileSystem(fs, uri); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java new file mode 100644 index 00000000000..2dda540b703 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.UnsupportedFileSystemException; + +/****************************************************************************** + * This class is extended from the ViewFileSystem for the overloaded scheme + * file system. Mount link configurations and in-memory mount table + * building behaviors are inherited from ViewFileSystem. Unlike ViewFileSystem + * scheme (viewfs://), the users would be able to use any scheme. + * + * To use this class, the following configurations need to be added in + * core-site.xml file. + * 1) fs..impl + * = org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme + * 2) fs.viewfs.overload.scheme.target..impl + * = " + * + * Here can be any scheme, but with that scheme there should be a + * hadoop compatible file system available. Second configuration value should + * be the respective scheme's file system implementation class. + * Example: if scheme is configured with "hdfs", then the 2nd configuration + * class name will be org.apache.hadoop.hdfs.DistributedFileSystem. + * if scheme is configured with "s3a", then the 2nd configuration class name + * will be org.apache.hadoop.fs.s3a.S3AFileSystem. + * + * Use Case 1: + * =========== + * If users want some of their existing cluster (hdfs://Cluster) + * data to mount with other hdfs and object store clusters(hdfs://NN1, + * o3fs://bucket1.volume1/, s3a://bucket1/) + * + * fs.viewfs.mounttable.Cluster./user = hdfs://NN1/user + * fs.viewfs.mounttable.Cluster./data = o3fs://bucket1.volume1/data + * fs.viewfs.mounttable.Cluster./backup = s3a://bucket1/backup/ + * + * Op1: Create file hdfs://Cluster/user/fileA will go to hdfs://NN1/user/fileA + * Op2: Create file hdfs://Cluster/data/datafile will go to + * o3fs://bucket1.volume1/data/datafile + * Op3: Create file hdfs://Cluster/backup/data.zip will go to + * s3a://bucket1/backup/data.zip + * + * Use Case 2: + * =========== + * If users want some of their existing cluster (s3a://bucketA/) + * data to mount with other hdfs and object store clusters + * (hdfs://NN1, o3fs://bucket1.volume1/) + * + * fs.viewfs.mounttable.bucketA./user = hdfs://NN1/user + * fs.viewfs.mounttable.bucketA./data = o3fs://bucket1.volume1/data + * fs.viewfs.mounttable.bucketA./salesDB = s3a://bucketA/salesDB/ + * + * Op1: Create file s3a://bucketA/user/fileA will go to hdfs://NN1/user/fileA + * Op2: Create file s3a://bucketA/data/datafile will go to + * o3fs://bucket1.volume1/data/datafile + * Op3: Create file s3a://bucketA/salesDB/dbfile will go to + * s3a://bucketA/salesDB/dbfile + *****************************************************************************/ +@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase", "Hive" }) +@InterfaceStability.Evolving +public class ViewFileSystemOverloadScheme extends ViewFileSystem { + private URI myUri; + public ViewFileSystemOverloadScheme() throws IOException { + super(); + } + + @Override + public String getScheme() { + return myUri.getScheme(); + } + + @Override + public void initialize(URI theUri, Configuration conf) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: " + + theUri); + } + this.myUri = theUri; + super.initialize(theUri, conf); + } + + /** + * This method is overridden because in ViewFileSystemOverloadScheme if + * overloaded cheme matches with mounted target fs scheme, file system should + * be created without going into fs..impl based resolution. Otherwise + * it will end up in an infinite loop as the target will be resolved again + * to ViewFileSystemOverloadScheme as fs..impl points to + * ViewFileSystemOverloadScheme. So, below method will initialize the + * fs.viewfs.overload.scheme.target..impl. Other schemes can + * follow fs.newInstance + */ + @Override + protected FsGetter fsGetter() { + + return new FsGetter() { + @Override + public FileSystem getNewInstance(URI uri, Configuration conf) + throws IOException { + if (uri.getScheme().equals(getScheme())) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "The file system initialized uri scheme is matching with the " + + "given target uri scheme. The target uri is: " + uri); + } + /* + * Avoid looping when target fs scheme is matching to overloaded + * scheme. + */ + return createFileSystem(uri, conf); + } else { + return FileSystem.newInstance(uri, conf); + } + } + + /** + * When ViewFileSystemOverloadScheme scheme and target uri scheme are + * matching, it will not take advantage of FileSystem cache as it will + * create instance directly. For caching needs please set + * "fs.viewfs.enable.inner.cache" to true. + */ + @Override + public FileSystem get(URI uri, Configuration conf) throws IOException { + if (uri.getScheme().equals(getScheme())) { + // Avoid looping when target fs scheme is matching to overloaded + // scheme. + if (LOG.isDebugEnabled()) { + LOG.debug( + "The file system initialized uri scheme is matching with the " + + "given target uri scheme. So, the target file system " + + "instances will not be cached. To cache fs instances, " + + "please set fs.viewfs.enable.inner.cache to true. " + + "The target uri is: " + uri); + } + return createFileSystem(uri, conf); + } else { + return FileSystem.get(uri, conf); + } + } + + private FileSystem createFileSystem(URI uri, Configuration conf) + throws IOException { + final String fsImplConf = String.format( + FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, + uri.getScheme()); + Class clazz = conf.getClass(fsImplConf, null); + if (clazz == null) { + throw new UnsupportedFileSystemException( + String.format("%s=null: %s: %s", fsImplConf, + "No overload scheme fs configured", uri.getScheme())); + } + FileSystem fs = (FileSystem) newInstance(clazz, uri, conf); + fs.initialize(uri, conf); + return fs; + } + + private T newInstance(Class theClass, URI uri, + Configuration conf) { + T result; + try { + Constructor meth = theClass.getConstructor(); + meth.setAccessible(true); + result = meth.newInstance(); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new RuntimeException(cause); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } + }; + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java index a4ccee3f7f5..8065b3f61f5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java @@ -862,7 +862,8 @@ public abstract class FileSystemContractBaseTest { found); } - private void assertListStatusFinds(Path dir, Path subdir) throws IOException { + protected void assertListStatusFinds(Path dir, Path subdir) + throws IOException { FileStatus[] stats = fs.listStatus(dir); boolean found = false; StringBuilder builder = new StringBuilder(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java new file mode 100644 index 00000000000..7e38903f1a7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * + * Test the TestViewFileSystemOverloadSchemeLF using a file with authority: + * file://mountTableName/ i.e, the authority is used to load a mount table. + */ +public class TestViewFileSystemOverloadSchemeLocalFileSystem { + private static final String FILE = "file"; + private static final Log LOG = + LogFactory.getLog(TestViewFileSystemOverloadSchemeLocalFileSystem.class); + private FileSystem fsTarget; + private Configuration conf; + private Path targetTestRoot; + private FileSystemTestHelper fileSystemTestHelper = + new FileSystemTestHelper(); + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.set(String.format("fs.%s.impl", FILE), + ViewFileSystemOverloadScheme.class.getName()); + conf.set(String.format( + FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, FILE), + LocalFileSystem.class.getName()); + fsTarget = new LocalFileSystem(); + fsTarget.initialize(new URI("file:///"), conf); + // create the test root on local_fs + targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget); + fsTarget.delete(targetTestRoot, true); + fsTarget.mkdirs(targetTestRoot); + } + + /** + * Tests write file and read file with ViewFileSystemOverloadScheme. + */ + @Test + public void testLocalTargetLinkWriteSimple() throws IOException { + LOG.info("Starting testLocalTargetLinkWriteSimple"); + final String testString = "Hello Local!..."; + final Path lfsRoot = new Path("/lfsRoot"); + ConfigUtil.addLink(conf, lfsRoot.toString(), + URI.create(targetTestRoot + "/local")); + try (FileSystem lViewFs = FileSystem.get(URI.create("file:///"), conf)) { + final Path testPath = new Path(lfsRoot, "test.txt"); + try (FSDataOutputStream fsDos = lViewFs.create(testPath)) { + fsDos.writeUTF(testString); + } + + try (FSDataInputStream lViewIs = lViewFs.open(testPath)) { + Assert.assertEquals(testString, lViewIs.readUTF()); + } + } + } + + /** + * Tests create file and delete file with ViewFileSystemOverloadScheme. + */ + @Test + public void testLocalFsCreateAndDelete() throws Exception { + LOG.info("Starting testLocalFsCreateAndDelete"); + ConfigUtil.addLink(conf, "mt", "/lfsroot", + URI.create(targetTestRoot + "/wd2")); + final URI mountURI = URI.create("file://mt/"); + try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) { + Path testPath = new Path(mountURI.toString() + "/lfsroot/test"); + lViewFS.createNewFile(testPath); + Assert.assertTrue(lViewFS.exists(testPath)); + lViewFS.delete(testPath, true); + Assert.assertFalse(lViewFS.exists(testPath)); + } + } + + /** + * Tests root level file with linkMergeSlash with + * ViewFileSystemOverloadScheme. + */ + @Test + public void testLocalFsLinkSlashMerge() throws Exception { + LOG.info("Starting testLocalFsLinkSlashMerge"); + ConfigUtil.addLinkMergeSlash(conf, "mt", + URI.create(targetTestRoot + "/wd2")); + final URI mountURI = URI.create("file://mt/"); + try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) { + Path fileOnRoot = new Path(mountURI.toString() + "/NewFile"); + lViewFS.createNewFile(fileOnRoot); + Assert.assertTrue(lViewFS.exists(fileOnRoot)); + } + } + + /** + * Tests with linkMergeSlash and other mounts in + * ViewFileSystemOverloadScheme. + */ + @Test(expected = IOException.class) + public void testLocalFsLinkSlashMergeWithOtherMountLinks() throws Exception { + LOG.info("Starting testLocalFsLinkSlashMergeWithOtherMountLinks"); + ConfigUtil.addLink(conf, "mt", "/lfsroot", + URI.create(targetTestRoot + "/wd2")); + ConfigUtil.addLinkMergeSlash(conf, "mt", + URI.create(targetTestRoot + "/wd2")); + final URI mountURI = URI.create("file://mt/"); + FileSystem.get(mountURI, conf); + Assert.fail("A merge slash cannot be configured with other mount links."); + } + + @After + public void tearDown() throws Exception { + if (null != fsTarget) { + fsTarget.delete(fileSystemTestHelper.getTestRootPath(fsTarget), true); + fsTarget.close(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java index 510faf79318..dcbd7f51bdc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter; import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; @@ -1274,7 +1275,8 @@ abstract public class ViewFileSystemBaseTest { @Test public void testViewFileSystemInnerCache() throws Exception { - ViewFileSystem.InnerCache cache = new ViewFileSystem.InnerCache(); + ViewFileSystem.InnerCache cache = + new ViewFileSystem.InnerCache(new FsGetter()); FileSystem fs = cache.get(fsTarget.getUri(), conf); // InnerCache caches filesystem. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java new file mode 100644 index 00000000000..03c29c927e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.TestHDFSFileSystemContract; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests ViewFileSystemOverloadScheme with file system contract tests. + */ +public class TestViewFileSystemOverloadSchemeHdfsFileSystemContract + extends TestHDFSFileSystemContract { + + private static MiniDFSCluster cluster; + private static String defaultWorkingDirectory; + private static Configuration conf = new HdfsConfiguration(); + + @BeforeClass + public static void init() throws IOException { + final File basedir = GenericTestUtils.getRandomizedTestDir(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + FileSystemContractBaseTest.TEST_UMASK); + cluster = new MiniDFSCluster.Builder(conf, basedir) + .numDataNodes(2) + .build(); + defaultWorkingDirectory = + "/user/" + UserGroupInformation.getCurrentUser().getShortUserName(); + } + + @Before + public void setUp() throws Exception { + conf.set(String.format("fs.%s.impl", "hdfs"), + ViewFileSystemOverloadScheme.class.getName()); + conf.set(String.format( + FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, + "hdfs"), + DistributedFileSystem.class.getName()); + URI defaultFSURI = + URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/user", + defaultFSURI); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/append", + defaultFSURI); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), + "/FileSystemContractBaseTest/", + new URI(defaultFSURI.toString() + "/FileSystemContractBaseTest/")); + fs = FileSystem.get(conf); + } + + @AfterClass + public static void tearDownAfter() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Override + protected String getDefaultWorkingDirectory() { + return defaultWorkingDirectory; + } + + @Override + @Test + public void testAppend() throws IOException { + AppendTestUtil.testAppend(fs, new Path("/append/f")); + } + + @Override + @Test(expected = AccessControlException.class) + public void testRenameRootDirForbidden() throws Exception { + super.testRenameRootDirForbidden(); + } + + @Override + @Test + public void testListStatusRootDir() throws Throwable { + assumeTrue(rootDirTestEnabled()); + Path dir = path("/"); + Path child = path("/FileSystemContractBaseTest"); + assertListStatusFinds(dir, child); + } + + @Override + @Ignore // This test same as above in this case. + public void testLSRootDir() throws Throwable { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java new file mode 100644 index 00000000000..bf0a8fe95c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java @@ -0,0 +1,410 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.test.PathUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests ViewFileSystemOverloadScheme with configured mount links. + */ +public class TestViewFileSystemOverloadSchemeWithHdfsScheme { + private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl"; + private static final String HDFS_SCHEME = "hdfs"; + private Configuration conf = null; + private MiniDFSCluster cluster = null; + private URI defaultFSURI; + private File localTargetDir; + private static final String TEST_ROOT_DIR = PathUtils + .getTestDirName(TestViewFileSystemOverloadSchemeWithHdfsScheme.class); + private static final String HDFS_USER_FOLDER = "/HDFSUser"; + private static final String LOCAL_FOLDER = "/local"; + + @Before + public void startCluster() throws IOException { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, + true); + conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), + ViewFileSystemOverloadScheme.class.getName()); + conf.set(String.format( + FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, + HDFS_SCHEME), DistributedFileSystem.class.getName()); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitClusterUp(); + defaultFSURI = + URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + localTargetDir = new File(TEST_ROOT_DIR, "/root/"); + Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme. + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + FileSystem.closeAll(); + cluster.shutdown(); + } + } + + private void createLinks(boolean needFalbackLink, Path hdfsTargetPath, + Path localTragetPath) { + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER, + hdfsTargetPath.toUri()); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER, + localTragetPath.toUri()); + if (needFalbackLink) { + ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(), + hdfsTargetPath.toUri()); + } + } + + /** + * Create mount links as follows. + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * + * create file /HDFSUser/testfile should create in hdfs + * create file /local/test should create directory in local fs + */ + @Test(timeout = 30000) + public void testMountLinkWithLocalAndHDFS() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + + createLinks(false, hdfsTargetPath, localTragetPath); + + // /HDFSUser/testfile + Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile"); + // /local/test + Path localDir = new Path(LOCAL_FOLDER + "/test"); + + try (ViewFileSystemOverloadScheme fs + = (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + Assert.assertEquals(2, fs.getMountPoints().length); + fs.createNewFile(hdfsFile); // /HDFSUser/testfile + fs.mkdirs(localDir); // /local/test + } + + // Initialize HDFS and test files exist in ls or not + try (DistributedFileSystem dfs = new DistributedFileSystem()) { + dfs.initialize(defaultFSURI, conf); + Assert.assertTrue(dfs.exists( + new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath), + hdfsFile.getName()))); // should be in hdfs. + Assert.assertFalse(dfs.exists( + new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath), + localDir.getName()))); // should not be in local fs. + } + + try (RawLocalFileSystem lfs = new RawLocalFileSystem()) { + lfs.initialize(localTragetPath.toUri(), conf); + Assert.assertFalse(lfs.exists( + new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath), + hdfsFile.getName()))); // should not be in hdfs. + Assert.assertTrue(lfs.exists( + new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath), + localDir.getName()))); // should be in local fs. + } + } + + /** + * Create mount links as follows. + * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/ + * It should fail to add non existent fs link. + */ + @Test(expected = IOException.class, timeout = 30000) + public void testMountLinkWithNonExistentLink() throws Exception { + final String userFolder = "/User"; + final Path nonExistTargetPath = + new Path("nonexistent://NonExistent" + userFolder); + + /** + * Below addLink will create following mount points + * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/ + */ + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), userFolder, + nonExistTargetPath.toUri()); + FileSystem.get(conf); + Assert.fail("Expected to fail with non existent link"); + } + + /** + * Create mount links as follows. + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * ListStatus on / should list the mount links. + */ + @Test(timeout = 30000) + public void testListStatusOnRootShouldListAllMountLinks() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + + createLinks(false, hdfsTargetPath, localTragetPath); + + try (FileSystem fs = FileSystem.get(conf)) { + FileStatus[] ls = fs.listStatus(new Path("/")); + Assert.assertEquals(2, ls.length); + String lsPath1 = + Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).toString(); + String lsPath2 = + Path.getPathWithoutSchemeAndAuthority(ls[1].getPath()).toString(); + Assert.assertTrue( + HDFS_USER_FOLDER.equals(lsPath1) || LOCAL_FOLDER.equals(lsPath1)); + Assert.assertTrue( + HDFS_USER_FOLDER.equals(lsPath2) || LOCAL_FOLDER.equals(lsPath2)); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * ListStatus non mount directory should fail. + */ + @Test(expected = IOException.class, timeout = 30000) + public void testListStatusOnNonMountedPath() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + + createLinks(false, hdfsTargetPath, localTragetPath); + + try (FileSystem fs = FileSystem.get(conf)) { + fs.listStatus(new Path("/nonMount")); + Assert.fail("It should fail as no mount link with /nonMount"); + } + } + + /** + * Create mount links as follows hdfs://localhost:xxx/HDFSUser --> + * hdfs://localhost:xxx/HDFSUser/ hdfs://localhost:xxx/local --> + * file://TEST_ROOT_DIR/root/ fallback --> hdfs://localhost:xxx/HDFSUser/ + * Creating file or directory at non root level should succeed with fallback + * links. + */ + @Test(timeout = 30000) + public void testWithLinkFallBack() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + + createLinks(true, hdfsTargetPath, localTragetPath); + + try (FileSystem fs = FileSystem.get(conf)) { + fs.createNewFile(new Path("/nonMount/myfile")); + FileStatus[] ls = fs.listStatus(new Path("/nonMount")); + Assert.assertEquals(1, ls.length); + Assert.assertEquals( + Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).getName(), + "myfile"); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * + * It cannot find any mount link. ViewFS expects a mount point from root. + */ + @Test(expected = NotInMountpointException.class, timeout = 30000) + public void testCreateOnRootShouldFailWhenMountLinkConfigured() + throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + + createLinks(false, hdfsTargetPath, localTragetPath); + + try (FileSystem fs = FileSystem.get(conf)) { + fs.createNewFile(new Path("/newFileOnRoot")); + Assert.fail("It should fail as root is read only in viewFS."); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * fallback --> hdfs://localhost:xxx/HDFSUser/ + * + * It will find fallback link, but root is not accessible and read only. + */ + @Test(expected = AccessControlException.class, timeout = 30000) + public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured() + throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + createLinks(true, hdfsTargetPath, localTragetPath); + try (FileSystem fs = FileSystem.get(conf)) { + fs.createNewFile(new Path("/onRootWhenFallBack")); + Assert.fail( + "It should fail as root is read only in viewFS, even when configured" + + " with fallback."); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * fallback --> hdfs://localhost:xxx/HDFSUser/ + * + * Note: Above links created because to make fs initialization success. + * Otherwise will not proceed if no mount links. + * + * Don't set fs.viewfs.overload.scheme.target.hdfs.impl property. + * So, OverloadScheme target fs initialization will fail. + */ + @Test(expected = UnsupportedFileSystemException.class, timeout = 30000) + public void testInvalidOverloadSchemeTargetFS() throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + conf = new Configuration(); + createLinks(true, hdfsTargetPath, localTragetPath); + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, + defaultFSURI.toString()); + conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), + ViewFileSystemOverloadScheme.class.getName()); + + try (FileSystem fs = FileSystem.get(conf)) { + fs.createNewFile(new Path("/onRootWhenFallBack")); + Assert.fail("OverloadScheme target fs should be valid."); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/ + * + * It should be able to create file using ViewFileSystemOverloadScheme. + */ + @Test(timeout = 30000) + public void testViewFsOverloadSchemeWhenInnerCacheDisabled() + throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path localTragetPath = new Path(localTargetDir.toURI()); + createLinks(false, hdfsTargetPath, localTragetPath); + conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); + try (FileSystem fs = FileSystem.get(conf)) { + Path testFile = new Path(HDFS_USER_FOLDER + "/testFile"); + fs.createNewFile(testFile); + Assert.assertTrue(fs.exists(testFile)); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/ + * + * 1. With cache, only one hdfs child file system instance should be there. + * 2. Without cache, there should 2 hdfs instances. + */ + @Test(timeout = 30000) + public void testViewFsOverloadSchemeWithInnerCache() + throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0, + hdfsTargetPath.toUri()); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1, + hdfsTargetPath.toUri()); + + // 1. Only 1 hdfs child file system should be there with cache. + try (ViewFileSystemOverloadScheme vfs = + (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + Assert.assertEquals(1, vfs.getChildFileSystems().length); + } + + // 2. Two hdfs file systems should be there if no cache. + conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); + try (ViewFileSystemOverloadScheme vfs = + (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + Assert.assertEquals(2, vfs.getChildFileSystems().length); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/ + * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/ + * + * When InnerCache disabled, all matching ViewFileSystemOverloadScheme + * initialized scheme file systems would not use FileSystem cache. + */ + @Test(timeout = 3000) + public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets() + throws Exception { + final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0, + hdfsTargetPath.toUri()); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1, + hdfsTargetPath.toUri()); + + conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); + // Two hdfs file systems should be there if no cache. + try (ViewFileSystemOverloadScheme vfs = + (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + Assert.assertEquals(2, vfs.getChildFileSystems().length); + } + } + + /** + * Create mount links as follows + * hdfs://localhost:xxx/local0 --> file://localPath/ + * hdfs://localhost:xxx/local1 --> file://localPath/ + * + * When InnerCache disabled, all non matching ViewFileSystemOverloadScheme + * initialized scheme file systems should continue to take advantage of + * FileSystem cache. + */ + @Test(timeout = 3000) + public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets() + throws Exception { + final Path localTragetPath = new Path(localTargetDir.toURI()); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 0, + localTragetPath.toUri()); + ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 1, + localTragetPath.toUri()); + + // Only one local file system should be there if no InnerCache, but fs + // cache should work. + conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); + try (ViewFileSystemOverloadScheme vfs = + (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + Assert.assertEquals(1, vfs.getChildFileSystems().length); + } + } +} \ No newline at end of file