HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up split calculation (gera)

This commit is contained in:
Gera Shegalov 2015-04-21 11:57:42 -07:00
parent 725eb52ddc
commit 6d2cf9fbbd
8 changed files with 329 additions and 49 deletions

View File

@ -504,6 +504,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp
buildListing (Zoran Dimitrijevic via Colin P. McCabe) buildListing (Zoran Dimitrijevic via Colin P. McCabe)
HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up
split calculation (gera)
BUG FIXES BUG FIXES
HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of

View File

@ -32,6 +32,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
public class LocatedFileStatus extends FileStatus { public class LocatedFileStatus extends FileStatus {
private BlockLocation[] locations; private BlockLocation[] locations;
public LocatedFileStatus() {
super();
}
/** /**
* Constructor * Constructor
* @param stat a file status * @param stat a file status
@ -43,7 +48,7 @@ public class LocatedFileStatus extends FileStatus {
stat.getBlockSize(), stat.getModificationTime(), stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(), stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), null, stat.getPath(), locations); stat.getGroup(), null, stat.getPath(), locations);
if (isSymlink()) { if (stat.isSymlink()) {
setSymlink(stat.getSymlink()); setSymlink(stat.getSymlink());
} }
} }

View File

@ -37,8 +37,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
@ -241,6 +243,12 @@ class ChRootedFileSystem extends FilterFileSystem {
return super.listStatus(fullPath(f)); return super.listStatus(fullPath(f));
} }
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return super.listLocatedStatus(fullPath(f));
}
@Override @Override
public boolean mkdirs(final Path f, final FsPermission permission) public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException { throws IOException {

View File

@ -45,7 +45,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
@ -115,8 +118,7 @@ public class ViewFileSystem extends FileSystem {
*/ */
private String getUriPath(final Path p) { private String getUriPath(final Path p) {
checkPath(p); checkPath(p);
String s = makeAbsolute(p).toUri().getPath(); return makeAbsolute(p).toUri().getPath();
return s;
} }
private Path makeAbsolute(final Path f) { private Path makeAbsolute(final Path f) {
@ -340,24 +342,42 @@ public class ViewFileSystem extends FileSystem {
return res.targetFileSystem.getFileChecksum(res.remainingPath); return res.targetFileSystem.getFileChecksum(res.remainingPath);
} }
@Override
public FileStatus getFileStatus(final Path f) throws AccessControlException,
FileNotFoundException, IOException {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
private static FileStatus fixFileStatus(FileStatus orig,
Path qualified) throws IOException {
// FileStatus#getPath is a fully qualified path relative to the root of // FileStatus#getPath is a fully qualified path relative to the root of
// target file system. // target file system.
// We need to change it to viewfs URI - relative to root of mount table. // We need to change it to viewfs URI - relative to root of mount table.
// The implementors of RawLocalFileSystem were trying to be very smart. // The implementors of RawLocalFileSystem were trying to be very smart.
// They implement FileStatus#getOwener lazily -- the object // They implement FileStatus#getOwner lazily -- the object
// returned is really a RawLocalFileSystem that expect the // returned is really a RawLocalFileSystem that expect the
// FileStatus#getPath to be unchanged so that it can get owner when needed. // FileStatus#getPath to be unchanged so that it can get owner when needed.
// Hence we need to interpose a new ViewFileSystemFileStatus that // Hence we need to interpose a new ViewFileSystemFileStatus that
// works around. // works around.
if ("file".equals(orig.getPath().toUri().getScheme())) {
orig = wrapLocalFileStatus(orig, qualified);
}
orig.setPath(qualified);
return orig;
}
private static FileStatus wrapLocalFileStatus(FileStatus orig,
Path qualified) {
return orig instanceof LocatedFileStatus
? new ViewFsLocatedFileStatus((LocatedFileStatus)orig, qualified)
: new ViewFsFileStatus(orig, qualified);
}
@Override
public FileStatus getFileStatus(final Path f) throws AccessControlException,
FileNotFoundException, IOException {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
FileStatus status = res.targetFileSystem.getFileStatus(res.remainingPath); FileStatus status = res.targetFileSystem.getFileStatus(res.remainingPath);
return new ViewFsFileStatus(status, this.makeQualified(f)); return fixFileStatus(status, this.makeQualified(f));
} }
@Override @Override
@ -378,18 +398,50 @@ public class ViewFileSystem extends FileSystem {
if (!res.isInternalDir()) { if (!res.isInternalDir()) {
// We need to change the name in the FileStatus as described in // We need to change the name in the FileStatus as described in
// {@link #getFileStatus } // {@link #getFileStatus }
ChRootedFileSystem targetFs;
targetFs = (ChRootedFileSystem) res.targetFileSystem;
int i = 0; int i = 0;
for (FileStatus status : statusLst) { for (FileStatus status : statusLst) {
String suffix = targetFs.stripOutRoot(status.getPath()); statusLst[i++] = fixFileStatus(status,
statusLst[i++] = new ViewFsFileStatus(status, this.makeQualified( getChrootedPath(res, status, f));
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)));
} }
} }
return statusLst; return statusLst;
} }
@Override
public RemoteIterator<LocatedFileStatus>listLocatedStatus(final Path f,
final PathFilter filter) throws FileNotFoundException, IOException {
final InodeTree.ResolveResult<FileSystem> res = fsState
.resolve(getUriPath(f), true);
final RemoteIterator<LocatedFileStatus> statusIter = res.targetFileSystem
.listLocatedStatus(res.remainingPath);
if (res.isInternalDir()) {
return statusIter;
}
return new RemoteIterator<LocatedFileStatus>() {
@Override
public boolean hasNext() throws IOException {
return statusIter.hasNext();
}
@Override
public LocatedFileStatus next() throws IOException {
final LocatedFileStatus status = statusIter.next();
return (LocatedFileStatus)fixFileStatus(status,
getChrootedPath(res, status, f));
}
};
}
private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
FileStatus status, Path f) throws IOException {
final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
.stripOutRoot(status.getPath());
return this.makeQualified(
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
}
@Override @Override
public boolean mkdirs(final Path dir, final FsPermission permission) public boolean mkdirs(final Path dir, final FsPermission permission)
throws IOException { throws IOException {

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import java.io.IOException;
class ViewFsLocatedFileStatus extends LocatedFileStatus {
final LocatedFileStatus myFs;
Path modifiedPath;
ViewFsLocatedFileStatus(LocatedFileStatus locatedFileStatus, Path path) {
myFs = locatedFileStatus;
modifiedPath = path;
}
@Override
public long getLen() {
return myFs.getLen();
}
@Override
public boolean isFile() {
return myFs.isFile();
}
@Override
public boolean isDirectory() {
return myFs.isDirectory();
}
@Override
@SuppressWarnings("deprecation")
public boolean isDir() {
return myFs.isDirectory();
}
@Override
public boolean isSymlink() {
return myFs.isSymlink();
}
@Override
public long getBlockSize() {
return myFs.getBlockSize();
}
@Override
public short getReplication() {
return myFs.getReplication();
}
@Override
public long getModificationTime() {
return myFs.getModificationTime();
}
@Override
public long getAccessTime() {
return myFs.getAccessTime();
}
@Override
public FsPermission getPermission() {
return myFs.getPermission();
}
@Override
public String getOwner() {
return myFs.getOwner();
}
@Override
public String getGroup() {
return myFs.getGroup();
}
@Override
public Path getPath() {
return modifiedPath;
}
@Override
public void setPath(final Path p) {
modifiedPath = p;
}
@Override
public Path getSymlink() throws IOException {
return myFs.getSymlink();
}
@Override
public void setSymlink(Path p) {
myFs.setSymlink(p);
}
@Override
public BlockLocation[] getBlockLocations() {
return myFs.getBlockLocations();
}
@Override
public int compareTo(Object o) {
return super.compareTo(o);
}
@Override
public boolean equals(Object o) {
return super.equals(o);
}
@Override
public int hashCode() {
return super.hashCode();
}
}

View File

@ -395,6 +395,20 @@ public class TestChRootedFileSystem {
verify(mockFs).getAclStatus(rawPath); verify(mockFs).getAclStatus(rawPath);
} }
@Test
public void testListLocatedFileStatus() throws IOException {
final Path mockMount = new Path("mockfs://foo/user");
final Path mockPath = new Path("/usermock");
final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
ConfigUtil.addLink(conf, mockPath.toString(), mockMount.toUri());
FileSystem vfs = FileSystem.get(URI.create("viewfs:///"), conf);
vfs.listLocatedStatus(mockPath);
final FileSystem mockFs = ((MockFileSystem)mockMount.getFileSystem(conf))
.getRawFileSystem();
verify(mockFs).listLocatedStatus(new Path(mockMount.toUri().getPath()));
}
static class MockFileSystem extends FilterFileSystem { static class MockFileSystem extends FilterFileSystem {
MockFileSystem() { MockFileSystem() {
super(mock(FileSystem.class)); super(mock(FileSystem.class));

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.fs.viewfs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -36,9 +37,11 @@ import static org.junit.Assert.assertFalse;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.ConfigUtil; import org.apache.hadoop.fs.viewfs.ConfigUtil;
@ -204,6 +207,15 @@ public class ViewFileSystemBaseTest {
fsView.makeQualified(new Path("/foo/bar"))); fsView.makeQualified(new Path("/foo/bar")));
} }
@Test
public void testLocatedOperationsThroughMountLinks() throws IOException {
testOperationsThroughMountLinksInternal(true);
}
@Test
public void testOperationsThroughMountLinks() throws IOException {
testOperationsThroughMountLinksInternal(false);
}
/** /**
* Test modify operations (create, mkdir, delete, etc) * Test modify operations (create, mkdir, delete, etc)
@ -214,8 +226,8 @@ public class ViewFileSystemBaseTest {
* Verify the operation via mountfs (ie fSys) and *also* via the * Verify the operation via mountfs (ie fSys) and *also* via the
* target file system (ie fSysLocal) that the mount link points-to. * target file system (ie fSysLocal) that the mount link points-to.
*/ */
@Test private void testOperationsThroughMountLinksInternal(boolean located)
public void testOperationsThroughMountLinks() throws IOException { throws IOException {
// Create file // Create file
fileSystemTestHelper.createFile(fsView, "/user/foo"); fileSystemTestHelper.createFile(fsView, "/user/foo");
Assert.assertTrue("Created file should be type file", Assert.assertTrue("Created file should be type file",
@ -329,7 +341,8 @@ public class ViewFileSystemBaseTest {
fsView.mkdirs(new Path("/targetRoot/dirFoo")); fsView.mkdirs(new Path("/targetRoot/dirFoo"));
Assert.assertTrue(fsView.exists(new Path("/targetRoot/dirFoo"))); Assert.assertTrue(fsView.exists(new Path("/targetRoot/dirFoo")));
boolean dirFooPresent = false; boolean dirFooPresent = false;
for (FileStatus fileStatus : fsView.listStatus(new Path("/targetRoot/"))) { for (FileStatus fileStatus :
listStatusInternal(located, new Path("/targetRoot/"))) {
if (fileStatus.getPath().getName().equals("dirFoo")) { if (fileStatus.getPath().getName().equals("dirFoo")) {
dirFooPresent = true; dirFooPresent = true;
} }
@ -395,6 +408,10 @@ public class ViewFileSystemBaseTest {
} }
} }
@Test
public void testLocatedListOnInternalDirsOfMountTable() throws IOException {
testListOnInternalDirsOfMountTableInternal(true);
}
/** /**
@ -406,15 +423,20 @@ public class ViewFileSystemBaseTest {
// test list on internal dirs of mount table // test list on internal dirs of mount table
@Test @Test
public void testListOnInternalDirsOfMountTable() throws IOException { public void testListOnInternalDirsOfMountTable() throws IOException {
testListOnInternalDirsOfMountTableInternal(false);
}
private void testListOnInternalDirsOfMountTableInternal(boolean located)
throws IOException {
// list on Slash // list on Slash
FileStatus[] dirPaths = fsView.listStatus(new Path("/")); FileStatus[] dirPaths = listStatusInternal(located, new Path("/"));
FileStatus fs; FileStatus fs;
verifyRootChildren(dirPaths); verifyRootChildren(dirPaths);
// list on internal dir // list on internal dir
dirPaths = fsView.listStatus(new Path("/internalDir")); dirPaths = listStatusInternal(located, new Path("/internalDir"));
Assert.assertEquals(2, dirPaths.length); Assert.assertEquals(2, dirPaths.length);
fs = fileSystemTestHelper.containsPath(fsView, "/internalDir/internalDir2", dirPaths); fs = fileSystemTestHelper.containsPath(fsView, "/internalDir/internalDir2", dirPaths);
@ -452,13 +474,26 @@ public class ViewFileSystemBaseTest {
@Test @Test
public void testListOnMountTargetDirs() throws IOException { public void testListOnMountTargetDirs() throws IOException {
FileStatus[] dirPaths = fsView.listStatus(new Path("/data")); testListOnMountTargetDirsInternal(false);
}
@Test
public void testLocatedListOnMountTargetDirs() throws IOException {
testListOnMountTargetDirsInternal(true);
}
private void testListOnMountTargetDirsInternal(boolean located)
throws IOException {
final Path dataPath = new Path("/data");
FileStatus[] dirPaths = listStatusInternal(located, dataPath);
FileStatus fs; FileStatus fs;
Assert.assertEquals(0, dirPaths.length); Assert.assertEquals(0, dirPaths.length);
// add a file // add a file
long len = fileSystemTestHelper.createFile(fsView, "/data/foo"); long len = fileSystemTestHelper.createFile(fsView, "/data/foo");
dirPaths = fsView.listStatus(new Path("/data")); dirPaths = listStatusInternal(located, dataPath);
Assert.assertEquals(1, dirPaths.length); Assert.assertEquals(1, dirPaths.length);
fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths); fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths);
Assert.assertNotNull(fs); Assert.assertNotNull(fs);
@ -467,7 +502,7 @@ public class ViewFileSystemBaseTest {
// add a dir // add a dir
fsView.mkdirs(fileSystemTestHelper.getTestRootPath(fsView, "/data/dirX")); fsView.mkdirs(fileSystemTestHelper.getTestRootPath(fsView, "/data/dirX"));
dirPaths = fsView.listStatus(new Path("/data")); dirPaths = listStatusInternal(located, dataPath);
Assert.assertEquals(2, dirPaths.length); Assert.assertEquals(2, dirPaths.length);
fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths); fs = fileSystemTestHelper.containsPath(fsView, "/data/foo", dirPaths);
Assert.assertNotNull(fs); Assert.assertNotNull(fs);
@ -477,6 +512,22 @@ public class ViewFileSystemBaseTest {
Assert.assertTrue("Created dir should appear as a dir", fs.isDirectory()); Assert.assertTrue("Created dir should appear as a dir", fs.isDirectory());
} }
private FileStatus[] listStatusInternal(boolean located, Path dataPath) throws IOException {
FileStatus[] dirPaths = new FileStatus[0];
if (located) {
RemoteIterator<LocatedFileStatus> statIter =
fsView.listLocatedStatus(dataPath);
ArrayList<LocatedFileStatus> tmp = new ArrayList<LocatedFileStatus>(10);
while (statIter.hasNext()) {
tmp.add(statIter.next());
}
dirPaths = tmp.toArray(dirPaths);
} else {
dirPaths = fsView.listStatus(dataPath);
}
return dirPaths;
}
@Test @Test
public void testFileStatusOnMountLink() throws IOException { public void testFileStatusOnMountLink() throws IOException {
Assert.assertTrue(fsView.getFileStatus(new Path("/")).isDirectory()); Assert.assertTrue(fsView.getFileStatus(new Path("/")).isDirectory());
@ -697,6 +748,16 @@ public class ViewFileSystemBaseTest {
@Test @Test
public void testRootReadableExecutable() throws IOException { public void testRootReadableExecutable() throws IOException {
testRootReadableExecutableInternal(false);
}
@Test
public void testLocatedRootReadableExecutable() throws IOException {
testRootReadableExecutableInternal(true);
}
private void testRootReadableExecutableInternal(boolean located)
throws IOException {
// verify executable permission on root: cd / // verify executable permission on root: cd /
// //
Assert.assertFalse("In root before cd", Assert.assertFalse("In root before cd",
@ -707,7 +768,8 @@ public class ViewFileSystemBaseTest {
// verify readable // verify readable
// //
verifyRootChildren(fsView.listStatus(fsView.getWorkingDirectory())); verifyRootChildren(listStatusInternal(located,
fsView.getWorkingDirectory()));
// verify permissions // verify permissions
// //