HADOOP-15565. Add an inner FS cache to ViewFileSystem, separate from the global cache, to avoid file system leaks. Contributed by Jinglun.

This commit is contained in:
Erik Krogen 2019-09-06 10:22:20 -07:00
parent e7d44e48f7
commit c92a3e94d8
10 changed files with 285 additions and 30 deletions

View File

@ -212,6 +212,11 @@ static void removeFileSystemForTesting(URI uri, Configuration conf,
CACHE.map.remove(new Cache.Key(uri, conf), fs);
}
@VisibleForTesting
static int cacheSize() {
return CACHE.map.size();
}
/**
* Get a FileSystem instance based on the uri, the passed in
* configuration and the user.

View File

@ -98,13 +98,12 @@ protected Path fullPath(final Path path) {
/**
* Constructor
* @param uri base file system
* @param conf configuration
* @param fs base file system
* @param uri base uri
* @throws IOException
*/
public ChRootedFileSystem(final URI uri, Configuration conf)
throws IOException {
super(FileSystem.get(uri, conf));
ChRootedFileSystem(final FileSystem fs, URI uri) throws IOException {
super(fs);
String pathString = uri.getPath();
if (pathString.isEmpty()) {
pathString = "/";
@ -115,7 +114,18 @@ public ChRootedFileSystem(final URI uri, Configuration conf)
workingDir = getHomeDirectory();
// We don't use the wd of the myFs
}
/**
* Constructor.
* @param uri base file system
* @param conf configuration
* @throws IOException
*/
public ChRootedFileSystem(final URI uri, Configuration conf)
throws IOException {
this(FileSystem.get(uri, conf), uri);
}
/**
* Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.

View File

@ -78,4 +78,11 @@ public interface Constants {
FsPermission PERMISSION_555 = new FsPermission((short) 0555);
String CONFIG_VIEWFS_RENAME_STRATEGY = "fs.viewfs.rename.strategy";
/**
* Enable ViewFileSystem to cache all children filesystems in inner cache.
*/
String CONFIG_VIEWFS_ENABLE_INNER_CACHE = "fs.viewfs.enable.inner.cache";
boolean CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT = true;
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.viewfs;
import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -26,10 +28,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Map.Entry;
@ -89,6 +94,72 @@ static AccessControlException readOnlyMountTable(final String operation,
return readOnlyMountTable(operation, p.toString());
}
/**
* Caching children filesystems. HADOOP-15565.
*/
static class InnerCache {
private Map<Key, FileSystem> map = new HashMap<>();
FileSystem get(URI uri, Configuration config) throws IOException {
Key key = new Key(uri);
if (map.get(key) == null) {
FileSystem fs = FileSystem.newInstance(uri, config);
map.put(key, fs);
return fs;
} else {
return map.get(key);
}
}
void closeAll() {
for (FileSystem fs : map.values()) {
try {
fs.close();
} catch (IOException e) {
LOG.info("Fail closing ViewFileSystem's child filesystem " + fs, e);
}
}
}
InnerCache unmodifiableCache() {
map = Collections.unmodifiableMap(map);
return this;
}
/**
* All the cached instances share the same UGI so there is no need to have a
* URI in the Key. Make the Key simple with just the scheme and authority.
*/
private static class Key {
private final String scheme;
private final String authority;
Key(URI uri) {
scheme = uri.getScheme() == null ? "" : uri.getScheme().toLowerCase();
authority =
uri.getAuthority() == null ? "" : uri.getAuthority().toLowerCase();
}
@Override
public int hashCode() {
return Objects.hash(scheme, authority);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj != null && obj instanceof Key) {
Key that = (Key) obj;
return this.scheme.equals(that.scheme) && this.authority
.equals(that.authority);
}
return false;
}
}
}
/**
* MountPoint representation built from the configuration.
*/
@ -125,6 +196,8 @@ public URI[] getTargetFileSystemURIs() {
Configuration config;
InodeTree<FileSystem> fsState; // the fs state; ie the mount table
Path homeDir = null;
private boolean enableInnerCache = false;
private InnerCache cache;
// Default to rename within same mountpoint
private RenameStrategy renameStrategy = RenameStrategy.SAME_MOUNTPOINT;
/**
@ -178,6 +251,9 @@ public void initialize(final URI theUri, final Configuration conf)
super.initialize(theUri, conf);
setConf(conf);
config = conf;
enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE,
CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT);
final InnerCache innerCache = new InnerCache();
// Now build client side view (i.e. client side mount table) from config.
final String authority = theUri.getAuthority();
try {
@ -187,7 +263,13 @@ public void initialize(final URI theUri, final Configuration conf)
@Override
protected FileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, IOException {
return new ChRootedFileSystem(uri, config);
FileSystem fs;
if (enableInnerCache) {
fs = innerCache.get(uri, config);
} else {
fs = FileSystem.get(uri, config);
}
return new ChRootedFileSystem(fs, uri);
}
@Override
@ -210,6 +292,12 @@ protected FileSystem getTargetFileSystem(final String settings,
throw new IOException("URISyntax exception: " + theUri);
}
if (enableInnerCache) {
// All fs instances are created and cached on startup. The cache is
// readonly after the initialize() so the concurrent access of the cache
// is safe.
cache = innerCache.unmodifiableCache();
}
}
/**
@ -1297,4 +1385,12 @@ enum RenameStrategy {
SAME_MOUNTPOINT, SAME_TARGET_URI_ACROSS_MOUNTPOINT,
SAME_FILESYSTEM_ACROSS_MOUNTPOINT
}
@Override
public void close() throws IOException {
super.close();
if (enableInnerCache && cache != null) {
cache.closeAll();
}
}
}

View File

@ -1493,4 +1493,10 @@ public void testReadSymlinkWithAFileAsInput() throws IOException {
file.delete();
}
/**
* The size of FileSystem cache.
*/
public static int getCacheSize() {
return FileSystem.cacheSize();
}
}

View File

@ -22,6 +22,7 @@
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -32,7 +33,6 @@
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.viewfs.ChRootedFileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -396,7 +396,7 @@ public void testAclMethodsPathTranslation() throws IOException {
}
@Test
public void testListLocatedFileStatus() throws IOException {
public void testListLocatedFileStatus() throws Exception {
final Path mockMount = new Path("mockfs://foo/user");
final Path mockPath = new Path("/usermock");
final Configuration conf = new Configuration();
@ -404,17 +404,35 @@ public void testListLocatedFileStatus() throws IOException {
ConfigUtil.addLink(conf, mockPath.toString(), mockMount.toUri());
FileSystem vfs = FileSystem.get(URI.create("viewfs:///"), conf);
vfs.listLocatedStatus(mockPath);
final FileSystem mockFs = ((MockFileSystem)mockMount.getFileSystem(conf))
.getRawFileSystem();
final FileSystem mockFs =
((MockFileSystem) getChildFileSystem((ViewFileSystem) vfs,
new URI("mockfs://foo/"))).getRawFileSystem();
verify(mockFs).listLocatedStatus(new Path(mockMount.toUri().getPath()));
}
static FileSystem getChildFileSystem(ViewFileSystem viewFs, URI uri) {
for (FileSystem fs : viewFs.getChildFileSystems()) {
if (Objects.equals(fs.getUri().getScheme(), uri.getScheme()) && Objects
.equals(fs.getUri().getAuthority(), uri.getAuthority())) {
return fs;
}
}
return null;
}
static class MockFileSystem extends FilterFileSystem {
private URI uri;
MockFileSystem() {
super(mock(FileSystem.class));
}
@Override
public void initialize(URI name, Configuration conf) throws IOException {}
public URI getUri() {
return uri;
}
@Override
public void initialize(URI name, Configuration conf) throws IOException {
uri = name;
}
}
@Test(timeout = 30000)

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -31,6 +32,8 @@
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.viewfs.TestChRootedFileSystem.MockFileSystem;
import org.junit.*;
import static org.apache.hadoop.fs.viewfs.TestChRootedFileSystem.getChildFileSystem;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -46,12 +49,16 @@ public class TestViewFileSystemDelegation { //extends ViewFileSystemTestSetup {
@BeforeClass
public static void setup() throws Exception {
conf = ViewFileSystemTestSetup.createConfig();
fs1 = setupFileSystem(new URI("fs1:/"), FakeFileSystem.class);
fs2 = setupFileSystem(new URI("fs2:/"), FakeFileSystem.class);
setupFileSystem(new URI("fs1:/"), FakeFileSystem.class);
setupFileSystem(new URI("fs2:/"), FakeFileSystem.class);
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
fs1 = (FakeFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("fs1:/"));
fs2 = (FakeFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("fs2:/"));
}
static FakeFileSystem setupFileSystem(URI uri, Class clazz)
static void setupFileSystem(URI uri, Class clazz)
throws Exception {
String scheme = uri.getScheme();
conf.set("fs."+scheme+".impl", clazz.getName());
@ -59,22 +66,21 @@ static FakeFileSystem setupFileSystem(URI uri, Class clazz)
assertEquals(uri, fs.getUri());
Path targetPath = new FileSystemTestHelper().getAbsoluteTestRootPath(fs);
ConfigUtil.addLink(conf, "/mounts/"+scheme, targetPath.toUri());
return fs;
}
private static FileSystem setupMockFileSystem(Configuration conf, URI uri)
private static void setupMockFileSystem(Configuration config, URI uri)
throws Exception {
String scheme = uri.getScheme();
conf.set("fs." + scheme + ".impl", MockFileSystem.class.getName());
FileSystem fs = FileSystem.get(uri, conf);
ConfigUtil.addLink(conf, "/mounts/" + scheme, uri);
return ((MockFileSystem)fs).getRawFileSystem();
config.set("fs." + scheme + ".impl", MockFileSystem.class.getName());
ConfigUtil.addLink(config, "/mounts/" + scheme, uri);
}
@Test
public void testSanity() {
assertEquals("fs1:/", fs1.getUri().toString());
assertEquals("fs2:/", fs2.getUri().toString());
public void testSanity() throws URISyntaxException {
assertEquals(new URI("fs1:/").getScheme(), fs1.getUri().getScheme());
assertEquals(new URI("fs1:/").getAuthority(), fs1.getUri().getAuthority());
assertEquals(new URI("fs2:/").getScheme(), fs2.getUri().getScheme());
assertEquals(new URI("fs2:/").getAuthority(), fs2.getUri().getAuthority());
}
@Test
@ -91,9 +97,15 @@ public void testVerifyChecksum() throws Exception {
@Test
public void testAclMethods() throws Exception {
Configuration conf = ViewFileSystemTestSetup.createConfig();
FileSystem mockFs1 = setupMockFileSystem(conf, new URI("mockfs1:/"));
FileSystem mockFs2 = setupMockFileSystem(conf, new URI("mockfs2:/"));
setupMockFileSystem(conf, new URI("mockfs1:/"));
setupMockFileSystem(conf, new URI("mockfs2:/"));
FileSystem viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
FileSystem mockFs1 =
((MockFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("mockfs1:/"))).getRawFileSystem();
FileSystem mockFs2 =
((MockFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("mockfs2:/"))).getRawFileSystem();
Path viewFsPath1 = new Path("/mounts/mockfs1/a/b/c");
Path mockFsPath1 = new Path("/a/b/c");

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.*;
import static org.apache.hadoop.fs.viewfs.TestChRootedFileSystem.getChildFileSystem;
import java.io.IOException;
import java.net.URI;
@ -54,12 +55,16 @@ public class TestViewFileSystemDelegationTokenSupport {
@BeforeClass
public static void setup() throws Exception {
conf = ViewFileSystemTestSetup.createConfig();
fs1 = setupFileSystem(new URI("fs1:///"), FakeFileSystem.class);
fs2 = setupFileSystem(new URI("fs2:///"), FakeFileSystem.class);
setupFileSystem(new URI("fs1:///"), FakeFileSystem.class);
setupFileSystem(new URI("fs2:///"), FakeFileSystem.class);
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
fs1 = (FakeFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("fs1:///"));
fs2 = (FakeFileSystem) getChildFileSystem((ViewFileSystem) viewFs,
new URI("fs2:///"));
}
static FakeFileSystem setupFileSystem(URI uri, Class<? extends FileSystem> clazz)
static void setupFileSystem(URI uri, Class<? extends FileSystem> clazz)
throws Exception {
String scheme = uri.getScheme();
conf.set("fs."+scheme+".impl", clazz.getName());
@ -67,7 +72,6 @@ static FakeFileSystem setupFileSystem(URI uri, Class<? extends FileSystem> clazz
// mount each fs twice, will later ensure 1 token/fs
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-one", fs.getUri());
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-two", fs.getUri());
return fs;
}
/**

View File

@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@ -36,9 +37,11 @@
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.TestFileUtil;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -64,6 +67,7 @@
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.*;
@ -1272,4 +1276,96 @@ public void testLinkTarget() throws Exception {
containsString("File does not exist:"));
}
}
@Test
public void testViewFileSystemInnerCache() throws Exception {
ViewFileSystem.InnerCache cache = new ViewFileSystem.InnerCache();
FileSystem fs = cache.get(fsTarget.getUri(), conf);
// InnerCache caches filesystem.
assertSame(fs, cache.get(fsTarget.getUri(), conf));
// InnerCache and FileSystem.CACHE are independent.
assertNotSame(fs, FileSystem.get(fsTarget.getUri(), conf));
// close InnerCache.
cache.closeAll();
try {
fs.exists(new Path("/"));
if (!(fs instanceof LocalFileSystem)) {
// Ignore LocalFileSystem because it can still be used after close.
fail("Expect Filesystem closed exception");
}
} catch (IOException e) {
assertExceptionContains("Filesystem closed", e);
}
}
@Test
public void testCloseChildrenFileSystem() throws Exception {
final String clusterName = "cluster" + new Random().nextInt();
Configuration config = new Configuration(conf);
ConfigUtil.addLink(config, clusterName, "/user",
new Path(targetTestRoot, "user").toUri());
config.setBoolean("fs.viewfs.impl.disable.cache", false);
URI uri = new URI("viewfs://" + clusterName + "/");
ViewFileSystem viewFs = (ViewFileSystem) FileSystem.get(uri, config);
assertTrue("viewfs should have at least one child fs.",
viewFs.getChildFileSystems().length > 0);
// viewFs is cached in FileSystem.CACHE
assertSame(viewFs, FileSystem.get(uri, config));
// child fs is not cached in FileSystem.CACHE
FileSystem child = viewFs.getChildFileSystems()[0];
assertNotSame(child, FileSystem.get(child.getUri(), config));
viewFs.close();
for (FileSystem childfs : viewFs.getChildFileSystems()) {
try {
childfs.exists(new Path("/"));
if (!(childfs instanceof LocalFileSystem)) {
// Ignore LocalFileSystem because it can still be used after close.
fail("Expect Filesystem closed exception");
}
} catch (IOException e) {
assertExceptionContains("Filesystem closed", e);
}
}
}
@Test
public void testChildrenFileSystemLeak() throws Exception {
final String clusterName = "cluster" + new Random().nextInt();
Configuration config = new Configuration(conf);
ConfigUtil.addLink(config, clusterName, "/user",
new Path(targetTestRoot, "user").toUri());
final int cacheSize = TestFileUtil.getCacheSize();
ViewFileSystem viewFs = (ViewFileSystem) FileSystem
.get(new URI("viewfs://" + clusterName + "/"), config);
assertEquals(cacheSize + 1, TestFileUtil.getCacheSize());
viewFs.close();
assertEquals(cacheSize, TestFileUtil.getCacheSize());
}
@Test
public void testDeleteOnExit() throws Exception {
final String clusterName = "cluster" + new Random().nextInt();
Configuration config = new Configuration(conf);
ConfigUtil.addLink(config, clusterName, "/user",
new Path(targetTestRoot, "user").toUri());
Path testDir = new Path("/user/testDeleteOnExit");
Path realTestPath = new Path(targetTestRoot, "user/testDeleteOnExit");
ViewFileSystem viewFs = (ViewFileSystem) FileSystem
.get(new URI("viewfs://" + clusterName + "/"), config);
viewFs.mkdirs(testDir);
assertTrue(viewFs.exists(testDir));
assertTrue(fsTarget.exists(realTestPath));
viewFs.deleteOnExit(testDir);
viewFs.close();
assertFalse(fsTarget.exists(realTestPath));
}
}

View File

@ -91,6 +91,7 @@ public static void clusterSetupAtBegining() throws IOException,
fileSystemTestHelper.createFile(fHdfs, testFileName);
fileSystemTestHelper.createFile(fHdfs, NOT_IN_MOUNTPOINT_FILENAME);
Configuration conf = ViewFileSystemTestSetup.createConfig();
conf.setInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT + 1);
ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() +
"/tmp"));
vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);