From e28930a38b3868bc701aeef229831292320b4a3a Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Tue, 6 Sep 2016 09:36:21 -0700 Subject: [PATCH] HADOOP-13447. Refactor S3AFileSystem to support introduction of separate metadata repository and tests. Contributed by Chris Nauroth. (cherry picked from commit d152557cf7f4d2288524c222fcbaf152bdc038b0) --- hadoop-tools/hadoop-aws/pom.xml | 5 + .../org/apache/hadoop/fs/s3a/Constants.java | 29 ++- .../hadoop/fs/s3a/S3AFastOutputStream.java | 6 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 192 ++------------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 6 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 40 +++ .../apache/hadoop/fs/s3a/S3ClientFactory.java | 232 ++++++++++++++++++ .../hadoop/fs/s3native/S3xLoginHelper.java | 3 + .../hadoop/fs/s3a/AbstractS3AMockTest.java | 70 ++++++ .../hadoop/fs/s3a/ITestS3AConfiguration.java | 10 +- .../hadoop/fs/s3a/MockS3ClientFactory.java | 40 +++ .../hadoop/fs/s3a/TestS3AGetFileStatus.java | 126 ++++++++++ 12 files changed, 567 insertions(+), 192 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 493e94e7bdd..c5a65d36c4a 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -278,6 +278,11 @@ junit test + + org.mockito + mockito-all + test + org.apache.hadoop hadoop-mapreduce-client-jobclient diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 1508675d702..cf97c35ba45 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -53,7 +53,8 @@ public final class Constants { public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; // connect to s3 over ssl? - public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; + public static final String SECURE_CONNECTIONS = + "fs.s3a.connection.ssl.enabled"; public static final boolean DEFAULT_SECURE_CONNECTIONS = true; //use a custom endpoint? @@ -75,7 +76,8 @@ public final class Constants { public static final int DEFAULT_MAX_ERROR_RETRIES = 20; // seconds until we give up trying to establish a connection to s3 - public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; + public static final String ESTABLISH_TIMEOUT = + "fs.s3a.connection.establish.timeout"; public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000; // seconds until we give up on a connection to s3 @@ -116,11 +118,13 @@ public final class Constants { public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB // minimum size in bytes before we start a multipart uploads or copy - public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; + public static final String MIN_MULTIPART_THRESHOLD = + "fs.s3a.multipart.threshold"; public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; //enable multiobject-delete calls? - public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable"; + public static final String ENABLE_MULTI_DELETE = + "fs.s3a.multiobjectdelete.enable"; // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; @@ -139,11 +143,13 @@ public final class Constants { public static final String DEFAULT_CANNED_ACL = ""; // should we try to purge old multipart uploads when starting up - public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; + public static final String PURGE_EXISTING_MULTIPART = + "fs.s3a.multipart.purge"; public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false; // purge any multipart uploads older than this number of seconds - public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; + public static final String PURGE_EXISTING_MULTIPART_AGE = + "fs.s3a.multipart.purge.age"; public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; // s3 server-side encryption @@ -203,4 +209,15 @@ public final class Constants { */ @InterfaceStability.Unstable public static final String INPUT_FADV_RANDOM = "random"; + + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static final String S3_CLIENT_FACTORY_IMPL = + "fs.s3a.s3.client.factory.impl"; + + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static final Class + DEFAULT_S3_CLIENT_FACTORY_IMPL = + S3ClientFactory.DefaultS3ClientFactory.class; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 5509d369d9d..c25d0fbf901 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -21,7 +21,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; @@ -71,7 +71,7 @@ public class S3AFastOutputStream extends OutputStream { private static final Logger LOG = S3AFileSystem.LOG; private final String key; private final String bucket; - private final AmazonS3Client client; + private final AmazonS3 client; private final int partSize; private final int multiPartThreshold; private final S3AFileSystem fs; @@ -102,7 +102,7 @@ public class S3AFastOutputStream extends OutputStream { * @param threadPoolExecutor thread factory * @throws IOException on any problem */ - public S3AFastOutputStream(AmazonS3Client client, + public S3AFastOutputStream(AmazonS3 client, S3AFileSystem fs, String bucket, String key, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e4d4e3d6c36..f238362eb35 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -35,11 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -58,7 +54,6 @@ import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -79,7 +74,7 @@ import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL; @@ -111,7 +106,7 @@ public class S3AFileSystem extends FileSystem { public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; private URI uri; private Path workingDir; - private AmazonS3Client s3; + private AmazonS3 s3; private String bucket; private int maxKeys; private Listing listing; @@ -150,37 +145,11 @@ public class S3AFileSystem extends FileSystem { bucket = name.getHost(); - AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(name, conf, uri); - - ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, - DEFAULT_SOCKET_SEND_BUFFER, 2048); - int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, - DEFAULT_SOCKET_RECV_BUFFER, 2048); - awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); - } - - initProxySupport(conf, awsConf, secureConnections); - - initUserAgent(conf, awsConf); - - initAmazonS3Client(conf, credentials, awsConf); + Class s3ClientFactoryClass = conf.getClass( + S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, + S3ClientFactory.class); + s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) + .createS3Client(name, uri); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); listing = new Listing(this); @@ -276,50 +245,6 @@ public class S3AFileSystem extends FileSystem { } } - void initProxySupport(Configuration conf, ClientConfiguration awsConf, - boolean secureConnections) throws IllegalArgumentException { - String proxyHost = conf.getTrimmed(PROXY_HOST, ""); - int proxyPort = conf.getInt(PROXY_PORT, -1); - if (!proxyHost.isEmpty()) { - awsConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - awsConf.setProxyPort(proxyPort); - } else { - if (secureConnections) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - awsConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - awsConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME + " or " + - PROXY_PASSWORD + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - awsConf.setProxyUsername(proxyUsername); - awsConf.setProxyPassword(proxyPassword); - awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); - awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); - if (LOG.isDebugEnabled()) { - LOG.debug("Using proxy server {}:{} as user {} with password {} on " + - "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), - String.valueOf(awsConf.getProxyUsername()), - awsConf.getProxyPassword(), awsConf.getProxyDomain(), - awsConf.getProxyWorkstation()); - } - } else if (proxyPort >= 0) { - String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - /** * Get S3A Instrumentation. For test purposes. * @return this instance's instrumentation. @@ -328,53 +253,9 @@ public class S3AFileSystem extends FileSystem { return instrumentation; } - /** - * Initializes the User-Agent header to send in HTTP requests to the S3 - * back-end. We always include the Hadoop version number. The user also may - * set an optional custom prefix to put in front of the Hadoop version number. - * The AWS SDK interally appends its own information, which seems to include - * the AWS SDK version, OS and JVM version. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private void initUserAgent(Configuration conf, ClientConfiguration awsConf) { - String userAgent = "Hadoop " + VersionInfo.getVersion(); - String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); - if (!userAgentPrefix.isEmpty()) { - userAgent = userAgentPrefix + ", " + userAgent; - } - LOG.debug("Using User-Agent: {}", userAgent); - awsConf.setUserAgent(userAgent); - } - - private void initAmazonS3Client(Configuration conf, - AWSCredentialsProvider credentials, ClientConfiguration awsConf) - throws IllegalArgumentException { - s3 = new AmazonS3Client(credentials, awsConf); - String endPoint = conf.getTrimmed(ENDPOINT, ""); - if (!endPoint.isEmpty()) { - try { - s3.setEndpoint(endPoint); - } catch (IllegalArgumentException e) { - String msg = "Incorrect endpoint: " + e.getMessage(); - LOG.error(msg); - throw new IllegalArgumentException(msg, e); - } - } - enablePathStyleAccessIfRequired(conf); - } - - private void enablePathStyleAccessIfRequired(Configuration conf) { - final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); - if (pathStyleAccess) { - LOG.debug("Enabling path style access!"); - s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true)); - } - } - private void initTransferManager() { - TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + TransferManagerConfiguration transferConfiguration = + new TransferManagerConfiguration(); transferConfiguration.setMinimumUploadPartSize(partSize); transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); transferConfiguration.setMultipartCopyPartSize(partSize); @@ -445,7 +326,7 @@ public class S3AFileSystem extends FileSystem { * @return AmazonS3Client */ @VisibleForTesting - AmazonS3Client getAmazonS3Client() { + AmazonS3 getAmazonS3Client() { return s3; } @@ -469,10 +350,6 @@ public class S3AFileSystem extends FileSystem { this.inputPolicy = inputPolicy; } - public S3AFileSystem() { - super(); - } - /** * Turns a path (relative or otherwise) into an S3 key. * @@ -791,8 +668,10 @@ public class S3AFileSystem extends FileSystem { while (true) { for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); - String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); + keysToDelete.add( + new DeleteObjectsRequest.KeyVersion(summary.getKey())); + String newDstKey = + dstKey + summary.getKey().substring(srcKey.length()); copyFile(summary.getKey(), newDstKey, summary.getSize()); if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { @@ -1397,7 +1276,8 @@ public class S3AFileSystem extends FileSystem { LOG.debug("Found file (with /): fake directory"); return new S3AFileStatus(true, true, path); } else { - LOG.warn("Found file (with /): real file? should not happen: {}", key); + LOG.warn("Found file (with /): real file? should not happen: {}", + key); return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), @@ -1994,42 +1874,4 @@ public class S3AFileSystem extends FileSystem { getFileBlockLocations(status, 0, status.getLen()) : null); } - - /** - * Get a integer option >= the minimum allowed value. - * @param conf configuration - * @param key key to look up - * @param defVal default value - * @param min minimum value - * @return the value - * @throws IllegalArgumentException if the value is below the minimum - */ - static int intOption(Configuration conf, String key, int defVal, int min) { - int v = conf.getInt(key, defVal); - Preconditions.checkArgument(v >= min, - String.format("Value of %s: %d is below the minimum value %d", - key, v, min)); - return v; - } - - /** - * Get a long option >= the minimum allowed value. - * @param conf configuration - * @param key key to look up - * @param defVal default value - * @param min minimum value - * @return the value - * @throws IllegalArgumentException if the value is below the minimum - */ - static long longOption(Configuration conf, - String key, - long defVal, - long min) { - long v = conf.getLong(key, defVal); - Preconditions.checkArgument(v >= min, - String.format("Value of %s: %d is below the minimum value %d", - key, v, min)); - return v; - } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index ccb97269aba..dd6cdd72174 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -19,7 +19,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.google.common.base.Preconditions; @@ -71,7 +71,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private volatile boolean closed; private S3ObjectInputStream wrappedStream; private final FileSystem.Statistics stats; - private final AmazonS3Client client; + private final AmazonS3 client; private final String bucket; private final String key; private final long contentLength; @@ -101,7 +101,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { public S3AInputStream(String bucket, String key, long contentLength, - AmazonS3Client client, + AmazonS3 client, FileSystem.Statistics stats, S3AInstrumentation instrumentation, long readahead, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 699676dc2d1..a5e8e7a1b6a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -25,6 +25,9 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.S3ObjectSummary; + +import com.google.common.base.Preconditions; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -403,4 +406,41 @@ public final class S3AUtils { builder.append("size=").append(summary.getSize()); return builder.toString(); } + + /** + * Get a integer option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static int intOption(Configuration conf, String key, int defVal, int min) { + int v = conf.getInt(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; + } + + /** + * Get a long option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static long longOption(Configuration conf, + String key, + long defVal, + long min) { + long v = conf.getLong(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + return v; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java new file mode 100644 index 00000000000..0a4dd027db1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; + +import java.io.IOException; +import java.net.URI; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.VersionInfo; + +import org.slf4j.Logger; + +/** + * Factory for creation of S3 client instances to be used by {@link S3Store}. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +interface S3ClientFactory { + + /** + * Creates a new {@link AmazonS3} client. This method accepts the S3A file + * system URI both in raw input form and validated form as separate arguments, + * because both values may be useful in logging. + * + * @param name raw input S3A file system URI + * @param uri validated form of S3A file system URI + * @return S3 client + * @throws IOException IO problem + */ + AmazonS3 createS3Client(URI name, URI uri) throws IOException; + + /** + * The default factory implementation, which calls the AWS SDK to configure + * and create an {@link AmazonS3Client} that communicates with the S3 service. + */ + static class DefaultS3ClientFactory extends Configured + implements S3ClientFactory { + + private static final Logger LOG = S3AFileSystem.LOG; + + @Override + public AmazonS3 createS3Client(URI name, URI uri) throws IOException { + Configuration conf = getConf(); + AWSCredentialsProvider credentials = + createAWSCredentialProviderSet(name, conf, uri); + ClientConfiguration awsConf = new ClientConfiguration(); + initConnectionSettings(conf, awsConf); + initProxySupport(conf, awsConf); + initUserAgent(conf, awsConf); + return createAmazonS3Client(conf, credentials, awsConf); + } + + /** + * Initializes all AWS SDK settings related to connection management. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initConnectionSettings(Configuration conf, + ClientConfiguration awsConf) { + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, + DEFAULT_SOCKET_SEND_BUFFER, 2048); + int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, + DEFAULT_SOCKET_RECV_BUFFER, 2048); + awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + } + + /** + * Initializes AWS SDK proxy support if configured. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + * @throws IllegalArgumentException if misconfigured + */ + private static void initProxySupport(Configuration conf, + ClientConfiguration awsConf) throws IllegalArgumentException { + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = + "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + + /** + * Initializes the User-Agent header to send in HTTP requests to the S3 + * back-end. We always include the Hadoop version number. The user also + * may set an optional custom prefix to put in front of the Hadoop version + * number. The AWS SDK interally appends its own information, which seems + * to include the AWS SDK version, OS and JVM version. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initUserAgent(Configuration conf, + ClientConfiguration awsConf) { + String userAgent = "Hadoop " + VersionInfo.getVersion(); + String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); + if (!userAgentPrefix.isEmpty()) { + userAgent = userAgentPrefix + ", " + userAgent; + } + LOG.debug("Using User-Agent: {}", userAgent); + awsConf.setUserAgent(userAgent); + } + + /** + * Creates an {@link AmazonS3Client} from the established configuration. + * + * @param conf Hadoop configuration + * @param credentials AWS credentials + * @param awsConf AWS SDK configuration + * @return S3 client + * @throws IllegalArgumentException if misconfigured + */ + private static AmazonS3 createAmazonS3Client(Configuration conf, + AWSCredentialsProvider credentials, ClientConfiguration awsConf) + throws IllegalArgumentException { + AmazonS3 s3 = new AmazonS3Client(credentials, awsConf); + String endPoint = conf.getTrimmed(ENDPOINT, ""); + if (!endPoint.isEmpty()) { + try { + s3.setEndpoint(endPoint); + } catch (IllegalArgumentException e) { + String msg = "Incorrect endpoint: " + e.getMessage(); + LOG.error(msg); + throw new IllegalArgumentException(msg, e); + } + } + enablePathStyleAccessIfRequired(s3, conf); + return s3; + } + + /** + * Enables path-style access to S3 buckets if configured. By default, the + * behavior is to use virtual hosted-style access with URIs of the form + * http://bucketname.s3.amazonaws.com. Enabling path-style access and a + * region-specific endpoint switches the behavior to use URIs of the form + * http://s3-eu-west-1.amazonaws.com/bucketname. + * + * @param s3 S3 client + * @param conf Hadoop configuration + */ + private static void enablePathStyleAccessIfRequired(AmazonS3 s3, + Configuration conf) { + final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); + if (pathStyleAccess) { + LOG.debug("Enabling path style access!"); + s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true)); + } + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java index bc8c2e63e9d..97ece37012d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java @@ -132,6 +132,9 @@ public final class S3xLoginHelper { * * This strips out login information. * + * @param uri the URI to canonicalize + * @param defaultPort default port to use in canonicalized URI if the input + * URI has no port and this value is greater than 0 * @return a new, canonicalized URI. */ public static URI canonicalizeUri(URI uri, int defaultPort) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java new file mode 100644 index 00000000000..6734947af96 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; + +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +/** + * Abstract base class for S3A unit tests using a mock S3 client. + */ +public abstract class AbstractS3AMockTest { + + protected static final String BUCKET = "mock-bucket"; + protected static final AmazonServiceException NOT_FOUND; + static { + NOT_FOUND = new AmazonServiceException("Not Found"); + NOT_FOUND.setStatusCode(404); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + protected S3AFileSystem fs; + protected AmazonS3 s3; + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, + S3ClientFactory.class); + fs = new S3AFileSystem(); + URI uri = URI.create(FS_S3A + "://" + BUCKET); + fs.initialize(uri, conf); + s3 = fs.getAmazonS3Client(); + } + + @After + public void teardown() throws Exception { + if (fs != null) { + fs.close(); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index 4e9933902c9..fca8e491da6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -19,7 +19,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.ClientConfiguration; -import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.S3ClientOptions; import org.apache.commons.lang.StringUtils; @@ -96,7 +96,7 @@ public class ITestS3AConfiguration { } else { conf.set(Constants.ENDPOINT, endpoint); fs = S3ATestUtils.createTestFileSystem(conf); - AmazonS3Client s3 = fs.getAmazonS3Client(); + AmazonS3 s3 = fs.getAmazonS3Client(); String endPointRegion = ""; // Differentiate handling of "s3-" and "s3." based endpoint identifiers String[] endpointParts = StringUtils.split(endpoint, '.'); @@ -364,7 +364,7 @@ public class ITestS3AConfiguration { try { fs = S3ATestUtils.createTestFileSystem(conf); assertNotNull(fs); - AmazonS3Client s3 = fs.getAmazonS3Client(); + AmazonS3 s3 = fs.getAmazonS3Client(); assertNotNull(s3); S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class, "clientOptions"); @@ -388,7 +388,7 @@ public class ITestS3AConfiguration { conf = new Configuration(); fs = S3ATestUtils.createTestFileSystem(conf); assertNotNull(fs); - AmazonS3Client s3 = fs.getAmazonS3Client(); + AmazonS3 s3 = fs.getAmazonS3Client(); assertNotNull(s3); ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, "clientConfiguration"); @@ -401,7 +401,7 @@ public class ITestS3AConfiguration { conf.set(Constants.USER_AGENT_PREFIX, "MyApp"); fs = S3ATestUtils.createTestFileSystem(conf); assertNotNull(fs); - AmazonS3Client s3 = fs.getAmazonS3Client(); + AmazonS3 s3 = fs.getAmazonS3Client(); assertNotNull(s3); ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, "clientConfiguration"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java new file mode 100644 index 00000000000..41f04ee728b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.mockito.Mockito.*; + +import java.net.URI; + +import com.amazonaws.services.s3.AmazonS3; + +/** + * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3} + * interface suitable for unit testing. + */ +public class MockS3ClientFactory implements S3ClientFactory { + + @Override + public AmazonS3 createS3Client(URI name, URI uri) { + String bucket = name.getHost(); + AmazonS3 s3 = mock(AmazonS3.class); + when(s3.doesBucketExist(bucket)).thenReturn(true); + return s3; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java new file mode 100644 index 00000000000..f9e9c6bc74a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.FileNotFoundException; +import java.util.Collections; +import java.util.Date; + +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import org.junit.Test; + +/** + * S3A tests for getFileStatus using mock S3 client. + */ +public class TestS3AGetFileStatus extends AbstractS3AMockTest { + + @Test + public void testFile() throws Exception { + Path path = new Path("/file"); + String key = path.toUri().getPath().substring(1); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(1L); + meta.setLastModified(new Date(2L)); + when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta); + FileStatus stat = fs.getFileStatus(path); + assertNotNull(stat); + assertEquals(fs.makeQualified(path), stat.getPath()); + assertTrue(stat.isFile()); + assertEquals(meta.getContentLength(), stat.getLen()); + assertEquals(meta.getLastModified().getTime(), stat.getModificationTime()); + } + + @Test + public void testFakeDirectory() throws Exception { + Path path = new Path("/dir"); + String key = path.toUri().getPath().substring(1); + when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(0L); + when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta); + FileStatus stat = fs.getFileStatus(path); + assertNotNull(stat); + assertEquals(fs.makeQualified(path), stat.getPath()); + assertTrue(stat.isDirectory()); + } + + @Test + public void testImplicitDirectory() throws Exception { + Path path = new Path("/dir"); + String key = path.toUri().getPath().substring(1); + when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + ObjectListing objects = mock(ObjectListing.class); + when(objects.getCommonPrefixes()).thenReturn( + Collections.singletonList("dir/")); + when(objects.getObjectSummaries()).thenReturn( + Collections.emptyList()); + when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + FileStatus stat = fs.getFileStatus(path); + assertNotNull(stat); + assertEquals(fs.makeQualified(path), stat.getPath()); + assertTrue(stat.isDirectory()); + } + + @Test + public void testRoot() throws Exception { + Path path = new Path("/"); + String key = path.toUri().getPath().substring(1); + when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + ObjectListing objects = mock(ObjectListing.class); + when(objects.getCommonPrefixes()).thenReturn( + Collections.emptyList()); + when(objects.getObjectSummaries()).thenReturn( + Collections.emptyList()); + when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + FileStatus stat = fs.getFileStatus(path); + assertNotNull(stat); + assertEquals(fs.makeQualified(path), stat.getPath()); + assertTrue(stat.isDirectory()); + assertTrue(stat.getPath().isRoot()); + } + + @Test + public void testNotFound() throws Exception { + Path path = new Path("/dir"); + String key = path.toUri().getPath().substring(1); + when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + ObjectListing objects = mock(ObjectListing.class); + when(objects.getCommonPrefixes()).thenReturn( + Collections.emptyList()); + when(objects.getObjectSummaries()).thenReturn( + Collections.emptyList()); + when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + exception.expect(FileNotFoundException.class); + fs.getFileStatus(path); + } +}