HADOOP-13130. s3a failures can surface as RTEs, not IOEs. (Steve Loughran)

This commit is contained in:
Steve Loughran 2016-05-21 16:39:25 +01:00
parent 500e946729
commit 39ec1515a2
17 changed files with 1113 additions and 243 deletions

View File

@ -29,4 +29,8 @@ public class InvalidRequestException extends IOException {
public InvalidRequestException(String str) {
super(str);
}
public InvalidRequestException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -24,4 +24,14 @@ public class PathAccessDeniedException extends PathIOException {
public PathAccessDeniedException(String path) {
super(path, "Permission denied");
}
public PathAccessDeniedException(String path, Throwable cause) {
super(path, cause);
}
public PathAccessDeniedException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.fs;
/**
* Exception corresponding to Permission denied - ENOENT
* Exception corresponding to path not found: ENOENT/ENOFILE
*/
public class PathNotFoundException extends PathIOException {
static final long serialVersionUID = 0L;
@ -26,4 +26,18 @@ public class PathNotFoundException extends PathIOException {
public PathNotFoundException(String path) {
super(path, "No such file or directory");
}
public PathNotFoundException(String path, Throwable cause) {
super(path, cause);
}
public PathNotFoundException(String path, String error) {
super(path, error);
}
public PathNotFoundException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
}

View File

@ -26,4 +26,18 @@ public class PathPermissionException extends PathIOException {
public PathPermissionException(String path) {
super(path, "Operation not permitted");
}
public PathPermissionException(String path, Throwable cause) {
super(path, cause);
}
public PathPermissionException(String path, String error) {
super(path, error);
}
public PathPermissionException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.google.common.base.Preconditions;
import java.io.IOException;
/**
* IOException equivalent of an {@link AmazonClientException}.
*/
public class AWSClientIOException extends IOException {
private final String operation;
public AWSClientIOException(String operation,
AmazonClientException cause) {
super(cause);
Preconditions.checkArgument(operation != null, "Null 'operation' argument");
Preconditions.checkArgument(cause != null, "Null 'cause' argument");
this.operation = operation;
}
public AmazonClientException getCause() {
return (AmazonClientException) super.getCause();
}
@Override
public String getMessage() {
return operation + ": " + getCause().getMessage();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.Map;
/**
* Wrap a {@link AmazonS3Exception} as an IOE, relaying all
* getters.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AWSS3IOException extends AWSServiceIOException {
/**
* Instantiate.
* @param operation operation which triggered this
* @param cause the underlying cause
*/
public AWSS3IOException(String operation,
AmazonS3Exception cause) {
super(operation, cause);
}
public AmazonS3Exception getCause() {
return (AmazonS3Exception) super.getCause();
}
public String getErrorResponseXml() {
return getCause().getErrorResponseXml();
}
public Map<String, String> getAdditionalDetails() {
return getCause().getAdditionalDetails();
}
public String getExtendedRequestId() {
return getCause().getExtendedRequestId();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A specific exception from AWS operations.
* The exception must always be created with an {@link AmazonServiceException}.
* The attributes of this exception can all be directly accessed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AWSServiceIOException extends AWSClientIOException {
/**
* Instantiate.
* @param operation operation which triggered this
* @param cause the underlying cause
*/
public AWSServiceIOException(String operation,
AmazonServiceException cause) {
super(operation, cause);
}
public AmazonServiceException getCause() {
return (AmazonServiceException) super.getCause();
}
public String getRequestId() {
return getCause().getRequestId();
}
public String getServiceName() {
return getCause().getServiceName();
}
public String getErrorCode() {
return getCause().getErrorCode();
}
public int getStatusCode() {
return getCause().getStatusCode();
}
public String getRawResponseContent() {
return getCause().getRawResponseContent();
}
public boolean isRetryable() {
return getCause().isRetryable();
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3Client;
@ -54,6 +53,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
* Upload files/parts asap directly from a memory buffer (instead of buffering
@ -152,10 +152,8 @@ public class S3AFastOutputStream extends OutputStream {
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
this.multiPartUpload = null;
this.progressListener = new ProgressableListener(progress);
if (LOG.isDebugEnabled()){
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
bucket, key);
}
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
bucket, key);
}
/**
@ -210,15 +208,11 @@ public class S3AFastOutputStream extends OutputStream {
requires multiple parts! */
final byte[] allBytes = buffer.toByteArray();
buffer = null; //earlier gc?
if (LOG.isDebugEnabled()) {
LOG.debug("Total length of initial buffer: {}", allBytes.length);
}
LOG.debug("Total length of initial buffer: {}", allBytes.length);
int processedPos = 0;
while ((multiPartThreshold - processedPos) >= partSize) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initial buffer: processing from byte {} to byte {}",
processedPos, (processedPos + partSize - 1));
}
LOG.debug("Initial buffer: processing from byte {} to byte {}",
processedPos, (processedPos + partSize - 1));
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
processedPos, partSize), partSize);
processedPos += partSize;
@ -235,7 +229,13 @@ public class S3AFastOutputStream extends OutputStream {
}
}
/**
* Close the stream. This will not return until the upload is complete
* or the attempt to perform the upload has failed.
* Exceptions raised in this method are indicative that the write has
* failed and data is at risk of being lost.
* @throws IOException on any failure.
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
@ -258,9 +258,7 @@ public class S3AFastOutputStream extends OutputStream {
statistics.incrementWriteOps(1);
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
if (LOG.isDebugEnabled()) {
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
}
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
} finally {
buffer = null;
super.close();
@ -283,20 +281,14 @@ public class S3AFastOutputStream extends OutputStream {
try {
return new MultiPartUpload(
client.initiateMultipartUpload(initiateMPURequest).getUploadId());
} catch (AmazonServiceException ase) {
throw new IOException("Unable to initiate MultiPartUpload (server side)" +
": " + ase, ase);
} catch (AmazonClientException ace) {
throw new IOException("Unable to initiate MultiPartUpload (client side)" +
": " + ace, ace);
throw translateException("initiate MultiPartUpload", key, ace);
}
}
private void putObject() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
key);
}
LOG.debug("Executing regular upload for bucket '{}' key '{}'",
bucket, key);
final ObjectMetadata om = createDefaultMetadata();
om.setContentLength(buffer.size());
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
@ -317,10 +309,11 @@ public class S3AFastOutputStream extends OutputStream {
LOG.warn("Interrupted object upload:" + ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
throw new IOException("Regular upload failed", ee.getCause());
throw extractException("regular upload", key, ee);
}
}
private class MultiPartUpload {
private final String uploadId;
private final List<ListenableFuture<PartETag>> partETagsFutures;
@ -328,13 +321,11 @@ public class S3AFastOutputStream extends OutputStream {
public MultiPartUpload(String uploadId) {
this.uploadId = uploadId;
this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
if (LOG.isDebugEnabled()) {
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
"id '{}'", bucket, key, uploadId);
}
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
"id '{}'", bucket, key, uploadId);
}
public void uploadPartAsync(ByteArrayInputStream inputStream,
private void uploadPartAsync(ByteArrayInputStream inputStream,
int partSize) {
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request =
@ -346,22 +337,21 @@ public class S3AFastOutputStream extends OutputStream {
executorService.submit(new Callable<PartETag>() {
@Override
public PartETag call() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
}
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
return client.uploadPart(request).getPartETag();
}
});
partETagsFutures.add(partETagFuture);
}
public List<PartETag> waitForAllPartUploads() throws IOException {
private List<PartETag> waitForAllPartUploads() throws IOException {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
//there is no way of recovering so abort
//cancel all partUploads
@ -370,22 +360,23 @@ public class S3AFastOutputStream extends OutputStream {
}
//abort multipartupload
this.abort();
throw new IOException("Part upload failed in multi-part upload with " +
"id '" +uploadId + "':" + ee, ee);
throw extractException("Multi-part upload with id '" + uploadId + "'",
key, ee);
}
//should not happen?
return null;
}
public void complete(List<PartETag> partETags) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
uploadId);
private void complete(List<PartETag> partETags) throws IOException {
try {
LOG.debug("Completing multi-part upload for key '{}', id '{}'",
key, uploadId);
client.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket,
key,
uploadId,
partETags));
} catch (AmazonClientException e) {
throw translateException("Completing multi-part upload", key, e);
}
final CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
client.completeMultipartUpload(completeRequest);
}
public void abort() {

View File

@ -40,6 +40,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@ -75,6 +76,7 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,85 +128,115 @@ public class S3AFileSystem extends FileSystem {
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
setConf(conf);
instrumentation = new S3AInstrumentation(name);
try {
instrumentation = new S3AInstrumentation(name);
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user", System.getProperty("user.name"))
.makeQualified(this.uri, this.getWorkingDirectory());
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user", System.getProperty("user.name"))
.makeQualified(this.uri, this.getWorkingDirectory());
bucket = name.getHost();
bucket = name.getHost();
AWSCredentialsProvider credentials = getAWSCredentialsProvider(name, conf);
AWSCredentialsProvider credentials =
getAWSCredentialsProvider(name, conf);
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
initProxySupport(conf, awsConf, secureConnections);
initUserAgent(conf, awsConf);
initAmazonS3Client(conf, credentials, awsConf);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
if (partSize < 5 * 1024 * 1024) {
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
}
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
multiPartThreshold = 5 * 1024 * 1024;
}
//check but do not store the block size
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
if (maxThreads < 2) {
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = 2;
}
int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
if (totalTasks < 1) {
LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
totalTasks = 1;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
initTransferManager();
initCannedAcls(conf);
verifyBucketExists();
initMultipartUploads(conf);
serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
initProxySupport(conf, awsConf, secureConnections);
}
initUserAgent(conf, awsConf);
initAmazonS3Client(conf, credentials, awsConf);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
if (partSize < 5 * 1024 * 1024) {
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
/**
* Verify that the bucket exists. This does not check permissions,
* not even read access.
* @throws FileNotFoundException the bucket is absent
* @throws IOException any other problem talking to S3
*/
protected void verifyBucketExists()
throws FileNotFoundException, IOException {
try {
if (!s3.doesBucketExist(bucket)) {
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
}
} catch (AmazonS3Exception e) {
// this is a sign of a serious startup problem so do dump everything
LOG.warn(stringify(e), e);
throw translateException("doesBucketExist", bucket, e);
} catch (AmazonServiceException e) {
// this is a sign of a serious startup problem so do dump everything
LOG.warn(stringify(e), e);
throw translateException("doesBucketExist", bucket, e);
} catch (AmazonClientException e) {
throw translateException("doesBucketExist", bucket, e);
}
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
multiPartThreshold = 5 * 1024 * 1024;
}
//check but do not store the block size
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
if (maxThreads < 2) {
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = 2;
}
int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
if (totalTasks < 1) {
LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
totalTasks = 1;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
initTransferManager();
initCannedAcls(conf);
if (!s3.doesBucketExist(bucket)) {
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
}
initMultipartUploads(conf);
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
}
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
@ -316,7 +348,7 @@ public class S3AFileSystem extends FileSystem {
}
}
private void initMultipartUploads(Configuration conf) {
private void initMultipartUploads(Configuration conf) throws IOException {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = longOption(conf,
@ -331,10 +363,10 @@ public class S3AFileSystem extends FileSystem {
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 403) {
instrumentation.errorIgnored();
LOG.debug("Failed to abort multipart uploads against {}," +
LOG.debug("Failed to purging multipart uploads against {}," +
" FS may be read only", bucket, e);
} else {
throw e;
throw translateException("purging multipart uploads", bucket, e);
}
}
}
@ -576,10 +608,28 @@ public class S3AFileSystem extends FileSystem {
*
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on failure
* @throws IOException on IO failure
* @return true if rename is successful
*/
public boolean rename(Path src, Path dst) throws IOException {
try {
return innerRename(src, dst);
} catch (AmazonClientException e) {
throw translateException("rename(" + src +", " + dst + ")", src, e);
}
}
/**
* The inner rename operation. See {@link #rename(Path, Path)} for
* the description of the operation.
* @param src path to be renamed
* @param dst new path after rename
* @return true if rename is successful
* @throws IOException on IO failure.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private boolean innerRename(Path src, Path dst) throws IOException,
AmazonClientException {
LOG.debug("Rename path {} to {}", src, dst);
String srcKey = pathToKey(src);
@ -722,7 +772,7 @@ public class S3AFileSystem extends FileSystem {
* when set to true
*/
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean clearKeys) {
boolean clearKeys) throws AmazonClientException {
if (enableMultiObjectsDelete) {
DeleteObjectsRequest deleteRequest
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
@ -745,7 +795,9 @@ public class S3AFileSystem extends FileSystem {
}
}
/** Delete a file.
/**
* Delete a Path. This operation is at least {@code O(files)}, with
* added overheads to enumerate the path. It is also not atomic.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
@ -755,6 +807,26 @@ public class S3AFileSystem extends FileSystem {
* @throws IOException due to inability to delete a directory or file.
*/
public boolean delete(Path f, boolean recursive) throws IOException {
try {
return innerDelete(f, recursive);
} catch (AmazonClientException e) {
throw translateException("delete", f, e);
}
}
/**
* Delete a path. See {@link #delete(Path, boolean)}.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException due to inability to delete a directory or file.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private boolean innerDelete(Path f, boolean recursive) throws IOException,
AmazonClientException {
LOG.debug("Delete path {} - recursive {}", f , recursive);
S3AFileStatus status;
try {
@ -835,7 +907,8 @@ public class S3AFileSystem extends FileSystem {
return true;
}
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
private void createFakeDirectoryIfNecessary(Path f)
throws IOException, AmazonClientException {
String key = pathToKey(f);
if (!key.isEmpty() && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
@ -854,6 +927,25 @@ public class S3AFileSystem extends FileSystem {
*/
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
try {
return innerListStatus(f);
} catch (AmazonClientException e) {
throw translateException("listStatus", f, e);
}
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
IOException, AmazonClientException {
String key = pathToKey(f);
LOG.debug("List status for path: {}", f);
@ -945,15 +1037,42 @@ public class S3AFileSystem extends FileSystem {
}
/**
* Make the given file and all non-existent parents into
* directories. Has the semantics of Unix 'mkdir -p'.
*
* Make the given path and all non-existent parents into
* directories. Has the semantics of Unix @{code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* @param f path to create
* @param path path to create
* @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
*/
// TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
public boolean mkdirs(Path path, FsPermission permission) throws IOException,
FileAlreadyExistsException {
try {
return innerMkdirs(path, permission);
} catch (AmazonClientException e) {
throw translateException("innerMkdirs", path, e);
}
}
/**
*
* Make the given path and all non-existent parents into
* directories.
* See {@link #mkdirs(Path, FsPermission)}
* @param f path to create
* @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
* @throws AmazonClientException on failures inside the AWS SDK
*/
// TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
private boolean innerMkdirs(Path f, FsPermission permission)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Making directory: {}", f);
try {
@ -991,7 +1110,7 @@ public class S3AFileSystem extends FileSystem {
* @param f The path we want information from
* @return a FileStatus object
* @throws java.io.FileNotFoundException when the path does not exist;
* IOException see specific implementation
* @throws IOException on other problems.
*/
public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f);
@ -1015,12 +1134,10 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(f.toString(), e);
throw e;
throw translateException("getFileStatus", f, e);
}
} catch (AmazonClientException e) {
printAmazonClientException(f.toString(), e);
throw e;
throw translateException("getFileStatus", f, e);
}
// Necessary?
@ -1043,12 +1160,10 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(newKey, e);
throw e;
throw translateException("getFileStatus", newKey, e);
}
} catch (AmazonClientException e) {
printAmazonClientException(newKey, e);
throw e;
throw translateException("getFileStatus", newKey, e);
}
}
}
@ -1089,12 +1204,10 @@ public class S3AFileSystem extends FileSystem {
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
printAmazonServiceException(key, e);
throw e;
throw translateException("getFileStatus", key, e);
}
} catch (AmazonClientException e) {
printAmazonClientException(key, e);
throw e;
throw translateException("getFileStatus", key, e);
}
LOG.debug("Not Found: {}", f);
@ -1113,10 +1226,42 @@ public class S3AFileSystem extends FileSystem {
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
try {
innerCopyFromLocalFile(delSrc, overwrite, src, dst);
} catch (AmazonClientException e) {
throw translateException("copyFromLocalFile(" + src + ", " + dst + ")",
src, e);
}
}
/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
String key = pathToKey(dst);
if (!overwrite && exists(dst)) {
@ -1166,8 +1311,12 @@ public class S3AFileSystem extends FileSystem {
}
}
/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
*/
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
try {
super.close();
} finally {
@ -1179,49 +1328,63 @@ public class S3AFileSystem extends FileSystem {
}
/**
* Override getCanonicalServiceName because we don't support token in S3A.
*/
* Override getCanonicalServiceName because we don't support token in S3A.
*/
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
/**
* Copy a single object in the bucket via a COPY operation.
* @param srcKey source object path
* @param dstKey destination object path
* @param size object size
* @throws AmazonClientException on failures inside the AWS SDK
* @throws InterruptedIOException the operation was interrupted
* @throws IOException Other IO problems
*/
private void copyFile(String srcKey, String dstKey, long size)
throws IOException {
throws IOException, InterruptedIOException, AmazonClientException {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
ProgressListener progressListener = new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
statistics.incrementWriteOps(1);
break;
default:
break;
}
}
};
Copy copy = transfers.copy(copyObjectRequest);
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
instrumentation.filesCopied(1, size);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + srcKey
+ " to " + dstKey + ", cancelling");
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
ProgressListener progressListener = new ProgressListener() {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
statistics.incrementWriteOps(1);
break;
default:
break;
}
}
};
Copy copy = transfers.copy(copyObjectRequest);
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
statistics.incrementWriteOps(1);
instrumentation.filesCopied(1, size);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + srcKey
+ " to " + dstKey + ", cancelling");
}
} catch (AmazonClientException e) {
throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")",
srcKey, e);
}
}
@ -1240,11 +1403,20 @@ public class S3AFileSystem extends FileSystem {
return date.getTime();
}
public void finishedWrite(String key) throws IOException {
/**
* Perform post-write actions.
* @param key key written to
*/
public void finishedWrite(String key) {
deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
}
private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
/**
* Delete mock parent directories which are no longer needed.
* This code swallows IO exceptions encountered
* @param f path
*/
private void deleteUnnecessaryFakeDirectories(Path f) {
while (true) {
String key = "";
try {
@ -1260,7 +1432,7 @@ public class S3AFileSystem extends FileSystem {
s3.deleteObject(bucket, key + "/");
statistics.incrementWriteOps(1);
}
} catch (FileNotFoundException | AmazonServiceException e) {
} catch (IOException | AmazonClientException e) {
LOG.debug("While deleting key {} ", key, e);
instrumentation.errorIgnored();
}
@ -1383,28 +1555,6 @@ public class S3AFileSystem extends FileSystem {
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
}
private void printAmazonServiceException(String target,
AmazonServiceException ase) {
LOG.info("{}: caught an AmazonServiceException {}", target, ase);
LOG.info("This means your request made it to Amazon S3," +
" but was rejected with an error response for some reason.");
LOG.info("Error Message: {}", ase.getMessage());
LOG.info("HTTP Status Code: {}", ase.getStatusCode());
LOG.info("AWS Error Code: {}", ase.getErrorCode());
LOG.info("Error Type: {}", ase.getErrorType());
LOG.info("Request ID: {}", ase.getRequestId());
LOG.info("Class Name: {}", ase.getClass().getName());
LOG.info("Exception", ase);
}
private void printAmazonClientException(String target,
AmazonClientException ace) {
LOG.info("{}: caught an AmazonClientException {}", target, ace);
LOG.info("This means the client encountered " +
"a problem while trying to communicate with S3, " +
"such as not being able to access the network.", ace);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
@ -29,11 +30,14 @@ import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import java.io.EOFException;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
* The input stream for an S3A object.
*
@ -112,7 +116,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param reason reason for reopen
* @param targetPos target position
* @param length length requested
* @throws IOException
* @throws IOException on any failure to open the object
*/
private synchronized void reopen(String reason, long targetPos, long length)
throws IOException {
@ -126,13 +130,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
streamStatistics.streamOpened();
GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, requestedStreamLen);
wrappedStream = client.getObject(request).getObjectContent();
try {
GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, requestedStreamLen);
wrappedStream = client.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri);
if (wrappedStream == null) {
throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri);
}
} catch (AmazonClientException e) {
throw translateException("Reopen at position " + targetPos, uri, e);
}
this.pos = targetPos;
@ -276,10 +284,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
return -1;
}
lazySeek(nextReadPos, 1);
int byteRead;
try {
lazySeek(nextReadPos, 1);
byteRead = wrappedStream.read();
} catch (EOFException e) {
return -1;
@ -337,11 +345,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
return -1;
}
lazySeek(nextReadPos, len);
streamStatistics.readOperationStarted(nextReadPos, len);
try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
return -1;
}
int bytesRead;
try {
streamStatistics.readOperationStarted(nextReadPos, len);
bytesRead = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
throw e;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
@ -40,11 +41,13 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
* Output stream to save data to S3.
@ -92,13 +95,15 @@ public class S3AOutputStream extends OutputStream {
lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
}
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
backupFile = lDirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
closed = false;
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
}
@Override
@ -123,7 +128,8 @@ public class S3AOutputStream extends OutputStream {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile);
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucket, key, backupFile);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(om);
@ -135,18 +141,20 @@ public class S3AOutputStream extends OutputStream {
upload.waitForUploadResult();
long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred();
long delta = upload.getProgress().getBytesTransferred() -
listener.getLastBytesTransferred();
if (statistics != null && delta != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
}
LOG.debug("S3A write delta changed after finished: {} bytes", delta);
statistics.incrementBytesWritten(delta);
}
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
throw new IOException(e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (AmazonClientException e) {
throw translateException("saving output", key , e);
} finally {
if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
@ -154,9 +162,7 @@ public class S3AOutputStream extends OutputStream {
super.close();
closed = true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("OutputStream for key '" + key + "' upload complete");
}
LOG.debug("OutputStream for key '{}' upload complete", key);
}
@Override

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Utility methods for S3A code.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class S3AUtils {
private S3AUtils() {
}
/**
* Translate an exception raised in an operation into an IOException.
* The specific type of IOException depends on the class of
* {@link AmazonClientException} passed in, and any status codes included
* in the operation. That is: HTTP error codes are examined and can be
* used to build a more specific response.
* @param operation operation
* @param path path operated on (must not be null)
* @param exception amazon exception raised
* @return an IOE which wraps the caught exception.
*/
public static IOException translateException(String operation,
Path path,
AmazonClientException exception) {
return translateException(operation, path.toString(), exception);
}
/**
* Translate an exception raised in an operation into an IOException.
* The specific type of IOException depends on the class of
* {@link AmazonClientException} passed in, and any status codes included
* in the operation. That is: HTTP error codes are examined and can be
* used to build a more specific response.
* @param operation operation
* @param path path operated on (may be null)
* @param exception amazon exception raised
* @return an IOE which wraps the caught exception.
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(String operation,
String path,
AmazonClientException exception) {
String message = String.format("%s%s: %s",
operation,
path != null ? (" on " + path) : "",
exception);
if (!(exception instanceof AmazonServiceException)) {
return new AWSClientIOException(message, exception);
} else {
IOException ioe;
AmazonServiceException ase = (AmazonServiceException) exception;
// this exception is non-null if the service exception is an s3 one
AmazonS3Exception s3Exception = ase instanceof AmazonS3Exception
? (AmazonS3Exception) ase
: null;
int status = ase.getStatusCode();
switch (status) {
// permissions
case 401:
case 403:
ioe = new AccessDeniedException(path, null, message);
ioe.initCause(ase);
break;
// the object isn't there
case 404:
case 410:
ioe = new FileNotFoundException(message);
ioe.initCause(ase);
break;
// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
case 416:
ioe = new EOFException(message);
break;
default:
// no specific exit code. Choose an IOE subclass based on the class
// of the caught exception
ioe = s3Exception != null
? new AWSS3IOException(message, s3Exception)
: new AWSServiceIOException(message, ase);
break;
}
return ioe;
}
}
/**
* Extract an exception from a failed future, and convert to an IOE.
* @param operation operation which failed
* @param path path operated on (may be null)
* @param ee execution exception
* @return an IOE which can be thrown
*/
public static IOException extractException(String operation,
String path,
ExecutionException ee) {
IOException ioe;
Throwable cause = ee.getCause();
if (cause instanceof AmazonClientException) {
ioe = translateException(operation, path, (AmazonClientException) cause);
} else if (cause instanceof IOException) {
ioe = (IOException) cause;
} else {
ioe = new IOException(operation + " failed: " + cause, cause);
}
return ioe;
}
/**
* Get low level details of an amazon exception for logging; multi-line.
* @param e exception
* @return string details
*/
public static String stringify(AmazonServiceException e) {
StringBuilder builder = new StringBuilder(
String.format("%s: %s error %d: %s; %s%s%n",
e.getErrorType(),
e.getServiceName(),
e.getStatusCode(),
e.getErrorCode(),
e.getErrorMessage(),
(e.isRetryable() ? " (retryable)": "")
));
String rawResponseContent = e.getRawResponseContent();
if (rawResponseContent != null) {
builder.append(rawResponseContent);
}
return builder.toString();
}
/**
* Get low level details of an amazon exception for logging; multi-line.
* @param e exception
* @return string details
*/
public static String stringify(AmazonS3Exception e) {
// get the low level details of an exception,
StringBuilder builder = new StringBuilder(
stringify((AmazonServiceException) e));
Map<String, String> details = e.getAdditionalDetails();
if (details != null) {
builder.append('\n');
for (Map.Entry<String, String> d : details.entrySet()) {
builder.append(d.getKey()).append('=')
.append(d.getValue()).append('\n');
}
}
return builder.toString();
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* S3A Filesystem. Except for the exceptions, it should
* all be hidden as implementation details.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -25,6 +25,7 @@ import org.junit.internal.AssumptionViolatedException;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
public class S3ATestUtils {
@ -72,4 +73,70 @@ public class S3ATestUtils {
FileContext fc = FileContext.getFileContext(testURI,conf);
return fc;
}
/**
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
* @param timeout timeout
* @param callback callback to invoke
* @throws FailFastException any fast-failure
* @throws Exception the exception which caused the iterator to fail
*/
public static void eventually(int timeout, Callable<Void> callback)
throws Exception {
Exception lastException;
long endtime = System.currentTimeMillis() + timeout;
do {
try {
callback.call();
return;
} catch (FailFastException e) {
throw e;
} catch (Exception e) {
lastException = e;
}
Thread.sleep(500);
} while (endtime > System.currentTimeMillis());
throw lastException;
}
/**
* The exception to raise so as to exit fast from
* {@link #eventually(int, Callable)}.
*/
public static class FailFastException extends Exception {
public FailFastException() {
}
public FailFastException(String message) {
super(message);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
public FailFastException(Throwable cause) {
super(cause);
}
}
/**
* Verify the class of an exception. If it is not as expected, rethrow it.
* Comparison is on the exact class, not subclass-of inference as
* offered by {@code instanceof}.
* @param clazz the expected exception class
* @param ex the exception caught
* @return the exception, if it is of the expected class
* @throws Exception the exception passed in.
*/
public static Exception verifyExceptionClass(Class clazz,
Exception ex)
throws Exception {
if (!(ex.getClass().equals(clazz))) {
throw ex;
}
return ex;
}
}

View File

@ -23,8 +23,10 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import com.amazonaws.auth.AWSCredentials;
@ -32,7 +34,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +50,7 @@ public class TestS3AAWSCredentialsProvider {
Configuration conf = new Configuration();
conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class");
try {
S3ATestUtils.createTestFileSystem(conf);
createFailingFS(conf);
} catch (IOException e) {
if (!(e.getCause() instanceof ClassNotFoundException)) {
LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e);
@ -58,6 +59,18 @@ public class TestS3AAWSCredentialsProvider {
}
}
/**
* Create a filesystem, expect it to fail by raising an IOException.
* Raises an assertion exception if in fact the FS does get instantiated.
* @param conf configuration
* @throws IOException an expected exception.
*/
private void createFailingFS(Configuration conf) throws IOException {
S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf);
fs.listStatus(new Path("/"));
fail("Expected exception - got " + fs);
}
static class BadCredentialsProvider implements AWSCredentialsProvider {
@SuppressWarnings("unused")
@ -79,12 +92,9 @@ public class TestS3AAWSCredentialsProvider {
Configuration conf = new Configuration();
conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName());
try {
S3ATestUtils.createTestFileSystem(conf);
} catch (AmazonS3Exception e) {
if (e.getStatusCode() != 403) {
LOG.error("Unexpected status code: {}", e.getStatusCode(), e);
throw e;
}
createFailingFS(conf);
} catch (AccessDeniedException e) {
// expected
}
}

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.reflect.FieldUtils;
import com.amazonaws.AmazonClientException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -43,7 +41,6 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.lang.reflect.Field;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
@ -122,7 +119,7 @@ public class TestS3AConfiguration {
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server at " + proxy);
} catch (AmazonClientException e) {
} catch (AWSClientIOException e) {
// expected
}
}
@ -155,14 +152,14 @@ public class TestS3AConfiguration {
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) {
} catch (AWSClientIOException e) {
// expected
}
conf.set(Constants.SECURE_CONNECTIONS, "false");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) {
} catch (AWSClientIOException e) {
// expected
}
}
@ -376,7 +373,7 @@ public class TestS3AConfiguration {
clientOptions.isPathStyleAccess());
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
} catch (final AmazonS3Exception e) {
} catch (final AWSS3IOException e) {
LOG.error("Caught exception: ", e);
// Catch/pass standard path style access behaviour when live bucket
// isn't in the same region as the s3 client default. See

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.nio.file.AccessDeniedException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
* Test S3A Failure translation, including a functional test
* generating errors during stream IO.
*/
public class TestS3AFailureHandling extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3AFailureHandling.class);
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Test
public void testReadFileChanged() throws Throwable {
describe("overwrite a file with a shorter one during a read, seek");
final int fullLength = 8192;
final byte[] fullDataset = dataset(fullLength, 'a', 32);
final int shortLen = 4096;
final byte[] shortDataset = dataset(shortLen, 'A', 32);
final FileSystem fs = getFileSystem();
final Path testpath = path("readFileToChange.txt");
// initial write
writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
try(FSDataInputStream instream = fs.open(testpath)) {
instream.seek(fullLength - 16);
assertTrue("no data to read", instream.read() >= 0);
// overwrite
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
// here the file length is less. Probe the file to see if this is true,
// with a spin and wait
eventually(30 *1000, new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
return null;
}
});
// here length is shorter. Assuming it has propagated to all replicas,
// the position of the input stream is now beyond the EOF.
// An attempt to seek backwards to a position greater than the
// short length will raise an exception from AWS S3, which must be
// translated into an EOF
instream.seek(shortLen + 1024);
int c = instream.read();
assertIsEOF("read()", c);
byte[] buf = new byte[256];
assertIsEOF("read(buffer)", instream.read(buf));
assertIsEOF("read(offset)",
instream.read(instream.getPos(), buf, 0, buf.length));
// now do a block read fully, again, backwards from the current pos
try {
instream.readFully(shortLen + 512, buf);
fail("Expected readFully to fail");
} catch (EOFException expected) {
LOG.debug("Expected EOF: ", expected);
}
assertIsEOF("read(offset)",
instream.read(shortLen + 510, buf, 0, buf.length));
// seek somewhere useful
instream.seek(shortLen - 256);
// delete the file. Reads must fail
fs.delete(testpath, false);
try {
int r = instream.read();
fail("Expected an exception, got " + r);
} catch (FileNotFoundException e) {
// expected
}
try {
instream.readFully(2048, buf);
fail("Expected readFully to fail");
} catch (FileNotFoundException e) {
// expected
}
}
}
/**
* Assert that a read operation returned an EOF value.
* @param operation specific operation
* @param readResult result
*/
private void assertIsEOF(String operation, int readResult) {
assertEquals("Expected EOF from "+ operation
+ "; got char " + (char) readResult, -1, readResult);
}
@Test
public void test404isNotFound() throws Throwable {
verifyTranslated(FileNotFoundException.class, createS3Exception(404));
}
protected Exception verifyTranslated(Class clazz,
AmazonClientException exception) throws Exception {
return verifyExceptionClass(clazz,
translateException("test", "/", exception));
}
@Test
public void test401isNotPermittedFound() throws Throwable {
verifyTranslated(AccessDeniedException.class,
createS3Exception(401));
}
protected AmazonS3Exception createS3Exception(int code) {
AmazonS3Exception source = new AmazonS3Exception("");
source.setStatusCode(code);
return source;
}
@Test
public void testGenericS3Exception() throws Throwable {
// S3 exception of no known type
AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
AWSS3IOException.class,
createS3Exception(451));
assertEquals(451, ex.getStatusCode());
}
@Test
public void testGenericServiceS3Exception() throws Throwable {
// service exception of no known type
AmazonServiceException ase = new AmazonServiceException("unwind");
ase.setStatusCode(500);
AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
AWSServiceIOException.class,
ase);
assertEquals(500, ex.getStatusCode());
}
@Test
public void testGenericClientException() throws Throwable {
// Generic Amazon exception
verifyTranslated(AWSClientIOException.class,
new AmazonClientException(""));
}
}