From 1f3bc63e6772be81bc9a6a7d93ed81d2a9e066c0 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Tue, 5 Sep 2017 23:30:18 -0700 Subject: [PATCH] HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov --- .../apache/hadoop/fs/viewfs/ConfigUtil.java | 27 + .../apache/hadoop/fs/viewfs/Constants.java | 8 +- .../apache/hadoop/fs/viewfs/InodeTree.java | 64 +- .../apache/hadoop/fs/viewfs/NflyFSystem.java | 951 ++++++++++++++++++ .../hadoop/fs/viewfs/ViewFileSystem.java | 37 +- .../org/apache/hadoop/fs/viewfs/ViewFs.java | 10 +- .../TestViewFileSystemLocalFileSystem.java | 77 +- .../hadoop/fs/viewfs/TestViewFsConfig.java | 13 +- .../fs/viewfs/TestViewFileSystemHdfs.java | 151 ++- 9 files changed, 1275 insertions(+), 63 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java index 6900df24f74..a5fc62e4243 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.viewfs; import java.net.URI; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; /** * Utilities for config variables of the viewFs See {@link ViewFs} @@ -68,6 +69,32 @@ public class ConfigUtil { src, target); } + /** + * + * @param conf + * @param mountTableName + * @param src + * @param settings + * @param targets + */ + public static void addLinkNfly(Configuration conf, String mountTableName, + String src, String settings, final URI ... targets) { + + settings = settings == null + ? "minReplication=2,repairOnRead=true" + : settings; + + conf.set(getConfigViewFsPrefix(mountTableName) + "." + + Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src, + StringUtils.uriToString(targets)); + } + + public static void addLinkNfly(final Configuration conf, final String src, + final URI ... targets) { + addLinkNfly(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, src, null, + targets); + } + /** * Add config variable for homedir for default mount table * @param conf - add to this conf diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java index 9882a8e9338..1a07c10d03c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java @@ -57,7 +57,13 @@ public interface Constants { * Config variable for specifying a merge link */ public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge"; - + + /** + * Config variable for specifying an nfly link. Nfly writes to multiple + * locations, and allows reads from the closest one. + */ + String CONFIG_VIEWFS_LINK_NFLY = "linkNfly"; + /** * Config variable for specifying a merge of the root of the mount-table * with the root of another file system. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java index c62d5cc61a8..665c9c9acb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java @@ -133,6 +133,12 @@ abstract class InodeTree { } } + enum LinkType { + SINGLE, + MERGE, + NFLY + } + /** * An internal class to represent a mount link. * A mount link can be single dir link or a merge dir link. @@ -146,19 +152,17 @@ abstract class InodeTree { * is changed later it is then ignored (a dir with null entries) */ static class INodeLink extends INode { - final boolean isMergeLink; // true if MergeLink final URI[] targetDirLinkList; final T targetFileSystem; // file system object created from the link. /** - * Construct a mergeLink. + * Construct a mergeLink or nfly. */ INodeLink(final String pathToNode, final UserGroupInformation aUgi, final T targetMergeFs, final URI[] aTargetDirLinkList) { super(pathToNode, aUgi); targetFileSystem = targetMergeFs; targetDirLinkList = aTargetDirLinkList; - isMergeLink = true; } /** @@ -170,7 +174,6 @@ abstract class InodeTree { targetFileSystem = targetFs; targetDirLinkList = new URI[1]; targetDirLinkList[0] = aTargetDirLink; - isMergeLink = false; } /** @@ -188,7 +191,9 @@ abstract class InodeTree { } private void createLink(final String src, final String target, - final boolean isLinkMerge, final UserGroupInformation aUgi) + final LinkType linkType, final String settings, + final UserGroupInformation aUgi, + final Configuration config) throws URISyntaxException, IOException, FileAlreadyExistsException, UnsupportedFileSystemException { // Validate that src is valid absolute path @@ -235,18 +240,20 @@ abstract class InodeTree { final INodeLink newLink; final String fullPath = curInode.fullPath + (curInode == root ? "" : "/") + iPath; - if (isLinkMerge) { // Target is list of URIs - String[] targetsList = StringUtils.getStrings(target); - URI[] targetsListURI = new URI[targetsList.length]; - int k = 0; - for (String itarget : targetsList) { - targetsListURI[k++] = new URI(itarget); - } - newLink = new INodeLink(fullPath, aUgi, - getTargetFileSystem(targetsListURI), targetsListURI); - } else { + switch (linkType) { + case SINGLE: newLink = new INodeLink(fullPath, aUgi, getTargetFileSystem(new URI(target)), new URI(target)); + break; + case MERGE: + case NFLY: + final URI[] targetUris = StringUtils.stringToURI( + StringUtils.getStrings(target)); + newLink = new INodeLink(fullPath, aUgi, + getTargetFileSystem(settings, targetUris), targetUris); + break; + default: + throw new IllegalArgumentException(linkType + ": Infeasible linkType"); } curInode.addLink(iPath, newLink); mountPoints.add(new MountPoint(src, newLink)); @@ -257,14 +264,14 @@ abstract class InodeTree { * 3 abstract methods. * @throws IOException */ - protected abstract T getTargetFileSystem(final URI uri) + protected abstract T getTargetFileSystem(URI uri) throws UnsupportedFileSystemException, URISyntaxException, IOException; - protected abstract T getTargetFileSystem(final INodeDir dir) + protected abstract T getTargetFileSystem(INodeDir dir) throws URISyntaxException; - protected abstract T getTargetFileSystem(final URI[] mergeFsURIList) - throws UnsupportedFileSystemException, URISyntaxException; + protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs) + throws UnsupportedFileSystemException, URISyntaxException, IOException; /** * Create Inode Tree from the specified mount-table specified in Config @@ -298,8 +305,9 @@ abstract class InodeTree { final String key = si.getKey(); if (key.startsWith(mtPrefix)) { gotMountTableEntry = true; - boolean isMergeLink = false; + LinkType linkType = LinkType.SINGLE; String src = key.substring(mtPrefix.length()); + String settings = null; if (src.startsWith(linkPrefix)) { src = src.substring(linkPrefix.length()); if (src.equals(SlashPath.toString())) { @@ -309,8 +317,20 @@ abstract class InodeTree { + "supported yet."); } } else if (src.startsWith(linkMergePrefix)) { // A merge link - isMergeLink = true; + linkType = LinkType.MERGE; src = src.substring(linkMergePrefix.length()); + } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) { + // prefix.settings.src + src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1); + // settings.src + settings = src.substring(0, src.indexOf('.')); + // settings + + // settings.src + src = src.substring(settings.length() + 1); + // src + + linkType = LinkType.NFLY; } else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) { // ignore - we set home dir from config continue; @@ -319,7 +339,7 @@ abstract class InodeTree { "Mount table in config: " + src); } final String target = si.getValue(); // link or merge link - createLink(src, target, isMergeLink, ugi); + createLink(src, target, linkType, settings, ugi, config); } } if (!gotMountTableEntry) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java new file mode 100644 index 00000000000..53966b8afbf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java @@ -0,0 +1,951 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; + +/** + * Nfly is a multi filesystem mount point. + */ +@Private +final class NflyFSystem extends FileSystem { + private static final Log LOG = LogFactory.getLog(NflyFSystem.class); + private static final String NFLY_TMP_PREFIX = "_nfly_tmp_"; + + enum NflyKey { + // minimum replication, if local filesystem is included +1 is recommended + minReplication, + + // forces to check all the replicas and fetch the one with the most recent + // time stamp + // + readMostRecent, + + // create missing replica from far to near, including local? + repairOnRead + } + + private static final int DEFAULT_MIN_REPLICATION = 2; + private static URI nflyURI = URI.create("nfly:///"); + + private final NflyNode[] nodes; + private final int minReplication; + private final EnumSet nflyFlags; + private final Node myNode; + private final NetworkTopology topology; + + /** + * URI's authority is used as an approximation of the distance from the + * client. It's sufficient for DC but not accurate because worker nodes can be + * closer. + */ + private static class NflyNode extends NodeBase { + private final ChRootedFileSystem fs; + NflyNode(String hostName, String rackName, URI uri, + Configuration conf) throws IOException { + this(hostName, rackName, new ChRootedFileSystem(uri, conf)); + } + + NflyNode(String hostName, String rackName, ChRootedFileSystem fs) { + super(hostName, rackName); + this.fs = fs; + } + + ChRootedFileSystem getFs() { + return fs; + } + + @Override + public boolean equals(Object o) { + // satisfy findbugs + return super.equals(o); + } + + @Override + public int hashCode() { + // satisfy findbugs + return super.hashCode(); + } + + } + + private static final class MRNflyNode + extends NflyNode implements Comparable { + + private FileStatus status; + + private MRNflyNode(NflyNode n) { + super(n.getName(), n.getNetworkLocation(), n.fs); + } + + private void updateFileStatus(Path f) throws IOException { + final FileStatus tmpStatus = getFs().getFileStatus(f); + status = tmpStatus == null + ? notFoundStatus(f) + : tmpStatus; + } + + // TODO allow configurable error margin for FileSystems with different + // timestamp precisions + @Override + public int compareTo(MRNflyNode other) { + if (status == null) { + return other.status == null ? 0 : 1; // move non-null towards head + } else if (other.status == null) { + return -1; // move this towards head + } else { + final long mtime = status.getModificationTime(); + final long their = other.status.getModificationTime(); + return Long.compare(their, mtime); // move more recent towards head + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MRNflyNode)) { + return false; + } + MRNflyNode other = (MRNflyNode) o; + return 0 == compareTo(other); + } + + @Override + public int hashCode() { + // satisfy findbugs + return super.hashCode(); + } + + private FileStatus nflyStatus() throws IOException { + return new NflyStatus(getFs(), status); + } + + private FileStatus cloneStatus() throws IOException { + return new FileStatus(status.getLen(), + status.isDirectory(), + status.getReplication(), + status.getBlockSize(), + status.getModificationTime(), + status.getAccessTime(), + null, null, null, + status.isSymlink() ? status.getSymlink() : null, + status.getPath()); + } + } + + private MRNflyNode[] workSet() { + final MRNflyNode[] res = new MRNflyNode[nodes.length]; + for (int i = 0; i < res.length; i++) { + res[i] = new MRNflyNode(nodes[i]); + } + return res; + } + + + /** + * Utility to replace null with DEFAULT_RACK. + * + * @param rackString rack value, can be null + * @return non-null rack string + */ + private static String getRack(String rackString) { + return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString; + } + + /** + * Creates a new Nfly instance. + * + * @param uris the list of uris in the mount point + * @param conf configuration object + * @param minReplication minimum copies to commit a write op + * @param nflyFlags modes such readMostRecent + * @throws IOException + */ + private NflyFSystem(URI[] uris, Configuration conf, int minReplication, + EnumSet nflyFlags) throws IOException { + if (uris.length < minReplication) { + throw new IOException(minReplication + " < " + uris.length + + ": Minimum replication < #destinations"); + } + setConf(conf); + final String localHostName = InetAddress.getLocalHost().getHostName(); + + // build a list for topology resolution + final List hostStrings = new ArrayList(uris.length + 1); + for (URI uri : uris) { + final String uriHost = uri.getHost(); + // assume local file system or another closest filesystem if no authority + hostStrings.add(uriHost == null ? localHostName : uriHost); + } + // resolve the client node + hostStrings.add(localHostName); + + final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass( + CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); + + // this is an ArrayList + final List rackStrings = tmpDns.resolve(hostStrings); + nodes = new NflyNode[uris.length]; + final Iterator rackIter = rackStrings.iterator(); + for (int i = 0; i < nodes.length; i++) { + nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], + conf); + } + // sort all the uri's by distance from myNode, the local file system will + // automatically be the the first one. + // + myNode = new NodeBase(localHostName, getRack(rackIter.next())); + topology = NetworkTopology.getInstance(conf); + topology.sortByDistance(myNode, nodes, nodes.length); + + this.minReplication = minReplication; + this.nflyFlags = nflyFlags; + statistics = getStatistics(nflyURI.getScheme(), getClass()); + } + + /** + * Transactional output stream. When creating path /dir/file + * 1) create invisible /real/dir_i/_nfly_tmp_file + * 2) when more than min replication was written, write is committed by + * renaming all successfully written files to /real/dir_i/file + */ + private final class NflyOutputStream extends OutputStream { + // actual path + private final Path nflyPath; + // tmp path before commit + private final Path tmpPath; + // broadcast set + private final FSDataOutputStream[] outputStreams; + // status set: 1 working, 0 problem + private final BitSet opSet; + private final boolean useOverwrite; + + private NflyOutputStream(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + nflyPath = f; + tmpPath = getNflyTmpPath(f); + outputStreams = new FSDataOutputStream[nodes.length]; + for (int i = 0; i < outputStreams.length; i++) { + outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true, + bufferSize, replication, blockSize, progress); + } + opSet = new BitSet(outputStreams.length); + opSet.set(0, outputStreams.length); + useOverwrite = false; + } + + // + // TODO consider how to clean up and throw an exception early when the clear + // bits under min replication + // + + private void mayThrow(List ioExceptions) throws IOException { + final IOException ioe = MultipleIOException + .createIOException(ioExceptions); + if (opSet.cardinality() < minReplication) { + throw ioe; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Exceptions occurred: " + ioe); + } + } + } + + + @Override + public void write(int d) throws IOException { + final List ioExceptions = new ArrayList(); + for (int i = opSet.nextSetBit(0); + i >=0; + i = opSet.nextSetBit(i + 1)) { + try { + outputStreams[i].write(d); + } catch (Throwable t) { + osException(i, "write", t, ioExceptions); + } + } + mayThrow(ioExceptions); + } + + private void osException(int i, String op, Throwable t, + List ioExceptions) { + opSet.clear(i); + processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath); + } + + @Override + public void write(byte[] bytes, int offset, int len) throws IOException { + final List ioExceptions = new ArrayList(); + for (int i = opSet.nextSetBit(0); + i >= 0; + i = opSet.nextSetBit(i + 1)) { + try { + outputStreams[i].write(bytes, offset, len); + } catch (Throwable t) { + osException(i, "write", t, ioExceptions); + } + } + mayThrow(ioExceptions); + } + + @Override + public void flush() throws IOException { + final List ioExceptions = new ArrayList(); + for (int i = opSet.nextSetBit(0); + i >= 0; + i = opSet.nextSetBit(i + 1)) { + try { + outputStreams[i].flush(); + } catch (Throwable t) { + osException(i, "flush", t, ioExceptions); + } + } + mayThrow(ioExceptions); + } + + @Override + public void close() throws IOException { + final List ioExceptions = new ArrayList(); + for (int i = opSet.nextSetBit(0); + i >= 0; + i = opSet.nextSetBit(i + 1)) { + try { + outputStreams[i].close(); + } catch (Throwable t) { + osException(i, "close", t, ioExceptions); + } + } + if (opSet.cardinality() < minReplication) { + cleanupAllTmpFiles(); + throw new IOException("Failed to sufficiently replicate: min=" + + minReplication + " actual=" + opSet.cardinality()); + } else { + commit(); + } + } + + private void cleanupAllTmpFiles() throws IOException { + for (int i = 0; i < outputStreams.length; i++) { + try { + nodes[i].fs.delete(tmpPath); + } catch (Throwable t) { + processThrowable(nodes[i], "delete", t, null, tmpPath); + } + } + } + + private void commit() throws IOException { + final List ioExceptions = new ArrayList(); + for (int i = opSet.nextSetBit(0); + i >= 0; + i = opSet.nextSetBit(i + 1)) { + final NflyNode nflyNode = nodes[i]; + try { + if (useOverwrite) { + nflyNode.fs.delete(nflyPath); + } + nflyNode.fs.rename(tmpPath, nflyPath); + + } catch (Throwable t) { + osException(i, "commit", t, ioExceptions); + } + } + + if (opSet.cardinality() < minReplication) { + // cleanup should be done outside. If rename failed, it's unlikely that + // delete will work either. It's the same kind of metadata-only op + // + throw MultipleIOException.createIOException(ioExceptions); + } + + // best effort to have a consistent timestamp + final long commitTime = System.currentTimeMillis(); + for (int i = opSet.nextSetBit(0); + i >= 0; + i = opSet.nextSetBit(i + 1)) { + try { + nodes[i].fs.setTimes(nflyPath, commitTime, commitTime); + } catch (Throwable t) { + LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath); + } + } + } + } + + private Path getNflyTmpPath(Path f) { + return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName()); + } + + /** + * // TODO + * Some file status implementations have expensive deserialization or metadata + * retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping + * the the real file status to preserve this behavior. Otherwise, calling + * realStatus getters in constructor defeats this design. + */ + static final class NflyStatus extends FileStatus { + private static final long serialVersionUID = 0x21f276d8; + + private final FileStatus realStatus; + private final String strippedRoot; + + private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus) + throws IOException { + this.realStatus = realStatus; + this.strippedRoot = realFs.stripOutRoot(realStatus.getPath()); + } + + String stripRoot() throws IOException { + return strippedRoot; + } + + @Override + public long getLen() { + return realStatus.getLen(); + } + + @Override + public boolean isFile() { + return realStatus.isFile(); + } + + @Override + public boolean isDirectory() { + return realStatus.isDirectory(); + } + + @Override + public boolean isSymlink() { + return realStatus.isSymlink(); + } + + @Override + public long getBlockSize() { + return realStatus.getBlockSize(); + } + + @Override + public short getReplication() { + return realStatus.getReplication(); + } + + @Override + public long getModificationTime() { + return realStatus.getModificationTime(); + } + + @Override + public long getAccessTime() { + return realStatus.getAccessTime(); + } + + @Override + public FsPermission getPermission() { + return realStatus.getPermission(); + } + + @Override + public String getOwner() { + return realStatus.getOwner(); + } + + @Override + public String getGroup() { + return realStatus.getGroup(); + } + + @Override + public Path getPath() { + return realStatus.getPath(); + } + + @Override + public void setPath(Path p) { + realStatus.setPath(p); + } + + @Override + public Path getSymlink() throws IOException { + return realStatus.getSymlink(); + } + + @Override + public void setSymlink(Path p) { + realStatus.setSymlink(p); + } + + @Override + public boolean equals(Object o) { + return realStatus.equals(o); + } + + @Override + public int hashCode() { + return realStatus.hashCode(); + } + + @Override + public String toString() { + return realStatus.toString(); + } + } + + @Override + public URI getUri() { + return nflyURI; + } + + /** + * Category: READ. + * + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + * @return input stream according to nfly flags (closest, most recent) + * @throws IOException + * @throws FileNotFoundException iff all destinations generate this exception + */ + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + // TODO proxy stream for reads + final List ioExceptions = + new ArrayList(nodes.length); + int numNotFounds = 0; + final MRNflyNode[] mrNodes = workSet(); + + // naively iterate until one can be opened + // + for (final MRNflyNode nflyNode : mrNodes) { + try { + if (nflyFlags.contains(NflyKey.repairOnRead) + || nflyFlags.contains(NflyKey.readMostRecent)) { + // calling file status to avoid pulling bytes prematurely + nflyNode.updateFileStatus(f); + } else { + return nflyNode.getFs().open(f, bufferSize); + } + } catch (FileNotFoundException fnfe) { + nflyNode.status = notFoundStatus(f); + numNotFounds++; + processThrowable(nflyNode, "open", fnfe, ioExceptions, f); + } catch (Throwable t) { + processThrowable(nflyNode, "open", t, ioExceptions, f); + } + } + + if (nflyFlags.contains(NflyKey.readMostRecent)) { + // sort from most recent to least recent + Arrays.sort(mrNodes); + } + + final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f, + bufferSize); + + if (fsdisAfterRepair != null) { + return fsdisAfterRepair; + } + + mayThrowFileNotFound(ioExceptions, numNotFounds); + throw MultipleIOException.createIOException(ioExceptions); + } + + private static FileStatus notFoundStatus(Path f) { + return new FileStatus(-1, false, 0, 0, 0, f); + } + + /** + * Iterate all available nodes in the proximity order to attempt repair of all + * FileNotFound nodes. + * + * @param mrNodes work set copy of nodes + * @param f path to repair and open + * @param bufferSize buffer size for read RPC + * @return the closest/most recent replica stream AFTER repair + */ + private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f, + int bufferSize) { + long maxMtime = 0L; + for (final MRNflyNode srcNode : mrNodes) { + if (srcNode.status == null // not available + || srcNode.status.getLen() < 0L) { // not found + continue; // not available + } + if (srcNode.status.getModificationTime() > maxMtime) { + maxMtime = srcNode.status.getModificationTime(); + } + + // attempt to repair all notFound nodes with srcNode + // + for (final MRNflyNode dstNode : mrNodes) { + if (dstNode.status == null // not available + || srcNode.compareTo(dstNode) == 0) { // same mtime + continue; + } + + try { + // status is absolute from the underlying mount, making it chrooted + // + final FileStatus srcStatus = srcNode.cloneStatus(); + srcStatus.setPath(f); + final Path tmpPath = getNflyTmpPath(f); + FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath, + false, // don't delete + true, // overwrite + getConf()); + dstNode.getFs().delete(f, false); + if (dstNode.getFs().rename(tmpPath, f)) { + try { + dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(), + srcNode.status.getAccessTime()); + } finally { + // save getFileStatus rpc + srcStatus.setPath(dstNode.getFs().makeQualified(f)); + dstNode.status = srcStatus; + } + } + } catch (IOException ioe) { + // can blame the source by statusSet.clear(ai), however, it would + // cost an extra RPC, so just rely on the loop below that will attempt + // an open anyhow + // + LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair", + ioe); + } + } + } + + // Since Java7, QuickSort is used instead of MergeSort. + // QuickSort may not be stable and thus the equal most recent nodes, may no + // longer appear in the NetworkTopology order. + // + if (maxMtime > 0) { + final List mrList = new ArrayList(); + for (final MRNflyNode openNode : mrNodes) { + if (openNode.status != null && openNode.status.getLen() >= 0L) { + if (openNode.status.getModificationTime() == maxMtime) { + mrList.add(openNode); + } + } + } + // assert mrList.size > 0 + final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]); + topology.sortByDistance(myNode, readNodes, readNodes.length); + for (final MRNflyNode rNode : readNodes) { + try { + return rNode.getFs().open(f, bufferSize); + } catch (IOException e) { + LOG.info(f + ": Failed to open at " + rNode.getFs().getUri()); + } + } + } + return null; + } + + private void mayThrowFileNotFound(List ioExceptions, + int numNotFounds) throws FileNotFoundException { + if (numNotFounds == nodes.length) { + throw (FileNotFoundException)ioExceptions.get(nodes.length - 1); + } + } + + // WRITE + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite, + bufferSize, replication, blockSize, progress), statistics); + } + + // WRITE + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + return null; + } + + // WRITE + @Override + public boolean rename(Path src, Path dst) throws IOException { + final List ioExceptions = new ArrayList(); + int numNotFounds = 0; + boolean succ = true; + for (final NflyNode nflyNode : nodes) { + try { + succ &= nflyNode.fs.rename(src, dst); + } catch (FileNotFoundException fnfe) { + numNotFounds++; + processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst); + } catch (Throwable t) { + processThrowable(nflyNode, "rename", t, ioExceptions, src, dst); + succ = false; + } + } + + mayThrowFileNotFound(ioExceptions, numNotFounds); + + // if all destinations threw exceptions throw, otherwise return + // + if (ioExceptions.size() == nodes.length) { + throw MultipleIOException.createIOException(ioExceptions); + } + + return succ; + } + + // WRITE + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + final List ioExceptions = new ArrayList(); + int numNotFounds = 0; + boolean succ = true; + for (final NflyNode nflyNode : nodes) { + try { + succ &= nflyNode.fs.delete(f); + } catch (FileNotFoundException fnfe) { + numNotFounds++; + processThrowable(nflyNode, "delete", fnfe, ioExceptions, f); + } catch (Throwable t) { + processThrowable(nflyNode, "delete", t, ioExceptions, f); + succ = false; + } + } + mayThrowFileNotFound(ioExceptions, numNotFounds); + + // if all destinations threw exceptions throw, otherwise return + // + if (ioExceptions.size() == nodes.length) { + throw MultipleIOException.createIOException(ioExceptions); + } + + return succ; + } + + + /** + * Returns the closest non-failing destination's result. + * + * @param f given path + * @return array of file statuses according to nfly modes + * @throws FileNotFoundException + * @throws IOException + */ + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + final List ioExceptions = + new ArrayList(nodes.length); + + final MRNflyNode[] mrNodes = workSet(); + if (nflyFlags.contains(NflyKey.readMostRecent)) { + int numNotFounds = 0; + for (final MRNflyNode nflyNode : mrNodes) { + try { + nflyNode.updateFileStatus(f); + } catch (FileNotFoundException fnfe) { + numNotFounds++; + processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f); + } catch (Throwable t) { + processThrowable(nflyNode, "listStatus", t, ioExceptions, f); + } + } + mayThrowFileNotFound(ioExceptions, numNotFounds); + Arrays.sort(mrNodes); + } + + int numNotFounds = 0; + for (final MRNflyNode nflyNode : mrNodes) { + try { + final FileStatus[] realStats = nflyNode.getFs().listStatus(f); + final FileStatus[] nflyStats = new FileStatus[realStats.length]; + for (int i = 0; i < realStats.length; i++) { + nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]); + } + return nflyStats; + } catch (FileNotFoundException fnfe) { + numNotFounds++; + processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f); + } catch (Throwable t) { + processThrowable(nflyNode, "listStatus", t, ioExceptions, f); + } + } + mayThrowFileNotFound(ioExceptions, numNotFounds); + throw MultipleIOException.createIOException(ioExceptions); + } + + @Override + public RemoteIterator listLocatedStatus(Path f) + throws FileNotFoundException, IOException { + // TODO important for splits + return super.listLocatedStatus(f); + } + + @Override + public void setWorkingDirectory(Path newDir) { + for (final NflyNode nflyNode : nodes) { + nflyNode.fs.setWorkingDirectory(newDir); + } + } + + @Override + public Path getWorkingDirectory() { + return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + boolean succ = true; + for (final NflyNode nflyNode : nodes) { + succ &= nflyNode.fs.mkdirs(f, permission); + } + return succ; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + // TODO proxy stream for reads + final List ioExceptions = + new ArrayList(nodes.length); + int numNotFounds = 0; + final MRNflyNode[] mrNodes = workSet(); + + long maxMtime = Long.MIN_VALUE; + int maxMtimeIdx = Integer.MIN_VALUE; + + // naively iterate until one can be returned + // + for (int i = 0; i < mrNodes.length; i++) { + MRNflyNode nflyNode = mrNodes[i]; + try { + nflyNode.updateFileStatus(f); + if (nflyFlags.contains(NflyKey.readMostRecent)) { + final long nflyTime = nflyNode.status.getModificationTime(); + if (nflyTime > maxMtime) { + maxMtime = nflyTime; + maxMtimeIdx = i; + } + } else { + return nflyNode.nflyStatus(); + } + } catch (FileNotFoundException fnfe) { + numNotFounds++; + processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f); + } catch (Throwable t) { + processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f); + } + } + + if (maxMtimeIdx >= 0) { + return mrNodes[maxMtimeIdx].nflyStatus(); + } + + mayThrowFileNotFound(ioExceptions, numNotFounds); + throw MultipleIOException.createIOException(ioExceptions); + } + + private static void processThrowable(NflyNode nflyNode, String op, + Throwable t, List ioExceptions, + Path... f) { + final String errMsg = Arrays.toString(f) + + ": failed to " + op + " " + nflyNode.fs.getUri(); + final IOException ioex; + if (t instanceof FileNotFoundException) { + ioex = new FileNotFoundException(errMsg); + ioex.initCause(t); + } else { + ioex = new IOException(errMsg, t); + } + + if (ioExceptions != null) { + ioExceptions.add(ioex); + } + } + + /** + * Initializes an nfly mountpoint in viewfs. + * + * @param uris destinations to replicate writes to + * @param conf file system configuration + * @param settings comma-separated list of k=v pairs. + * @return an Nfly filesystem + * @throws IOException + */ + static FileSystem createFileSystem(URI[] uris, Configuration conf, + String settings) throws IOException { + // assert settings != null + int minRepl = DEFAULT_MIN_REPLICATION; + EnumSet nflyFlags = EnumSet.noneOf(NflyKey.class); + final String[] kvPairs = StringUtils.split(settings); + for (String kv : kvPairs) { + final String[] kvPair = StringUtils.split(kv, '='); + if (kvPair.length != 2) { + throw new IllegalArgumentException(kv); + } + NflyKey nflyKey = NflyKey.valueOf(kvPair[0]); + switch (nflyKey) { + case minReplication: + minRepl = Integer.parseInt(kvPair[1]); + break; + case repairOnRead: + case readMostRecent: + if (Boolean.valueOf(kvPair[1])) { + nflyFlags.add(nflyKey); + } + break; + default: + throw new IllegalArgumentException(nflyKey + ": Infeasible"); + } + } + return new NflyFSystem(uris, conf, minRepl, nflyFlags); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index 158b0994332..ca1380a1096 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; @@ -186,25 +185,21 @@ public class ViewFileSystem extends FileSystem { fsState = new InodeTree(conf, authority) { @Override - protected - FileSystem getTargetFileSystem(final URI uri) + protected FileSystem getTargetFileSystem(final URI uri) throws URISyntaxException, IOException { return new ChRootedFileSystem(uri, config); } @Override - protected - FileSystem getTargetFileSystem(final INodeDir dir) + protected FileSystem getTargetFileSystem(final INodeDir dir) throws URISyntaxException { return new InternalDirOfViewFs(dir, creationTime, ugi, myUri, config); } @Override - protected - FileSystem getTargetFileSystem(URI[] mergeFsURIList) - throws URISyntaxException, UnsupportedFileSystemException { - throw new UnsupportedFileSystemException("mergefs not implemented"); - // return MergeFs.createMergeFs(mergeFsURIList, config); + protected FileSystem getTargetFileSystem(final String settings, + final URI[] uris) throws URISyntaxException, IOException { + return NflyFSystem.createFileSystem(uris, config, settings); } }; workingDir = this.getHomeDirectory(); @@ -455,8 +450,13 @@ public class ViewFileSystem extends FileSystem { private Path getChrootedPath(InodeTree.ResolveResult res, FileStatus status, Path f) throws IOException { - final String suffix = ((ChRootedFileSystem)res.targetFileSystem) - .stripOutRoot(status.getPath()); + final String suffix; + if (res.targetFileSystem instanceof ChRootedFileSystem) { + suffix = ((ChRootedFileSystem)res.targetFileSystem) + .stripOutRoot(status.getPath()); + } else { // nfly + suffix = ((NflyFSystem.NflyStatus)status).stripRoot(); + } return this.makeQualified( suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)); } @@ -501,10 +501,15 @@ public class ViewFileSystem extends FileSystem { verifyRenameStrategy(srcUri, dstUri, resSrc.targetFileSystem == resDst.targetFileSystem, renameStrategy); - ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem; - ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem; - return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath), - dstFS.fullPath(resDst.remainingPath)); + if (resSrc.targetFileSystem instanceof ChRootedFileSystem && + resDst.targetFileSystem instanceof ChRootedFileSystem) { + ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem; + ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem; + return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath), + dstFS.fullPath(resDst.remainingPath)); + } else { + return resSrc.targetFileSystem.rename(resSrc.remainingPath, resDst.remainingPath); + } } static void verifyRenameStrategy(URI srcUri, URI dstUri, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java index 364485fe29f..6a89f27fd98 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java @@ -212,8 +212,7 @@ public class ViewFs extends AbstractFileSystem { fsState = new InodeTree(conf, authority) { @Override - protected - AbstractFileSystem getTargetFileSystem(final URI uri) + protected AbstractFileSystem getTargetFileSystem(final URI uri) throws URISyntaxException, UnsupportedFileSystemException { String pathString = uri.getPath(); if (pathString.isEmpty()) { @@ -225,15 +224,14 @@ public class ViewFs extends AbstractFileSystem { } @Override - protected - AbstractFileSystem getTargetFileSystem( + protected AbstractFileSystem getTargetFileSystem( final INodeDir dir) throws URISyntaxException { return new InternalDirOfViewFs(dir, creationTime, ugi, getUri()); } @Override - protected - AbstractFileSystem getTargetFileSystem(URI[] mergeFsURIList) + protected AbstractFileSystem getTargetFileSystem(final String settings, + final URI[] mergeFsURIList) throws URISyntaxException, UnsupportedFileSystemException { throw new UnsupportedFileSystemException("mergefs not implemented yet"); // return MergeFs.createMergeFs(mergeFsURIList, config); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java index 49437924ea3..808d8b06c35 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java @@ -18,13 +18,25 @@ package org.apache.hadoop.fs.viewfs; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Before; - +import org.junit.Test; /** @@ -37,6 +49,8 @@ import org.junit.Before; */ public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest { + private static final Log LOG = + LogFactory.getLog(TestViewFileSystemLocalFileSystem.class); @Override @Before @@ -47,6 +61,65 @@ public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest { } + @Test + public void testNflyWriteSimple() throws IOException { + LOG.info("Starting testNflyWriteSimple"); + final URI[] testUris = new URI[] { + URI.create(targetTestRoot + "/nfwd1"), + URI.create(targetTestRoot + "/nfwd2") + }; + final String testFileName = "test.txt"; + final Configuration testConf = new Configuration(conf); + final String testString = "Hello Nfly!"; + final Path nflyRoot = new Path("/nflyroot"); + ConfigUtil.addLinkNfly(testConf, nflyRoot.toString(), testUris); + final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf); + + final FSDataOutputStream fsDos = nfly.create( + new Path(nflyRoot, "test.txt")); + try { + fsDos.writeUTF(testString); + } finally { + fsDos.close(); + } + + FileStatus[] statuses = nfly.listStatus(nflyRoot); + + FileSystem lfs = FileSystem.getLocal(testConf); + for (final URI testUri : testUris) { + final Path testFile = new Path(new Path(testUri), testFileName); + assertTrue(testFile + " should exist!", lfs.exists(testFile)); + final FSDataInputStream fsdis = lfs.open(testFile); + try { + assertEquals("Wrong file content", testString, fsdis.readUTF()); + } finally { + fsdis.close(); + } + } + } + + + @Test + public void testNflyInvalidMinReplication() throws Exception { + LOG.info("Starting testNflyInvalidMinReplication"); + final URI[] testUris = new URI[] { + URI.create(targetTestRoot + "/nfwd1"), + URI.create(targetTestRoot + "/nfwd2") + }; + + final Configuration conf = new Configuration(); + ConfigUtil.addLinkNfly(conf, "mt", "/nflyroot", "minReplication=4", + testUris); + try { + FileSystem.get(URI.create("viewfs://mt/"), conf); + fail("Expected bad minReplication exception."); + } catch (IOException ioe) { + assertTrue("No minReplication message", + ioe.getMessage().contains("Minimum replication")); + } + } + + @Override @After public void tearDown() throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java index 895ae0cce2e..136837fc801 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java @@ -24,7 +24,6 @@ import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.junit.Test; public class TestViewFsConfig { @@ -43,23 +42,21 @@ public class TestViewFsConfig { new InodeTree(conf, null) { @Override - protected Foo getTargetFileSystem(final URI uri) - throws URISyntaxException, UnsupportedFileSystemException { + protected Foo getTargetFileSystem(final URI uri) { return null; } @Override - protected Foo getTargetFileSystem( - org.apache.hadoop.fs.viewfs.InodeTree.INodeDir dir) - throws URISyntaxException { + protected Foo getTargetFileSystem(final INodeDir dir) { return null; } @Override - protected Foo getTargetFileSystem(URI[] mergeFsURIList) - throws URISyntaxException, UnsupportedFileSystemException { + protected Foo getTargetFileSystem(final String settings, + final URI[] mergeFsURIList) { return null; } + }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java index b8f5379181c..b8bed1df84a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java @@ -17,11 +17,9 @@ */ package org.apache.hadoop.fs.viewfs; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; - import java.io.File; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.EnumSet; @@ -31,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,17 +45,26 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag; import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; + import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestViewFileSystemHdfs.class); + private static MiniDFSCluster cluster; private static Path defaultWorkingDirectory; @@ -190,12 +199,12 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest { //Verify file deletion within EZ DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true); - Assert.assertTrue("ViewFileSystem trash roots should include EZ file trash", + assertTrue("ViewFileSystem trash roots should include EZ file trash", (fsView.getTrashRoots(true).size() == 1)); //Verify deletion of EZ DFSTestUtil.verifyDelete(shell, fsTarget, zone, true); - Assert.assertTrue("ViewFileSystem trash roots should include EZ zone trash", + assertTrue("ViewFileSystem trash roots should include EZ zone trash", (fsView.getTrashRoots(true).size() == 2)); } @@ -239,14 +248,14 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest { viewFs.getFileChecksum(mountDataFilePath); FileChecksum fileChecksumViaTargetFs = fsTarget.getFileChecksum(fsTargetFilePath); - Assert.assertTrue("File checksum not matching!", + assertTrue("File checksum not matching!", fileChecksumViaViewFs.equals(fileChecksumViaTargetFs)); fileChecksumViaViewFs = viewFs.getFileChecksum(mountDataFilePath, fileLength / 2); fileChecksumViaTargetFs = fsTarget.getFileChecksum(fsTargetFilePath, fileLength / 2); - Assert.assertTrue("File checksum not matching!", + assertTrue("File checksum not matching!", fileChecksumViaViewFs.equals(fileChecksumViaTargetFs)); } @@ -269,4 +278,130 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest { e); } } + + @Test + public void testNflyClosestRepair() throws Exception { + testNflyRepair(NflyFSystem.NflyKey.repairOnRead); + } + + @Test + public void testNflyMostRecentRepair() throws Exception { + testNflyRepair(NflyFSystem.NflyKey.readMostRecent); + } + + private void testNflyRepair(NflyFSystem.NflyKey repairKey) + throws Exception { + LOG.info("Starting testNflyWriteSimpleFailover"); + final URI uri1 = targetTestRoot.toUri(); + final URI uri2 = targetTestRoot2.toUri(); + final URI[] testUris = new URI[] { + new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null), + new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null) + }; + + final Configuration testConf = new Configuration(conf); + testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + + final String testString = "Hello Nfly!"; + final Path nflyRoot = new Path("/nflyroot"); + + ConfigUtil.addLinkNfly(testConf, + Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, + nflyRoot.toString(), + "minReplication=2," + repairKey + "=true", testUris); + + final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf); + // wd = /nflyroot/user/ + nfly.setWorkingDirectory(new Path(nflyRoot + + nfly.getWorkingDirectory().toUri().getPath())); + + // 1. test mkdirs + final Path testDir = new Path("testdir1/sub1/sub3"); + final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp"); + assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir)); + + // Test renames + assertTrue(nfly.rename(testDir, testDir_tmp)); + assertTrue(nfly.rename(testDir_tmp, testDir)); + + for (final URI testUri : testUris) { + final FileSystem fs = FileSystem.get(testUri, testConf); + assertTrue(testDir + " should exist!", fs.exists(testDir)); + } + + // 2. test write + final Path testFile = new Path("test.txt"); + final FSDataOutputStream fsDos = nfly.create(testFile); + try { + fsDos.writeUTF(testString); + } finally { + fsDos.close(); + } + + for (final URI testUri : testUris) { + final FileSystem fs = FileSystem.get(testUri, testConf); + final FSDataInputStream fsdis = fs.open(testFile); + try { + assertEquals("Wrong file content", testString, fsdis.readUTF()); + } finally { + fsdis.close(); + } + } + + // 3. test reads when one unavailable + // + // bring one NN down and read through nfly should still work + // + for (int i = 0; i < cluster.getNumNameNodes(); i++) { + cluster.shutdownNameNode(i); + FSDataInputStream fsDis = null; + try { + fsDis = nfly.open(testFile); + assertEquals("Wrong file content", testString, fsDis.readUTF()); + } finally { + IOUtils.cleanupWithLogger(LOG, fsDis); + cluster.restartNameNode(i); + } + } + + // both nodes are up again, test repair + final FileSystem fs1 = FileSystem.get(testUris[0], conf); + assertTrue(fs1.delete(testFile, false)); + assertFalse(fs1.exists(testFile)); + FSDataInputStream fsDis = null; + try { + fsDis = nfly.open(testFile); + assertEquals("Wrong file content", testString, fsDis.readUTF()); + assertTrue(fs1.exists(testFile)); + } finally { + IOUtils.cleanupWithLogger(LOG, fsDis); + } + + // test most recent repair + if (repairKey == NflyFSystem.NflyKey.readMostRecent) { + final FileSystem fs2 = FileSystem.get(testUris[0], conf); + final long expectedMtime = fs2.getFileStatus(testFile) + .getModificationTime(); + + for (final URI testUri : testUris) { + final FileSystem fs = FileSystem.get(testUri, conf); + fs.setTimes(testFile, 1L, 1L); + assertEquals(testUri + "Set mtime failed!", 1L, + fs.getFileStatus(testFile).getModificationTime()); + assertEquals("nfly file status wrong", expectedMtime, + nfly.getFileStatus(testFile).getModificationTime()); + FSDataInputStream fsDis2 = null; + try { + fsDis2 = nfly.open(testFile); + assertEquals("Wrong file content", testString, fsDis2.readUTF()); + // repair is done, now trying via normal fs + // + assertEquals("Repair most recent failed!", expectedMtime, + fs.getFileStatus(testFile).getModificationTime()); + } finally { + IOUtils.cleanupWithLogger(LOG, fsDis2); + } + } + } + } }