HDFS-15387. FSUsage#DF should consider ViewFSOverloadScheme in processPath. Contributed by Uma Maheswara Rao G.
(cherry picked from commit785b1def95
) (cherry picked from commit120ee793fc
)
This commit is contained in:
parent
f7e590aaff
commit
100c13967e
|
@ -128,7 +128,8 @@ class FsUsage extends FsCommand {
|
|||
|
||||
@Override
|
||||
protected void processPath(PathData item) throws IOException {
|
||||
if (ViewFileSystemUtil.isViewFileSystem(item.fs)) {
|
||||
if (ViewFileSystemUtil.isViewFileSystem(item.fs)
|
||||
|| ViewFileSystemUtil.isViewFileSystemOverloadScheme(item.fs)) {
|
||||
ViewFileSystem viewFileSystem = (ViewFileSystem) item.fs;
|
||||
Map<ViewFileSystem.MountPoint, FsStatus> fsStatusMap =
|
||||
ViewFileSystemUtil.getStatus(viewFileSystem, item.path);
|
||||
|
|
|
@ -51,6 +51,17 @@ public final class ViewFileSystemUtil {
|
|||
return fileSystem.getScheme().equals(FsConstants.VIEWFS_SCHEME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the FileSystem is a ViewFileSystemOverloadScheme.
|
||||
*
|
||||
* @param fileSystem
|
||||
* @return true if the fileSystem is ViewFileSystemOverloadScheme
|
||||
*/
|
||||
public static boolean isViewFileSystemOverloadScheme(
|
||||
final FileSystem fileSystem) {
|
||||
return fileSystem instanceof ViewFileSystemOverloadScheme;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get FsStatus for all ViewFsMountPoints matching path for the given
|
||||
* ViewFileSystem.
|
||||
|
@ -93,7 +104,8 @@ public final class ViewFileSystemUtil {
|
|||
*/
|
||||
public static Map<MountPoint, FsStatus> getStatus(
|
||||
FileSystem fileSystem, Path path) throws IOException {
|
||||
if (!isViewFileSystem(fileSystem)) {
|
||||
if (!(isViewFileSystem(fileSystem)
|
||||
|| isViewFileSystemOverloadScheme(fileSystem))) {
|
||||
throw new UnsupportedFileSystemException("FileSystem '"
|
||||
+ fileSystem.getUri() + "'is not a ViewFileSystem.");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.tools;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Scanner;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
import org.apache.hadoop.fs.FsShell;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFsTestSetup;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Tests HDFS commands with ViewFileSystemOverloadScheme with configured mount
|
||||
* links.
|
||||
*/
|
||||
public class TestViewFileSystemOverloadSchemeWithFSCommands {
|
||||
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
|
||||
private static final String HDFS_SCHEME = "hdfs";
|
||||
private Configuration conf = null;
|
||||
private MiniDFSCluster cluster = null;
|
||||
private URI defaultFSURI;
|
||||
private File localTargetDir;
|
||||
private static final String TEST_ROOT_DIR = PathUtils
|
||||
.getTestDirName(TestViewFileSystemOverloadSchemeWithFSCommands.class);
|
||||
private static final String HDFS_USER_FOLDER = "/HDFSUser";
|
||||
private static final String LOCAL_FOLDER = "/local";
|
||||
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
private final ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
private static final PrintStream OLD_OUT = System.out;
|
||||
private static final PrintStream OLD_ERR = System.err;
|
||||
|
||||
/**
|
||||
* Sets up the configurations and starts the MiniDFSCluster.
|
||||
*/
|
||||
@Before
|
||||
public void startCluster() throws IOException {
|
||||
conf = new Configuration();
|
||||
conf.setInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
|
||||
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
|
||||
ViewFileSystemOverloadScheme.class.getName());
|
||||
conf.set(String.format(
|
||||
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
|
||||
HDFS_SCHEME), DistributedFileSystem.class.getName());
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
cluster.waitClusterUp();
|
||||
defaultFSURI =
|
||||
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
|
||||
localTargetDir = new File(TEST_ROOT_DIR, "/root/");
|
||||
Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
try {
|
||||
System.out.flush();
|
||||
System.err.flush();
|
||||
} finally {
|
||||
System.setOut(OLD_OUT);
|
||||
System.setErr(OLD_ERR);
|
||||
}
|
||||
if (cluster != null) {
|
||||
FileSystem.closeAll();
|
||||
cluster.shutdown();
|
||||
}
|
||||
resetStream();
|
||||
}
|
||||
|
||||
private void redirectStream() {
|
||||
System.setOut(new PrintStream(out));
|
||||
System.setErr(new PrintStream(err));
|
||||
}
|
||||
|
||||
private void resetStream() {
|
||||
out.reset();
|
||||
err.reset();
|
||||
}
|
||||
|
||||
private static void scanIntoList(final ByteArrayOutputStream baos,
|
||||
final List<String> list) {
|
||||
final Scanner scanner = new Scanner(baos.toString());
|
||||
while (scanner.hasNextLine()) {
|
||||
list.add(scanner.nextLine());
|
||||
}
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given mount links to config. sources contains mount link src and
|
||||
* the respective index location in targets contains the target uri.
|
||||
*/
|
||||
void addMountLinks(String mountTable, String[] sources, String[] targets,
|
||||
Configuration config) throws IOException, URISyntaxException {
|
||||
ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests DF with ViewFSOverloadScheme.
|
||||
*/
|
||||
@Test
|
||||
public void testDFWithViewFsOverloadScheme() throws Exception {
|
||||
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
|
||||
List<String> mounts = Lists.newArrayList();
|
||||
mounts.add(HDFS_USER_FOLDER);
|
||||
mounts.add(LOCAL_FOLDER);
|
||||
addMountLinks(defaultFSURI.getAuthority(),
|
||||
mounts.toArray(new String[mounts.size()]),
|
||||
new String[] {hdfsTargetPath.toUri().toString(),
|
||||
localTargetDir.toURI().toString() },
|
||||
conf);
|
||||
FsShell fsShell = new FsShell(conf);
|
||||
try {
|
||||
redirectStream();
|
||||
int ret =
|
||||
ToolRunner.run(fsShell, new String[] {"-fs", defaultFSURI.toString(),
|
||||
"-df", "-h", defaultFSURI.toString() + "/" });
|
||||
assertEquals(0, ret);
|
||||
final List<String> errList = Lists.newArrayList();
|
||||
scanIntoList(out, errList);
|
||||
assertEquals(3, errList.size());
|
||||
for (int i = 1; i < errList.size(); i++) {
|
||||
String[] lineSplits = errList.get(i).split("\\s+");
|
||||
String mount = lineSplits[lineSplits.length - 1];
|
||||
mounts.remove(mount);
|
||||
}
|
||||
String msg =
|
||||
"DF was not calculated on all mounts. The left out mounts are: "
|
||||
+ mounts;
|
||||
assertEquals(msg, 0, mounts.size());
|
||||
} finally {
|
||||
fsShell.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue