HDFS-15322. Make NflyFS to work when ViewFsOverloadScheme's scheme and target uris schemes are same. Contributed by Uma Maheswara Rao G.
This commit is contained in:
parent
52b21de1d8
commit
4734c77b4b
|
@ -135,6 +135,17 @@ public class ConfigUtil {
|
|||
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add nfly link to configuration for the given mount table.
|
||||
*/
|
||||
public static void addLinkNfly(Configuration conf, String mountTableName,
|
||||
String src, String settings, final String targets) {
|
||||
conf.set(
|
||||
getConfigViewFsPrefix(mountTableName) + "."
|
||||
+ Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
|
||||
targets);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param conf
|
||||
|
@ -149,9 +160,7 @@ public class ConfigUtil {
|
|||
settings = settings == null
|
||||
? "minReplication=2,repairOnRead=true"
|
||||
: settings;
|
||||
|
||||
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
|
||||
Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
|
||||
addLinkNfly(conf, mountTableName, src, settings,
|
||||
StringUtils.uriToString(targets));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.viewfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
/**
|
||||
* File system instance getter.
|
||||
*/
|
||||
@Private
|
||||
class FsGetter {
|
||||
|
||||
/**
|
||||
* Gets new file system instance of given uri.
|
||||
*/
|
||||
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
return FileSystem.newInstance(uri, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system instance of given uri.
|
||||
*/
|
||||
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||
return FileSystem.get(uri, conf);
|
||||
}
|
||||
}
|
|
@ -59,8 +59,7 @@ public class HCFSMountTableConfigLoader implements MountTableConfigLoader {
|
|||
throws IOException {
|
||||
this.mountTable = new Path(mountTableConfigPath);
|
||||
String scheme = mountTable.toUri().getScheme();
|
||||
ViewFileSystem.FsGetter fsGetter =
|
||||
new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
|
||||
FsGetter fsGetter = new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
|
||||
try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
|
||||
RemoteIterator<LocatedFileStatus> listFiles =
|
||||
fs.listFiles(mountTable, false);
|
||||
|
|
|
@ -212,6 +212,21 @@ final class NflyFSystem extends FileSystem {
|
|||
*/
|
||||
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
||||
EnumSet<NflyKey> nflyFlags) throws IOException {
|
||||
this(uris, conf, minReplication, nflyFlags, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new Nfly instance.
|
||||
*
|
||||
* @param uris the list of uris in the mount point
|
||||
* @param conf configuration object
|
||||
* @param minReplication minimum copies to commit a write op
|
||||
* @param nflyFlags modes such readMostRecent
|
||||
* @param fsGetter to get the file system instance with the given uri
|
||||
* @throws IOException
|
||||
*/
|
||||
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
||||
EnumSet<NflyKey> nflyFlags, FsGetter fsGetter) throws IOException {
|
||||
if (uris.length < minReplication) {
|
||||
throw new IOException(minReplication + " < " + uris.length
|
||||
+ ": Minimum replication < #destinations");
|
||||
|
@ -238,8 +253,14 @@ final class NflyFSystem extends FileSystem {
|
|||
nodes = new NflyNode[uris.length];
|
||||
final Iterator<String> rackIter = rackStrings.iterator();
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
|
||||
conf);
|
||||
if (fsGetter != null) {
|
||||
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(),
|
||||
new ChRootedFileSystem(fsGetter.getNewInstance(uris[i], conf),
|
||||
uris[i]));
|
||||
} else {
|
||||
nodes[i] =
|
||||
new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], conf);
|
||||
}
|
||||
}
|
||||
// sort all the uri's by distance from myNode, the local file system will
|
||||
// automatically be the the first one.
|
||||
|
@ -921,7 +942,7 @@ final class NflyFSystem extends FileSystem {
|
|||
* @throws IOException
|
||||
*/
|
||||
static FileSystem createFileSystem(URI[] uris, Configuration conf,
|
||||
String settings) throws IOException {
|
||||
String settings, FsGetter fsGetter) throws IOException {
|
||||
// assert settings != null
|
||||
int minRepl = DEFAULT_MIN_REPLICATION;
|
||||
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
|
||||
|
@ -946,6 +967,6 @@ final class NflyFSystem extends FileSystem {
|
|||
throw new IllegalArgumentException(nflyKey + ": Infeasible");
|
||||
}
|
||||
}
|
||||
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
|
||||
return new NflyFSystem(uris, conf, minRepl, nflyFlags, fsGetter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,27 +96,6 @@ public class ViewFileSystem extends FileSystem {
|
|||
return readOnlyMountTable(operation, p.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* File system instance getter.
|
||||
*/
|
||||
static class FsGetter {
|
||||
|
||||
/**
|
||||
* Gets new file system instance of given uri.
|
||||
*/
|
||||
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
return FileSystem.newInstance(uri, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system instance of given uri.
|
||||
*/
|
||||
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||
return FileSystem.get(uri, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system creator instance.
|
||||
*/
|
||||
|
@ -316,7 +295,8 @@ public class ViewFileSystem extends FileSystem {
|
|||
@Override
|
||||
protected FileSystem getTargetFileSystem(final String settings,
|
||||
final URI[] uris) throws URISyntaxException, IOException {
|
||||
return NflyFSystem.createFileSystem(uris, config, settings);
|
||||
return NflyFSystem.createFileSystem(uris, config, settings,
|
||||
fsGetter);
|
||||
}
|
||||
};
|
||||
workingDir = this.getHomeDirectory();
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.fs.permission.AclStatus;
|
|||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -146,7 +147,8 @@ public class ViewFsTestSetup {
|
|||
throws IOException, URISyntaxException {
|
||||
ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
|
||||
mountTableConfPath.toUri().getScheme());
|
||||
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
|
||||
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(),
|
||||
conf)) {
|
||||
try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
|
||||
String prefix =
|
||||
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
|
||||
|
@ -158,17 +160,23 @@ public class ViewFsTestSetup {
|
|||
for (int i = 0; i < sources.length; i++) {
|
||||
String src = sources[i];
|
||||
String target = targets[i];
|
||||
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
|
||||
out.writeBytes("<property><name>");
|
||||
if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
|
||||
if (isNfly) {
|
||||
String[] srcParts = src.split("[.]");
|
||||
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
|
||||
String actualSrc = srcParts[srcParts.length - 1];
|
||||
String params = srcParts[srcParts.length - 2];
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_NFLY + "."
|
||||
+ params + "." + actualSrc);
|
||||
} else if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
|
||||
out.writeBytes("</name>");
|
||||
} else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
|
||||
out.writeBytes("</name>");
|
||||
} else {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
|
||||
out.writeBytes("</name>");
|
||||
}
|
||||
out.writeBytes("</name>");
|
||||
out.writeBytes("<value>");
|
||||
out.writeBytes(target);
|
||||
out.writeBytes("</value></property>");
|
||||
|
@ -191,7 +199,15 @@ public class ViewFsTestSetup {
|
|||
String target = targets[i];
|
||||
String mountTableName = mountTable == null ?
|
||||
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
|
||||
if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
|
||||
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
|
||||
if (isNfly) {
|
||||
String[] srcParts = src.split("[.]");
|
||||
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
|
||||
String actualSrc = srcParts[srcParts.length - 1];
|
||||
String params = srcParts[srcParts.length - 2];
|
||||
ConfigUtil.addLinkNfly(config, mountTableName, actualSrc, params,
|
||||
target);
|
||||
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
|
||||
ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
|
||||
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
|
||||
ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.viewfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
@ -24,6 +28,9 @@ import java.net.URISyntaxException;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
|
@ -44,6 +51,7 @@ import org.junit.Test;
|
|||
* Tests ViewFileSystemOverloadScheme with configured mount links.
|
||||
*/
|
||||
public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
|
||||
private static final String TEST_STRING = "Hello ViewFSOverloadedScheme!";
|
||||
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
|
||||
private static final String HDFS_SCHEME = "hdfs";
|
||||
private Configuration conf = null;
|
||||
|
@ -63,6 +71,8 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
|
|||
conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||
true);
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
|
||||
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
|
||||
ViewFileSystemOverloadScheme.class.getName());
|
||||
conf.set(String.format(
|
||||
|
@ -438,6 +448,117 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the rename with nfly mount link.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyRename() throws Exception {
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
|
||||
|
||||
final Path testDir = new Path("/nflyroot/testdir1/sub1/sub3");
|
||||
final Path testDirTmp = new Path("/nflyroot/testdir1/sub1/sub3_temp");
|
||||
assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
|
||||
|
||||
// Test renames
|
||||
assertTrue(nfly.rename(testDir, testDirTmp));
|
||||
assertTrue(nfly.rename(testDirTmp, testDir));
|
||||
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
for (final URI testUri : testUris) {
|
||||
final FileSystem fs = FileSystem.get(testUri, conf);
|
||||
assertTrue(testDir + " should exist!", fs.exists(testDir));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the write and read contents with nfly mount link.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyWriteRead() throws Exception {
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
|
||||
final Path testFile = new Path("/nflyroot/test.txt");
|
||||
writeString(nfly, TEST_STRING, testFile);
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
for (final URI testUri : testUris) {
|
||||
try (FileSystem fs = FileSystem.get(testUri, conf)) {
|
||||
readString(fs, testFile, TEST_STRING, testUri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. Writes contents with nfly link having two target uris. 2. Deletes one
|
||||
* target file. 3. Tests the read works with repairOnRead flag. 4. Tests that
|
||||
* previously deleted file fully recovered and exists.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyRepair() throws Exception {
|
||||
final NflyFSystem.NflyKey repairKey = NflyFSystem.NflyKey.repairOnRead;
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2," + repairKey + "=true." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
try (FileSystem nfly = FileSystem.get(defaultFSURI, conf)) {
|
||||
// write contents to nfly
|
||||
final Path testFilePath = new Path("/nflyroot/test.txt");
|
||||
writeString(nfly, TEST_STRING, testFilePath);
|
||||
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
// both nodes are up again, test repair
|
||||
FsGetter getter = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs");
|
||||
try (FileSystem fs1 = getter.getNewInstance(testUris[0], conf)) {
|
||||
// Delete a file from one target URI
|
||||
String testFile = "/test.txt";
|
||||
assertTrue(
|
||||
fs1.delete(new Path(testUris[0].toString() + testFile), false));
|
||||
assertFalse(fs1.exists(new Path(testUris[0].toString() + testFile)));
|
||||
|
||||
// Verify read success.
|
||||
readString(nfly, testFilePath, TEST_STRING, testUris[0]);
|
||||
// Verify file recovered.
|
||||
assertTrue(fs1.exists(new Path(testUris[0].toString() + testFile)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeString(final FileSystem nfly, final String testString,
|
||||
final Path testFile) throws IOException {
|
||||
try (FSDataOutputStream fsDos = nfly.create(testFile)) {
|
||||
fsDos.writeUTF(testString);
|
||||
}
|
||||
}
|
||||
|
||||
private void readString(final FileSystem nfly, final Path testFile,
|
||||
final String testString, final URI testUri) throws IOException {
|
||||
try (FSDataInputStream fsDis = nfly.open(testFile)) {
|
||||
assertEquals("Wrong file content", testString, fsDis.readUTF());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return configuration.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue