Revert "HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem"

This reverts commit 218a21a0f0.
This commit is contained in:
Yongjun Zhang 2018-05-08 22:22:06 -07:00
parent f13ec73c29
commit 21bf02023c
9 changed files with 69 additions and 904 deletions

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.fs.viewfs;
import java.net.URI;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
@ -69,72 +68,7 @@ public class ConfigUtil {
addLink( conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
src, target);
}
/**
* Add a LinkMergeSlash to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param target
*/
public static void addLinkMergeSlash(Configuration conf,
final String mountTableName, final URI target) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH, target.toString());
}
/**
* Add a LinkMergeSlash to the config for the default mount table.
* @param conf
* @param target
*/
public static void addLinkMergeSlash(Configuration conf, final URI target) {
addLinkMergeSlash(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
target);
}
/**
* Add a LinkFallback to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param target
*/
public static void addLinkFallback(Configuration conf,
final String mountTableName, final URI target) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_FALLBACK, target.toString());
}
/**
* Add a LinkFallback to the config for the default mount table.
* @param conf
* @param target
*/
public static void addLinkFallback(Configuration conf, final URI target) {
addLinkFallback(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
target);
}
/**
* Add a LinkMerge to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param targets
*/
public static void addLinkMerge(Configuration conf,
final String mountTableName, final URI[] targets) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_MERGE, Arrays.toString(targets));
}
/**
* Add a LinkMerge to the config for the default mount table.
* @param conf
* @param targets
*/
public static void addLinkMerge(Configuration conf, final URI[] targets) {
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
}
/**
*
* @param conf

View File

@ -51,17 +51,12 @@ public interface Constants {
/**
* Config variable for specifying a simple link
*/
String CONFIG_VIEWFS_LINK = "link";
/**
* Config variable for specifying a fallback for link mount points.
*/
String CONFIG_VIEWFS_LINK_FALLBACK = "linkFallback";
public static final String CONFIG_VIEWFS_LINK = "link";
/**
* Config variable for specifying a merge link
*/
String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
/**
* Config variable for specifying an nfly link. Nfly writes to multiple
@ -73,9 +68,10 @@ public interface Constants {
* Config variable for specifying a merge of the root of the mount-table
* with the root of another file system.
*/
String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
public static final String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
FsPermission PERMISSION_555 = new FsPermission((short) 0555);
static public final FsPermission PERMISSION_555 =
new FsPermission((short) 0555);
String CONFIG_VIEWFS_RENAME_STRATEGY = "fs.viewfs.rename.strategy";
}

View File

@ -17,15 +17,12 @@
*/
package org.apache.hadoop.fs.viewfs;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -64,12 +61,8 @@ abstract class InodeTree<T> {
}
static final Path SlashPath = new Path("/");
// the root of the mount table
private final INode<T> root;
// the fallback filesystem
private final INodeLink<T> rootFallbackLink;
// the homedir for this mount table
private final String homedirPrefix;
private final INodeDir<T> root; // the root of the mount table
private final String homedirPrefix; // the homedir for this mount table
private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
static class MountPoint<T> {
@ -92,7 +85,7 @@ abstract class InodeTree<T> {
}
/**
* Internal class for INode tree.
* Internal class for inode tree
* @param <T>
*/
abstract static class INode<T> {
@ -101,58 +94,21 @@ abstract class InodeTree<T> {
public INode(String pathToNode, UserGroupInformation aUgi) {
fullPath = pathToNode;
}
// INode forming the internal mount table directory tree
// for ViewFileSystem. This internal directory tree is
// constructed based on the mount table config entries
// and is read only.
abstract boolean isInternalDir();
// INode linking to another filesystem. Represented
// via mount table link config entries.
boolean isLink() {
return !isInternalDir();
}
}
/**
* Internal class to represent an internal dir of the mount table.
* Internal class to represent an internal dir of the mount table
* @param <T>
*/
static class INodeDir<T> extends INode<T> {
private final Map<String, INode<T>> children = new HashMap<>();
private T internalDirFs = null; //filesystem of this internal directory
private boolean isRoot = false;
final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
T InodeDirFs = null; // file system of this internal directory of mountT
boolean isRoot = false;
INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
super(pathToNode, aUgi);
}
@Override
boolean isInternalDir() {
return true;
}
T getInternalDirFs() {
return internalDirFs;
}
void setInternalDirFs(T internalDirFs) {
this.internalDirFs = internalDirFs;
}
void setRoot(boolean root) {
isRoot = root;
}
boolean isRoot() {
return isRoot;
}
Map<String, INode<T>> getChildren() {
return Collections.unmodifiableMap(children);
}
INode<T> resolveInternal(final String pathComponent) {
return children.get(pathComponent);
}
@ -163,7 +119,7 @@ abstract class InodeTree<T> {
throw new FileAlreadyExistsException();
}
final INodeDir<T> newDir = new INodeDir<T>(fullPath +
(isRoot() ? "" : "/") + pathComponent, aUgi);
(isRoot ? "" : "/") + pathComponent, aUgi);
children.put(pathComponent, newDir);
return newDir;
}
@ -177,43 +133,10 @@ abstract class InodeTree<T> {
}
}
/**
* Mount table link type.
*/
enum LinkType {
/**
* Link entry pointing to a single filesystem uri.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.link.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK}
*/
SINGLE,
/**
* Fallback filesystem for the paths not mounted by
* any single link entries.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkFallback
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_FALLBACK}
*/
SINGLE_FALLBACK,
/**
* Link entry pointing to an union of two or more filesystem uris.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE}
*/
MERGE,
/**
* Link entry for merging mount table's root with the
* root of another filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMergeSlash
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE_SLASH}
*/
MERGE_SLASH,
/**
* Link entry to write to multiple filesystems and read
* from the closest filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
*/
NFLY;
NFLY
}
/**
@ -265,15 +188,6 @@ abstract class InodeTree<T> {
}
return new Path(result.toString());
}
@Override
boolean isInternalDir() {
return false;
}
public T getTargetFileSystem() {
return targetFileSystem;
}
}
private void createLink(final String src, final String target,
@ -289,10 +203,7 @@ abstract class InodeTree<T> {
}
final String[] srcPaths = breakIntoPathComponents(src);
// Make sure root is of INodeDir type before
// adding any regular links to it.
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
INodeDir<T> curInode = root;
int i;
// Ignore first initial slash, process all except last component
for (i = 1; i < srcPaths.length - 1; i++) {
@ -300,15 +211,15 @@ abstract class InodeTree<T> {
INode<T> nextInode = curInode.resolveInternal(iPath);
if (nextInode == null) {
INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
newDir.setInternalDirFs(getTargetFileSystem(newDir));
newDir.InodeDirFs = getTargetFileSystem(newDir);
nextInode = newDir;
}
if (nextInode.isLink()) {
if (nextInode instanceof INodeLink) {
// Error - expected a dir but got a link
throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
" already exists as link");
} else {
assert(nextInode.isInternalDir());
assert (nextInode instanceof INodeDir);
curInode = (INodeDir<T>) nextInode;
}
}
@ -334,11 +245,6 @@ abstract class InodeTree<T> {
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(new URI(target)), new URI(target));
break;
case SINGLE_FALLBACK:
case MERGE_SLASH:
// Link fallback and link merge slash configuration
// are handled specially at InodeTree.
throw new IllegalArgumentException("Unexpected linkType: " + linkType);
case MERGE:
case NFLY:
final URI[] targetUris = StringUtils.stringToURI(
@ -367,77 +273,6 @@ abstract class InodeTree<T> {
protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
private INodeDir<T> getRootDir() {
Preconditions.checkState(root.isInternalDir());
return (INodeDir<T>)root;
}
private INodeLink<T> getRootLink() {
Preconditions.checkState(root.isLink());
return (INodeLink<T>)root;
}
private boolean hasFallbackLink() {
return rootFallbackLink != null;
}
private INodeLink<T> getRootFallbackLink() {
Preconditions.checkState(root.isInternalDir());
return rootFallbackLink;
}
/**
* An internal class representing the ViewFileSystem mount table
* link entries and their attributes.
* @see LinkType
*/
private static class LinkEntry {
private final String src;
private final String target;
private final LinkType linkType;
private final String settings;
private final UserGroupInformation ugi;
private final Configuration config;
LinkEntry(String src, String target, LinkType linkType, String settings,
UserGroupInformation ugi, Configuration config) {
this.src = src;
this.target = target;
this.linkType = linkType;
this.settings = settings;
this.ugi = ugi;
this.config = config;
}
String getSrc() {
return src;
}
String getTarget() {
return target;
}
LinkType getLinkType() {
return linkType;
}
boolean isLinkType(LinkType type) {
return this.linkType == type;
}
String getSettings() {
return settings;
}
UserGroupInformation getUgi() {
return ugi;
}
Configuration getConfig() {
return config;
}
}
/**
* Create Inode Tree from the specified mount-table specified in Config
* @param config - the mount table keys are prefixed with
@ -451,59 +286,39 @@ abstract class InodeTree<T> {
protected InodeTree(final Configuration config, final String viewName)
throws UnsupportedFileSystemException, URISyntaxException,
FileAlreadyExistsException, IOException {
String mountTableName = viewName;
if (mountTableName == null) {
mountTableName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
String vName = viewName;
if (vName == null) {
vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
}
homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
homedirPrefix = ConfigUtil.getHomeDirValue(config, vName);
root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
root.InodeDirFs = getTargetFileSystem(root);
root.isRoot = true;
boolean isMergeSlashConfigured = false;
String mergeSlashTarget = null;
List<LinkEntry> linkEntries = new LinkedList<>();
final String mountTablePrefix =
Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName + ".";
final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." +
vName + ".";
final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
final String linkFallbackPrefix = Constants.CONFIG_VIEWFS_LINK_FALLBACK;
final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
final String linkMergeSlashPrefix =
Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH;
boolean gotMountTableEntry = false;
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
for (Entry<String, String> si : config) {
final String key = si.getKey();
if (key.startsWith(mountTablePrefix)) {
if (key.startsWith(mtPrefix)) {
gotMountTableEntry = true;
LinkType linkType;
String src = key.substring(mountTablePrefix.length());
LinkType linkType = LinkType.SINGLE;
String src = key.substring(mtPrefix.length());
String settings = null;
if (src.startsWith(linkPrefix)) {
src = src.substring(linkPrefix.length());
if (src.equals(SlashPath.toString())) {
throw new UnsupportedFileSystemException("Unexpected mount table "
+ "link entry '" + key + "'. Use "
+ Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH + " instead!");
+ "link entry '" + key + "'. "
+ Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH + " is not "
+ "supported yet.");
}
linkType = LinkType.SINGLE;
} else if (src.startsWith(linkFallbackPrefix)) {
if (src.length() != linkFallbackPrefix.length()) {
throw new IOException("ViewFs: Mount points initialization error." +
" Invalid " + Constants.CONFIG_VIEWFS_LINK_FALLBACK +
" entry in config: " + src);
}
linkType = LinkType.SINGLE_FALLBACK;
} else if (src.startsWith(linkMergePrefix)) { // A merge link
src = src.substring(linkMergePrefix.length());
linkType = LinkType.MERGE;
} else if (src.startsWith(linkMergeSlashPrefix)) {
// This is a LinkMergeSlash entry. This entry should
// not have any additional source path.
if (src.length() != linkMergeSlashPrefix.length()) {
throw new IOException("ViewFs: Mount points initialization error." +
" Invalid " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
" entry in config: " + src);
}
linkType = LinkType.MERGE_SLASH;
src = src.substring(linkMergePrefix.length());
} else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
// prefix.settings.src
src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
@ -523,69 +338,14 @@ abstract class InodeTree<T> {
throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
"Mount table in config: " + src);
}
final String target = si.getValue();
if (linkType != LinkType.MERGE_SLASH) {
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "A regular link should not be added.");
}
linkEntries.add(
new LinkEntry(src, target, linkType, settings, ugi, config));
} else {
if (!linkEntries.isEmpty()) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with regular links. "
+ "A merge slash link should not be configured.");
}
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "Multiple merge slash links for the same mount table is "
+ "not allowed.");
}
isMergeSlashConfigured = true;
mergeSlashTarget = target;
}
final String target = si.getValue(); // link or merge link
createLink(src, target, linkType, settings, ugi, config);
}
}
if (isMergeSlashConfigured) {
Preconditions.checkNotNull(mergeSlashTarget);
root = new INodeLink<T>(mountTableName, ugi,
getTargetFileSystem(new URI(mergeSlashTarget)),
new URI(mergeSlashTarget));
mountPoints.add(new MountPoint<T>("/", (INodeLink<T>) root));
rootFallbackLink = null;
} else {
root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
getRootDir().setRoot(true);
INodeLink<T> fallbackLink = null;
for (LinkEntry le : linkEntries) {
if (le.isLinkType(LinkType.SINGLE_FALLBACK)) {
if (fallbackLink != null) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a link fallback. "
+ "Multiple fallback links for the same mount table is "
+ "not allowed.");
}
fallbackLink = new INodeLink<T>(mountTableName, ugi,
getTargetFileSystem(new URI(le.getTarget())),
new URI(le.getTarget()));
} else {
createLink(le.getSrc(), le.getTarget(), le.getLinkType(),
le.getSettings(), le.getUgi(), le.getConfig());
}
}
rootFallbackLink = fallbackLink;
}
if (!gotMountTableEntry) {
throw new IOException(
"ViewFs: Cannot initialize: Empty Mount table in config for " +
"viewfs://" + mountTableName + "/");
"viewfs://" + vName + "/");
}
}
@ -622,7 +382,7 @@ abstract class InodeTree<T> {
/**
* Resolve the pathname p relative to root InodeDir
* @param p - input path
* @param p - inout path
* @param resolveLastComponent
* @return ResolveResult which allows further resolution of the remaining path
* @throws FileNotFoundException
@ -631,53 +391,27 @@ abstract class InodeTree<T> {
throws FileNotFoundException {
String[] path = breakIntoPathComponents(p);
if (path.length <= 1) { // special case for when path is "/"
T targetFs = root.isInternalDir() ?
getRootDir().getInternalDirFs() : getRootLink().getTargetFileSystem();
ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
targetFs, root.fullPath, SlashPath);
ResolveResult<T> res =
new ResolveResult<T>(ResultKind.INTERNAL_DIR,
root.InodeDirFs, root.fullPath, SlashPath);
return res;
}
/**
* linkMergeSlash has been configured. The root of this mount table has
* been linked to the root directory of a file system.
* The first non-slash path component should be name of the mount table.
*/
if (root.isLink()) {
Path remainingPath;
StringBuilder remainingPathStr = new StringBuilder();
// ignore first slash
for (int i = 1; i < path.length; i++) {
remainingPathStr.append("/").append(path[i]);
}
remainingPath = new Path(remainingPathStr.toString());
ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
return res;
}
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
INodeDir<T> curInode = root;
int i;
// ignore first slash
for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
INode<T> nextInode = curInode.resolveInternal(path[i]);
if (nextInode == null) {
if (hasFallbackLink()) {
return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootFallbackLink().getTargetFileSystem(),
root.fullPath, new Path(p));
} else {
StringBuilder failedAt = new StringBuilder(path[0]);
for (int j = 1; j <= i; ++j) {
failedAt.append('/').append(path[j]);
}
throw (new FileNotFoundException(
"File/Directory does not exist: " + failedAt.toString()));
StringBuilder failedAt = new StringBuilder(path[0]);
for (int j = 1; j <= i; ++j) {
failedAt.append('/').append(path[j]);
}
throw (new FileNotFoundException("File/Directory does not exist: "
+ failedAt.toString()));
}
if (nextInode.isLink()) {
if (nextInode instanceof INodeLink) {
final INodeLink<T> link = (INodeLink<T>) nextInode;
final Path remainingPath;
if (i >= path.length - 1) {
@ -691,9 +425,9 @@ abstract class InodeTree<T> {
}
final ResolveResult<T> res =
new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
link.targetFileSystem, nextInode.fullPath, remainingPath);
return res;
} else if (nextInode.isInternalDir()) {
} else if (nextInode instanceof INodeDir) {
curInode = (INodeDir<T>) nextInode;
}
}
@ -715,7 +449,7 @@ abstract class InodeTree<T> {
}
final ResolveResult<T> res =
new ResolveResult<T>(ResultKind.INTERNAL_DIR,
curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
curInode.InodeDirFs, curInode.fullPath, remainingPath);
return res;
}

View File

@ -1037,12 +1037,12 @@ public class ViewFileSystem extends FileSystem {
public FileStatus[] listStatus(Path f) throws AccessControlException,
FileNotFoundException, IOException {
checkPathIsSlash(f);
FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
FileStatus[] result = new FileStatus[theInternalDir.children.size()];
int i = 0;
for (Entry<String, INode<FileSystem>> iEntry :
theInternalDir.getChildren().entrySet()) {
for (Entry<String, INode<FileSystem>> iEntry :
theInternalDir.children.entrySet()) {
INode<FileSystem> inode = iEntry.getValue();
if (inode.isLink()) {
if (inode instanceof INodeLink ) {
INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
result[i++] = new FileStatus(0, false, 0, 0,
@ -1065,12 +1065,11 @@ public class ViewFileSystem extends FileSystem {
@Override
public boolean mkdirs(Path dir, FsPermission permission)
throws AccessControlException, FileAlreadyExistsException {
if (theInternalDir.isRoot() && dir == null) {
if (theInternalDir.isRoot && dir == null) {
throw new FileAlreadyExistsException("/ already exits");
}
// Note dir starts with /
if (theInternalDir.getChildren().containsKey(
dir.toString().substring(1))) {
if (theInternalDir.children.containsKey(dir.toString().substring(1))) {
return true; // this is the stupid semantics of FileSystem
}
throw readOnlyMountTable("mkdirs", dir);

View File

@ -899,13 +899,13 @@ public class ViewFs extends AbstractFileSystem {
throws IOException {
// look up i internalDirs children - ignore first Slash
INode<AbstractFileSystem> inode =
theInternalDir.getChildren().get(f.toUri().toString().substring(1));
theInternalDir.children.get(f.toUri().toString().substring(1));
if (inode == null) {
throw new FileNotFoundException(
"viewFs internal mount table - missing entry:" + f);
}
FileStatus result;
if (inode.isLink()) {
if (inode instanceof INodeLink) {
INodeLink<AbstractFileSystem> inodelink =
(INodeLink<AbstractFileSystem>) inode;
result = new FileStatus(0, false, 0, 0, creationTime, creationTime,
@ -947,14 +947,14 @@ public class ViewFs extends AbstractFileSystem {
public FileStatus[] listStatus(final Path f) throws AccessControlException,
IOException {
checkPathIsSlash(f);
FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
FileStatus[] result = new FileStatus[theInternalDir.children.size()];
int i = 0;
for (Entry<String, INode<AbstractFileSystem>> iEntry :
theInternalDir.getChildren().entrySet()) {
for (Entry<String, INode<AbstractFileSystem>> iEntry :
theInternalDir.children.entrySet()) {
INode<AbstractFileSystem> inode = iEntry.getValue();
if (inode.isLink()) {
if (inode instanceof INodeLink ) {
INodeLink<AbstractFileSystem> link =
(INodeLink<AbstractFileSystem>) inode;
@ -979,7 +979,7 @@ public class ViewFs extends AbstractFileSystem {
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException {
if (theInternalDir.isRoot() && dir == null) {
if (theInternalDir.isRoot && dir == null) {
throw new FileAlreadyExistsException("/ already exits");
}
throw readOnlyMountTable("mkdir", dir);

View File

@ -1005,8 +1005,8 @@ abstract public class ViewFileSystemBaseTest {
+ mtPrefix + Constants.CONFIG_VIEWFS_LINK + "." + "/");
} catch (Exception e) {
if (e instanceof UnsupportedFileSystemException) {
String msg = " Use " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
" instead";
String msg = Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+ " is not supported yet.";
assertThat(e.getMessage(), containsString(msg));
} else {
fail("Unexpected exception: " + e.getMessage());

View File

@ -91,7 +91,7 @@ In order to provide transparency with the old world, the ViewFs file system (i.e
ViewFs implements the Hadoop file system interface just like HDFS and the local file system. It is a trivial file system in the sense that it only allows linking to other file systems. Because ViewFs implements the Hadoop file system interface, it works transparently Hadoop tools. For example, all the shell commands work with ViewFs as with HDFS and local file system.
In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
The mount points of a mount table are specified in the standard Hadoop configuration files. In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
```xml
<property>

View File

@ -1,264 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for viewfs with LinkFallback mount table entries.
*/
public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
private static FileSystem fsDefault;
private static MiniDFSCluster cluster;
private static final int NAME_SPACES_COUNT = 3;
private static final int DATA_NODES_COUNT = 3;
private static final int FS_INDEX_DEFAULT = 0;
private static final String LINK_FALLBACK_CLUSTER_1_NAME = "Cluster1";
private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
private static final Configuration CONF = new Configuration();
private static final File TEST_DIR = GenericTestUtils.getTestDir(
TestViewFileSystemLinkFallback.class.getSimpleName());
private static final String TEST_BASE_PATH =
"/tmp/TestViewFileSystemLinkFallback";
private final static Logger LOG = LoggerFactory.getLogger(
TestViewFileSystemLinkFallback.class);
@Override
protected FileSystemTestHelper createFileSystemHelper() {
return new FileSystemTestHelper(TEST_BASE_PATH);
}
@BeforeClass
public static void clusterSetupAtBeginning() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
NAME_SPACES_COUNT))
.numDataNodes(DATA_NODES_COUNT)
.build();
cluster.waitClusterUp();
for (int i = 0; i < NAME_SPACES_COUNT; i++) {
FS_HDFS[i] = cluster.getFileSystem(i);
}
fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
}
@AfterClass
public static void clusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Override
@Before
public void setUp() throws Exception {
fsTarget = fsDefault;
super.setUp();
}
/**
* Override this so that we don't set the targetTestRoot to any path under the
* root of the FS, and so that we don't try to delete the test dir, but rather
* only its contents.
*/
@Override
void initializeTargetTestRoot() throws IOException {
targetTestRoot = fsDefault.makeQualified(new Path("/"));
for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
fsDefault.delete(status.getPath(), true);
}
}
@Override
void setupMountPoints() {
super.setupMountPoints();
ConfigUtil.addLinkFallback(conf, LINK_FALLBACK_CLUSTER_1_NAME,
targetTestRoot.toUri());
}
@Override
int getExpectedDelegationTokenCount() {
return 1; // all point to the same fs so 1 unique token
}
@Override
int getExpectedDelegationTokenCountWithCredentials() {
return 1;
}
@Test
public void testConfLinkFallback() throws Exception {
Path testBasePath = new Path(TEST_BASE_PATH);
Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
Path testBaseFileRelative = new Path(testLevel2Dir,
"../../testBaseFile.log");
Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
fsTarget.mkdirs(testLevel2Dir);
fsTarget.createNewFile(testBaseFile);
FSDataOutputStream dataOutputStream = fsTarget.append(testBaseFile);
dataOutputStream.write(1);
dataOutputStream.close();
fsTarget.createNewFile(testLevel2File);
dataOutputStream = fsTarget.append(testLevel2File);
dataOutputStream.write("test link fallback".toString().getBytes());
dataOutputStream.close();
String clusterName = "ClusterFallback";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLinkFallback(conf, clusterName, fsTarget.getUri());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus baseFileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testBaseFile.toUri().toString()));
LOG.info("BaseFileStat: " + baseFileStat);
FileStatus baseFileRelStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testBaseFileRelative.toUri().toString()));
LOG.info("BaseFileRelStat: " + baseFileRelStat);
Assert.assertEquals("Unexpected file length for " + testBaseFile,
1, baseFileStat.getLen());
Assert.assertEquals("Unexpected file length for " + testBaseFileRelative,
baseFileStat.getLen(), baseFileRelStat.getLen());
FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testLevel2File.toUri().toString()));
LOG.info("Level2FileStat: " + level2FileStat);
vfs.close();
}
@Test
public void testConfLinkFallbackWithRegularLinks() throws Exception {
Path testBasePath = new Path(TEST_BASE_PATH);
Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
fsTarget.mkdirs(testLevel2Dir);
fsTarget.createNewFile(testBaseFile);
fsTarget.createNewFile(testLevel2File);
FSDataOutputStream dataOutputStream = fsTarget.append(testLevel2File);
dataOutputStream.write("test link fallback".toString().getBytes());
dataOutputStream.close();
String clusterName = "ClusterFallback";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLink(conf, clusterName,
"/internalDir/linkToDir2",
new Path(targetTestRoot, "dir2").toUri());
ConfigUtil.addLink(conf, clusterName,
"/internalDir/internalDirB/linkToDir3",
new Path(targetTestRoot, "dir3").toUri());
ConfigUtil.addLink(conf, clusterName,
"/danglingLink",
new Path(targetTestRoot, "missingTarget").toUri());
ConfigUtil.addLink(conf, clusterName,
"/linkToAFile",
new Path(targetTestRoot, "aFile").toUri());
System.out.println("ViewFs link fallback " + fsTarget.getUri());
ConfigUtil.addLinkFallback(conf, clusterName, targetTestRoot.toUri());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus baseFileStat = vfs.getFileStatus(
new Path(viewFsUri.toString() + testBaseFile.toUri().toString()));
LOG.info("BaseFileStat: " + baseFileStat);
Assert.assertEquals("Unexpected file length for " + testBaseFile,
0, baseFileStat.getLen());
FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testLevel2File.toUri().toString()));
LOG.info("Level2FileStat: " + level2FileStat);
dataOutputStream = vfs.append(testLevel2File);
dataOutputStream.write("Writing via viewfs fallback path".getBytes());
dataOutputStream.close();
FileStatus level2FileStatAfterWrite = vfs.getFileStatus(
new Path(viewFsUri.toString() + testLevel2File.toUri().toString()));
Assert.assertTrue("Unexpected file length for " + testLevel2File,
level2FileStatAfterWrite.getLen() > level2FileStat.getLen());
vfs.close();
}
@Test
public void testConfLinkFallbackWithMountPoint() throws Exception {
TEST_DIR.mkdirs();
Configuration conf = new Configuration();
String clusterName = "ClusterX";
String mountPoint = "/user";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String expectedErrorMsg = "Invalid linkFallback entry in config: " +
"linkFallback./user";
String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+ clusterName + "." + Constants.CONFIG_VIEWFS_LINK_FALLBACK
+ "." + mountPoint;
conf.set(mountTableEntry, TEST_DIR.toURI().toString());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow linkMergeSlash to take extra mount points!");
} catch (IOException e) {
assertTrue("Unexpected error: " + e.getMessage(),
e.getMessage().contains(expectedErrorMsg));
}
}
}

View File

@ -1,234 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.security.auth.login.LoginException;
/**
* Test for viewfs with LinkMergeSlash mount table entries.
*/
public class TestViewFileSystemLinkMergeSlash extends ViewFileSystemBaseTest {
private static FileSystem fsDefault;
private static MiniDFSCluster cluster;
private static final int NAME_SPACES_COUNT = 3;
private static final int DATA_NODES_COUNT = 3;
private static final int FS_INDEX_DEFAULT = 0;
private static final String LINK_MERGE_SLASH_CLUSTER_1_NAME = "ClusterLMS1";
private static final String LINK_MERGE_SLASH_CLUSTER_2_NAME = "ClusterLMS2";
private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
private static final Configuration CONF = new Configuration();
private static final File TEST_DIR = GenericTestUtils.getTestDir(
TestViewFileSystemLinkMergeSlash.class.getSimpleName());
private static final String TEST_TEMP_PATH =
"/tmp/TestViewFileSystemLinkMergeSlash";
private final static Logger LOG = LoggerFactory.getLogger(
TestViewFileSystemLinkMergeSlash.class);
@Override
protected FileSystemTestHelper createFileSystemHelper() {
return new FileSystemTestHelper(TEST_TEMP_PATH);
}
@BeforeClass
public static void clusterSetupAtBeginning() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
NAME_SPACES_COUNT))
.numDataNodes(DATA_NODES_COUNT)
.build();
cluster.waitClusterUp();
for (int i = 0; i < NAME_SPACES_COUNT; i++) {
FS_HDFS[i] = cluster.getFileSystem(i);
}
fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
}
@AfterClass
public static void clusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Override
@Before
public void setUp() throws Exception {
fsTarget = fsDefault;
super.setUp();
}
/**
* Override this so that we don't set the targetTestRoot to any path under the
* root of the FS, and so that we don't try to delete the test dir, but rather
* only its contents.
*/
@Override
void initializeTargetTestRoot() throws IOException {
targetTestRoot = fsDefault.makeQualified(new Path("/"));
for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
fsDefault.delete(status.getPath(), true);
}
}
@Override
void setupMountPoints() {
super.setupMountPoints();
ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_1_NAME,
targetTestRoot.toUri());
ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_2_NAME,
targetTestRoot.toUri());
}
@Override
int getExpectedDelegationTokenCount() {
return 1; // all point to the same fs so 1 unique token
}
@Override
int getExpectedDelegationTokenCountWithCredentials() {
return 1;
}
@Test
public void testConfLinkMergeSlash() throws Exception {
TEST_DIR.mkdirs();
String clusterName = "ClusterMerge";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String testFileName = "testLinkMergeSlash";
File infile = new File(TEST_DIR, testFileName);
final byte[] content = "HelloWorld".getBytes();
FileOutputStream fos = null;
try {
fos = new FileOutputStream(infile);
fos.write(content);
} finally {
if (fos != null) {
fos.close();
}
}
assertEquals((long)content.length, infile.length());
Configuration conf = new Configuration();
ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus stat = vfs.getFileStatus(new Path(viewFsUri.toString() +
testFileName));
LOG.info("File stat: " + stat);
vfs.close();
}
@Test
public void testConfLinkMergeSlashWithRegularLinks() throws Exception {
TEST_DIR.mkdirs();
String clusterName = "ClusterMerge";
String expectedErrorMsg1 = "Mount table ClusterMerge has already been " +
"configured with a merge slash link";
String expectedErrorMsg2 = "Mount table ClusterMerge has already been " +
"configured with regular links";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
ConfigUtil.addLink(conf, clusterName, "testDir", TEST_DIR.toURI());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow both merge slash link and regular link on same "
+ "mount table.");
} catch (IOException e) {
assertTrue("Unexpected error message: " + e.getMessage(),
e.getMessage().contains(expectedErrorMsg1) || e.getMessage()
.contains(expectedErrorMsg2));
}
}
@Test
public void testConfLinkMergeSlashWithMountPoint() throws Exception {
TEST_DIR.mkdirs();
Configuration conf = new Configuration();
String clusterName = "ClusterX";
String mountPoint = "/user";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String expectedErrorMsg = "Invalid linkMergeSlash entry in config: " +
"linkMergeSlash./user";
String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+ clusterName + "." + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+ "." + mountPoint;
conf.set(mountTableEntry, TEST_DIR.toURI().toString());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow linkMergeSlash to take extra mount points!");
} catch (IOException e) {
assertTrue(e.getMessage().contains(expectedErrorMsg));
}
}
@Test
public void testChildFileSystems() throws Exception {
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
LINK_MERGE_SLASH_CLUSTER_1_NAME, "/", null, null);
FileSystem fs = FileSystem.get(viewFsUri, conf);
FileSystem[] childFs = fs.getChildFileSystems();
Assert.assertEquals("Unexpected number of child filesystems!",
1, childFs.length);
Assert.assertEquals("Unexpected child filesystem!",
DistributedFileSystem.class, childFs[0].getClass());
}
}