HDFS-15322. Make NflyFS to work when ViewFsOverloadScheme's scheme and target uris schemes are same. Contributed by Uma Maheswara Rao G.
(cherry picked from commit 4734c77b4b64b7c6432da4cc32881aba85f94ea1) (cherry picked from commit 8e71e85af70c17f2350f794f8bc2475eb1e3acea)
This commit is contained in:
parent
7015589f58
commit
6ae92962d9
@ -135,6 +135,17 @@ public static void addLinkMerge(Configuration conf, final URI[] targets) {
|
||||
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add nfly link to configuration for the given mount table.
|
||||
*/
|
||||
public static void addLinkNfly(Configuration conf, String mountTableName,
|
||||
String src, String settings, final String targets) {
|
||||
conf.set(
|
||||
getConfigViewFsPrefix(mountTableName) + "."
|
||||
+ Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
|
||||
targets);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param conf
|
||||
@ -149,9 +160,7 @@ public static void addLinkNfly(Configuration conf, String mountTableName,
|
||||
settings = settings == null
|
||||
? "minReplication=2,repairOnRead=true"
|
||||
: settings;
|
||||
|
||||
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
|
||||
Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
|
||||
addLinkNfly(conf, mountTableName, src, settings,
|
||||
StringUtils.uriToString(targets));
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.viewfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
/**
|
||||
* File system instance getter.
|
||||
*/
|
||||
@Private
|
||||
class FsGetter {
|
||||
|
||||
/**
|
||||
* Gets new file system instance of given uri.
|
||||
*/
|
||||
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
return FileSystem.newInstance(uri, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system instance of given uri.
|
||||
*/
|
||||
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||
return FileSystem.get(uri, conf);
|
||||
}
|
||||
}
|
@ -59,8 +59,7 @@ public void load(String mountTableConfigPath, Configuration conf)
|
||||
throws IOException {
|
||||
this.mountTable = new Path(mountTableConfigPath);
|
||||
String scheme = mountTable.toUri().getScheme();
|
||||
ViewFileSystem.FsGetter fsGetter =
|
||||
new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
|
||||
FsGetter fsGetter = new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
|
||||
try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
|
||||
RemoteIterator<LocatedFileStatus> listFiles =
|
||||
fs.listFiles(mountTable, false);
|
||||
|
@ -212,6 +212,21 @@ private static String getRack(String rackString) {
|
||||
*/
|
||||
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
||||
EnumSet<NflyKey> nflyFlags) throws IOException {
|
||||
this(uris, conf, minReplication, nflyFlags, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new Nfly instance.
|
||||
*
|
||||
* @param uris the list of uris in the mount point
|
||||
* @param conf configuration object
|
||||
* @param minReplication minimum copies to commit a write op
|
||||
* @param nflyFlags modes such readMostRecent
|
||||
* @param fsGetter to get the file system instance with the given uri
|
||||
* @throws IOException
|
||||
*/
|
||||
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
||||
EnumSet<NflyKey> nflyFlags, FsGetter fsGetter) throws IOException {
|
||||
if (uris.length < minReplication) {
|
||||
throw new IOException(minReplication + " < " + uris.length
|
||||
+ ": Minimum replication < #destinations");
|
||||
@ -238,8 +253,14 @@ private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
|
||||
nodes = new NflyNode[uris.length];
|
||||
final Iterator<String> rackIter = rackStrings.iterator();
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
|
||||
conf);
|
||||
if (fsGetter != null) {
|
||||
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(),
|
||||
new ChRootedFileSystem(fsGetter.getNewInstance(uris[i], conf),
|
||||
uris[i]));
|
||||
} else {
|
||||
nodes[i] =
|
||||
new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], conf);
|
||||
}
|
||||
}
|
||||
// sort all the uri's by distance from myNode, the local file system will
|
||||
// automatically be the the first one.
|
||||
@ -921,7 +942,7 @@ private static void processThrowable(NflyNode nflyNode, String op,
|
||||
* @throws IOException
|
||||
*/
|
||||
static FileSystem createFileSystem(URI[] uris, Configuration conf,
|
||||
String settings) throws IOException {
|
||||
String settings, FsGetter fsGetter) throws IOException {
|
||||
// assert settings != null
|
||||
int minRepl = DEFAULT_MIN_REPLICATION;
|
||||
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
|
||||
@ -946,6 +967,6 @@ static FileSystem createFileSystem(URI[] uris, Configuration conf,
|
||||
throw new IllegalArgumentException(nflyKey + ": Infeasible");
|
||||
}
|
||||
}
|
||||
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
|
||||
return new NflyFSystem(uris, conf, minRepl, nflyFlags, fsGetter);
|
||||
}
|
||||
}
|
||||
|
@ -94,27 +94,6 @@ static AccessControlException readOnlyMountTable(final String operation,
|
||||
return readOnlyMountTable(operation, p.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* File system instance getter.
|
||||
*/
|
||||
static class FsGetter {
|
||||
|
||||
/**
|
||||
* Gets new file system instance of given uri.
|
||||
*/
|
||||
public FileSystem getNewInstance(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
return FileSystem.newInstance(uri, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system instance of given uri.
|
||||
*/
|
||||
public FileSystem get(URI uri, Configuration conf) throws IOException {
|
||||
return FileSystem.get(uri, conf);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets file system creator instance.
|
||||
*/
|
||||
@ -314,7 +293,8 @@ protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
|
||||
@Override
|
||||
protected FileSystem getTargetFileSystem(final String settings,
|
||||
final URI[] uris) throws URISyntaxException, IOException {
|
||||
return NflyFSystem.createFileSystem(uris, config, settings);
|
||||
return NflyFSystem.createFileSystem(uris, config, settings,
|
||||
fsGetter);
|
||||
}
|
||||
};
|
||||
workingDir = this.getHomeDirectory();
|
||||
|
@ -50,7 +50,6 @@
|
||||
import org.apache.hadoop.fs.permission.AclUtil;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
||||
/**
|
||||
@ -146,7 +147,8 @@ static void addMountLinksToFile(String mountTable, String[] sources,
|
||||
throws IOException, URISyntaxException {
|
||||
ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
|
||||
mountTableConfPath.toUri().getScheme());
|
||||
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
|
||||
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(),
|
||||
conf)) {
|
||||
try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
|
||||
String prefix =
|
||||
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
|
||||
@ -158,17 +160,23 @@ static void addMountLinksToFile(String mountTable, String[] sources,
|
||||
for (int i = 0; i < sources.length; i++) {
|
||||
String src = sources[i];
|
||||
String target = targets[i];
|
||||
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
|
||||
out.writeBytes("<property><name>");
|
||||
if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
|
||||
if (isNfly) {
|
||||
String[] srcParts = src.split("[.]");
|
||||
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
|
||||
String actualSrc = srcParts[srcParts.length - 1];
|
||||
String params = srcParts[srcParts.length - 2];
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_NFLY + "."
|
||||
+ params + "." + actualSrc);
|
||||
} else if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
|
||||
out.writeBytes("</name>");
|
||||
} else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
|
||||
out.writeBytes("</name>");
|
||||
} else {
|
||||
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
|
||||
out.writeBytes("</name>");
|
||||
}
|
||||
out.writeBytes("</name>");
|
||||
out.writeBytes("<value>");
|
||||
out.writeBytes(target);
|
||||
out.writeBytes("</value></property>");
|
||||
@ -191,7 +199,15 @@ static void addMountLinksToConf(String mountTable, String[] sources,
|
||||
String target = targets[i];
|
||||
String mountTableName = mountTable == null ?
|
||||
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
|
||||
if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
|
||||
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
|
||||
if (isNfly) {
|
||||
String[] srcParts = src.split("[.]");
|
||||
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
|
||||
String actualSrc = srcParts[srcParts.length - 1];
|
||||
String params = srcParts[srcParts.length - 2];
|
||||
ConfigUtil.addLinkNfly(config, mountTableName, actualSrc, params,
|
||||
target);
|
||||
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
|
||||
ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
|
||||
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
|
||||
ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));
|
||||
|
@ -17,6 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.fs.viewfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -24,6 +28,9 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
@ -44,6 +51,7 @@
|
||||
* Tests ViewFileSystemOverloadScheme with configured mount links.
|
||||
*/
|
||||
public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
|
||||
private static final String TEST_STRING = "Hello ViewFSOverloadedScheme!";
|
||||
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
|
||||
private static final String HDFS_SCHEME = "hdfs";
|
||||
private Configuration conf = null;
|
||||
@ -63,6 +71,8 @@ public void startCluster() throws IOException {
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
|
||||
true);
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
|
||||
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
|
||||
ViewFileSystemOverloadScheme.class.getName());
|
||||
conf.set(String.format(
|
||||
@ -438,6 +448,117 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the rename with nfly mount link.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyRename() throws Exception {
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
|
||||
|
||||
final Path testDir = new Path("/nflyroot/testdir1/sub1/sub3");
|
||||
final Path testDirTmp = new Path("/nflyroot/testdir1/sub1/sub3_temp");
|
||||
assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
|
||||
|
||||
// Test renames
|
||||
assertTrue(nfly.rename(testDir, testDirTmp));
|
||||
assertTrue(nfly.rename(testDirTmp, testDir));
|
||||
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
for (final URI testUri : testUris) {
|
||||
final FileSystem fs = FileSystem.get(testUri, conf);
|
||||
assertTrue(testDir + " should exist!", fs.exists(testDir));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the write and read contents with nfly mount link.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyWriteRead() throws Exception {
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
|
||||
final Path testFile = new Path("/nflyroot/test.txt");
|
||||
writeString(nfly, TEST_STRING, testFile);
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
for (final URI testUri : testUris) {
|
||||
try (FileSystem fs = FileSystem.get(testUri, conf)) {
|
||||
readString(fs, testFile, TEST_STRING, testUri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. Writes contents with nfly link having two target uris. 2. Deletes one
|
||||
* target file. 3. Tests the read works with repairOnRead flag. 4. Tests that
|
||||
* previously deleted file fully recovered and exists.
|
||||
*/
|
||||
@Test(timeout = 3000)
|
||||
public void testNflyRepair() throws Exception {
|
||||
final NflyFSystem.NflyKey repairKey = NflyFSystem.NflyKey.repairOnRead;
|
||||
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
|
||||
final URI uri1 = hdfsTargetPath1.toUri();
|
||||
final URI uri2 = hdfsTargetPath2.toUri();
|
||||
final Path nflyRoot = new Path("/nflyroot");
|
||||
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
|
||||
+ ".minReplication=2," + repairKey + "=true." + nflyRoot.toString();
|
||||
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
|
||||
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
|
||||
try (FileSystem nfly = FileSystem.get(defaultFSURI, conf)) {
|
||||
// write contents to nfly
|
||||
final Path testFilePath = new Path("/nflyroot/test.txt");
|
||||
writeString(nfly, TEST_STRING, testFilePath);
|
||||
|
||||
final URI[] testUris = new URI[] {uri1, uri2 };
|
||||
// both nodes are up again, test repair
|
||||
FsGetter getter = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs");
|
||||
try (FileSystem fs1 = getter.getNewInstance(testUris[0], conf)) {
|
||||
// Delete a file from one target URI
|
||||
String testFile = "/test.txt";
|
||||
assertTrue(
|
||||
fs1.delete(new Path(testUris[0].toString() + testFile), false));
|
||||
assertFalse(fs1.exists(new Path(testUris[0].toString() + testFile)));
|
||||
|
||||
// Verify read success.
|
||||
readString(nfly, testFilePath, TEST_STRING, testUris[0]);
|
||||
// Verify file recovered.
|
||||
assertTrue(fs1.exists(new Path(testUris[0].toString() + testFile)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeString(final FileSystem nfly, final String testString,
|
||||
final Path testFile) throws IOException {
|
||||
try (FSDataOutputStream fsDos = nfly.create(testFile)) {
|
||||
fsDos.writeUTF(testString);
|
||||
}
|
||||
}
|
||||
|
||||
private void readString(final FileSystem nfly, final Path testFile,
|
||||
final String testString, final URI testUri) throws IOException {
|
||||
try (FSDataInputStream fsDis = nfly.open(testFile)) {
|
||||
assertEquals("Wrong file content", testString, fsDis.readUTF());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return configuration.
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user