HDFS-15305. Extend ViewFS and provide ViewFileSystemOverloadScheme implementation with scheme configurable. Contributed by Uma Maheswara Rao G.
(cherry picked from commit 9c8236d04d
)
This commit is contained in:
parent
e5e824bd6f
commit
0d0aac1a2a
|
@ -42,4 +42,6 @@ public interface FsConstants {
|
||||||
*/
|
*/
|
||||||
public static final URI VIEWFS_URI = URI.create("viewfs:///");
|
public static final URI VIEWFS_URI = URI.create("viewfs:///");
|
||||||
public static final String VIEWFS_SCHEME = "viewfs";
|
public static final String VIEWFS_SCHEME = "viewfs";
|
||||||
|
String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN =
|
||||||
|
"fs.viewfs.overload.scheme.target.%s.impl";
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.fs.viewfs;
|
package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
|
|
||||||
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
|
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
|
||||||
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
|
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -34,9 +34,9 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Map.Entry;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -94,16 +94,49 @@ public class ViewFileSystem extends FileSystem {
|
||||||
return readOnlyMountTable(operation, p.toString());
|
return readOnlyMountTable(operation, p.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* File system instance getter.
|
||||||
|
*/
|
||||||
|
static class FsGetter {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets new file system instance of given uri.
|
||||||
|
*/
|
||||||
|
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
return FileSystem.newInstance(uri, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets file system instance of given uri.
|
||||||
|
*/
|
||||||
|
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||||
|
return FileSystem.get(uri, conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets file system creator instance.
|
||||||
|
*/
|
||||||
|
protected FsGetter fsGetter() {
|
||||||
|
return new FsGetter();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Caching children filesystems. HADOOP-15565.
|
* Caching children filesystems. HADOOP-15565.
|
||||||
*/
|
*/
|
||||||
static class InnerCache {
|
static class InnerCache {
|
||||||
private Map<Key, FileSystem> map = new HashMap<>();
|
private Map<Key, FileSystem> map = new HashMap<>();
|
||||||
|
private FsGetter fsCreator;
|
||||||
|
|
||||||
|
InnerCache(FsGetter fsCreator) {
|
||||||
|
this.fsCreator = fsCreator;
|
||||||
|
}
|
||||||
|
|
||||||
FileSystem get(URI uri, Configuration config) throws IOException {
|
FileSystem get(URI uri, Configuration config) throws IOException {
|
||||||
Key key = new Key(uri);
|
Key key = new Key(uri);
|
||||||
if (map.get(key) == null) {
|
if (map.get(key) == null) {
|
||||||
FileSystem fs = FileSystem.newInstance(uri, config);
|
FileSystem fs = fsCreator.getNewInstance(uri, config);
|
||||||
map.put(key, fs);
|
map.put(key, fs);
|
||||||
return fs;
|
return fs;
|
||||||
} else {
|
} else {
|
||||||
|
@ -191,7 +224,7 @@ public class ViewFileSystem extends FileSystem {
|
||||||
|
|
||||||
final long creationTime; // of the the mount table
|
final long creationTime; // of the the mount table
|
||||||
final UserGroupInformation ugi; // the user/group of user who created mtable
|
final UserGroupInformation ugi; // the user/group of user who created mtable
|
||||||
URI myUri;
|
private URI myUri;
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
Configuration config;
|
Configuration config;
|
||||||
InodeTree<FileSystem> fsState; // the fs state; ie the mount table
|
InodeTree<FileSystem> fsState; // the fs state; ie the mount table
|
||||||
|
@ -253,13 +286,13 @@ public class ViewFileSystem extends FileSystem {
|
||||||
config = conf;
|
config = conf;
|
||||||
enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE,
|
enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE,
|
||||||
CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT);
|
CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT);
|
||||||
final InnerCache innerCache = new InnerCache();
|
FsGetter fsGetter = fsGetter();
|
||||||
|
final InnerCache innerCache = new InnerCache(fsGetter);
|
||||||
// Now build client side view (i.e. client side mount table) from config.
|
// Now build client side view (i.e. client side mount table) from config.
|
||||||
final String authority = theUri.getAuthority();
|
final String authority = theUri.getAuthority();
|
||||||
try {
|
try {
|
||||||
myUri = new URI(FsConstants.VIEWFS_SCHEME, authority, "/", null, null);
|
myUri = new URI(getScheme(), authority, "/", null, null);
|
||||||
fsState = new InodeTree<FileSystem>(conf, authority) {
|
fsState = new InodeTree<FileSystem>(conf, authority) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected FileSystem getTargetFileSystem(final URI uri)
|
protected FileSystem getTargetFileSystem(final URI uri)
|
||||||
throws URISyntaxException, IOException {
|
throws URISyntaxException, IOException {
|
||||||
|
@ -267,7 +300,7 @@ public class ViewFileSystem extends FileSystem {
|
||||||
if (enableInnerCache) {
|
if (enableInnerCache) {
|
||||||
fs = innerCache.get(uri, config);
|
fs = innerCache.get(uri, config);
|
||||||
} else {
|
} else {
|
||||||
fs = FileSystem.get(uri, config);
|
fs = fsGetter.get(uri, config);
|
||||||
}
|
}
|
||||||
return new ChRootedFileSystem(fs, uri);
|
return new ChRootedFileSystem(fs, uri);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,204 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
|
||||||
|
/******************************************************************************
|
||||||
|
* This class is extended from the ViewFileSystem for the overloaded scheme
|
||||||
|
* file system. Mount link configurations and in-memory mount table
|
||||||
|
* building behaviors are inherited from ViewFileSystem. Unlike ViewFileSystem
|
||||||
|
* scheme (viewfs://), the users would be able to use any scheme.
|
||||||
|
*
|
||||||
|
* To use this class, the following configurations need to be added in
|
||||||
|
* core-site.xml file.
|
||||||
|
* 1) fs.<scheme>.impl
|
||||||
|
* = org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme
|
||||||
|
* 2) fs.viewfs.overload.scheme.target.<scheme>.impl
|
||||||
|
* = <hadoop compatible file system implementation class name for the
|
||||||
|
* <scheme>"
|
||||||
|
*
|
||||||
|
* Here <scheme> can be any scheme, but with that scheme there should be a
|
||||||
|
* hadoop compatible file system available. Second configuration value should
|
||||||
|
* be the respective scheme's file system implementation class.
|
||||||
|
* Example: if scheme is configured with "hdfs", then the 2nd configuration
|
||||||
|
* class name will be org.apache.hadoop.hdfs.DistributedFileSystem.
|
||||||
|
* if scheme is configured with "s3a", then the 2nd configuration class name
|
||||||
|
* will be org.apache.hadoop.fs.s3a.S3AFileSystem.
|
||||||
|
*
|
||||||
|
* Use Case 1:
|
||||||
|
* ===========
|
||||||
|
* If users want some of their existing cluster (hdfs://Cluster)
|
||||||
|
* data to mount with other hdfs and object store clusters(hdfs://NN1,
|
||||||
|
* o3fs://bucket1.volume1/, s3a://bucket1/)
|
||||||
|
*
|
||||||
|
* fs.viewfs.mounttable.Cluster./user = hdfs://NN1/user
|
||||||
|
* fs.viewfs.mounttable.Cluster./data = o3fs://bucket1.volume1/data
|
||||||
|
* fs.viewfs.mounttable.Cluster./backup = s3a://bucket1/backup/
|
||||||
|
*
|
||||||
|
* Op1: Create file hdfs://Cluster/user/fileA will go to hdfs://NN1/user/fileA
|
||||||
|
* Op2: Create file hdfs://Cluster/data/datafile will go to
|
||||||
|
* o3fs://bucket1.volume1/data/datafile
|
||||||
|
* Op3: Create file hdfs://Cluster/backup/data.zip will go to
|
||||||
|
* s3a://bucket1/backup/data.zip
|
||||||
|
*
|
||||||
|
* Use Case 2:
|
||||||
|
* ===========
|
||||||
|
* If users want some of their existing cluster (s3a://bucketA/)
|
||||||
|
* data to mount with other hdfs and object store clusters
|
||||||
|
* (hdfs://NN1, o3fs://bucket1.volume1/)
|
||||||
|
*
|
||||||
|
* fs.viewfs.mounttable.bucketA./user = hdfs://NN1/user
|
||||||
|
* fs.viewfs.mounttable.bucketA./data = o3fs://bucket1.volume1/data
|
||||||
|
* fs.viewfs.mounttable.bucketA./salesDB = s3a://bucketA/salesDB/
|
||||||
|
*
|
||||||
|
* Op1: Create file s3a://bucketA/user/fileA will go to hdfs://NN1/user/fileA
|
||||||
|
* Op2: Create file s3a://bucketA/data/datafile will go to
|
||||||
|
* o3fs://bucket1.volume1/data/datafile
|
||||||
|
* Op3: Create file s3a://bucketA/salesDB/dbfile will go to
|
||||||
|
* s3a://bucketA/salesDB/dbfile
|
||||||
|
*****************************************************************************/
|
||||||
|
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase", "Hive" })
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ViewFileSystemOverloadScheme extends ViewFileSystem {
|
||||||
|
private URI myUri;
|
||||||
|
public ViewFileSystemOverloadScheme() throws IOException {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getScheme() {
|
||||||
|
return myUri.getScheme();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(URI theUri, Configuration conf) throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: "
|
||||||
|
+ theUri);
|
||||||
|
}
|
||||||
|
this.myUri = theUri;
|
||||||
|
super.initialize(theUri, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is overridden because in ViewFileSystemOverloadScheme if
|
||||||
|
* overloaded cheme matches with mounted target fs scheme, file system should
|
||||||
|
* be created without going into fs.<scheme>.impl based resolution. Otherwise
|
||||||
|
* it will end up in an infinite loop as the target will be resolved again
|
||||||
|
* to ViewFileSystemOverloadScheme as fs.<scheme>.impl points to
|
||||||
|
* ViewFileSystemOverloadScheme. So, below method will initialize the
|
||||||
|
* fs.viewfs.overload.scheme.target.<scheme>.impl. Other schemes can
|
||||||
|
* follow fs.newInstance
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected FsGetter fsGetter() {
|
||||||
|
|
||||||
|
return new FsGetter() {
|
||||||
|
@Override
|
||||||
|
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
if (uri.getScheme().equals(getScheme())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"The file system initialized uri scheme is matching with the "
|
||||||
|
+ "given target uri scheme. The target uri is: " + uri);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Avoid looping when target fs scheme is matching to overloaded
|
||||||
|
* scheme.
|
||||||
|
*/
|
||||||
|
return createFileSystem(uri, conf);
|
||||||
|
} else {
|
||||||
|
return FileSystem.newInstance(uri, conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When ViewFileSystemOverloadScheme scheme and target uri scheme are
|
||||||
|
* matching, it will not take advantage of FileSystem cache as it will
|
||||||
|
* create instance directly. For caching needs please set
|
||||||
|
* "fs.viewfs.enable.inner.cache" to true.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||||
|
if (uri.getScheme().equals(getScheme())) {
|
||||||
|
// Avoid looping when target fs scheme is matching to overloaded
|
||||||
|
// scheme.
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"The file system initialized uri scheme is matching with the "
|
||||||
|
+ "given target uri scheme. So, the target file system "
|
||||||
|
+ "instances will not be cached. To cache fs instances, "
|
||||||
|
+ "please set fs.viewfs.enable.inner.cache to true. "
|
||||||
|
+ "The target uri is: " + uri);
|
||||||
|
}
|
||||||
|
return createFileSystem(uri, conf);
|
||||||
|
} else {
|
||||||
|
return FileSystem.get(uri, conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileSystem createFileSystem(URI uri, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
final String fsImplConf = String.format(
|
||||||
|
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
|
||||||
|
uri.getScheme());
|
||||||
|
Class<?> clazz = conf.getClass(fsImplConf, null);
|
||||||
|
if (clazz == null) {
|
||||||
|
throw new UnsupportedFileSystemException(
|
||||||
|
String.format("%s=null: %s: %s", fsImplConf,
|
||||||
|
"No overload scheme fs configured", uri.getScheme()));
|
||||||
|
}
|
||||||
|
FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
|
||||||
|
fs.initialize(uri, conf);
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> T newInstance(Class<T> theClass, URI uri,
|
||||||
|
Configuration conf) {
|
||||||
|
T result;
|
||||||
|
try {
|
||||||
|
Constructor<T> meth = theClass.getConstructor();
|
||||||
|
meth.setAccessible(true);
|
||||||
|
result = meth.newInstance();
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause instanceof RuntimeException) {
|
||||||
|
throw (RuntimeException) cause;
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(cause);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -862,7 +862,8 @@ public abstract class FileSystemContractBaseTest {
|
||||||
found);
|
found);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertListStatusFinds(Path dir, Path subdir) throws IOException {
|
protected void assertListStatusFinds(Path dir, Path subdir)
|
||||||
|
throws IOException {
|
||||||
FileStatus[] stats = fs.listStatus(dir);
|
FileStatus[] stats = fs.listStatus(dir);
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Test the TestViewFileSystemOverloadSchemeLF using a file with authority:
|
||||||
|
* file://mountTableName/ i.e, the authority is used to load a mount table.
|
||||||
|
*/
|
||||||
|
public class TestViewFileSystemOverloadSchemeLocalFileSystem {
|
||||||
|
private static final String FILE = "file";
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestViewFileSystemOverloadSchemeLocalFileSystem.class);
|
||||||
|
private FileSystem fsTarget;
|
||||||
|
private Configuration conf;
|
||||||
|
private Path targetTestRoot;
|
||||||
|
private FileSystemTestHelper fileSystemTestHelper =
|
||||||
|
new FileSystemTestHelper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.set(String.format("fs.%s.impl", FILE),
|
||||||
|
ViewFileSystemOverloadScheme.class.getName());
|
||||||
|
conf.set(String.format(
|
||||||
|
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, FILE),
|
||||||
|
LocalFileSystem.class.getName());
|
||||||
|
fsTarget = new LocalFileSystem();
|
||||||
|
fsTarget.initialize(new URI("file:///"), conf);
|
||||||
|
// create the test root on local_fs
|
||||||
|
targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
|
||||||
|
fsTarget.delete(targetTestRoot, true);
|
||||||
|
fsTarget.mkdirs(targetTestRoot);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests write file and read file with ViewFileSystemOverloadScheme.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocalTargetLinkWriteSimple() throws IOException {
|
||||||
|
LOG.info("Starting testLocalTargetLinkWriteSimple");
|
||||||
|
final String testString = "Hello Local!...";
|
||||||
|
final Path lfsRoot = new Path("/lfsRoot");
|
||||||
|
ConfigUtil.addLink(conf, lfsRoot.toString(),
|
||||||
|
URI.create(targetTestRoot + "/local"));
|
||||||
|
try (FileSystem lViewFs = FileSystem.get(URI.create("file:///"), conf)) {
|
||||||
|
final Path testPath = new Path(lfsRoot, "test.txt");
|
||||||
|
try (FSDataOutputStream fsDos = lViewFs.create(testPath)) {
|
||||||
|
fsDos.writeUTF(testString);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (FSDataInputStream lViewIs = lViewFs.open(testPath)) {
|
||||||
|
Assert.assertEquals(testString, lViewIs.readUTF());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests create file and delete file with ViewFileSystemOverloadScheme.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocalFsCreateAndDelete() throws Exception {
|
||||||
|
LOG.info("Starting testLocalFsCreateAndDelete");
|
||||||
|
ConfigUtil.addLink(conf, "mt", "/lfsroot",
|
||||||
|
URI.create(targetTestRoot + "/wd2"));
|
||||||
|
final URI mountURI = URI.create("file://mt/");
|
||||||
|
try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
|
||||||
|
Path testPath = new Path(mountURI.toString() + "/lfsroot/test");
|
||||||
|
lViewFS.createNewFile(testPath);
|
||||||
|
Assert.assertTrue(lViewFS.exists(testPath));
|
||||||
|
lViewFS.delete(testPath, true);
|
||||||
|
Assert.assertFalse(lViewFS.exists(testPath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests root level file with linkMergeSlash with
|
||||||
|
* ViewFileSystemOverloadScheme.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLocalFsLinkSlashMerge() throws Exception {
|
||||||
|
LOG.info("Starting testLocalFsLinkSlashMerge");
|
||||||
|
ConfigUtil.addLinkMergeSlash(conf, "mt",
|
||||||
|
URI.create(targetTestRoot + "/wd2"));
|
||||||
|
final URI mountURI = URI.create("file://mt/");
|
||||||
|
try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
|
||||||
|
Path fileOnRoot = new Path(mountURI.toString() + "/NewFile");
|
||||||
|
lViewFS.createNewFile(fileOnRoot);
|
||||||
|
Assert.assertTrue(lViewFS.exists(fileOnRoot));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests with linkMergeSlash and other mounts in
|
||||||
|
* ViewFileSystemOverloadScheme.
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testLocalFsLinkSlashMergeWithOtherMountLinks() throws Exception {
|
||||||
|
LOG.info("Starting testLocalFsLinkSlashMergeWithOtherMountLinks");
|
||||||
|
ConfigUtil.addLink(conf, "mt", "/lfsroot",
|
||||||
|
URI.create(targetTestRoot + "/wd2"));
|
||||||
|
ConfigUtil.addLinkMergeSlash(conf, "mt",
|
||||||
|
URI.create(targetTestRoot + "/wd2"));
|
||||||
|
final URI mountURI = URI.create("file://mt/");
|
||||||
|
FileSystem.get(mountURI, conf);
|
||||||
|
Assert.fail("A merge slash cannot be configured with other mount links.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (null != fsTarget) {
|
||||||
|
fsTarget.delete(fileSystemTestHelper.getTestRootPath(fsTarget), true);
|
||||||
|
fsTarget.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.AclUtil;
|
import org.apache.hadoop.fs.permission.AclUtil;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
|
||||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
|
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
@ -1274,7 +1275,8 @@ abstract public class ViewFileSystemBaseTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testViewFileSystemInnerCache() throws Exception {
|
public void testViewFileSystemInnerCache() throws Exception {
|
||||||
ViewFileSystem.InnerCache cache = new ViewFileSystem.InnerCache();
|
ViewFileSystem.InnerCache cache =
|
||||||
|
new ViewFileSystem.InnerCache(new FsGetter());
|
||||||
FileSystem fs = cache.get(fsTarget.getUri(), conf);
|
FileSystem fs = cache.get(fsTarget.getUri(), conf);
|
||||||
|
|
||||||
// InnerCache caches filesystem.
|
// InnerCache caches filesystem.
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||||
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.TestHDFSFileSystemContract;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests ViewFileSystemOverloadScheme with file system contract tests.
|
||||||
|
*/
|
||||||
|
public class TestViewFileSystemOverloadSchemeHdfsFileSystemContract
|
||||||
|
extends TestHDFSFileSystemContract {
|
||||||
|
|
||||||
|
private static MiniDFSCluster cluster;
|
||||||
|
private static String defaultWorkingDirectory;
|
||||||
|
private static Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void init() throws IOException {
|
||||||
|
final File basedir = GenericTestUtils.getRandomizedTestDir();
|
||||||
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
|
||||||
|
FileSystemContractBaseTest.TEST_UMASK);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf, basedir)
|
||||||
|
.numDataNodes(2)
|
||||||
|
.build();
|
||||||
|
defaultWorkingDirectory =
|
||||||
|
"/user/" + UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf.set(String.format("fs.%s.impl", "hdfs"),
|
||||||
|
ViewFileSystemOverloadScheme.class.getName());
|
||||||
|
conf.set(String.format(
|
||||||
|
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
|
||||||
|
"hdfs"),
|
||||||
|
DistributedFileSystem.class.getName());
|
||||||
|
URI defaultFSURI =
|
||||||
|
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/user",
|
||||||
|
defaultFSURI);
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/append",
|
||||||
|
defaultFSURI);
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(),
|
||||||
|
"/FileSystemContractBaseTest/",
|
||||||
|
new URI(defaultFSURI.toString() + "/FileSystemContractBaseTest/"));
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfter() throws Exception {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getDefaultWorkingDirectory() {
|
||||||
|
return defaultWorkingDirectory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testAppend() throws IOException {
|
||||||
|
AppendTestUtil.testAppend(fs, new Path("/append/f"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Test(expected = AccessControlException.class)
|
||||||
|
public void testRenameRootDirForbidden() throws Exception {
|
||||||
|
super.testRenameRootDirForbidden();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testListStatusRootDir() throws Throwable {
|
||||||
|
assumeTrue(rootDirTestEnabled());
|
||||||
|
Path dir = path("/");
|
||||||
|
Path child = path("/FileSystemContractBaseTest");
|
||||||
|
assertListStatusFinds(dir, child);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore // This test same as above in this case.
|
||||||
|
public void testLSRootDir() throws Throwable {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,410 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.viewfs;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests ViewFileSystemOverloadScheme with configured mount links.
|
||||||
|
*/
|
||||||
|
public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
|
||||||
|
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
|
||||||
|
private static final String HDFS_SCHEME = "hdfs";
|
||||||
|
private Configuration conf = null;
|
||||||
|
private MiniDFSCluster cluster = null;
|
||||||
|
private URI defaultFSURI;
|
||||||
|
private File localTargetDir;
|
||||||
|
private static final String TEST_ROOT_DIR = PathUtils
|
||||||
|
.getTestDirName(TestViewFileSystemOverloadSchemeWithHdfsScheme.class);
|
||||||
|
private static final String HDFS_USER_FOLDER = "/HDFSUser";
|
||||||
|
private static final String LOCAL_FOLDER = "/local";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startCluster() throws IOException {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||||
|
true);
|
||||||
|
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
|
||||||
|
ViewFileSystemOverloadScheme.class.getName());
|
||||||
|
conf.set(String.format(
|
||||||
|
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
|
||||||
|
HDFS_SCHEME), DistributedFileSystem.class.getName());
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
defaultFSURI =
|
||||||
|
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
|
||||||
|
localTargetDir = new File(TEST_ROOT_DIR, "/root/");
|
||||||
|
Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
FileSystem.closeAll();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createLinks(boolean needFalbackLink, Path hdfsTargetPath,
|
||||||
|
Path localTragetPath) {
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER,
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER,
|
||||||
|
localTragetPath.toUri());
|
||||||
|
if (needFalbackLink) {
|
||||||
|
ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows.
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
*
|
||||||
|
* create file /HDFSUser/testfile should create in hdfs
|
||||||
|
* create file /local/test should create directory in local fs
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testMountLinkWithLocalAndHDFS() throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
|
||||||
|
createLinks(false, hdfsTargetPath, localTragetPath);
|
||||||
|
|
||||||
|
// /HDFSUser/testfile
|
||||||
|
Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile");
|
||||||
|
// /local/test
|
||||||
|
Path localDir = new Path(LOCAL_FOLDER + "/test");
|
||||||
|
|
||||||
|
try (ViewFileSystemOverloadScheme fs
|
||||||
|
= (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
|
||||||
|
Assert.assertEquals(2, fs.getMountPoints().length);
|
||||||
|
fs.createNewFile(hdfsFile); // /HDFSUser/testfile
|
||||||
|
fs.mkdirs(localDir); // /local/test
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize HDFS and test files exist in ls or not
|
||||||
|
try (DistributedFileSystem dfs = new DistributedFileSystem()) {
|
||||||
|
dfs.initialize(defaultFSURI, conf);
|
||||||
|
Assert.assertTrue(dfs.exists(
|
||||||
|
new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
|
||||||
|
hdfsFile.getName()))); // should be in hdfs.
|
||||||
|
Assert.assertFalse(dfs.exists(
|
||||||
|
new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
|
||||||
|
localDir.getName()))); // should not be in local fs.
|
||||||
|
}
|
||||||
|
|
||||||
|
try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
|
||||||
|
lfs.initialize(localTragetPath.toUri(), conf);
|
||||||
|
Assert.assertFalse(lfs.exists(
|
||||||
|
new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
|
||||||
|
hdfsFile.getName()))); // should not be in hdfs.
|
||||||
|
Assert.assertTrue(lfs.exists(
|
||||||
|
new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
|
||||||
|
localDir.getName()))); // should be in local fs.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows.
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/
|
||||||
|
* It should fail to add non existent fs link.
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class, timeout = 30000)
|
||||||
|
public void testMountLinkWithNonExistentLink() throws Exception {
|
||||||
|
final String userFolder = "/User";
|
||||||
|
final Path nonExistTargetPath =
|
||||||
|
new Path("nonexistent://NonExistent" + userFolder);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Below addLink will create following mount points
|
||||||
|
* hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
|
||||||
|
*/
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), userFolder,
|
||||||
|
nonExistTargetPath.toUri());
|
||||||
|
FileSystem.get(conf);
|
||||||
|
Assert.fail("Expected to fail with non existent link");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows.
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
* ListStatus on / should list the mount links.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
|
||||||
|
createLinks(false, hdfsTargetPath, localTragetPath);
|
||||||
|
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
FileStatus[] ls = fs.listStatus(new Path("/"));
|
||||||
|
Assert.assertEquals(2, ls.length);
|
||||||
|
String lsPath1 =
|
||||||
|
Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).toString();
|
||||||
|
String lsPath2 =
|
||||||
|
Path.getPathWithoutSchemeAndAuthority(ls[1].getPath()).toString();
|
||||||
|
Assert.assertTrue(
|
||||||
|
HDFS_USER_FOLDER.equals(lsPath1) || LOCAL_FOLDER.equals(lsPath1));
|
||||||
|
Assert.assertTrue(
|
||||||
|
HDFS_USER_FOLDER.equals(lsPath2) || LOCAL_FOLDER.equals(lsPath2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
* ListStatus non mount directory should fail.
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class, timeout = 30000)
|
||||||
|
public void testListStatusOnNonMountedPath() throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
|
||||||
|
createLinks(false, hdfsTargetPath, localTragetPath);
|
||||||
|
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
fs.listStatus(new Path("/nonMount"));
|
||||||
|
Assert.fail("It should fail as no mount link with /nonMount");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows hdfs://localhost:xxx/HDFSUser -->
|
||||||
|
* hdfs://localhost:xxx/HDFSUser/ hdfs://localhost:xxx/local -->
|
||||||
|
* file://TEST_ROOT_DIR/root/ fallback --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* Creating file or directory at non root level should succeed with fallback
|
||||||
|
* links.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testWithLinkFallBack() throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
|
||||||
|
createLinks(true, hdfsTargetPath, localTragetPath);
|
||||||
|
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
fs.createNewFile(new Path("/nonMount/myfile"));
|
||||||
|
FileStatus[] ls = fs.listStatus(new Path("/nonMount"));
|
||||||
|
Assert.assertEquals(1, ls.length);
|
||||||
|
Assert.assertEquals(
|
||||||
|
Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).getName(),
|
||||||
|
"myfile");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
*
|
||||||
|
* It cannot find any mount link. ViewFS expects a mount point from root.
|
||||||
|
*/
|
||||||
|
@Test(expected = NotInMountpointException.class, timeout = 30000)
|
||||||
|
public void testCreateOnRootShouldFailWhenMountLinkConfigured()
|
||||||
|
throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
|
||||||
|
createLinks(false, hdfsTargetPath, localTragetPath);
|
||||||
|
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
fs.createNewFile(new Path("/newFileOnRoot"));
|
||||||
|
Assert.fail("It should fail as root is read only in viewFS.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
* fallback --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
*
|
||||||
|
* It will find fallback link, but root is not accessible and read only.
|
||||||
|
*/
|
||||||
|
@Test(expected = AccessControlException.class, timeout = 30000)
|
||||||
|
public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured()
|
||||||
|
throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
createLinks(true, hdfsTargetPath, localTragetPath);
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
fs.createNewFile(new Path("/onRootWhenFallBack"));
|
||||||
|
Assert.fail(
|
||||||
|
"It should fail as root is read only in viewFS, even when configured"
|
||||||
|
+ " with fallback.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
* fallback --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
*
|
||||||
|
* Note: Above links created because to make fs initialization success.
|
||||||
|
* Otherwise will not proceed if no mount links.
|
||||||
|
*
|
||||||
|
* Don't set fs.viewfs.overload.scheme.target.hdfs.impl property.
|
||||||
|
* So, OverloadScheme target fs initialization will fail.
|
||||||
|
*/
|
||||||
|
@Test(expected = UnsupportedFileSystemException.class, timeout = 30000)
|
||||||
|
public void testInvalidOverloadSchemeTargetFS() throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
conf = new Configuration();
|
||||||
|
createLinks(true, hdfsTargetPath, localTragetPath);
|
||||||
|
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
|
||||||
|
defaultFSURI.toString());
|
||||||
|
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
|
||||||
|
ViewFileSystemOverloadScheme.class.getName());
|
||||||
|
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
fs.createNewFile(new Path("/onRootWhenFallBack"));
|
||||||
|
Assert.fail("OverloadScheme target fs should be valid.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
|
||||||
|
*
|
||||||
|
* It should be able to create file using ViewFileSystemOverloadScheme.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
|
||||||
|
throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
createLinks(false, hdfsTargetPath, localTragetPath);
|
||||||
|
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
|
||||||
|
try (FileSystem fs = FileSystem.get(conf)) {
|
||||||
|
Path testFile = new Path(HDFS_USER_FOLDER + "/testFile");
|
||||||
|
fs.createNewFile(testFile);
|
||||||
|
Assert.assertTrue(fs.exists(testFile));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
*
|
||||||
|
* 1. With cache, only one hdfs child file system instance should be there.
|
||||||
|
* 2. Without cache, there should 2 hdfs instances.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testViewFsOverloadSchemeWithInnerCache()
|
||||||
|
throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
|
||||||
|
// 1. Only 1 hdfs child file system should be there with cache.
|
||||||
|
try (ViewFileSystemOverloadScheme vfs =
|
||||||
|
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
|
||||||
|
Assert.assertEquals(1, vfs.getChildFileSystems().length);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Two hdfs file systems should be there if no cache.
|
||||||
|
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
|
||||||
|
try (ViewFileSystemOverloadScheme vfs =
|
||||||
|
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
|
||||||
|
Assert.assertEquals(2, vfs.getChildFileSystems().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
* hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
|
||||||
|
*
|
||||||
|
* When InnerCache disabled, all matching ViewFileSystemOverloadScheme
|
||||||
|
* initialized scheme file systems would not use FileSystem cache.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 3000)
|
||||||
|
public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
|
||||||
|
throws Exception {
|
||||||
|
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
|
||||||
|
hdfsTargetPath.toUri());
|
||||||
|
|
||||||
|
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
|
||||||
|
// Two hdfs file systems should be there if no cache.
|
||||||
|
try (ViewFileSystemOverloadScheme vfs =
|
||||||
|
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
|
||||||
|
Assert.assertEquals(2, vfs.getChildFileSystems().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create mount links as follows
|
||||||
|
* hdfs://localhost:xxx/local0 --> file://localPath/
|
||||||
|
* hdfs://localhost:xxx/local1 --> file://localPath/
|
||||||
|
*
|
||||||
|
* When InnerCache disabled, all non matching ViewFileSystemOverloadScheme
|
||||||
|
* initialized scheme file systems should continue to take advantage of
|
||||||
|
* FileSystem cache.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 3000)
|
||||||
|
public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
|
||||||
|
throws Exception {
|
||||||
|
final Path localTragetPath = new Path(localTargetDir.toURI());
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 0,
|
||||||
|
localTragetPath.toUri());
|
||||||
|
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 1,
|
||||||
|
localTragetPath.toUri());
|
||||||
|
|
||||||
|
// Only one local file system should be there if no InnerCache, but fs
|
||||||
|
// cache should work.
|
||||||
|
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
|
||||||
|
try (ViewFileSystemOverloadScheme vfs =
|
||||||
|
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
|
||||||
|
Assert.assertEquals(1, vfs.getChildFileSystems().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue