HDFS-15387. FSUsage#DF should consider ViewFSOverloadScheme in processPath. Contributed by Uma Maheswara Rao G.

This commit is contained in:
Uma Maheswara Rao G 2020-06-12 14:32:19 -07:00 committed by GitHub
parent e154084770
commit 785b1def95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 188 additions and 2 deletions

View File

@ -128,7 +128,8 @@ class FsUsage extends FsCommand {
@Override
protected void processPath(PathData item) throws IOException {
if (ViewFileSystemUtil.isViewFileSystem(item.fs)) {
if (ViewFileSystemUtil.isViewFileSystem(item.fs)
|| ViewFileSystemUtil.isViewFileSystemOverloadScheme(item.fs)) {
ViewFileSystem viewFileSystem = (ViewFileSystem) item.fs;
Map<ViewFileSystem.MountPoint, FsStatus> fsStatusMap =
ViewFileSystemUtil.getStatus(viewFileSystem, item.path);

View File

@ -51,6 +51,17 @@ public final class ViewFileSystemUtil {
return fileSystem.getScheme().equals(FsConstants.VIEWFS_SCHEME);
}
/**
* Check if the FileSystem is a ViewFileSystemOverloadScheme.
*
* @param fileSystem
* @return true if the fileSystem is ViewFileSystemOverloadScheme
*/
public static boolean isViewFileSystemOverloadScheme(
final FileSystem fileSystem) {
return fileSystem instanceof ViewFileSystemOverloadScheme;
}
/**
* Get FsStatus for all ViewFsMountPoints matching path for the given
* ViewFileSystem.
@ -93,7 +104,8 @@ public final class ViewFileSystemUtil {
*/
public static Map<MountPoint, FsStatus> getStatus(
FileSystem fileSystem, Path path) throws IOException {
if (!isViewFileSystem(fileSystem)) {
if (!(isViewFileSystem(fileSystem)
|| isViewFileSystemOverloadScheme(fileSystem))) {
throw new UnsupportedFileSystemException("FileSystem '"
+ fileSystem.getUri() + "'is not a ViewFileSystem.");
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme;
import org.apache.hadoop.fs.viewfs.ViewFsTestSetup;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* Tests HDFS commands with ViewFileSystemOverloadScheme with configured mount
* links.
*/
public class TestViewFileSystemOverloadSchemeWithFSCommands {
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
private static final String HDFS_SCHEME = "hdfs";
private Configuration conf = null;
private MiniDFSCluster cluster = null;
private URI defaultFSURI;
private File localTargetDir;
private static final String TEST_ROOT_DIR = PathUtils
.getTestDirName(TestViewFileSystemOverloadSchemeWithFSCommands.class);
private static final String HDFS_USER_FOLDER = "/HDFSUser";
private static final String LOCAL_FOLDER = "/local";
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private final ByteArrayOutputStream err = new ByteArrayOutputStream();
private static final PrintStream OLD_OUT = System.out;
private static final PrintStream OLD_ERR = System.err;
/**
* Sets up the configurations and starts the MiniDFSCluster.
*/
@Before
public void startCluster() throws IOException {
conf = new Configuration();
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
ViewFileSystemOverloadScheme.class.getName());
conf.set(String.format(
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
HDFS_SCHEME), DistributedFileSystem.class.getName());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitClusterUp();
defaultFSURI =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
localTargetDir = new File(TEST_ROOT_DIR, "/root/");
Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
}
@After
public void tearDown() throws IOException {
try {
System.out.flush();
System.err.flush();
} finally {
System.setOut(OLD_OUT);
System.setErr(OLD_ERR);
}
if (cluster != null) {
FileSystem.closeAll();
cluster.shutdown();
}
resetStream();
}
private void redirectStream() {
System.setOut(new PrintStream(out));
System.setErr(new PrintStream(err));
}
private void resetStream() {
out.reset();
err.reset();
}
private static void scanIntoList(final ByteArrayOutputStream baos,
final List<String> list) {
final Scanner scanner = new Scanner(baos.toString());
while (scanner.hasNextLine()) {
list.add(scanner.nextLine());
}
scanner.close();
}
/**
* Adds the given mount links to config. sources contains mount link src and
* the respective index location in targets contains the target uri.
*/
void addMountLinks(String mountTable, String[] sources, String[] targets,
Configuration config) throws IOException, URISyntaxException {
ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
}
/**
* Tests DF with ViewFSOverloadScheme.
*/
@Test
public void testDFWithViewFsOverloadScheme() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
List<String> mounts = Lists.newArrayList();
mounts.add(HDFS_USER_FOLDER);
mounts.add(LOCAL_FOLDER);
addMountLinks(defaultFSURI.getAuthority(),
mounts.toArray(new String[mounts.size()]),
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() },
conf);
FsShell fsShell = new FsShell(conf);
try {
redirectStream();
int ret =
ToolRunner.run(fsShell, new String[] {"-fs", defaultFSURI.toString(),
"-df", "-h", defaultFSURI.toString() + "/" });
assertEquals(0, ret);
final List<String> errList = Lists.newArrayList();
scanIntoList(out, errList);
assertEquals(3, errList.size());
for (int i = 1; i < errList.size(); i++) {
String[] lineSplits = errList.get(i).split("\\s+");
String mount = lineSplits[lineSplits.length - 1];
mounts.remove(mount);
}
String msg =
"DF was not calculated on all mounts. The left out mounts are: "
+ mounts;
assertEquals(msg, 0, mounts.size());
} finally {
fsShell.close();
}
}
}