From 1d035c8149c9fba4a78c128e0a5c73a68d14a7fb Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 17 Mar 2017 17:11:02 -0700 Subject: [PATCH] HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs. Contributed by Erik Krogen. (cherry picked from commit e1a99802fc5c2f2ad3ceb000378db1c0133069b3) Conflicts: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/fs/viewfs/ChRootedFs.java | 14 +++ .../org/apache/hadoop/fs/viewfs/ViewFs.java | 80 ++++++++++++---- .../hadoop/fs/viewfs/ViewFsBaseTest.java | 93 +++++++++++++++++-- 4 files changed, 161 insertions(+), 29 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f73b560bc8d..204d3aad1db 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -34,6 +34,9 @@ Release 2.7.4 - UNRELEASED HADOOP-14138. Remove S3A ref from META-INF service discovery, rely on existing core-default entry (Steve Loughran via jlowe) + HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs. + (Erik Krogen via shv) + BUG FIXES HADOOP-13362. DefaultMetricsSystem leaks the source name when a source diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java index 68e756a81b5..c008e359e8a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFs.java @@ -35,8 +35,10 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; @@ -234,6 +236,18 @@ class ChRootedFs extends AbstractFileSystem { return myFs.listStatus(fullPath(f)); } + @Override + public RemoteIterator listStatusIterator(final Path f) + throws IOException, UnresolvedLinkException { + return myFs.listStatusIterator(fullPath(f)); + } + + @Override + public RemoteIterator listLocatedStatus(final Path f) + throws IOException, UnresolvedLinkException { + return myFs.listLocatedStatus(fullPath(f)); + } + @Override public void mkdir(final Path dir, final FsPermission permission, final boolean createParent) throws IOException, UnresolvedLinkException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java index 35c11482558..ec73fde5783 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; @@ -387,26 +388,32 @@ public class ViewFs extends AbstractFileSystem { if (res.isInternalDir()) { return fsIter; } - - return new RemoteIterator() { - final RemoteIterator myIter; - final ChRootedFs targetFs; - { // Init - myIter = fsIter; - targetFs = (ChRootedFs) res.targetFileSystem; - } - + + return new WrappingRemoteIterator(res, fsIter, f) { @Override - public boolean hasNext() throws IOException { - return myIter.hasNext(); + public FileStatus getViewFsFileStatus(FileStatus stat, Path newPath) { + return new ViewFsFileStatus(stat, newPath); } - + }; + } + + @Override + public RemoteIterator listLocatedStatus(final Path f) + throws AccessControlException, FileNotFoundException, + UnresolvedLinkException, IOException { + final InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + final RemoteIterator fsIter = + res.targetFileSystem.listLocatedStatus(res.remainingPath); + if (res.isInternalDir()) { + return fsIter; + } + + return new WrappingRemoteIterator(res, fsIter, f) { @Override - public FileStatus next() throws IOException { - FileStatus status = myIter.next(); - String suffix = targetFs.stripOutRoot(status.getPath()); - return new ViewFsFileStatus(status, makeQualified( - suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix))); + public LocatedFileStatus getViewFsFileStatus(LocatedFileStatus stat, + Path newPath) { + return new ViewFsLocatedFileStatus(stat, newPath); } }; } @@ -716,8 +723,43 @@ public class ViewFs extends AbstractFileSystem { fsState.resolve(getUriPath(path), true); res.targetFileSystem.removeXAttr(res.remainingPath, name); } - - + + /** + * Helper class to perform some transformation on results returned + * from a RemoteIterator. + */ + private abstract class WrappingRemoteIterator + implements RemoteIterator { + private final String resolvedPath; + private final ChRootedFs targetFs; + private final RemoteIterator innerIter; + private final Path originalPath; + + WrappingRemoteIterator(InodeTree.ResolveResult res, + RemoteIterator innerIter, Path originalPath) { + this.resolvedPath = res.resolvedPath; + this.targetFs = (ChRootedFs)res.targetFileSystem; + this.innerIter = innerIter; + this.originalPath = originalPath; + } + + @Override + public boolean hasNext() throws IOException { + return innerIter.hasNext(); + } + + @Override + public T next() throws IOException { + T status = innerIter.next(); + String suffix = targetFs.stripOutRoot(status.getPath()); + Path newPath = makeQualified(suffix.length() == 0 ? originalPath + : new Path(resolvedPath, suffix)); + return getViewFsFileStatus(status, newPath); + } + + protected abstract T getViewFsFileStatus(T status, Path newPath); + } + /* * An instance of this class represents an internal dir of the viewFs * ie internal dir of the mount table. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java index 035b280249d..3fd25679f59 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java @@ -25,18 +25,29 @@ import static org.apache.hadoop.fs.FileContextTestHelper.isFile; import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContextTestHelper; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.FileContextTestHelper.fileType; import org.apache.hadoop.fs.FileStatus; @@ -54,7 +65,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; /** @@ -317,6 +327,16 @@ public class ViewFsBaseTest { } } Assert.assertTrue(dirFooPresent); + RemoteIterator dirLocatedContents = + fcView.listLocatedStatus(new Path("/targetRoot/")); + dirFooPresent = false; + while (dirLocatedContents.hasNext()) { + FileStatus fileStatus = dirLocatedContents.next(); + if (fileStatus.getPath().getName().equals("dirFoo")) { + dirFooPresent = true; + } + } + Assert.assertTrue(dirFooPresent); } // rename across mount points that point to same target also fail @@ -448,24 +468,23 @@ public class ViewFsBaseTest { } @Test - public void testGetFileChecksum() throws AccessControlException - , UnresolvedLinkException, IOException { - AbstractFileSystem mockAFS = Mockito.mock(AbstractFileSystem.class); + public void testGetFileChecksum() throws AccessControlException, + UnresolvedLinkException, IOException { + AbstractFileSystem mockAFS = mock(AbstractFileSystem.class); InodeTree.ResolveResult res = new InodeTree.ResolveResult(null, mockAFS , null, new Path("someFile")); @SuppressWarnings("unchecked") - InodeTree fsState = Mockito.mock(InodeTree.class); - Mockito.when(fsState.resolve(Mockito.anyString() - , Mockito.anyBoolean())).thenReturn(res); - ViewFs vfs = Mockito.mock(ViewFs.class); + InodeTree fsState = mock(InodeTree.class); + when(fsState.resolve(anyString(), anyBoolean())).thenReturn(res); + ViewFs vfs = mock(ViewFs.class); vfs.fsState = fsState; - Mockito.when(vfs.getFileChecksum(new Path("/tmp/someFile"))) + when(vfs.getFileChecksum(new Path("/tmp/someFile"))) .thenCallRealMethod(); vfs.getFileChecksum(new Path("/tmp/someFile")); - Mockito.verify(mockAFS).getFileChecksum(new Path("someFile")); + verify(mockAFS).getFileChecksum(new Path("someFile")); } @Test(expected=FileNotFoundException.class) @@ -777,4 +796,58 @@ public class ViewFsBaseTest { public void testInternalRemoveXAttr() throws IOException { fcView.removeXAttr(new Path("/internalDir"), "xattrName"); } + + // Confirm that listLocatedStatus is delegated properly to the underlying + // AbstractFileSystem to allow for optimizations + @Test + public void testListLocatedStatus() throws IOException { + final Path mockTarget = new Path("mockfs://listLocatedStatus/foo"); + final Path mountPoint = new Path("/fooMount"); + final Configuration newConf = new Configuration(); + newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class, + AbstractFileSystem.class); + ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri()); + FileContext.getFileContext(URI.create("viewfs:///"), newConf) + .listLocatedStatus(mountPoint); + AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri()); + verify(mockFs).listLocatedStatus(new Path(mockTarget.toUri().getPath())); + verify(mockFs, never()).listStatus(any(Path.class)); + verify(mockFs, never()).listStatusIterator(any(Path.class)); + } + + // Confirm that listStatus is delegated properly to the underlying + // AbstractFileSystem's listStatusIterator to allow for optimizations + @Test + public void testListStatusIterator() throws IOException { + final Path mockTarget = new Path("mockfs://listStatusIterator/foo"); + final Path mountPoint = new Path("/fooMount"); + final Configuration newConf = new Configuration(); + newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class, + AbstractFileSystem.class); + ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri()); + FileContext.getFileContext(URI.create("viewfs:///"), newConf) + .listStatus(mountPoint); + AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri()); + verify(mockFs).listStatusIterator(new Path(mockTarget.toUri().getPath())); + verify(mockFs, never()).listStatus(any(Path.class)); + } + + static class MockFs extends ChRootedFs { + private static Map fsCache = new HashMap<>(); + MockFs(URI uri, Configuration conf) throws URISyntaxException { + super(getMockFs(uri), new Path("/")); + } + static AbstractFileSystem getMockFs(URI uri) { + AbstractFileSystem mockFs = fsCache.get(uri.getAuthority()); + if (mockFs == null) { + mockFs = mock(AbstractFileSystem.class); + when(mockFs.getUri()).thenReturn(uri); + when(mockFs.getUriDefaultPort()).thenReturn(1); + when(mockFs.getUriPath(any(Path.class))).thenCallRealMethod(); + when(mockFs.isValidName(anyString())).thenReturn(true); + fsCache.put(uri.getAuthority(), mockFs); + } + return mockFs; + } + } }