HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2017-03-17 17:11:02 -07:00 committed by Konstantin V Shvachko
parent 7f8e928400
commit e1a99802fc
3 changed files with 156 additions and 27 deletions

View File

@ -37,8 +37,10 @@ import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
@ -236,6 +238,18 @@ class ChRootedFs extends AbstractFileSystem {
return myFs.listStatus(fullPath(f));
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path f)
throws IOException, UnresolvedLinkException {
return myFs.listStatusIterator(fullPath(f));
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws IOException, UnresolvedLinkException {
return myFs.listLocatedStatus(fullPath(f));
}
@Override
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent) throws IOException, UnresolvedLinkException {

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
@ -389,25 +390,31 @@ public class ViewFs extends AbstractFileSystem {
return fsIter;
}
return new RemoteIterator<FileStatus>() {
final RemoteIterator<FileStatus> myIter;
final ChRootedFs targetFs;
{ // Init
myIter = fsIter;
targetFs = (ChRootedFs) res.targetFileSystem;
return new WrappingRemoteIterator<FileStatus>(res, fsIter, f) {
@Override
public FileStatus getViewFsFileStatus(FileStatus stat, Path newPath) {
return new ViewFsFileStatus(stat, newPath);
}
};
}
@Override
public boolean hasNext() throws IOException {
return myIter.hasNext();
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(f), true);
final RemoteIterator<LocatedFileStatus> fsIter =
res.targetFileSystem.listLocatedStatus(res.remainingPath);
if (res.isInternalDir()) {
return fsIter;
}
return new WrappingRemoteIterator<LocatedFileStatus>(res, fsIter, f) {
@Override
public FileStatus next() throws IOException {
FileStatus status = myIter.next();
String suffix = targetFs.stripOutRoot(status.getPath());
return new ViewFsFileStatus(status, makeQualified(
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)));
public LocatedFileStatus getViewFsFileStatus(LocatedFileStatus stat,
Path newPath) {
return new ViewFsLocatedFileStatus(stat, newPath);
}
};
}
@ -773,6 +780,42 @@ public class ViewFs extends AbstractFileSystem {
return res.targetFileSystem.getStoragePolicy(res.remainingPath);
}
/**
* Helper class to perform some transformation on results returned
* from a RemoteIterator.
*/
private abstract class WrappingRemoteIterator<T extends FileStatus>
implements RemoteIterator<T> {
private final String resolvedPath;
private final ChRootedFs targetFs;
private final RemoteIterator<T> innerIter;
private final Path originalPath;
WrappingRemoteIterator(InodeTree.ResolveResult<AbstractFileSystem> res,
RemoteIterator<T> innerIter, Path originalPath) {
this.resolvedPath = res.resolvedPath;
this.targetFs = (ChRootedFs)res.targetFileSystem;
this.innerIter = innerIter;
this.originalPath = originalPath;
}
@Override
public boolean hasNext() throws IOException {
return innerIter.hasNext();
}
@Override
public T next() throws IOException {
T status = innerIter.next();
String suffix = targetFs.stripOutRoot(status.getPath());
Path newPath = makeQualified(suffix.length() == 0 ? originalPath
: new Path(resolvedPath, suffix));
return getViewFsFileStatus(status, newPath);
}
protected abstract T getViewFsFileStatus(T status, Path newPath);
}
/*
* An instance of this class represents an internal dir of the viewFs
* ie internal dir of the mount table.

View File

@ -25,6 +25,13 @@ import static org.apache.hadoop.fs.FileContextTestHelper.isFile;
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -32,13 +39,16 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.FileContextTestHelper.fileType;
import org.apache.hadoop.fs.FileStatus;
@ -56,7 +66,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
@ -319,6 +328,16 @@ abstract public class ViewFsBaseTest {
}
}
Assert.assertTrue(dirFooPresent);
RemoteIterator<LocatedFileStatus> dirLocatedContents =
fcView.listLocatedStatus(new Path("/targetRoot/"));
dirFooPresent = false;
while (dirLocatedContents.hasNext()) {
FileStatus fileStatus = dirLocatedContents.next();
if (fileStatus.getPath().getName().equals("dirFoo")) {
dirFooPresent = true;
}
}
Assert.assertTrue(dirFooPresent);
}
// rename across mount points that point to same target also fail
@ -450,24 +469,23 @@ abstract public class ViewFsBaseTest {
}
@Test
public void testGetFileChecksum() throws AccessControlException
, UnresolvedLinkException, IOException {
AbstractFileSystem mockAFS = Mockito.mock(AbstractFileSystem.class);
public void testGetFileChecksum() throws AccessControlException,
UnresolvedLinkException, IOException {
AbstractFileSystem mockAFS = mock(AbstractFileSystem.class);
InodeTree.ResolveResult<AbstractFileSystem> res =
new InodeTree.ResolveResult<AbstractFileSystem>(null, mockAFS , null,
new Path("someFile"));
@SuppressWarnings("unchecked")
InodeTree<AbstractFileSystem> fsState = Mockito.mock(InodeTree.class);
Mockito.when(fsState.resolve(Mockito.anyString()
, Mockito.anyBoolean())).thenReturn(res);
ViewFs vfs = Mockito.mock(ViewFs.class);
InodeTree<AbstractFileSystem> fsState = mock(InodeTree.class);
when(fsState.resolve(anyString(), anyBoolean())).thenReturn(res);
ViewFs vfs = mock(ViewFs.class);
vfs.fsState = fsState;
Mockito.when(vfs.getFileChecksum(new Path("/tmp/someFile")))
when(vfs.getFileChecksum(new Path("/tmp/someFile")))
.thenCallRealMethod();
vfs.getFileChecksum(new Path("/tmp/someFile"));
Mockito.verify(mockAFS).getFileChecksum(new Path("someFile"));
verify(mockAFS).getFileChecksum(new Path("someFile"));
}
@Test(expected=FileNotFoundException.class)
@ -820,4 +838,58 @@ abstract public class ViewFsBaseTest {
}
});
}
// Confirm that listLocatedStatus is delegated properly to the underlying
// AbstractFileSystem to allow for optimizations
@Test
public void testListLocatedStatus() throws IOException {
final Path mockTarget = new Path("mockfs://listLocatedStatus/foo");
final Path mountPoint = new Path("/fooMount");
final Configuration newConf = new Configuration();
newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class,
AbstractFileSystem.class);
ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri());
FileContext.getFileContext(URI.create("viewfs:///"), newConf)
.listLocatedStatus(mountPoint);
AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri());
verify(mockFs).listLocatedStatus(new Path(mockTarget.toUri().getPath()));
verify(mockFs, never()).listStatus(any(Path.class));
verify(mockFs, never()).listStatusIterator(any(Path.class));
}
// Confirm that listStatus is delegated properly to the underlying
// AbstractFileSystem's listStatusIterator to allow for optimizations
@Test
public void testListStatusIterator() throws IOException {
final Path mockTarget = new Path("mockfs://listStatusIterator/foo");
final Path mountPoint = new Path("/fooMount");
final Configuration newConf = new Configuration();
newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class,
AbstractFileSystem.class);
ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri());
FileContext.getFileContext(URI.create("viewfs:///"), newConf)
.listStatus(mountPoint);
AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri());
verify(mockFs).listStatusIterator(new Path(mockTarget.toUri().getPath()));
verify(mockFs, never()).listStatus(any(Path.class));
}
static class MockFs extends ChRootedFs {
private static Map<String, AbstractFileSystem> fsCache = new HashMap<>();
MockFs(URI uri, Configuration conf) throws URISyntaxException {
super(getMockFs(uri), new Path("/"));
}
static AbstractFileSystem getMockFs(URI uri) {
AbstractFileSystem mockFs = fsCache.get(uri.getAuthority());
if (mockFs == null) {
mockFs = mock(AbstractFileSystem.class);
when(mockFs.getUri()).thenReturn(uri);
when(mockFs.getUriDefaultPort()).thenReturn(1);
when(mockFs.getUriPath(any(Path.class))).thenCallRealMethod();
when(mockFs.isValidName(anyString())).thenReturn(true);
fsCache.put(uri.getAuthority(), mockFs);
}
return mockFs;
}
}
}