HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs. Contributed by Erik Krogen.

(cherry picked from commit e1a99802fc)

Conflicts:
	hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
	hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsBaseTest.java
This commit is contained in:
Erik Krogen 2017-03-17 17:11:02 -07:00 committed by Konstantin V Shvachko
parent 830a602375
commit 1d035c8149
4 changed files with 161 additions and 29 deletions

View File

@ -34,6 +34,9 @@ Release 2.7.4 - UNRELEASED
HADOOP-14138. Remove S3A ref from META-INF service discovery, rely on
existing core-default entry (Steve Loughran via jlowe)
HADOOP-14169. Implement listStatusIterator, listLocatedStatus for ViewFs.
(Erik Krogen via shv)
BUG FIXES
HADOOP-13362. DefaultMetricsSystem leaks the source name when a source

View File

@ -35,8 +35,10 @@ import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
@ -234,6 +236,18 @@ class ChRootedFs extends AbstractFileSystem {
return myFs.listStatus(fullPath(f));
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path f)
throws IOException, UnresolvedLinkException {
return myFs.listStatusIterator(fullPath(f));
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws IOException, UnresolvedLinkException {
return myFs.listLocatedStatus(fullPath(f));
}
@Override
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent) throws IOException, UnresolvedLinkException {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
@ -388,25 +389,31 @@ public class ViewFs extends AbstractFileSystem {
return fsIter;
}
return new RemoteIterator<FileStatus>() {
final RemoteIterator<FileStatus> myIter;
final ChRootedFs targetFs;
{ // Init
myIter = fsIter;
targetFs = (ChRootedFs) res.targetFileSystem;
return new WrappingRemoteIterator<FileStatus>(res, fsIter, f) {
@Override
public FileStatus getViewFsFileStatus(FileStatus stat, Path newPath) {
return new ViewFsFileStatus(stat, newPath);
}
};
}
@Override
public boolean hasNext() throws IOException {
return myIter.hasNext();
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(f), true);
final RemoteIterator<LocatedFileStatus> fsIter =
res.targetFileSystem.listLocatedStatus(res.remainingPath);
if (res.isInternalDir()) {
return fsIter;
}
return new WrappingRemoteIterator<LocatedFileStatus>(res, fsIter, f) {
@Override
public FileStatus next() throws IOException {
FileStatus status = myIter.next();
String suffix = targetFs.stripOutRoot(status.getPath());
return new ViewFsFileStatus(status, makeQualified(
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix)));
public LocatedFileStatus getViewFsFileStatus(LocatedFileStatus stat,
Path newPath) {
return new ViewFsLocatedFileStatus(stat, newPath);
}
};
}
@ -717,6 +724,41 @@ public class ViewFs extends AbstractFileSystem {
res.targetFileSystem.removeXAttr(res.remainingPath, name);
}
/**
* Helper class to perform some transformation on results returned
* from a RemoteIterator.
*/
private abstract class WrappingRemoteIterator<T extends FileStatus>
implements RemoteIterator<T> {
private final String resolvedPath;
private final ChRootedFs targetFs;
private final RemoteIterator<T> innerIter;
private final Path originalPath;
WrappingRemoteIterator(InodeTree.ResolveResult<AbstractFileSystem> res,
RemoteIterator<T> innerIter, Path originalPath) {
this.resolvedPath = res.resolvedPath;
this.targetFs = (ChRootedFs)res.targetFileSystem;
this.innerIter = innerIter;
this.originalPath = originalPath;
}
@Override
public boolean hasNext() throws IOException {
return innerIter.hasNext();
}
@Override
public T next() throws IOException {
T status = innerIter.next();
String suffix = targetFs.stripOutRoot(status.getPath());
Path newPath = makeQualified(suffix.length() == 0 ? originalPath
: new Path(resolvedPath, suffix));
return getViewFsFileStatus(status, newPath);
}
protected abstract T getViewFsFileStatus(T status, Path newPath);
}
/*
* An instance of this class represents an internal dir of the viewFs

View File

@ -25,18 +25,29 @@ import static org.apache.hadoop.fs.FileContextTestHelper.isFile;
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.FileContextTestHelper.fileType;
import org.apache.hadoop.fs.FileStatus;
@ -54,7 +65,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
@ -317,6 +327,16 @@ public class ViewFsBaseTest {
}
}
Assert.assertTrue(dirFooPresent);
RemoteIterator<LocatedFileStatus> dirLocatedContents =
fcView.listLocatedStatus(new Path("/targetRoot/"));
dirFooPresent = false;
while (dirLocatedContents.hasNext()) {
FileStatus fileStatus = dirLocatedContents.next();
if (fileStatus.getPath().getName().equals("dirFoo")) {
dirFooPresent = true;
}
}
Assert.assertTrue(dirFooPresent);
}
// rename across mount points that point to same target also fail
@ -448,24 +468,23 @@ public class ViewFsBaseTest {
}
@Test
public void testGetFileChecksum() throws AccessControlException
, UnresolvedLinkException, IOException {
AbstractFileSystem mockAFS = Mockito.mock(AbstractFileSystem.class);
public void testGetFileChecksum() throws AccessControlException,
UnresolvedLinkException, IOException {
AbstractFileSystem mockAFS = mock(AbstractFileSystem.class);
InodeTree.ResolveResult<AbstractFileSystem> res =
new InodeTree.ResolveResult<AbstractFileSystem>(null, mockAFS , null,
new Path("someFile"));
@SuppressWarnings("unchecked")
InodeTree<AbstractFileSystem> fsState = Mockito.mock(InodeTree.class);
Mockito.when(fsState.resolve(Mockito.anyString()
, Mockito.anyBoolean())).thenReturn(res);
ViewFs vfs = Mockito.mock(ViewFs.class);
InodeTree<AbstractFileSystem> fsState = mock(InodeTree.class);
when(fsState.resolve(anyString(), anyBoolean())).thenReturn(res);
ViewFs vfs = mock(ViewFs.class);
vfs.fsState = fsState;
Mockito.when(vfs.getFileChecksum(new Path("/tmp/someFile")))
when(vfs.getFileChecksum(new Path("/tmp/someFile")))
.thenCallRealMethod();
vfs.getFileChecksum(new Path("/tmp/someFile"));
Mockito.verify(mockAFS).getFileChecksum(new Path("someFile"));
verify(mockAFS).getFileChecksum(new Path("someFile"));
}
@Test(expected=FileNotFoundException.class)
@ -777,4 +796,58 @@ public class ViewFsBaseTest {
public void testInternalRemoveXAttr() throws IOException {
fcView.removeXAttr(new Path("/internalDir"), "xattrName");
}
// Confirm that listLocatedStatus is delegated properly to the underlying
// AbstractFileSystem to allow for optimizations
@Test
public void testListLocatedStatus() throws IOException {
final Path mockTarget = new Path("mockfs://listLocatedStatus/foo");
final Path mountPoint = new Path("/fooMount");
final Configuration newConf = new Configuration();
newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class,
AbstractFileSystem.class);
ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri());
FileContext.getFileContext(URI.create("viewfs:///"), newConf)
.listLocatedStatus(mountPoint);
AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri());
verify(mockFs).listLocatedStatus(new Path(mockTarget.toUri().getPath()));
verify(mockFs, never()).listStatus(any(Path.class));
verify(mockFs, never()).listStatusIterator(any(Path.class));
}
// Confirm that listStatus is delegated properly to the underlying
// AbstractFileSystem's listStatusIterator to allow for optimizations
@Test
public void testListStatusIterator() throws IOException {
final Path mockTarget = new Path("mockfs://listStatusIterator/foo");
final Path mountPoint = new Path("/fooMount");
final Configuration newConf = new Configuration();
newConf.setClass("fs.AbstractFileSystem.mockfs.impl", MockFs.class,
AbstractFileSystem.class);
ConfigUtil.addLink(newConf, mountPoint.toString(), mockTarget.toUri());
FileContext.getFileContext(URI.create("viewfs:///"), newConf)
.listStatus(mountPoint);
AbstractFileSystem mockFs = MockFs.getMockFs(mockTarget.toUri());
verify(mockFs).listStatusIterator(new Path(mockTarget.toUri().getPath()));
verify(mockFs, never()).listStatus(any(Path.class));
}
static class MockFs extends ChRootedFs {
private static Map<String, AbstractFileSystem> fsCache = new HashMap<>();
MockFs(URI uri, Configuration conf) throws URISyntaxException {
super(getMockFs(uri), new Path("/"));
}
static AbstractFileSystem getMockFs(URI uri) {
AbstractFileSystem mockFs = fsCache.get(uri.getAuthority());
if (mockFs == null) {
mockFs = mock(AbstractFileSystem.class);
when(mockFs.getUri()).thenReturn(uri);
when(mockFs.getUriDefaultPort()).thenReturn(1);
when(mockFs.getUriPath(any(Path.class))).thenCallRealMethod();
when(mockFs.isValidName(anyString())).thenReturn(true);
fsCache.put(uri.getAuthority(), mockFs);
}
return mockFs;
}
}
}