HADOOP-13447. Refactor S3AFileSystem to support introduction of separate metadata repository and tests. Contributed by Chris Nauroth.
This commit is contained in:
parent
39d1b1d747
commit
d152557cf7
|
@ -295,6 +295,11 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||
|
|
|
@ -53,7 +53,8 @@ public final class Constants {
|
|||
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
|
||||
|
||||
// connect to s3 over ssl?
|
||||
public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
|
||||
public static final String SECURE_CONNECTIONS =
|
||||
"fs.s3a.connection.ssl.enabled";
|
||||
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
|
||||
|
||||
//use a custom endpoint?
|
||||
|
@ -75,7 +76,8 @@ public final class Constants {
|
|||
public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
|
||||
|
||||
// seconds until we give up trying to establish a connection to s3
|
||||
public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout";
|
||||
public static final String ESTABLISH_TIMEOUT =
|
||||
"fs.s3a.connection.establish.timeout";
|
||||
public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
|
||||
|
||||
// seconds until we give up on a connection to s3
|
||||
|
@ -111,11 +113,13 @@ public final class Constants {
|
|||
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
|
||||
|
||||
// minimum size in bytes before we start a multipart uploads or copy
|
||||
public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
|
||||
public static final String MIN_MULTIPART_THRESHOLD =
|
||||
"fs.s3a.multipart.threshold";
|
||||
public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
|
||||
|
||||
//enable multiobject-delete calls?
|
||||
public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable";
|
||||
public static final String ENABLE_MULTI_DELETE =
|
||||
"fs.s3a.multiobjectdelete.enable";
|
||||
|
||||
// comma separated list of directories
|
||||
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
|
||||
|
@ -134,11 +138,13 @@ public final class Constants {
|
|||
public static final String DEFAULT_CANNED_ACL = "";
|
||||
|
||||
// should we try to purge old multipart uploads when starting up
|
||||
public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
|
||||
public static final String PURGE_EXISTING_MULTIPART =
|
||||
"fs.s3a.multipart.purge";
|
||||
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
|
||||
|
||||
// purge any multipart uploads older than this number of seconds
|
||||
public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
|
||||
public static final String PURGE_EXISTING_MULTIPART_AGE =
|
||||
"fs.s3a.multipart.purge.age";
|
||||
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
|
||||
|
||||
// s3 server-side encryption
|
||||
|
@ -198,4 +204,15 @@ public final class Constants {
|
|||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String INPUT_FADV_RANDOM = "random";
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public static final String S3_CLIENT_FACTORY_IMPL =
|
||||
"fs.s3a.s3.client.factory.impl";
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public static final Class<? extends S3ClientFactory>
|
||||
DEFAULT_S3_CLIENT_FACTORY_IMPL =
|
||||
S3ClientFactory.DefaultS3ClientFactory.class;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.fs.s3a;
|
|||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.event.ProgressEvent;
|
||||
import com.amazonaws.event.ProgressListener;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||
|
@ -71,7 +71,7 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
private static final Logger LOG = S3AFileSystem.LOG;
|
||||
private final String key;
|
||||
private final String bucket;
|
||||
private final AmazonS3Client client;
|
||||
private final AmazonS3 client;
|
||||
private final int partSize;
|
||||
private final int multiPartThreshold;
|
||||
private final S3AFileSystem fs;
|
||||
|
@ -102,7 +102,7 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
* @param threadPoolExecutor thread factory
|
||||
* @throws IOException on any problem
|
||||
*/
|
||||
public S3AFastOutputStream(AmazonS3Client client,
|
||||
public S3AFastOutputStream(AmazonS3 client,
|
||||
S3AFileSystem fs,
|
||||
String bucket,
|
||||
String key,
|
||||
|
|
|
@ -34,11 +34,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.S3ClientOptions;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
|
@ -57,7 +53,6 @@ import com.amazonaws.services.s3.transfer.Upload;
|
|||
import com.amazonaws.event.ProgressListener;
|
||||
import com.amazonaws.event.ProgressEvent;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -78,7 +73,7 @@ import org.apache.hadoop.fs.StorageStatistics;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
|
||||
|
@ -110,7 +105,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
|
||||
private URI uri;
|
||||
private Path workingDir;
|
||||
private AmazonS3Client s3;
|
||||
private AmazonS3 s3;
|
||||
private String bucket;
|
||||
private int maxKeys;
|
||||
private Listing listing;
|
||||
|
@ -147,37 +142,11 @@ public class S3AFileSystem extends FileSystem {
|
|||
|
||||
bucket = name.getHost();
|
||||
|
||||
AWSCredentialsProvider credentials =
|
||||
createAWSCredentialProviderSet(name, conf, uri);
|
||||
|
||||
ClientConfiguration awsConf = new ClientConfiguration();
|
||||
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
||||
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
||||
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||
DEFAULT_SECURE_CONNECTIONS);
|
||||
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
||||
DEFAULT_MAX_ERROR_RETRIES, 0));
|
||||
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
|
||||
DEFAULT_ESTABLISH_TIMEOUT, 0));
|
||||
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
|
||||
DEFAULT_SOCKET_TIMEOUT, 0));
|
||||
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
|
||||
DEFAULT_SOCKET_SEND_BUFFER, 2048);
|
||||
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
|
||||
DEFAULT_SOCKET_RECV_BUFFER, 2048);
|
||||
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
|
||||
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
||||
if (!signerOverride.isEmpty()) {
|
||||
LOG.debug("Signer override = {}", signerOverride);
|
||||
awsConf.setSignerOverride(signerOverride);
|
||||
}
|
||||
|
||||
initProxySupport(conf, awsConf, secureConnections);
|
||||
|
||||
initUserAgent(conf, awsConf);
|
||||
|
||||
initAmazonS3Client(conf, credentials, awsConf);
|
||||
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
|
||||
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
|
||||
S3ClientFactory.class);
|
||||
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
|
||||
.createS3Client(name, uri);
|
||||
|
||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||
listing = new Listing(this);
|
||||
|
@ -266,50 +235,6 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
|
||||
boolean secureConnections) throws IllegalArgumentException {
|
||||
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
|
||||
int proxyPort = conf.getInt(PROXY_PORT, -1);
|
||||
if (!proxyHost.isEmpty()) {
|
||||
awsConf.setProxyHost(proxyHost);
|
||||
if (proxyPort >= 0) {
|
||||
awsConf.setProxyPort(proxyPort);
|
||||
} else {
|
||||
if (secureConnections) {
|
||||
LOG.warn("Proxy host set without port. Using HTTPS default 443");
|
||||
awsConf.setProxyPort(443);
|
||||
} else {
|
||||
LOG.warn("Proxy host set without port. Using HTTP default 80");
|
||||
awsConf.setProxyPort(80);
|
||||
}
|
||||
}
|
||||
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
|
||||
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
|
||||
if ((proxyUsername == null) != (proxyPassword == null)) {
|
||||
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
|
||||
PROXY_PASSWORD + " set without the other.";
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
awsConf.setProxyUsername(proxyUsername);
|
||||
awsConf.setProxyPassword(proxyPassword);
|
||||
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
|
||||
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
|
||||
"domain {} as workstation {}", awsConf.getProxyHost(),
|
||||
awsConf.getProxyPort(),
|
||||
String.valueOf(awsConf.getProxyUsername()),
|
||||
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
|
||||
awsConf.getProxyWorkstation());
|
||||
}
|
||||
} else if (proxyPort >= 0) {
|
||||
String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get S3A Instrumentation. For test purposes.
|
||||
* @return this instance's instrumentation.
|
||||
|
@ -318,53 +243,9 @@ public class S3AFileSystem extends FileSystem {
|
|||
return instrumentation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the User-Agent header to send in HTTP requests to the S3
|
||||
* back-end. We always include the Hadoop version number. The user also may
|
||||
* set an optional custom prefix to put in front of the Hadoop version number.
|
||||
* The AWS SDK interally appends its own information, which seems to include
|
||||
* the AWS SDK version, OS and JVM version.
|
||||
*
|
||||
* @param conf Hadoop configuration
|
||||
* @param awsConf AWS SDK configuration
|
||||
*/
|
||||
private void initUserAgent(Configuration conf, ClientConfiguration awsConf) {
|
||||
String userAgent = "Hadoop " + VersionInfo.getVersion();
|
||||
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
|
||||
if (!userAgentPrefix.isEmpty()) {
|
||||
userAgent = userAgentPrefix + ", " + userAgent;
|
||||
}
|
||||
LOG.debug("Using User-Agent: {}", userAgent);
|
||||
awsConf.setUserAgent(userAgent);
|
||||
}
|
||||
|
||||
private void initAmazonS3Client(Configuration conf,
|
||||
AWSCredentialsProvider credentials, ClientConfiguration awsConf)
|
||||
throws IllegalArgumentException {
|
||||
s3 = new AmazonS3Client(credentials, awsConf);
|
||||
String endPoint = conf.getTrimmed(ENDPOINT, "");
|
||||
if (!endPoint.isEmpty()) {
|
||||
try {
|
||||
s3.setEndpoint(endPoint);
|
||||
} catch (IllegalArgumentException e) {
|
||||
String msg = "Incorrect endpoint: " + e.getMessage();
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg, e);
|
||||
}
|
||||
}
|
||||
enablePathStyleAccessIfRequired(conf);
|
||||
}
|
||||
|
||||
private void enablePathStyleAccessIfRequired(Configuration conf) {
|
||||
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
|
||||
if (pathStyleAccess) {
|
||||
LOG.debug("Enabling path style access!");
|
||||
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
|
||||
}
|
||||
}
|
||||
|
||||
private void initTransferManager() {
|
||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
||||
TransferManagerConfiguration transferConfiguration =
|
||||
new TransferManagerConfiguration();
|
||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
||||
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
|
||||
transferConfiguration.setMultipartCopyPartSize(partSize);
|
||||
|
@ -435,7 +316,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @return AmazonS3Client
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AmazonS3Client getAmazonS3Client() {
|
||||
AmazonS3 getAmazonS3Client() {
|
||||
return s3;
|
||||
}
|
||||
|
||||
|
@ -459,10 +340,6 @@ public class S3AFileSystem extends FileSystem {
|
|||
this.inputPolicy = inputPolicy;
|
||||
}
|
||||
|
||||
public S3AFileSystem() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns a path (relative or otherwise) into an S3 key.
|
||||
*
|
||||
|
@ -782,8 +659,10 @@ public class S3AFileSystem extends FileSystem {
|
|||
|
||||
while (true) {
|
||||
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
|
||||
String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
|
||||
keysToDelete.add(
|
||||
new DeleteObjectsRequest.KeyVersion(summary.getKey()));
|
||||
String newDstKey =
|
||||
dstKey + summary.getKey().substring(srcKey.length());
|
||||
copyFile(summary.getKey(), newDstKey, summary.getSize());
|
||||
|
||||
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
|
||||
|
@ -1388,7 +1267,8 @@ public class S3AFileSystem extends FileSystem {
|
|||
LOG.debug("Found file (with /): fake directory");
|
||||
return new S3AFileStatus(true, true, path);
|
||||
} else {
|
||||
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
||||
LOG.warn("Found file (with /): real file? should not happen: {}",
|
||||
key);
|
||||
|
||||
return new S3AFileStatus(meta.getContentLength(),
|
||||
dateToLong(meta.getLastModified()),
|
||||
|
@ -1985,42 +1865,4 @@ public class S3AFileSystem extends FileSystem {
|
|||
getFileBlockLocations(status, 0, status.getLen())
|
||||
: null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a integer option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static int intOption(Configuration conf, String key, int defVal, int min) {
|
||||
int v = conf.getInt(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static long longOption(Configuration conf,
|
||||
String key,
|
||||
long defVal,
|
||||
long min) {
|
||||
long v = conf.getLong(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
return v;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -71,7 +71,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
private volatile boolean closed;
|
||||
private S3ObjectInputStream wrappedStream;
|
||||
private final FileSystem.Statistics stats;
|
||||
private final AmazonS3Client client;
|
||||
private final AmazonS3 client;
|
||||
private final String bucket;
|
||||
private final String key;
|
||||
private final long contentLength;
|
||||
|
@ -101,7 +101,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
public S3AInputStream(String bucket,
|
||||
String key,
|
||||
long contentLength,
|
||||
AmazonS3Client client,
|
||||
AmazonS3 client,
|
||||
FileSystem.Statistics stats,
|
||||
S3AInstrumentation instrumentation,
|
||||
long readahead,
|
||||
|
|
|
@ -25,6 +25,9 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
|
|||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -403,4 +406,41 @@ public final class S3AUtils {
|
|||
builder.append("size=").append(summary.getSize());
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a integer option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static int intOption(Configuration conf, String key, int defVal, int min) {
|
||||
int v = conf.getInt(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static long longOption(Configuration conf,
|
||||
String key,
|
||||
long defVal,
|
||||
long min) {
|
||||
long v = conf.getLong(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.Protocol;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.S3ClientOptions;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
/**
|
||||
* Factory for creation of S3 client instances to be used by {@link S3Store}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
interface S3ClientFactory {
|
||||
|
||||
/**
|
||||
* Creates a new {@link AmazonS3} client. This method accepts the S3A file
|
||||
* system URI both in raw input form and validated form as separate arguments,
|
||||
* because both values may be useful in logging.
|
||||
*
|
||||
* @param name raw input S3A file system URI
|
||||
* @param uri validated form of S3A file system URI
|
||||
* @return S3 client
|
||||
* @throws IOException IO problem
|
||||
*/
|
||||
AmazonS3 createS3Client(URI name, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* The default factory implementation, which calls the AWS SDK to configure
|
||||
* and create an {@link AmazonS3Client} that communicates with the S3 service.
|
||||
*/
|
||||
static class DefaultS3ClientFactory extends Configured
|
||||
implements S3ClientFactory {
|
||||
|
||||
private static final Logger LOG = S3AFileSystem.LOG;
|
||||
|
||||
@Override
|
||||
public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
|
||||
Configuration conf = getConf();
|
||||
AWSCredentialsProvider credentials =
|
||||
createAWSCredentialProviderSet(name, conf, uri);
|
||||
ClientConfiguration awsConf = new ClientConfiguration();
|
||||
initConnectionSettings(conf, awsConf);
|
||||
initProxySupport(conf, awsConf);
|
||||
initUserAgent(conf, awsConf);
|
||||
return createAmazonS3Client(conf, credentials, awsConf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes all AWS SDK settings related to connection management.
|
||||
*
|
||||
* @param conf Hadoop configuration
|
||||
* @param awsConf AWS SDK configuration
|
||||
*/
|
||||
private static void initConnectionSettings(Configuration conf,
|
||||
ClientConfiguration awsConf) {
|
||||
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
||||
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
||||
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||
DEFAULT_SECURE_CONNECTIONS);
|
||||
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
||||
DEFAULT_MAX_ERROR_RETRIES, 0));
|
||||
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
|
||||
DEFAULT_ESTABLISH_TIMEOUT, 0));
|
||||
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
|
||||
DEFAULT_SOCKET_TIMEOUT, 0));
|
||||
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
|
||||
DEFAULT_SOCKET_SEND_BUFFER, 2048);
|
||||
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
|
||||
DEFAULT_SOCKET_RECV_BUFFER, 2048);
|
||||
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
|
||||
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
||||
if (!signerOverride.isEmpty()) {
|
||||
LOG.debug("Signer override = {}", signerOverride);
|
||||
awsConf.setSignerOverride(signerOverride);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes AWS SDK proxy support if configured.
|
||||
*
|
||||
* @param conf Hadoop configuration
|
||||
* @param awsConf AWS SDK configuration
|
||||
* @throws IllegalArgumentException if misconfigured
|
||||
*/
|
||||
private static void initProxySupport(Configuration conf,
|
||||
ClientConfiguration awsConf) throws IllegalArgumentException {
|
||||
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
|
||||
int proxyPort = conf.getInt(PROXY_PORT, -1);
|
||||
if (!proxyHost.isEmpty()) {
|
||||
awsConf.setProxyHost(proxyHost);
|
||||
if (proxyPort >= 0) {
|
||||
awsConf.setProxyPort(proxyPort);
|
||||
} else {
|
||||
if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
|
||||
LOG.warn("Proxy host set without port. Using HTTPS default 443");
|
||||
awsConf.setProxyPort(443);
|
||||
} else {
|
||||
LOG.warn("Proxy host set without port. Using HTTP default 80");
|
||||
awsConf.setProxyPort(80);
|
||||
}
|
||||
}
|
||||
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
|
||||
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
|
||||
if ((proxyUsername == null) != (proxyPassword == null)) {
|
||||
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
|
||||
PROXY_PASSWORD + " set without the other.";
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
awsConf.setProxyUsername(proxyUsername);
|
||||
awsConf.setProxyPassword(proxyPassword);
|
||||
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
|
||||
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
|
||||
"domain {} as workstation {}", awsConf.getProxyHost(),
|
||||
awsConf.getProxyPort(),
|
||||
String.valueOf(awsConf.getProxyUsername()),
|
||||
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
|
||||
awsConf.getProxyWorkstation());
|
||||
}
|
||||
} else if (proxyPort >= 0) {
|
||||
String msg =
|
||||
"Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the User-Agent header to send in HTTP requests to the S3
|
||||
* back-end. We always include the Hadoop version number. The user also
|
||||
* may set an optional custom prefix to put in front of the Hadoop version
|
||||
* number. The AWS SDK interally appends its own information, which seems
|
||||
* to include the AWS SDK version, OS and JVM version.
|
||||
*
|
||||
* @param conf Hadoop configuration
|
||||
* @param awsConf AWS SDK configuration
|
||||
*/
|
||||
private static void initUserAgent(Configuration conf,
|
||||
ClientConfiguration awsConf) {
|
||||
String userAgent = "Hadoop " + VersionInfo.getVersion();
|
||||
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
|
||||
if (!userAgentPrefix.isEmpty()) {
|
||||
userAgent = userAgentPrefix + ", " + userAgent;
|
||||
}
|
||||
LOG.debug("Using User-Agent: {}", userAgent);
|
||||
awsConf.setUserAgent(userAgent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an {@link AmazonS3Client} from the established configuration.
|
||||
*
|
||||
* @param conf Hadoop configuration
|
||||
* @param credentials AWS credentials
|
||||
* @param awsConf AWS SDK configuration
|
||||
* @return S3 client
|
||||
* @throws IllegalArgumentException if misconfigured
|
||||
*/
|
||||
private static AmazonS3 createAmazonS3Client(Configuration conf,
|
||||
AWSCredentialsProvider credentials, ClientConfiguration awsConf)
|
||||
throws IllegalArgumentException {
|
||||
AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
|
||||
String endPoint = conf.getTrimmed(ENDPOINT, "");
|
||||
if (!endPoint.isEmpty()) {
|
||||
try {
|
||||
s3.setEndpoint(endPoint);
|
||||
} catch (IllegalArgumentException e) {
|
||||
String msg = "Incorrect endpoint: " + e.getMessage();
|
||||
LOG.error(msg);
|
||||
throw new IllegalArgumentException(msg, e);
|
||||
}
|
||||
}
|
||||
enablePathStyleAccessIfRequired(s3, conf);
|
||||
return s3;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables path-style access to S3 buckets if configured. By default, the
|
||||
* behavior is to use virtual hosted-style access with URIs of the form
|
||||
* http://bucketname.s3.amazonaws.com. Enabling path-style access and a
|
||||
* region-specific endpoint switches the behavior to use URIs of the form
|
||||
* http://s3-eu-west-1.amazonaws.com/bucketname.
|
||||
*
|
||||
* @param s3 S3 client
|
||||
* @param conf Hadoop configuration
|
||||
*/
|
||||
private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
|
||||
Configuration conf) {
|
||||
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
|
||||
if (pathStyleAccess) {
|
||||
LOG.debug("Enabling path style access!");
|
||||
s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -132,6 +132,9 @@ public final class S3xLoginHelper {
|
|||
*
|
||||
* This strips out login information.
|
||||
*
|
||||
* @param uri the URI to canonicalize
|
||||
* @param defaultPort default port to use in canonicalized URI if the input
|
||||
* URI has no port and this value is greater than 0
|
||||
* @return a new, canonicalized URI.
|
||||
*/
|
||||
public static URI canonicalizeUri(URI uri, int defaultPort) {
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
/**
|
||||
* Abstract base class for S3A unit tests using a mock S3 client.
|
||||
*/
|
||||
public abstract class AbstractS3AMockTest {
|
||||
|
||||
protected static final String BUCKET = "mock-bucket";
|
||||
protected static final AmazonServiceException NOT_FOUND;
|
||||
static {
|
||||
NOT_FOUND = new AmazonServiceException("Not Found");
|
||||
NOT_FOUND.setStatusCode(404);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
protected S3AFileSystem fs;
|
||||
protected AmazonS3 s3;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
|
||||
S3ClientFactory.class);
|
||||
fs = new S3AFileSystem();
|
||||
URI uri = URI.create(FS_S3A + "://" + BUCKET);
|
||||
fs.initialize(uri, conf);
|
||||
s3 = fs.getAmazonS3Client();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.S3ClientOptions;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
@ -96,7 +96,7 @@ public class ITestS3AConfiguration {
|
|||
} else {
|
||||
conf.set(Constants.ENDPOINT, endpoint);
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
AmazonS3Client s3 = fs.getAmazonS3Client();
|
||||
AmazonS3 s3 = fs.getAmazonS3Client();
|
||||
String endPointRegion = "";
|
||||
// Differentiate handling of "s3-" and "s3." based endpoint identifiers
|
||||
String[] endpointParts = StringUtils.split(endpoint, '.');
|
||||
|
@ -364,7 +364,7 @@ public class ITestS3AConfiguration {
|
|||
try {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
assertNotNull(fs);
|
||||
AmazonS3Client s3 = fs.getAmazonS3Client();
|
||||
AmazonS3 s3 = fs.getAmazonS3Client();
|
||||
assertNotNull(s3);
|
||||
S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
|
||||
"clientOptions");
|
||||
|
@ -388,7 +388,7 @@ public class ITestS3AConfiguration {
|
|||
conf = new Configuration();
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
assertNotNull(fs);
|
||||
AmazonS3Client s3 = fs.getAmazonS3Client();
|
||||
AmazonS3 s3 = fs.getAmazonS3Client();
|
||||
assertNotNull(s3);
|
||||
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
|
||||
"clientConfiguration");
|
||||
|
@ -401,7 +401,7 @@ public class ITestS3AConfiguration {
|
|||
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
assertNotNull(fs);
|
||||
AmazonS3Client s3 = fs.getAmazonS3Client();
|
||||
AmazonS3 s3 = fs.getAmazonS3Client();
|
||||
assertNotNull(s3);
|
||||
ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
|
||||
"clientConfiguration");
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
|
||||
/**
|
||||
* An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
|
||||
* interface suitable for unit testing.
|
||||
*/
|
||||
public class MockS3ClientFactory implements S3ClientFactory {
|
||||
|
||||
@Override
|
||||
public AmazonS3 createS3Client(URI name, URI uri) {
|
||||
String bucket = name.getHost();
|
||||
AmazonS3 s3 = mock(AmazonS3.class);
|
||||
when(s3.doesBucketExist(bucket)).thenReturn(true);
|
||||
return s3;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* S3A tests for getFileStatus using mock S3 client.
|
||||
*/
|
||||
public class TestS3AGetFileStatus extends AbstractS3AMockTest {
|
||||
|
||||
@Test
|
||||
public void testFile() throws Exception {
|
||||
Path path = new Path("/file");
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
ObjectMetadata meta = new ObjectMetadata();
|
||||
meta.setContentLength(1L);
|
||||
meta.setLastModified(new Date(2L));
|
||||
when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
assertTrue(stat.isFile());
|
||||
assertEquals(meta.getContentLength(), stat.getLen());
|
||||
assertEquals(meta.getLastModified().getTime(), stat.getModificationTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFakeDirectory() throws Exception {
|
||||
Path path = new Path("/dir");
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
|
||||
ObjectMetadata meta = new ObjectMetadata();
|
||||
meta.setContentLength(0L);
|
||||
when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
assertTrue(stat.isDirectory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testImplicitDirectory() throws Exception {
|
||||
Path path = new Path("/dir");
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
|
||||
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.singletonList("dir/"));
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
assertTrue(stat.isDirectory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoot() throws Exception {
|
||||
Path path = new Path("/");
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
|
||||
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.<String>emptyList());
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
assertTrue(stat.isDirectory());
|
||||
assertTrue(stat.getPath().isRoot());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotFound() throws Exception {
|
||||
Path path = new Path("/dir");
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
|
||||
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.<String>emptyList());
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
exception.expect(FileNotFoundException.class);
|
||||
fs.getFileStatus(path);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue