HADOOP-13447. Refactor S3AFileSystem to support introduction of separate metadata repository and tests. Contributed by Chris Nauroth.

(cherry picked from commit d152557cf7)
This commit is contained in:
Chris Nauroth 2016-09-06 09:36:21 -07:00
parent 614f9a62c4
commit e28930a38b
12 changed files with 567 additions and 192 deletions

View File

@ -278,6 +278,11 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId>

View File

@ -53,7 +53,8 @@ private Constants() {
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
// connect to s3 over ssl? // connect to s3 over ssl?
public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; public static final String SECURE_CONNECTIONS =
"fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true; public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
//use a custom endpoint? //use a custom endpoint?
@ -75,7 +76,8 @@ private Constants() {
public static final int DEFAULT_MAX_ERROR_RETRIES = 20; public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
// seconds until we give up trying to establish a connection to s3 // seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000; public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
// seconds until we give up on a connection to s3 // seconds until we give up on a connection to s3
@ -116,11 +118,13 @@ private Constants() {
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
// minimum size in bytes before we start a multipart uploads or copy // minimum size in bytes before we start a multipart uploads or copy
public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; public static final String MIN_MULTIPART_THRESHOLD =
"fs.s3a.multipart.threshold";
public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
//enable multiobject-delete calls? //enable multiobject-delete calls?
public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable"; public static final String ENABLE_MULTI_DELETE =
"fs.s3a.multiobjectdelete.enable";
// comma separated list of directories // comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
@ -139,11 +143,13 @@ private Constants() {
public static final String DEFAULT_CANNED_ACL = ""; public static final String DEFAULT_CANNED_ACL = "";
// should we try to purge old multipart uploads when starting up // should we try to purge old multipart uploads when starting up
public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; public static final String PURGE_EXISTING_MULTIPART =
"fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false; public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
// purge any multipart uploads older than this number of seconds // purge any multipart uploads older than this number of seconds
public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; public static final String PURGE_EXISTING_MULTIPART_AGE =
"fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
// s3 server-side encryption // s3 server-side encryption
@ -203,4 +209,15 @@ private Constants() {
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static final String INPUT_FADV_RANDOM = "random"; public static final String INPUT_FADV_RANDOM = "random";
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static final String S3_CLIENT_FACTORY_IMPL =
"fs.s3a.s3.client.factory.impl";
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static final Class<? extends S3ClientFactory>
DEFAULT_S3_CLIENT_FACTORY_IMPL =
S3ClientFactory.DefaultS3ClientFactory.class;
} }

View File

@ -21,7 +21,7 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@ -71,7 +71,7 @@ public class S3AFastOutputStream extends OutputStream {
private static final Logger LOG = S3AFileSystem.LOG; private static final Logger LOG = S3AFileSystem.LOG;
private final String key; private final String key;
private final String bucket; private final String bucket;
private final AmazonS3Client client; private final AmazonS3 client;
private final int partSize; private final int partSize;
private final int multiPartThreshold; private final int multiPartThreshold;
private final S3AFileSystem fs; private final S3AFileSystem fs;
@ -102,7 +102,7 @@ public class S3AFastOutputStream extends OutputStream {
* @param threadPoolExecutor thread factory * @param threadPoolExecutor thread factory
* @throws IOException on any problem * @throws IOException on any problem
*/ */
public S3AFastOutputStream(AmazonS3Client client, public S3AFastOutputStream(AmazonS3 client,
S3AFileSystem fs, S3AFileSystem fs,
String bucket, String bucket,
String key, String key,

View File

@ -35,11 +35,7 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@ -58,7 +54,6 @@
import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEvent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -79,7 +74,7 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
@ -111,7 +106,7 @@ public class S3AFileSystem extends FileSystem {
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri; private URI uri;
private Path workingDir; private Path workingDir;
private AmazonS3Client s3; private AmazonS3 s3;
private String bucket; private String bucket;
private int maxKeys; private int maxKeys;
private Listing listing; private Listing listing;
@ -150,37 +145,11 @@ public void initialize(URI name, Configuration conf) throws IOException {
bucket = name.getHost(); bucket = name.getHost();
AWSCredentialsProvider credentials = Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
createAWSCredentialProviderSet(name, conf, uri); S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
ClientConfiguration awsConf = new ClientConfiguration(); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, .createS3Client(name, uri);
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
DEFAULT_SOCKET_SEND_BUFFER, 2048);
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
DEFAULT_SOCKET_RECV_BUFFER, 2048);
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
initProxySupport(conf, awsConf, secureConnections);
initUserAgent(conf, awsConf);
initAmazonS3Client(conf, credentials, awsConf);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this); listing = new Listing(this);
@ -276,50 +245,6 @@ protected void verifyBucketExists()
} }
} }
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
boolean secureConnections) throws IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(),
String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
/** /**
* Get S3A Instrumentation. For test purposes. * Get S3A Instrumentation. For test purposes.
* @return this instance's instrumentation. * @return this instance's instrumentation.
@ -328,53 +253,9 @@ public S3AInstrumentation getInstrumentation() {
return instrumentation; return instrumentation;
} }
/**
* Initializes the User-Agent header to send in HTTP requests to the S3
* back-end. We always include the Hadoop version number. The user also may
* set an optional custom prefix to put in front of the Hadoop version number.
* The AWS SDK interally appends its own information, which seems to include
* the AWS SDK version, OS and JVM version.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
private void initUserAgent(Configuration conf, ClientConfiguration awsConf) {
String userAgent = "Hadoop " + VersionInfo.getVersion();
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
if (!userAgentPrefix.isEmpty()) {
userAgent = userAgentPrefix + ", " + userAgent;
}
LOG.debug("Using User-Agent: {}", userAgent);
awsConf.setUserAgent(userAgent);
}
private void initAmazonS3Client(Configuration conf,
AWSCredentialsProvider credentials, ClientConfiguration awsConf)
throws IllegalArgumentException {
s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT, "");
if (!endPoint.isEmpty()) {
try {
s3.setEndpoint(endPoint);
} catch (IllegalArgumentException e) {
String msg = "Incorrect endpoint: " + e.getMessage();
LOG.error(msg);
throw new IllegalArgumentException(msg, e);
}
}
enablePathStyleAccessIfRequired(conf);
}
private void enablePathStyleAccessIfRequired(Configuration conf) {
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
if (pathStyleAccess) {
LOG.debug("Enabling path style access!");
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
}
}
private void initTransferManager() { private void initTransferManager() {
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); TransferManagerConfiguration transferConfiguration =
new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize); transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
transferConfiguration.setMultipartCopyPartSize(partSize); transferConfiguration.setMultipartCopyPartSize(partSize);
@ -445,7 +326,7 @@ public int getDefaultPort() {
* @return AmazonS3Client * @return AmazonS3Client
*/ */
@VisibleForTesting @VisibleForTesting
AmazonS3Client getAmazonS3Client() { AmazonS3 getAmazonS3Client() {
return s3; return s3;
} }
@ -469,10 +350,6 @@ public void setInputPolicy(S3AInputPolicy inputPolicy) {
this.inputPolicy = inputPolicy; this.inputPolicy = inputPolicy;
} }
public S3AFileSystem() {
super();
}
/** /**
* Turns a path (relative or otherwise) into an S3 key. * Turns a path (relative or otherwise) into an S3 key.
* *
@ -791,8 +668,10 @@ private boolean innerRename(Path src, Path dst) throws IOException,
while (true) { while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) { for (S3ObjectSummary summary : objects.getObjectSummaries()) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); keysToDelete.add(
String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); new DeleteObjectsRequest.KeyVersion(summary.getKey()));
String newDstKey =
dstKey + summary.getKey().substring(srcKey.length());
copyFile(summary.getKey(), newDstKey, summary.getSize()); copyFile(summary.getKey(), newDstKey, summary.getSize());
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
@ -1397,7 +1276,8 @@ public S3AFileStatus getFileStatus(final Path f) throws IOException {
LOG.debug("Found file (with /): fake directory"); LOG.debug("Found file (with /): fake directory");
return new S3AFileStatus(true, true, path); return new S3AFileStatus(true, true, path);
} else { } else {
LOG.warn("Found file (with /): real file? should not happen: {}", key); LOG.warn("Found file (with /): real file? should not happen: {}",
key);
return new S3AFileStatus(meta.getContentLength(), return new S3AFileStatus(meta.getContentLength(),
dateToLong(meta.getLastModified()), dateToLong(meta.getLastModified()),
@ -1994,42 +1874,4 @@ LocatedFileStatus toLocatedFileStatus(FileStatus status)
getFileBlockLocations(status, 0, status.getLen()) getFileBlockLocations(status, 0, status.getLen())
: null); : null);
} }
/**
* Get a integer option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static int intOption(Configuration conf, String key, int defVal, int min) {
int v = conf.getInt(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
/**
* Get a long option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static long longOption(Configuration conf,
String key,
long defVal,
long min) {
long v = conf.getLong(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
} }

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -71,7 +71,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private volatile boolean closed; private volatile boolean closed;
private S3ObjectInputStream wrappedStream; private S3ObjectInputStream wrappedStream;
private final FileSystem.Statistics stats; private final FileSystem.Statistics stats;
private final AmazonS3Client client; private final AmazonS3 client;
private final String bucket; private final String bucket;
private final String key; private final String key;
private final long contentLength; private final long contentLength;
@ -101,7 +101,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
public S3AInputStream(String bucket, public S3AInputStream(String bucket,
String key, String key,
long contentLength, long contentLength,
AmazonS3Client client, AmazonS3 client,
FileSystem.Statistics stats, FileSystem.Statistics stats,
S3AInstrumentation instrumentation, S3AInstrumentation instrumentation,
long readahead, long readahead,

View File

@ -25,6 +25,9 @@
import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -403,4 +406,41 @@ public static String stringify(S3ObjectSummary summary) {
builder.append("size=").append(summary.getSize()); builder.append("size=").append(summary.getSize());
return builder.toString(); return builder.toString();
} }
/**
* Get a integer option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static int intOption(Configuration conf, String key, int defVal, int min) {
int v = conf.getInt(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
/**
* Get a long option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static long longOption(Configuration conf,
String key,
long defVal,
long min) {
long v = conf.getLong(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
return v;
}
} }

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import java.io.IOException;
import java.net.URI;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
/**
* Factory for creation of S3 client instances to be used by {@link S3Store}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
interface S3ClientFactory {
/**
* Creates a new {@link AmazonS3} client. This method accepts the S3A file
* system URI both in raw input form and validated form as separate arguments,
* because both values may be useful in logging.
*
* @param name raw input S3A file system URI
* @param uri validated form of S3A file system URI
* @return S3 client
* @throws IOException IO problem
*/
AmazonS3 createS3Client(URI name, URI uri) throws IOException;
/**
* The default factory implementation, which calls the AWS SDK to configure
* and create an {@link AmazonS3Client} that communicates with the S3 service.
*/
static class DefaultS3ClientFactory extends Configured
implements S3ClientFactory {
private static final Logger LOG = S3AFileSystem.LOG;
@Override
public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
Configuration conf = getConf();
AWSCredentialsProvider credentials =
createAWSCredentialProviderSet(name, conf, uri);
ClientConfiguration awsConf = new ClientConfiguration();
initConnectionSettings(conf, awsConf);
initProxySupport(conf, awsConf);
initUserAgent(conf, awsConf);
return createAmazonS3Client(conf, credentials, awsConf);
}
/**
* Initializes all AWS SDK settings related to connection management.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
private static void initConnectionSettings(Configuration conf,
ClientConfiguration awsConf) {
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
DEFAULT_SOCKET_SEND_BUFFER, 2048);
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
DEFAULT_SOCKET_RECV_BUFFER, 2048);
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
}
/**
* Initializes AWS SDK proxy support if configured.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
* @throws IllegalArgumentException if misconfigured
*/
private static void initProxySupport(Configuration conf,
ClientConfiguration awsConf) throws IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(),
String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
} else if (proxyPort >= 0) {
String msg =
"Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
/**
* Initializes the User-Agent header to send in HTTP requests to the S3
* back-end. We always include the Hadoop version number. The user also
* may set an optional custom prefix to put in front of the Hadoop version
* number. The AWS SDK interally appends its own information, which seems
* to include the AWS SDK version, OS and JVM version.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
private static void initUserAgent(Configuration conf,
ClientConfiguration awsConf) {
String userAgent = "Hadoop " + VersionInfo.getVersion();
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
if (!userAgentPrefix.isEmpty()) {
userAgent = userAgentPrefix + ", " + userAgent;
}
LOG.debug("Using User-Agent: {}", userAgent);
awsConf.setUserAgent(userAgent);
}
/**
* Creates an {@link AmazonS3Client} from the established configuration.
*
* @param conf Hadoop configuration
* @param credentials AWS credentials
* @param awsConf AWS SDK configuration
* @return S3 client
* @throws IllegalArgumentException if misconfigured
*/
private static AmazonS3 createAmazonS3Client(Configuration conf,
AWSCredentialsProvider credentials, ClientConfiguration awsConf)
throws IllegalArgumentException {
AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT, "");
if (!endPoint.isEmpty()) {
try {
s3.setEndpoint(endPoint);
} catch (IllegalArgumentException e) {
String msg = "Incorrect endpoint: " + e.getMessage();
LOG.error(msg);
throw new IllegalArgumentException(msg, e);
}
}
enablePathStyleAccessIfRequired(s3, conf);
return s3;
}
/**
* Enables path-style access to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
* http://bucketname.s3.amazonaws.com. Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
* http://s3-eu-west-1.amazonaws.com/bucketname.
*
* @param s3 S3 client
* @param conf Hadoop configuration
*/
private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
Configuration conf) {
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
if (pathStyleAccess) {
LOG.debug("Enabling path style access!");
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
}
}
}
}

View File

@ -132,6 +132,9 @@ public static Login extractLoginDetails(URI name) {
* *
* This strips out login information. * This strips out login information.
* *
* @param uri the URI to canonicalize
* @param defaultPort default port to use in canonicalized URI if the input
* URI has no port and this value is greater than 0
* @return a new, canonicalized URI. * @return a new, canonicalized URI.
*/ */
public static URI canonicalizeUri(URI uri, int defaultPort) { public static URI canonicalizeUri(URI uri, int defaultPort) {

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.s3a.Constants.*;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
/**
* Abstract base class for S3A unit tests using a mock S3 client.
*/
public abstract class AbstractS3AMockTest {
protected static final String BUCKET = "mock-bucket";
protected static final AmazonServiceException NOT_FOUND;
static {
NOT_FOUND = new AmazonServiceException("Not Found");
NOT_FOUND.setStatusCode(404);
}
@Rule
public ExpectedException exception = ExpectedException.none();
protected S3AFileSystem fs;
protected AmazonS3 s3;
@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
fs = new S3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + BUCKET);
fs.initialize(uri, conf);
s3 = fs.getAmazonS3Client();
}
@After
public void teardown() throws Exception {
if (fs != null) {
fs.close();
}
}
}

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -96,7 +96,7 @@ public void testEndpoint() throws Exception {
} else { } else {
conf.set(Constants.ENDPOINT, endpoint); conf.set(Constants.ENDPOINT, endpoint);
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
AmazonS3Client s3 = fs.getAmazonS3Client(); AmazonS3 s3 = fs.getAmazonS3Client();
String endPointRegion = ""; String endPointRegion = "";
// Differentiate handling of "s3-" and "s3." based endpoint identifiers // Differentiate handling of "s3-" and "s3." based endpoint identifiers
String[] endpointParts = StringUtils.split(endpoint, '.'); String[] endpointParts = StringUtils.split(endpoint, '.');
@ -364,7 +364,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
try { try {
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs); assertNotNull(fs);
AmazonS3Client s3 = fs.getAmazonS3Client(); AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3); assertNotNull(s3);
S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class, S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
"clientOptions"); "clientOptions");
@ -388,7 +388,7 @@ public void testDefaultUserAgent() throws Exception {
conf = new Configuration(); conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs); assertNotNull(fs);
AmazonS3Client s3 = fs.getAmazonS3Client(); AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3); assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration"); "clientConfiguration");
@ -401,7 +401,7 @@ public void testCustomUserAgent() throws Exception {
conf.set(Constants.USER_AGENT_PREFIX, "MyApp"); conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs); assertNotNull(fs);
AmazonS3Client s3 = fs.getAmazonS3Client(); AmazonS3 s3 = fs.getAmazonS3Client();
assertNotNull(s3); assertNotNull(s3);
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class, ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
"clientConfiguration"); "clientConfiguration");

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.mockito.Mockito.*;
import java.net.URI;
import com.amazonaws.services.s3.AmazonS3;
/**
* An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
* interface suitable for unit testing.
*/
public class MockS3ClientFactory implements S3ClientFactory {
@Override
public AmazonS3 createS3Client(URI name, URI uri) {
String bucket = name.getHost();
AmazonS3 s3 = mock(AmazonS3.class);
when(s3.doesBucketExist(bucket)).thenReturn(true);
return s3;
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.Date;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
/**
* S3A tests for getFileStatus using mock S3 client.
*/
public class TestS3AGetFileStatus extends AbstractS3AMockTest {
@Test
public void testFile() throws Exception {
Path path = new Path("/file");
String key = path.toUri().getPath().substring(1);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(1L);
meta.setLastModified(new Date(2L));
when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
assertTrue(stat.isFile());
assertEquals(meta.getContentLength(), stat.getLen());
assertEquals(meta.getLastModified().getTime(), stat.getModificationTime());
}
@Test
public void testFakeDirectory() throws Exception {
Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(0L);
when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
assertTrue(stat.isDirectory());
}
@Test
public void testImplicitDirectory() throws Exception {
Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn(
Collections.singletonList("dir/"));
when(objects.getObjectSummaries()).thenReturn(
Collections.<S3ObjectSummary>emptyList());
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
assertTrue(stat.isDirectory());
}
@Test
public void testRoot() throws Exception {
Path path = new Path("/");
String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn(
Collections.<String>emptyList());
when(objects.getObjectSummaries()).thenReturn(
Collections.<S3ObjectSummary>emptyList());
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
assertTrue(stat.isDirectory());
assertTrue(stat.getPath().isRoot());
}
@Test
public void testNotFound() throws Exception {
Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn(
Collections.<String>emptyList());
when(objects.getObjectSummaries()).thenReturn(
Collections.<S3ObjectSummary>emptyList());
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
exception.expect(FileNotFoundException.class);
fs.getFileStatus(path);
}
}