HADOOP-18684. S3A filesystem to support binding to to other URI schemes (#5521)
Contributed by Harshit Gupta
This commit is contained in:
parent
937caf7de9
commit
dfb2ca0a64
|
@ -18,14 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DelegateToFileSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
|
||||
|
||||
/**
|
||||
* S3A implementation of AbstractFileSystem.
|
||||
|
@ -37,7 +39,8 @@ public class S3A extends DelegateToFileSystem {
|
|||
|
||||
public S3A(URI theUri, Configuration conf)
|
||||
throws IOException, URISyntaxException {
|
||||
super(theUri, new S3AFileSystem(), conf, "s3a", false);
|
||||
super(theUri, new S3AFileSystem(), conf,
|
||||
theUri.getScheme().isEmpty() ? FS_S3A : theUri.getScheme(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -419,6 +419,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
*/
|
||||
private final Set<Path> deleteOnExit = new TreeSet<>();
|
||||
|
||||
/**
|
||||
* Scheme for the current filesystem.
|
||||
*/
|
||||
private String scheme = FS_S3A;
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void addDeprecatedKeys() {
|
||||
|
@ -642,6 +647,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
vectoredActiveRangeReads = intOption(conf,
|
||||
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
|
||||
vectoredIOContext = populateVectoredIOContext(conf);
|
||||
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
|
||||
} catch (AmazonClientException e) {
|
||||
// amazon client exception: stop all services then throw the translation
|
||||
cleanupWithLogger(LOG, span);
|
||||
|
@ -1201,7 +1207,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "s3a";
|
||||
return this.scheme;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
public class ITestS3AUrlScheme extends AbstractS3ATestBase{
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSScheme() throws IOException, URISyntaxException {
|
||||
FileSystem fs = FileSystem.get(new URI("s3://mybucket/path"),
|
||||
getConfiguration());
|
||||
try {
|
||||
assertEquals("s3", fs.getScheme());
|
||||
Path path = fs.makeQualified(new Path("tmp/path"));
|
||||
assertEquals("s3", path.toUri().getScheme());
|
||||
} finally {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,11 +13,34 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.s3a.fileContext;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.TestFileContext;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* Implementation of TestFileContext for S3a.
|
||||
*/
|
||||
public class ITestS3AFileContext extends TestFileContext{
|
||||
public class ITestS3AFileContext extends TestFileContext {
|
||||
|
||||
@Test
|
||||
public void testScheme()
|
||||
throws URISyntaxException, UnsupportedFileSystemException {
|
||||
Configuration conf = new Configuration();
|
||||
URI uri = new URI("s3://mybucket/path");
|
||||
conf.set("fs.AbstractFileSystem.s3.impl",
|
||||
"org.apache.hadoop.fs.s3a.S3A");
|
||||
FileContext fc = FileContext.getFileContext(uri, conf);
|
||||
assertEquals("s3", fc.getDefaultFileSystem().getUri().getScheme());
|
||||
Path path = fc.makeQualified(new Path("tmp/path"));
|
||||
assertEquals("s3", path.toUri().getScheme());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue