HADOOP-7549. Use JDK ServiceLoader mechanism to find FileSystem implementations. (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1329994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f4711ef3ab
commit
706e861a85
|
@ -269,6 +269,8 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HADOOP-8152. Expand public APIs for security library classes. (atm via eli)
|
||||
|
||||
HADOOP-7549. Use JDK ServiceLoader mechanism to find FileSystem implementations. (tucu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.Stack;
|
||||
import java.util.TreeSet;
|
||||
|
@ -184,6 +185,17 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
statistics = getStatistics(name.getScheme(), getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
* This implementation throws an <code>UnsupportedOperationException</code>.
|
||||
*
|
||||
* @return the protocol scheme for the FileSystem.
|
||||
*/
|
||||
public String getScheme() {
|
||||
throw new UnsupportedOperationException("Not implemented by the FileSystem implementation");
|
||||
}
|
||||
|
||||
/** Returns a URI whose scheme and authority identify this FileSystem.*/
|
||||
public abstract URI getUri();
|
||||
|
||||
|
@ -2078,9 +2090,45 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
) throws IOException {
|
||||
}
|
||||
|
||||
// making it volatile to be able to do a double checked locking
|
||||
private volatile static boolean FILE_SYSTEMS_LOADED = false;
|
||||
|
||||
private static final Map<String, Class<? extends FileSystem>>
|
||||
SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
|
||||
|
||||
private static void loadFileSystems() {
|
||||
synchronized (FileSystem.class) {
|
||||
if (!FILE_SYSTEMS_LOADED) {
|
||||
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
|
||||
for (FileSystem fs : serviceLoader) {
|
||||
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
|
||||
}
|
||||
FILE_SYSTEMS_LOADED = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
|
||||
Configuration conf) throws IOException {
|
||||
if (!FILE_SYSTEMS_LOADED) {
|
||||
loadFileSystems();
|
||||
}
|
||||
Class<? extends FileSystem> clazz = null;
|
||||
if (conf != null) {
|
||||
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
|
||||
}
|
||||
if (clazz == null) {
|
||||
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
|
||||
}
|
||||
if (clazz == null) {
|
||||
throw new IOException("No FileSystem for scheme: " + scheme);
|
||||
}
|
||||
return clazz;
|
||||
}
|
||||
|
||||
private static FileSystem createFileSystem(URI uri, Configuration conf
|
||||
) throws IOException {
|
||||
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
|
||||
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
|
||||
if (clazz == null) {
|
||||
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
|
||||
}
|
||||
|
|
|
@ -71,7 +71,18 @@ public class HarFileSystem extends FilterFileSystem {
|
|||
*/
|
||||
public HarFileSystem() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>har</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "har";
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor to create a HarFileSystem with an
|
||||
* underlying filesystem.
|
||||
|
|
|
@ -39,7 +39,18 @@ public class LocalFileSystem extends ChecksumFileSystem {
|
|||
public LocalFileSystem() {
|
||||
this(new RawLocalFileSystem());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>file</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "file";
|
||||
}
|
||||
|
||||
public FileSystem getRaw() {
|
||||
return getRawFileSystem();
|
||||
}
|
||||
|
|
|
@ -59,6 +59,17 @@ public class FTPFileSystem extends FileSystem {
|
|||
|
||||
private URI uri;
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>ftp</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "ftp";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) throws IOException { // get
|
||||
super.initialize(uri, conf);
|
||||
|
|
|
@ -57,6 +57,17 @@ public class KosmosFileSystem extends FileSystem {
|
|||
this.kfsImpl = fsimpl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>kfs</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "kfs";
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
|
|
|
@ -67,6 +67,17 @@ public class S3FileSystem extends FileSystem {
|
|||
this.store = store;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>s3</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "s3";
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
|
|
|
@ -251,7 +251,18 @@ public class NativeS3FileSystem extends FileSystem {
|
|||
public NativeS3FileSystem(NativeFileSystemStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>s3n</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "s3n";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) throws IOException {
|
||||
super.initialize(uri, conf);
|
||||
|
|
|
@ -149,6 +149,17 @@ public class ViewFileSystem extends FileSystem {
|
|||
creationTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>viewfs</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "viewfs";
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after a new FileSystem instance is constructed.
|
||||
* @param theUri a uri whose authority section names the host, port, etc. for
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.fs.LocalFileSystem
|
||||
org.apache.hadoop.fs.viewfs.ViewFileSystem
|
||||
org.apache.hadoop.fs.s3.S3FileSystem
|
||||
org.apache.hadoop.fs.s3native.NativeS3FileSystem
|
||||
org.apache.hadoop.fs.kfs.KosmosFileSystem
|
||||
org.apache.hadoop.fs.ftp.FTPFileSystem
|
||||
org.apache.hadoop.fs.HarFileSystem
|
|
@ -352,25 +352,6 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.file.impl</name>
|
||||
<value>org.apache.hadoop.fs.LocalFileSystem</value>
|
||||
<description>The FileSystem for file: uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.hdfs.impl</name>
|
||||
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
|
||||
<description>The FileSystem for hdfs: uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.viewfs.impl</name>
|
||||
<value>org.apache.hadoop.fs.viewfs.ViewFileSystem</value>
|
||||
<description>The FileSystem for view file system for viewfs: uris
|
||||
(ie client side mount table:).</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.AbstractFileSystem.file.impl</name>
|
||||
<value>org.apache.hadoop.fs.local.LocalFs</value>
|
||||
|
@ -391,45 +372,6 @@
|
|||
(ie client side mount table:).</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
|
||||
<description>The FileSystem for s3: uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3n.impl</name>
|
||||
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
|
||||
<description>The FileSystem for s3n: (Native S3) uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.kfs.impl</name>
|
||||
<value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
|
||||
<description>The FileSystem for kfs: uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.hftp.impl</name>
|
||||
<value>org.apache.hadoop.hdfs.HftpFileSystem</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.hsftp.impl</name>
|
||||
<value>org.apache.hadoop.hdfs.HsftpFileSystem</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.webhdfs.impl</name>
|
||||
<value>org.apache.hadoop.hdfs.web.WebHdfsFileSystem</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.ftp.impl</name>
|
||||
<value>org.apache.hadoop.fs.ftp.FTPFileSystem</value>
|
||||
<description>The FileSystem for ftp: uris.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.ftp.host</name>
|
||||
<value>0.0.0.0</value>
|
||||
|
@ -444,18 +386,6 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.har.impl</name>
|
||||
<value>org.apache.hadoop.fs.HarFileSystem</value>
|
||||
<description>The filesystem for Hadoop archives. </description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.har.impl.disable.cache</name>
|
||||
<value>true</value>
|
||||
<description>Don't cache 'har' filesystem instances.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.df.interval</name>
|
||||
<value>60000</value>
|
||||
|
|
|
@ -43,7 +43,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public void testCacheEnabled() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
FileSystem fs1 = FileSystem.get(new URI("cachedfile://a"), conf);
|
||||
FileSystem fs2 = FileSystem.get(new URI("cachedfile://a"), conf);
|
||||
assertSame(fs1, fs2);
|
||||
|
@ -84,7 +84,7 @@ public class TestFileSystemCaching {
|
|||
// wait for InitializeForeverFileSystem to start initialization
|
||||
InitializeForeverFileSystem.sem.acquire();
|
||||
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
FileSystem.get(new URI("cachedfile://a"), conf);
|
||||
t.interrupt();
|
||||
t.join();
|
||||
|
@ -93,7 +93,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public void testCacheDisabled() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.uncachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.uncachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
conf.setBoolean("fs.uncachedfile.impl.disable.cache", true);
|
||||
FileSystem fs1 = FileSystem.get(new URI("uncachedfile://a"), conf);
|
||||
FileSystem fs2 = FileSystem.get(new URI("uncachedfile://a"), conf);
|
||||
|
@ -104,7 +104,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public <T extends TokenIdentifier> void testCacheForUgi() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
UserGroupInformation ugiA = UserGroupInformation.createRemoteUser("foo");
|
||||
UserGroupInformation ugiB = UserGroupInformation.createRemoteUser("bar");
|
||||
FileSystem fsA = ugiA.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
|
@ -156,7 +156,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public void testUserFS() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
FileSystem fsU1 = FileSystem.get(new URI("cachedfile://a"), conf, "bar");
|
||||
FileSystem fsU2 = FileSystem.get(new URI("cachedfile://a"), conf, "foo");
|
||||
|
||||
|
@ -166,7 +166,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public void testFsUniqueness() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
// multiple invocations of FileSystem.get return the same object.
|
||||
FileSystem fs1 = FileSystem.get(conf);
|
||||
FileSystem fs2 = FileSystem.get(conf);
|
||||
|
@ -183,7 +183,7 @@ public class TestFileSystemCaching {
|
|||
@Test
|
||||
public void testCloseAllForUGI() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("fs.cachedfile.impl", conf.get("fs.file.impl"));
|
||||
conf.set("fs.cachedfile.impl", FileSystem.getFileSystemClass("file", null).getName());
|
||||
UserGroupInformation ugiA = UserGroupInformation.createRemoteUser("foo");
|
||||
FileSystem fsA = ugiA.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
public FileSystem run() throws Exception {
|
||||
|
|
|
@ -165,7 +165,10 @@ public class TestFilterFileSystem {
|
|||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public String getScheme() {
|
||||
return "dontcheck";
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -89,6 +89,17 @@ public class DistributedFileSystem extends FileSystem {
|
|||
public DistributedFileSystem() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>hdfs</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "hdfs";
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public DistributedFileSystem(InetSocketAddress namenode,
|
||||
Configuration conf) throws IOException {
|
||||
|
|
|
@ -154,6 +154,17 @@ public class HftpFileSystem extends FileSystem
|
|||
return SecurityUtil.buildTokenService(nnSecureUri).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>hftp</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "hftp";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(final URI name, final Configuration conf)
|
||||
throws IOException {
|
||||
|
|
|
@ -58,6 +58,17 @@ public class HsftpFileSystem extends HftpFileSystem {
|
|||
private static final long MM_SECONDS_PER_DAY = 1000 * 60 * 60 * 24;
|
||||
private volatile int ExpWarnDays = 0;
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>hsftp</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "hsftp";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
|
|
|
@ -155,6 +155,17 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
*
|
||||
* @return <code>webhdfs</code>
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "webhdfs";
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void initialize(URI uri, Configuration conf
|
||||
) throws IOException {
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.hdfs.DistributedFileSystem
|
||||
org.apache.hadoop.hdfs.HftpFileSystem
|
||||
org.apache.hadoop.hdfs.HsftpFileSystem
|
||||
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
|
Loading…
Reference in New Issue