From e1a99802fc5c2f2ad3ceb000378db1c0133069b3 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 17 Mar 2017 17:11:02 -0700 Subject: [PATCH] HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs. Contributed by Erik Krogen. --- .../apache/hadoop/fs/viewfs/ChRootedFs.java | 14 +++ .../org/apache/hadoop/fs/viewfs/ViewFs.java | 77 ++++++++++++---- .../hadoop/fs/viewfs/ViewFsBaseTest.java | 92 +++++++++++++++++-- 3 files changed, 156 insertions(+), 27 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java index f07e07f199d..b7e47c21f13 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java @@ -37,8 +37,10 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; @@ -236,6 +238,18 @@ class ChRootedFs extends AbstractFileSystem { return myFs.listStatus(fullPath(f)); } + @Override + public RemoteIterator listStatusIterator(final Path f) + throws IOException, UnresolvedLinkException { + return myFs.listStatusIterator(fullPath(f)); + } + + @Override + public RemoteIterator listLocatedStatus(final Path f) + throws IOException, UnresolvedLinkException { + return myFs.listLocatedStatus(fullPath(f)); + } + @Override public void mkdir(final Path dir, final FsPermission permission, final boolean createParent) throws IOException, UnresolvedLinkException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java index d5f8e81f26d..1a54a81a0f1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -388,26 +389,32 @@ public class ViewFs extends AbstractFileSystem { if (res.isInternalDir()) { return fsIter; } - - return new RemoteIterator() { - final RemoteIterator myIter; - final ChRootedFs targetFs; - { // Init - myIter = fsIter; - targetFs = (ChRootedFs) res.targetFileSystem; - } - + + return new WrappingRemoteIterator(res, fsIter, f) { @Override - public boolean hasNext() throws IOException { - return myIter.hasNext(); + public FileStatus getViewFsFileStatus(FileStatus stat, Path newPath) { + return new ViewFsFileStatus(stat, newPath); } - + }; + } + + @Override + public RemoteIterator listLocatedStatus(final Path f) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + final InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + final RemoteIterator fsIter = + res.targetFileSystem.listLocatedStatus(res.remainingPath); + if (res.isInternalDir()) { + return fsIter; + } + + return new WrappingRemoteIterator(res, fsIter, f) { @Override - public FileStatus next() throws IOException { - FileStatus status = myIter.next(); - String suffix = targetFs.stripOutRoot(status.getPath()); - return new ViewFsFileStatus(status, makeQualified( - suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix))); + public LocatedFileStatus getViewFsFileStatus(LocatedFileStatus stat, + Path newPath) { + return new ViewFsLocatedFileStatus(stat, newPath); } }; } @@ -773,6 +780,42 @@ public class ViewFs extends AbstractFileSystem { return res.targetFileSystem.getStoragePolicy(res.remainingPath); } + /** + * Helper class to perform some transformation on results returned + * from a RemoteIterator. + */ + private abstract class WrappingRemoteIterator + implements RemoteIterator { + private final String resolvedPath; + private final ChRootedFs targetFs; + private final RemoteIterator innerIter; + private final Path originalPath; + + WrappingRemoteIterator(InodeTree.ResolveResult res, + RemoteIterator innerIter, Path originalPath) { + this.resolvedPath = res.resolvedPath; + this.targetFs = (ChRootedFs)res.targetFileSystem; + this.innerIter = innerIter; + this.originalPath = originalPath; + } + + @Override + public boolean hasNext() throws IOException { + return innerIter.hasNext(); + } + + @Override + public T next() throws IOException { + T status = innerIter.next(); + String suffix = targetFs.stripOutRoot(status.getPath()); + Path newPath = makeQualified(suffix.length() == 0 ? originalPath + : new Path(resolvedPath, suffix)); + return getViewFsFileStatus(status, newPath); + } + + protected abstract T getViewFsFileStatus(T status, Path newPath); + } + /* * An instance of this class represents an internal dir of the viewFs * ie internal dir of the mount table. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java index 16d26442d14..8f6df32e777 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java @@ -25,6 +25,13 @@ import static org.apache.hadoop.fs.FileContextTestHelper.isFile; import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,13 +39,16 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContextTestHelper; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.FileContextTestHelper.fileType; import org.apache.hadoop.fs.FileStatus; @@ -56,7 +66,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; /** @@ -319,6 +328,16 @@ abstract public class ViewFsBaseTest { } } Assert.assertTrue(dirFooPresent); + RemoteIterator dirLocatedContents = + fcView.listLocatedStatus(new Path("/targetRoot/")); + dirFooPresent = false; + while (dirLocatedContents.hasNext()) { + FileStatus fileStatus = dirLocatedContents.next(); + if (fileStatus.getPath().getName().equals("dirFoo")) { + dirFooPresent = true; + } + } + Assert.assertTrue(dirFooPresent); } // rename across mount points that point to same target also fail @@ -450,24 +469,23 @@ abstract public class ViewFsBaseTest { } @Test - public void testGetFileChecksum() throws AccessControlException - , UnresolvedLinkException, IOException { - AbstractFileSystem mockAFS = Mockito.mock(AbstractFileSystem.class); + public void testGetFileChecksum() throws AccessControlException, + UnresolvedLinkException, IOException { + AbstractFileSystem mockAFS = mock(AbstractFileSystem.class); InodeTree.ResolveResult res = new InodeTree.ResolveResult(null, mockAFS , null, new Path("someFile")); @SuppressWarnings("unchecked") - InodeTree fsState = Mockito.mock(InodeTree.class); - Mockito.when(fsState.resolve(Mockito.anyString() - , Mockito.anyBoolean())).thenReturn(res); - ViewFs vfs = Mockito.mock(ViewFs.class); + InodeTree fsState = mock(InodeTree.class); + when(fsState.resolve(anyString(), anyBoolean())).thenReturn(res); + ViewFs vfs = mock(ViewFs.class); vfs.fsState = fsState; - Mockito.when(vfs.getFileChecksum(new Path("/tmp/someFile"))) + when(vfs.getFileChecksum(new Path("/tmp/someFile"))) .thenCallRealMethod(); vfs.getFileChecksum(new Path("/tmp/someFile")); - Mockito.verify(mockAFS).getFileChecksum(new Path("someFile")); + verify(mockAFS).getFileChecksum(new Path("someFile")); } @Test(expected=FileNotFoundException.class) @@ -820,4 +838,58 @@ abstract public class ViewFsBaseTest { } }); } + + // Confirm that listLocatedStatus is delegated properly to the underlying + // AbstractFileSystem to allow for optimizations + @Test + public void testListLocatedStatus() throws IOException { + final Path mockTarget = new Path("mockfs://listLocatedStatus/foo"); + final Path mountPoint = new Path("/fooMount"); + final Configuration newConf = new Configuration(); + newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class, + AbstractFileSystem.class); + ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri()); + FileContext.getFileContext(URI.create("viewfs:///"), newConf) + .listLocatedStatus(mountPoint); + AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri()); + verify(mockFs).listLocatedStatus(new Path(mockTarget.toUri().getPath())); + verify(mockFs, never()).listStatus(any(Path.class)); + verify(mockFs, never()).listStatusIterator(any(Path.class)); + } + + // Confirm that listStatus is delegated properly to the underlying + // AbstractFileSystem's listStatusIterator to allow for optimizations + @Test + public void testListStatusIterator() throws IOException { + final Path mockTarget = new Path("mockfs://listStatusIterator/foo"); + final Path mountPoint = new Path("/fooMount"); + final Configuration newConf = new Configuration(); + newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class, + AbstractFileSystem.class); + ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri()); + FileContext.getFileContext(URI.create("viewfs:///"), newConf) + .listStatus(mountPoint); + AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri()); + verify(mockFs).listStatusIterator(new Path(mockTarget.toUri().getPath())); + verify(mockFs, never()).listStatus(any(Path.class)); + } + + static class MockFs extends ChRootedFs { + private static Map fsCache = new HashMap<>(); + MockFs(URI uri, Configuration conf) throws URISyntaxException { + super(getMockFs(uri), new Path("/")); + } + static AbstractFileSystem getMockFs(URI uri) { + AbstractFileSystem mockFs = fsCache.get(uri.getAuthority()); + if (mockFs == null) { + mockFs = mock(AbstractFileSystem.class); + when(mockFs.getUri()).thenReturn(uri); + when(mockFs.getUriDefaultPort()).thenReturn(1); + when(mockFs.getUriPath(any(Path.class))).thenCallRealMethod(); + when(mockFs.isValidName(anyString())).thenReturn(true); + fsCache.put(uri.getAuthority(), mockFs); + } + return mockFs; + } + } }