diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 493e94e7bdd..c5a65d36c4a 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -278,6 +278,11 @@
junit
test
+
+ org.mockito
+ mockito-all
+ test
+
org.apache.hadoop
hadoop-mapreduce-client-jobclient
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1508675d702..cf97c35ba45 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -53,7 +53,8 @@ public final class Constants {
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
// connect to s3 over ssl?
- public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
+ public static final String SECURE_CONNECTIONS =
+ "fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
//use a custom endpoint?
@@ -75,7 +76,8 @@ public final class Constants {
public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
// seconds until we give up trying to establish a connection to s3
- public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout";
+ public static final String ESTABLISH_TIMEOUT =
+ "fs.s3a.connection.establish.timeout";
public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
// seconds until we give up on a connection to s3
@@ -116,11 +118,13 @@ public final class Constants {
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
// minimum size in bytes before we start a multipart uploads or copy
- public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
+ public static final String MIN_MULTIPART_THRESHOLD =
+ "fs.s3a.multipart.threshold";
public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
//enable multiobject-delete calls?
- public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable";
+ public static final String ENABLE_MULTI_DELETE =
+ "fs.s3a.multiobjectdelete.enable";
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
@@ -139,11 +143,13 @@ public final class Constants {
public static final String DEFAULT_CANNED_ACL = "";
// should we try to purge old multipart uploads when starting up
- public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
+ public static final String PURGE_EXISTING_MULTIPART =
+ "fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
// purge any multipart uploads older than this number of seconds
- public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
+ public static final String PURGE_EXISTING_MULTIPART_AGE =
+ "fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
// s3 server-side encryption
@@ -203,4 +209,15 @@ public final class Constants {
*/
@InterfaceStability.Unstable
public static final String INPUT_FADV_RANDOM = "random";
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static final String S3_CLIENT_FACTORY_IMPL =
+ "fs.s3a.s3.client.factory.impl";
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static final Class extends S3ClientFactory>
+ DEFAULT_S3_CLIENT_FACTORY_IMPL =
+ S3ClientFactory.DefaultS3ClientFactory.class;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 5509d369d9d..c25d0fbf901 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@@ -71,7 +71,7 @@ public class S3AFastOutputStream extends OutputStream {
private static final Logger LOG = S3AFileSystem.LOG;
private final String key;
private final String bucket;
- private final AmazonS3Client client;
+ private final AmazonS3 client;
private final int partSize;
private final int multiPartThreshold;
private final S3AFileSystem fs;
@@ -102,7 +102,7 @@ public class S3AFastOutputStream extends OutputStream {
* @param threadPoolExecutor thread factory
* @throws IOException on any problem
*/
- public S3AFastOutputStream(AmazonS3Client client,
+ public S3AFastOutputStream(AmazonS3 client,
S3AFileSystem fs,
String bucket,
String key,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e4d4e3d6c36..f238362eb35 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -35,11 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -58,7 +54,6 @@ import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressEvent;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -79,7 +74,7 @@ import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
@@ -111,7 +106,7 @@ public class S3AFileSystem extends FileSystem {
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri;
private Path workingDir;
- private AmazonS3Client s3;
+ private AmazonS3 s3;
private String bucket;
private int maxKeys;
private Listing listing;
@@ -150,37 +145,11 @@ public class S3AFileSystem extends FileSystem {
bucket = name.getHost();
- AWSCredentialsProvider credentials =
- createAWSCredentialProviderSet(name, conf, uri);
-
- ClientConfiguration awsConf = new ClientConfiguration();
- awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
- boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
- DEFAULT_SECURE_CONNECTIONS);
- awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
- awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
- DEFAULT_MAX_ERROR_RETRIES, 0));
- awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
- DEFAULT_ESTABLISH_TIMEOUT, 0));
- awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
- DEFAULT_SOCKET_TIMEOUT, 0));
- int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
- DEFAULT_SOCKET_SEND_BUFFER, 2048);
- int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
- DEFAULT_SOCKET_RECV_BUFFER, 2048);
- awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
- String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
- if (!signerOverride.isEmpty()) {
- LOG.debug("Signer override = {}", signerOverride);
- awsConf.setSignerOverride(signerOverride);
- }
-
- initProxySupport(conf, awsConf, secureConnections);
-
- initUserAgent(conf, awsConf);
-
- initAmazonS3Client(conf, credentials, awsConf);
+ Class extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
+ S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
+ S3ClientFactory.class);
+ s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
+ .createS3Client(name, uri);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this);
@@ -276,50 +245,6 @@ public class S3AFileSystem extends FileSystem {
}
}
- void initProxySupport(Configuration conf, ClientConfiguration awsConf,
- boolean secureConnections) throws IllegalArgumentException {
- String proxyHost = conf.getTrimmed(PROXY_HOST, "");
- int proxyPort = conf.getInt(PROXY_PORT, -1);
- if (!proxyHost.isEmpty()) {
- awsConf.setProxyHost(proxyHost);
- if (proxyPort >= 0) {
- awsConf.setProxyPort(proxyPort);
- } else {
- if (secureConnections) {
- LOG.warn("Proxy host set without port. Using HTTPS default 443");
- awsConf.setProxyPort(443);
- } else {
- LOG.warn("Proxy host set without port. Using HTTP default 80");
- awsConf.setProxyPort(80);
- }
- }
- String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
- String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
- if ((proxyUsername == null) != (proxyPassword == null)) {
- String msg = "Proxy error: " + PROXY_USERNAME + " or " +
- PROXY_PASSWORD + " set without the other.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- awsConf.setProxyUsername(proxyUsername);
- awsConf.setProxyPassword(proxyPassword);
- awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
- awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
- "domain {} as workstation {}", awsConf.getProxyHost(),
- awsConf.getProxyPort(),
- String.valueOf(awsConf.getProxyUsername()),
- awsConf.getProxyPassword(), awsConf.getProxyDomain(),
- awsConf.getProxyWorkstation());
- }
- } else if (proxyPort >= 0) {
- String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- }
-
/**
* Get S3A Instrumentation. For test purposes.
* @return this instance's instrumentation.
@@ -328,53 +253,9 @@ public class S3AFileSystem extends FileSystem {
return instrumentation;
}
- /**
- * Initializes the User-Agent header to send in HTTP requests to the S3
- * back-end. We always include the Hadoop version number. The user also may
- * set an optional custom prefix to put in front of the Hadoop version number.
- * The AWS SDK interally appends its own information, which seems to include
- * the AWS SDK version, OS and JVM version.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private void initUserAgent(Configuration conf, ClientConfiguration awsConf) {
- String userAgent = "Hadoop " + VersionInfo.getVersion();
- String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
- if (!userAgentPrefix.isEmpty()) {
- userAgent = userAgentPrefix + ", " + userAgent;
- }
- LOG.debug("Using User-Agent: {}", userAgent);
- awsConf.setUserAgent(userAgent);
- }
-
- private void initAmazonS3Client(Configuration conf,
- AWSCredentialsProvider credentials, ClientConfiguration awsConf)
- throws IllegalArgumentException {
- s3 = new AmazonS3Client(credentials, awsConf);
- String endPoint = conf.getTrimmed(ENDPOINT, "");
- if (!endPoint.isEmpty()) {
- try {
- s3.setEndpoint(endPoint);
- } catch (IllegalArgumentException e) {
- String msg = "Incorrect endpoint: " + e.getMessage();
- LOG.error(msg);
- throw new IllegalArgumentException(msg, e);
- }
- }
- enablePathStyleAccessIfRequired(conf);
- }
-
- private void enablePathStyleAccessIfRequired(Configuration conf) {
- final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
- if (pathStyleAccess) {
- LOG.debug("Enabling path style access!");
- s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
- }
- }
-
private void initTransferManager() {
- TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
+ TransferManagerConfiguration transferConfiguration =
+ new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
transferConfiguration.setMultipartCopyPartSize(partSize);
@@ -445,7 +326,7 @@ public class S3AFileSystem extends FileSystem {
* @return AmazonS3Client
*/
@VisibleForTesting
- AmazonS3Client getAmazonS3Client() {
+ AmazonS3 getAmazonS3Client() {
return s3;
}
@@ -469,10 +350,6 @@ public class S3AFileSystem extends FileSystem {
this.inputPolicy = inputPolicy;
}
- public S3AFileSystem() {
- super();
- }
-
/**
* Turns a path (relative or otherwise) into an S3 key.
*
@@ -791,8 +668,10 @@ public class S3AFileSystem extends FileSystem {
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
- keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
- String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
+ keysToDelete.add(
+ new DeleteObjectsRequest.KeyVersion(summary.getKey()));
+ String newDstKey =
+ dstKey + summary.getKey().substring(srcKey.length());
copyFile(summary.getKey(), newDstKey, summary.getSize());
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
@@ -1397,7 +1276,8 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("Found file (with /): fake directory");
return new S3AFileStatus(true, true, path);
} else {
- LOG.warn("Found file (with /): real file? should not happen: {}", key);
+ LOG.warn("Found file (with /): real file? should not happen: {}",
+ key);
return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()),
@@ -1994,42 +1874,4 @@ public class S3AFileSystem extends FileSystem {
getFileBlockLocations(status, 0, status.getLen())
: null);
}
-
- /**
- * Get a integer option >= the minimum allowed value.
- * @param conf configuration
- * @param key key to look up
- * @param defVal default value
- * @param min minimum value
- * @return the value
- * @throws IllegalArgumentException if the value is below the minimum
- */
- static int intOption(Configuration conf, String key, int defVal, int min) {
- int v = conf.getInt(key, defVal);
- Preconditions.checkArgument(v >= min,
- String.format("Value of %s: %d is below the minimum value %d",
- key, v, min));
- return v;
- }
-
- /**
- * Get a long option >= the minimum allowed value.
- * @param conf configuration
- * @param key key to look up
- * @param defVal default value
- * @param min minimum value
- * @return the value
- * @throws IllegalArgumentException if the value is below the minimum
- */
- static long longOption(Configuration conf,
- String key,
- long defVal,
- long min) {
- long v = conf.getLong(key, defVal);
- Preconditions.checkArgument(v >= min,
- String.format("Value of %s: %d is below the minimum value %d",
- key, v, min));
- return v;
- }
-
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index ccb97269aba..dd6cdd72174 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.base.Preconditions;
@@ -71,7 +71,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private volatile boolean closed;
private S3ObjectInputStream wrappedStream;
private final FileSystem.Statistics stats;
- private final AmazonS3Client client;
+ private final AmazonS3 client;
private final String bucket;
private final String key;
private final long contentLength;
@@ -101,7 +101,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
public S3AInputStream(String bucket,
String key,
long contentLength,
- AmazonS3Client client,
+ AmazonS3 client,
FileSystem.Statistics stats,
S3AInstrumentation instrumentation,
long readahead,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 699676dc2d1..a5e8e7a1b6a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -25,6 +25,9 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import com.google.common.base.Preconditions;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -403,4 +406,41 @@ public final class S3AUtils {
builder.append("size=").append(summary.getSize());
return builder.toString();
}
+
+ /**
+ * Get a integer option >= the minimum allowed value.
+ * @param conf configuration
+ * @param key key to look up
+ * @param defVal default value
+ * @param min minimum value
+ * @return the value
+ * @throws IllegalArgumentException if the value is below the minimum
+ */
+ static int intOption(Configuration conf, String key, int defVal, int min) {
+ int v = conf.getInt(key, defVal);
+ Preconditions.checkArgument(v >= min,
+ String.format("Value of %s: %d is below the minimum value %d",
+ key, v, min));
+ return v;
+ }
+
+ /**
+ * Get a long option >= the minimum allowed value.
+ * @param conf configuration
+ * @param key key to look up
+ * @param defVal default value
+ * @param min minimum value
+ * @return the value
+ * @throws IllegalArgumentException if the value is below the minimum
+ */
+ static long longOption(Configuration conf,
+ String key,
+ long defVal,
+ long min) {
+ long v = conf.getLong(key, defVal);
+ Preconditions.checkArgument(v >= min,
+ String.format("Value of %s: %d is below the minimum value %d",
+ key, v, min));
+ return v;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
new file mode 100644
index 00000000000..0a4dd027db1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+
+import org.slf4j.Logger;
+
+/**
+ * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+interface S3ClientFactory {
+
+ /**
+ * Creates a new {@link AmazonS3} client. This method accepts the S3A file
+ * system URI both in raw input form and validated form as separate arguments,
+ * because both values may be useful in logging.
+ *
+ * @param name raw input S3A file system URI
+ * @param uri validated form of S3A file system URI
+ * @return S3 client
+ * @throws IOException IO problem
+ */
+ AmazonS3 createS3Client(URI name, URI uri) throws IOException;
+
+ /**
+ * The default factory implementation, which calls the AWS SDK to configure
+ * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ */
+ static class DefaultS3ClientFactory extends Configured
+ implements S3ClientFactory {
+
+ private static final Logger LOG = S3AFileSystem.LOG;
+
+ @Override
+ public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
+ Configuration conf = getConf();
+ AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(name, conf, uri);
+ ClientConfiguration awsConf = new ClientConfiguration();
+ initConnectionSettings(conf, awsConf);
+ initProxySupport(conf, awsConf);
+ initUserAgent(conf, awsConf);
+ return createAmazonS3Client(conf, credentials, awsConf);
+ }
+
+ /**
+ * Initializes all AWS SDK settings related to connection management.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initConnectionSettings(Configuration conf,
+ ClientConfiguration awsConf) {
+ awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+ DEFAULT_MAXIMUM_CONNECTIONS, 1));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+ DEFAULT_SECURE_CONNECTIONS);
+ awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+ DEFAULT_MAX_ERROR_RETRIES, 0));
+ awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+ DEFAULT_ESTABLISH_TIMEOUT, 0));
+ awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+ DEFAULT_SOCKET_TIMEOUT, 0));
+ int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+ DEFAULT_SOCKET_SEND_BUFFER, 2048);
+ int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+ DEFAULT_SOCKET_RECV_BUFFER, 2048);
+ awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+ String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+ if (!signerOverride.isEmpty()) {
+ LOG.debug("Signer override = {}", signerOverride);
+ awsConf.setSignerOverride(signerOverride);
+ }
+ }
+
+ /**
+ * Initializes AWS SDK proxy support if configured.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static void initProxySupport(Configuration conf,
+ ClientConfiguration awsConf) throws IllegalArgumentException {
+ String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+ int proxyPort = conf.getInt(PROXY_PORT, -1);
+ if (!proxyHost.isEmpty()) {
+ awsConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ awsConf.setProxyPort(proxyPort);
+ } else {
+ if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ awsConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ awsConf.setProxyPort(80);
+ }
+ }
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+ PROXY_PASSWORD + " set without the other.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ awsConf.setProxyUsername(proxyUsername);
+ awsConf.setProxyPassword(proxyPassword);
+ awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+ awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+ "domain {} as workstation {}", awsConf.getProxyHost(),
+ awsConf.getProxyPort(),
+ String.valueOf(awsConf.getProxyUsername()),
+ awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+ awsConf.getProxyWorkstation());
+ }
+ } else if (proxyPort >= 0) {
+ String msg =
+ "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ /**
+ * Initializes the User-Agent header to send in HTTP requests to the S3
+ * back-end. We always include the Hadoop version number. The user also
+ * may set an optional custom prefix to put in front of the Hadoop version
+ * number. The AWS SDK interally appends its own information, which seems
+ * to include the AWS SDK version, OS and JVM version.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initUserAgent(Configuration conf,
+ ClientConfiguration awsConf) {
+ String userAgent = "Hadoop " + VersionInfo.getVersion();
+ String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+ if (!userAgentPrefix.isEmpty()) {
+ userAgent = userAgentPrefix + ", " + userAgent;
+ }
+ LOG.debug("Using User-Agent: {}", userAgent);
+ awsConf.setUserAgent(userAgent);
+ }
+
+ /**
+ * Creates an {@link AmazonS3Client} from the established configuration.
+ *
+ * @param conf Hadoop configuration
+ * @param credentials AWS credentials
+ * @param awsConf AWS SDK configuration
+ * @return S3 client
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static AmazonS3 createAmazonS3Client(Configuration conf,
+ AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+ throws IllegalArgumentException {
+ AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
+ String endPoint = conf.getTrimmed(ENDPOINT, "");
+ if (!endPoint.isEmpty()) {
+ try {
+ s3.setEndpoint(endPoint);
+ } catch (IllegalArgumentException e) {
+ String msg = "Incorrect endpoint: " + e.getMessage();
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+ enablePathStyleAccessIfRequired(s3, conf);
+ return s3;
+ }
+
+ /**
+ * Enables path-style access to S3 buckets if configured. By default, the
+ * behavior is to use virtual hosted-style access with URIs of the form
+ * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
+ * region-specific endpoint switches the behavior to use URIs of the form
+ * http://s3-eu-west-1.amazonaws.com/bucketname.
+ *
+ * @param s3 S3 client
+ * @param conf Hadoop configuration
+ */
+ private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+ Configuration conf) {
+ final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+ if (pathStyleAccess) {
+ LOG.debug("Enabling path style access!");
+ s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
+ }
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
index bc8c2e63e9d..97ece37012d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
@@ -132,6 +132,9 @@ public final class S3xLoginHelper {
*
* This strips out login information.
*
+ * @param uri the URI to canonicalize
+ * @param defaultPort default port to use in canonicalized URI if the input
+ * URI has no port and this value is greater than 0
* @return a new, canonicalized URI.
*/
public static URI canonicalizeUri(URI uri, int defaultPort) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
new file mode 100644
index 00000000000..6734947af96
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Abstract base class for S3A unit tests using a mock S3 client.
+ */
+public abstract class AbstractS3AMockTest {
+
+ protected static final String BUCKET = "mock-bucket";
+ protected static final AmazonServiceException NOT_FOUND;
+ static {
+ NOT_FOUND = new AmazonServiceException("Not Found");
+ NOT_FOUND.setStatusCode(404);
+ }
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ protected S3AFileSystem fs;
+ protected AmazonS3 s3;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
+ S3ClientFactory.class);
+ fs = new S3AFileSystem();
+ URI uri = URI.create(FS_S3A + "://" + BUCKET);
+ fs.initialize(uri, conf);
+ s3 = fs.getAmazonS3Client();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index 4e9933902c9..fca8e491da6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.commons.lang.StringUtils;
@@ -96,7 +96,7 @@ public class ITestS3AConfiguration {
} else {
conf.set(Constants.ENDPOINT, endpoint);
fs = S3ATestUtils.createTestFileSystem(conf);
- AmazonS3Client s3 = fs.getAmazonS3Client();
+ AmazonS3 s3 = fs.getAmazonS3Client();
String endPointRegion = "";
// Differentiate handling of "s3-" and "s3." based endpoint identifiers
String[] endpointParts = StringUtils.split(endpoint, '.');
@@ -364,7 +364,7 @@ public class ITestS3AConfiguration {
try {
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
- AmazonS3Client s3 = fs.getAmazonS3Client();
+ AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3);
S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
"clientOptions");
@@ -388,7 +388,7 @@ public class ITestS3AConfiguration {
conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
- AmazonS3Client s3 = fs.getAmazonS3Client();
+ AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration");
@@ -401,7 +401,7 @@ public class ITestS3AConfiguration {
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
- AmazonS3Client s3 = fs.getAmazonS3Client();
+ AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
new file mode 100644
index 00000000000..41f04ee728b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.mockito.Mockito.*;
+
+import java.net.URI;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+/**
+ * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
+ * interface suitable for unit testing.
+ */
+public class MockS3ClientFactory implements S3ClientFactory {
+
+ @Override
+ public AmazonS3 createS3Client(URI name, URI uri) {
+ String bucket = name.getHost();
+ AmazonS3 s3 = mock(AmazonS3.class);
+ when(s3.doesBucketExist(bucket)).thenReturn(true);
+ return s3;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
new file mode 100644
index 00000000000..f9e9c6bc74a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.Date;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+/**
+ * S3A tests for getFileStatus using mock S3 client.
+ */
+public class TestS3AGetFileStatus extends AbstractS3AMockTest {
+
+ @Test
+ public void testFile() throws Exception {
+ Path path = new Path("/file");
+ String key = path.toUri().getPath().substring(1);
+ ObjectMetadata meta = new ObjectMetadata();
+ meta.setContentLength(1L);
+ meta.setLastModified(new Date(2L));
+ when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
+ FileStatus stat = fs.getFileStatus(path);
+ assertNotNull(stat);
+ assertEquals(fs.makeQualified(path), stat.getPath());
+ assertTrue(stat.isFile());
+ assertEquals(meta.getContentLength(), stat.getLen());
+ assertEquals(meta.getLastModified().getTime(), stat.getModificationTime());
+ }
+
+ @Test
+ public void testFakeDirectory() throws Exception {
+ Path path = new Path("/dir");
+ String key = path.toUri().getPath().substring(1);
+ when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+ ObjectMetadata meta = new ObjectMetadata();
+ meta.setContentLength(0L);
+ when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
+ FileStatus stat = fs.getFileStatus(path);
+ assertNotNull(stat);
+ assertEquals(fs.makeQualified(path), stat.getPath());
+ assertTrue(stat.isDirectory());
+ }
+
+ @Test
+ public void testImplicitDirectory() throws Exception {
+ Path path = new Path("/dir");
+ String key = path.toUri().getPath().substring(1);
+ when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+ ObjectListing objects = mock(ObjectListing.class);
+ when(objects.getCommonPrefixes()).thenReturn(
+ Collections.singletonList("dir/"));
+ when(objects.getObjectSummaries()).thenReturn(
+ Collections.emptyList());
+ when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ FileStatus stat = fs.getFileStatus(path);
+ assertNotNull(stat);
+ assertEquals(fs.makeQualified(path), stat.getPath());
+ assertTrue(stat.isDirectory());
+ }
+
+ @Test
+ public void testRoot() throws Exception {
+ Path path = new Path("/");
+ String key = path.toUri().getPath().substring(1);
+ when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+ ObjectListing objects = mock(ObjectListing.class);
+ when(objects.getCommonPrefixes()).thenReturn(
+ Collections.emptyList());
+ when(objects.getObjectSummaries()).thenReturn(
+ Collections.emptyList());
+ when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ FileStatus stat = fs.getFileStatus(path);
+ assertNotNull(stat);
+ assertEquals(fs.makeQualified(path), stat.getPath());
+ assertTrue(stat.isDirectory());
+ assertTrue(stat.getPath().isRoot());
+ }
+
+ @Test
+ public void testNotFound() throws Exception {
+ Path path = new Path("/dir");
+ String key = path.toUri().getPath().substring(1);
+ when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+ ObjectListing objects = mock(ObjectListing.class);
+ when(objects.getCommonPrefixes()).thenReturn(
+ Collections.emptyList());
+ when(objects.getObjectSummaries()).thenReturn(
+ Collections.emptyList());
+ when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ exception.expect(FileNotFoundException.class);
+ fs.getFileStatus(path);
+ }
+}