HADOOP-13130. s3a failures can surface as RTEs, not IOEs. (Steve Loughran)

This commit is contained in:
Steve Loughran 2016-05-21 14:19:56 +01:00
parent 5b907a17ed
commit f2aef95bd3
17 changed files with 1120 additions and 251 deletions

View File

@ -29,4 +29,8 @@ public class InvalidRequestException extends IOException {
public InvalidRequestException(String str) { public InvalidRequestException(String str) {
super(str); super(str);
} }
public InvalidRequestException(String message, Throwable cause) {
super(message, cause);
}
} }

View File

@ -24,4 +24,14 @@ public class PathAccessDeniedException extends PathIOException {
public PathAccessDeniedException(String path) { public PathAccessDeniedException(String path) {
super(path, "Permission denied"); super(path, "Permission denied");
} }
public PathAccessDeniedException(String path, Throwable cause) {
super(path, cause);
}
public PathAccessDeniedException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
} }

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
/** /**
* Exception corresponding to Permission denied - ENOENT * Exception corresponding to path not found: ENOENT/ENOFILE
*/ */
public class PathNotFoundException extends PathIOException { public class PathNotFoundException extends PathIOException {
static final long serialVersionUID = 0L; static final long serialVersionUID = 0L;
@ -26,4 +26,18 @@ public class PathNotFoundException extends PathIOException {
public PathNotFoundException(String path) { public PathNotFoundException(String path) {
super(path, "No such file or directory"); super(path, "No such file or directory");
} }
public PathNotFoundException(String path, Throwable cause) {
super(path, cause);
}
public PathNotFoundException(String path, String error) {
super(path, error);
}
public PathNotFoundException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
} }

View File

@ -26,4 +26,18 @@ public class PathPermissionException extends PathIOException {
public PathPermissionException(String path) { public PathPermissionException(String path) {
super(path, "Operation not permitted"); super(path, "Operation not permitted");
} }
public PathPermissionException(String path, Throwable cause) {
super(path, cause);
}
public PathPermissionException(String path, String error) {
super(path, error);
}
public PathPermissionException(String path,
String error,
Throwable cause) {
super(path, error, cause);
}
} }

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.google.common.base.Preconditions;
import java.io.IOException;
/**
* IOException equivalent of an {@link AmazonClientException}.
*/
public class AWSClientIOException extends IOException {
private final String operation;
public AWSClientIOException(String operation,
AmazonClientException cause) {
super(cause);
Preconditions.checkArgument(operation != null, "Null 'operation' argument");
Preconditions.checkArgument(cause != null, "Null 'cause' argument");
this.operation = operation;
}
public AmazonClientException getCause() {
return (AmazonClientException) super.getCause();
}
@Override
public String getMessage() {
return operation + ": " + getCause().getMessage();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.util.Map;
/**
* Wrap a {@link AmazonS3Exception} as an IOE, relaying all
* getters.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AWSS3IOException extends AWSServiceIOException {
/**
* Instantiate.
* @param operation operation which triggered this
* @param cause the underlying cause
*/
public AWSS3IOException(String operation,
AmazonS3Exception cause) {
super(operation, cause);
}
public AmazonS3Exception getCause() {
return (AmazonS3Exception) super.getCause();
}
public String getErrorResponseXml() {
return getCause().getErrorResponseXml();
}
public Map<String, String> getAdditionalDetails() {
return getCause().getAdditionalDetails();
}
public String getExtendedRequestId() {
return getCause().getExtendedRequestId();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A specific exception from AWS operations.
* The exception must always be created with an {@link AmazonServiceException}.
* The attributes of this exception can all be directly accessed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AWSServiceIOException extends AWSClientIOException {
/**
* Instantiate.
* @param operation operation which triggered this
* @param cause the underlying cause
*/
public AWSServiceIOException(String operation,
AmazonServiceException cause) {
super(operation, cause);
}
public AmazonServiceException getCause() {
return (AmazonServiceException) super.getCause();
}
public String getRequestId() {
return getCause().getRequestId();
}
public String getServiceName() {
return getCause().getServiceName();
}
public String getErrorCode() {
return getCause().getErrorCode();
}
public int getStatusCode() {
return getCause().getStatusCode();
}
public String getRawResponseContent() {
return getCause().getRawResponseContent();
}
public boolean isRetryable() {
return getCause().isRetryable();
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
@ -54,6 +53,7 @@
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
* Upload files/parts asap directly from a memory buffer (instead of buffering * Upload files/parts asap directly from a memory buffer (instead of buffering
@ -152,11 +152,9 @@ public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
this.multiPartUpload = null; this.multiPartUpload = null;
this.progressListener = new ProgressableListener(progress); this.progressListener = new ProgressableListener(progress);
if (LOG.isDebugEnabled()){
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'", LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
bucket, key); bucket, key);
} }
}
/** /**
* Writes a byte to the memory buffer. If this causes the buffer to reach * Writes a byte to the memory buffer. If this causes the buffer to reach
@ -210,15 +208,11 @@ private synchronized void uploadBuffer() throws IOException {
requires multiple parts! */ requires multiple parts! */
final byte[] allBytes = buffer.toByteArray(); final byte[] allBytes = buffer.toByteArray();
buffer = null; //earlier gc? buffer = null; //earlier gc?
if (LOG.isDebugEnabled()) {
LOG.debug("Total length of initial buffer: {}", allBytes.length); LOG.debug("Total length of initial buffer: {}", allBytes.length);
}
int processedPos = 0; int processedPos = 0;
while ((multiPartThreshold - processedPos) >= partSize) { while ((multiPartThreshold - processedPos) >= partSize) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initial buffer: processing from byte {} to byte {}", LOG.debug("Initial buffer: processing from byte {} to byte {}",
processedPos, (processedPos + partSize - 1)); processedPos, (processedPos + partSize - 1));
}
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes, multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
processedPos, partSize), partSize); processedPos, partSize), partSize);
processedPos += partSize; processedPos += partSize;
@ -235,7 +229,13 @@ private synchronized void uploadBuffer() throws IOException {
} }
} }
/**
* Close the stream. This will not return until the upload is complete
* or the attempt to perform the upload has failed.
* Exceptions raised in this method are indicative that the write has
* failed and data is at risk of being lost.
* @throws IOException on any failure.
*/
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) { if (closed) {
@ -258,9 +258,7 @@ public synchronized void close() throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
// This will delete unnecessary fake parent directories // This will delete unnecessary fake parent directories
fs.finishedWrite(key); fs.finishedWrite(key);
if (LOG.isDebugEnabled()) {
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
}
} finally { } finally {
buffer = null; buffer = null;
super.close(); super.close();
@ -283,20 +281,14 @@ private MultiPartUpload initiateMultiPartUpload() throws IOException {
try { try {
return new MultiPartUpload( return new MultiPartUpload(
client.initiateMultipartUpload(initiateMPURequest).getUploadId()); client.initiateMultipartUpload(initiateMPURequest).getUploadId());
} catch (AmazonServiceException ase) {
throw new IOException("Unable to initiate MultiPartUpload (server side)" +
": " + ase, ase);
} catch (AmazonClientException ace) { } catch (AmazonClientException ace) {
throw new IOException("Unable to initiate MultiPartUpload (client side)" + throw translateException("initiate MultiPartUpload", key, ace);
": " + ace, ace);
} }
} }
private void putObject() throws IOException { private void putObject() throws IOException {
if (LOG.isDebugEnabled()) { LOG.debug("Executing regular upload for bucket '{}' key '{}'",
LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket, bucket, key);
key);
}
final ObjectMetadata om = createDefaultMetadata(); final ObjectMetadata om = createDefaultMetadata();
om.setContentLength(buffer.size()); om.setContentLength(buffer.size());
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
@ -317,10 +309,11 @@ public PutObjectResult call() throws Exception {
LOG.warn("Interrupted object upload:" + ie, ie); LOG.warn("Interrupted object upload:" + ie, ie);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
throw new IOException("Regular upload failed", ee.getCause()); throw extractException("regular upload", key, ee);
} }
} }
private class MultiPartUpload { private class MultiPartUpload {
private final String uploadId; private final String uploadId;
private final List<ListenableFuture<PartETag>> partETagsFutures; private final List<ListenableFuture<PartETag>> partETagsFutures;
@ -328,13 +321,11 @@ private class MultiPartUpload {
public MultiPartUpload(String uploadId) { public MultiPartUpload(String uploadId) {
this.uploadId = uploadId; this.uploadId = uploadId;
this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>(); this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
if (LOG.isDebugEnabled()) {
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " + LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
"id '{}'", bucket, key, uploadId); "id '{}'", bucket, key, uploadId);
} }
}
public void uploadPartAsync(ByteArrayInputStream inputStream, private void uploadPartAsync(ByteArrayInputStream inputStream,
int partSize) { int partSize) {
final int currentPartNumber = partETagsFutures.size() + 1; final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request = final UploadPartRequest request =
@ -346,22 +337,21 @@ public void uploadPartAsync(ByteArrayInputStream inputStream,
executorService.submit(new Callable<PartETag>() { executorService.submit(new Callable<PartETag>() {
@Override @Override
public PartETag call() throws Exception { public PartETag call() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber, LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId); uploadId);
}
return client.uploadPart(request).getPartETag(); return client.uploadPart(request).getPartETag();
} }
}); });
partETagsFutures.add(partETagFuture); partETagsFutures.add(partETagFuture);
} }
public List<PartETag> waitForAllPartUploads() throws IOException { private List<PartETag> waitForAllPartUploads() throws IOException {
try { try {
return Futures.allAsList(partETagsFutures).get(); return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload:" + ie, ie); LOG.warn("Interrupted partUpload:" + ie, ie);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) { } catch (ExecutionException ee) {
//there is no way of recovering so abort //there is no way of recovering so abort
//cancel all partUploads //cancel all partUploads
@ -370,22 +360,23 @@ public List<PartETag> waitForAllPartUploads() throws IOException {
} }
//abort multipartupload //abort multipartupload
this.abort(); this.abort();
throw new IOException("Part upload failed in multi-part upload with " + throw extractException("Multi-part upload with id '" + uploadId + "'",
"id '" +uploadId + "':" + ee, ee); key, ee);
} }
//should not happen?
return null;
} }
public void complete(List<PartETag> partETags) { private void complete(List<PartETag> partETags) throws IOException {
if (LOG.isDebugEnabled()) { try {
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key, LOG.debug("Completing multi-part upload for key '{}', id '{}'",
uploadId); key, uploadId);
client.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket,
key,
uploadId,
partETags));
} catch (AmazonClientException e) {
throw translateException("Completing multi-part upload", key, e);
} }
final CompleteMultipartUploadRequest completeRequest =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
client.completeMultipartUpload(completeRequest);
} }
public void abort() { public void abort() {

View File

@ -43,6 +43,7 @@
import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@ -78,6 +79,7 @@
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -181,6 +183,7 @@ public Thread newThread(Runnable r) {
public void initialize(URI name, Configuration conf) throws IOException { public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf); super.initialize(name, conf);
setConf(conf); setConf(conf);
try {
instrumentation = new S3AInstrumentation(name); instrumentation = new S3AInstrumentation(name);
uri = URI.create(name.getScheme() + "://" + name.getAuthority()); uri = URI.create(name.getScheme() + "://" + name.getAuthority());
@ -189,7 +192,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
bucket = name.getHost(); bucket = name.getHost();
AWSCredentialsProvider credentials = getAWSCredentialsProvider(name, conf); AWSCredentialsProvider credentials =
getAWSCredentialsProvider(name, conf);
ClientConfiguration awsConf = new ClientConfiguration(); ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
@ -260,16 +264,43 @@ public void initialize(URI name, Configuration conf) throws IOException {
initCannedAcls(conf); initCannedAcls(conf);
if (!s3.doesBucketExist(bucket)) { verifyBucketExists();
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
}
initMultipartUploads(conf); initMultipartUploads(conf);
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
} }
/**
* Verify that the bucket exists. This does not check permissions,
* not even read access.
* @throws FileNotFoundException the bucket is absent
* @throws IOException any other problem talking to S3
*/
protected void verifyBucketExists()
throws FileNotFoundException, IOException {
try {
if (!s3.doesBucketExist(bucket)) {
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
}
} catch (AmazonS3Exception e) {
// this is a sign of a serious startup problem so do dump everything
LOG.warn(stringify(e), e);
throw translateException("doesBucketExist", bucket, e);
} catch (AmazonServiceException e) {
// this is a sign of a serious startup problem so do dump everything
LOG.warn(stringify(e), e);
throw translateException("doesBucketExist", bucket, e);
} catch (AmazonClientException e) {
throw translateException("doesBucketExist", bucket, e);
}
}
void initProxySupport(Configuration conf, ClientConfiguration awsConf, void initProxySupport(Configuration conf, ClientConfiguration awsConf,
boolean secureConnections) throws IllegalArgumentException { boolean secureConnections) throws IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, ""); String proxyHost = conf.getTrimmed(PROXY_HOST, "");
@ -379,7 +410,7 @@ private void initCannedAcls(Configuration conf) {
} }
} }
private void initMultipartUploads(Configuration conf) { private void initMultipartUploads(Configuration conf) throws IOException {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART); DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = longOption(conf, long purgeExistingMultipartAge = longOption(conf,
@ -394,10 +425,10 @@ private void initMultipartUploads(Configuration conf) {
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() == 403) { if (e.getStatusCode() == 403) {
instrumentation.errorIgnored(); instrumentation.errorIgnored();
LOG.debug("Failed to abort multipart uploads against {}," + LOG.debug("Failed to purging multipart uploads against {}," +
" FS may be read only", bucket, e); " FS may be read only", bucket, e);
} else { } else {
throw e; throw translateException("purging multipart uploads", bucket, e);
} }
} }
} }
@ -639,10 +670,28 @@ public FSDataOutputStream append(Path f, int bufferSize,
* *
* @param src path to be renamed * @param src path to be renamed
* @param dst new path after rename * @param dst new path after rename
* @throws IOException on failure * @throws IOException on IO failure
* @return true if rename is successful * @return true if rename is successful
*/ */
public boolean rename(Path src, Path dst) throws IOException { public boolean rename(Path src, Path dst) throws IOException {
try {
return innerRename(src, dst);
} catch (AmazonClientException e) {
throw translateException("rename(" + src +", " + dst + ")", src, e);
}
}
/**
* The inner rename operation. See {@link #rename(Path, Path)} for
* the description of the operation.
* @param src path to be renamed
* @param dst new path after rename
* @return true if rename is successful
* @throws IOException on IO failure.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private boolean innerRename(Path src, Path dst) throws IOException,
AmazonClientException {
LOG.debug("Rename path {} to {}", src, dst); LOG.debug("Rename path {} to {}", src, dst);
String srcKey = pathToKey(src); String srcKey = pathToKey(src);
@ -785,7 +834,7 @@ public boolean rename(Path src, Path dst) throws IOException {
* when set to true * when set to true
*/ */
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete, private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean clearKeys) { boolean clearKeys) throws AmazonClientException {
if (enableMultiObjectsDelete) { if (enableMultiObjectsDelete) {
DeleteObjectsRequest deleteRequest DeleteObjectsRequest deleteRequest
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete); = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
@ -808,7 +857,9 @@ private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
} }
} }
/** Delete a file. /**
* Delete a Path. This operation is at least {@code O(files)}, with
* added overheads to enumerate the path. It is also not atomic.
* *
* @param f the path to delete. * @param f the path to delete.
* @param recursive if path is a directory and set to * @param recursive if path is a directory and set to
@ -818,6 +869,26 @@ private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
* @throws IOException due to inability to delete a directory or file. * @throws IOException due to inability to delete a directory or file.
*/ */
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
try {
return innerDelete(f, recursive);
} catch (AmazonClientException e) {
throw translateException("delete", f, e);
}
}
/**
* Delete a path. See {@link #delete(Path, boolean)}.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException due to inability to delete a directory or file.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private boolean innerDelete(Path f, boolean recursive) throws IOException,
AmazonClientException {
LOG.debug("Delete path {} - recursive {}", f , recursive); LOG.debug("Delete path {} - recursive {}", f , recursive);
S3AFileStatus status; S3AFileStatus status;
try { try {
@ -898,7 +969,8 @@ public boolean delete(Path f, boolean recursive) throws IOException {
return true; return true;
} }
private void createFakeDirectoryIfNecessary(Path f) throws IOException { private void createFakeDirectoryIfNecessary(Path f)
throws IOException, AmazonClientException {
String key = pathToKey(f); String key = pathToKey(f);
if (!key.isEmpty() && !exists(f)) { if (!key.isEmpty() && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f); LOG.debug("Creating new fake directory at {}", f);
@ -917,6 +989,25 @@ private void createFakeDirectoryIfNecessary(Path f) throws IOException {
*/ */
public FileStatus[] listStatus(Path f) throws FileNotFoundException, public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException { IOException {
try {
return innerListStatus(f);
} catch (AmazonClientException e) {
throw translateException("listStatus", f, e);
}
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
IOException, AmazonClientException {
String key = pathToKey(f); String key = pathToKey(f);
LOG.debug("List status for path: {}", f); LOG.debug("List status for path: {}", f);
@ -1008,15 +1099,42 @@ public Path getWorkingDirectory() {
} }
/** /**
* Make the given file and all non-existent parents into *
* directories. Has the semantics of Unix 'mkdir -p'. * Make the given path and all non-existent parents into
* directories. Has the semantics of Unix @{code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error. * Existence of the directory hierarchy is not an error.
* @param f path to create * @param path path to create
* @param permission to apply to f * @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
*/ */
// TODO: If we have created an empty file at /foo/bar and we then call // TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
public boolean mkdirs(Path f, FsPermission permission) throws IOException { public boolean mkdirs(Path path, FsPermission permission) throws IOException,
FileAlreadyExistsException {
try {
return innerMkdirs(path, permission);
} catch (AmazonClientException e) {
throw translateException("innerMkdirs", path, e);
}
}
/**
*
* Make the given path and all non-existent parents into
* directories.
* See {@link #mkdirs(Path, FsPermission)}
* @param f path to create
* @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
* @throws AmazonClientException on failures inside the AWS SDK
*/
// TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
private boolean innerMkdirs(Path f, FsPermission permission)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Making directory: {}", f); LOG.debug("Making directory: {}", f);
try { try {
@ -1054,7 +1172,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
* @param f The path we want information from * @param f The path we want information from
* @return a FileStatus object * @return a FileStatus object
* @throws java.io.FileNotFoundException when the path does not exist; * @throws java.io.FileNotFoundException when the path does not exist;
* IOException see specific implementation * @throws IOException on other problems.
*/ */
public S3AFileStatus getFileStatus(Path f) throws IOException { public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f); String key = pathToKey(f);
@ -1078,12 +1196,10 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
printAmazonServiceException(f.toString(), e); throw translateException("getFileStatus", f, e);
throw e;
} }
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
printAmazonClientException(f.toString(), e); throw translateException("getFileStatus", f, e);
throw e;
} }
// Necessary? // Necessary?
@ -1106,12 +1222,10 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
printAmazonServiceException(newKey, e); throw translateException("getFileStatus", newKey, e);
throw e;
} }
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
printAmazonClientException(newKey, e); throw translateException("getFileStatus", newKey, e);
throw e;
} }
} }
} }
@ -1152,12 +1266,10 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
printAmazonServiceException(key, e); throw translateException("getFileStatus", key, e);
throw e;
} }
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
printAmazonClientException(key, e); throw translateException("getFileStatus", key, e);
throw e;
} }
LOG.debug("Not Found: {}", f); LOG.debug("Not Found: {}", f);
@ -1176,10 +1288,42 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
* @param overwrite whether to overwrite an existing file * @param overwrite whether to overwrite an existing file
* @param src path * @param src path
* @param dst path * @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/ */
@Override @Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException { Path dst) throws IOException {
try {
innerCopyFromLocalFile(delSrc, overwrite, src, dst);
} catch (AmazonClientException e) {
throw translateException("copyFromLocalFile(" + src + ", " + dst + ")",
src, e);
}
}
/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
String key = pathToKey(dst); String key = pathToKey(dst);
if (!overwrite && exists(dst)) { if (!overwrite && exists(dst)) {
@ -1229,8 +1373,12 @@ public void progressChanged(ProgressEvent progressEvent) {
} }
} }
/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
*/
@Override @Override
public void close() throws IOException { public synchronized void close() throws IOException {
try { try {
super.close(); super.close();
} finally { } finally {
@ -1250,10 +1398,20 @@ public String getCanonicalServiceName() {
return null; return null;
} }
/**
* Copy a single object in the bucket via a COPY operation.
* @param srcKey source object path
* @param dstKey destination object path
* @param size object size
* @throws AmazonClientException on failures inside the AWS SDK
* @throws InterruptedIOException the operation was interrupted
* @throws IOException Other IO problems
*/
private void copyFile(String srcKey, String dstKey, long size) private void copyFile(String srcKey, String dstKey, long size)
throws IOException { throws IOException, InterruptedIOException, AmazonClientException {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey); LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
try {
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom); ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
@ -1286,6 +1444,10 @@ public void progressChanged(ProgressEvent progressEvent) {
throw new InterruptedIOException("Interrupted copying " + srcKey throw new InterruptedIOException("Interrupted copying " + srcKey
+ " to " + dstKey + ", cancelling"); + " to " + dstKey + ", cancelling");
} }
} catch (AmazonClientException e) {
throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")",
srcKey, e);
}
} }
private boolean objectRepresentsDirectory(final String name, final long size) { private boolean objectRepresentsDirectory(final String name, final long size) {
@ -1303,11 +1465,20 @@ private static long dateToLong(final Date date) {
return date.getTime(); return date.getTime();
} }
public void finishedWrite(String key) throws IOException { /**
* Perform post-write actions.
* @param key key written to
*/
public void finishedWrite(String key) {
deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
} }
private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { /**
* Delete mock parent directories which are no longer needed.
* This code swallows IO exceptions encountered
* @param f path
*/
private void deleteUnnecessaryFakeDirectories(Path f) {
while (true) { while (true) {
String key = ""; String key = "";
try { try {
@ -1323,7 +1494,7 @@ private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
s3.deleteObject(bucket, key + "/"); s3.deleteObject(bucket, key + "/");
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
} }
} catch (FileNotFoundException | AmazonServiceException e) { } catch (IOException | AmazonClientException e) {
LOG.debug("While deleting key {} ", key, e); LOG.debug("While deleting key {} ", key, e);
instrumentation.errorIgnored(); instrumentation.errorIgnored();
} }
@ -1446,28 +1617,6 @@ public long getDefaultBlockSize() {
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
} }
private void printAmazonServiceException(String target,
AmazonServiceException ase) {
LOG.info("{}: caught an AmazonServiceException {}", target, ase);
LOG.info("This means your request made it to Amazon S3," +
" but was rejected with an error response for some reason.");
LOG.info("Error Message: {}", ase.getMessage());
LOG.info("HTTP Status Code: {}", ase.getStatusCode());
LOG.info("AWS Error Code: {}", ase.getErrorCode());
LOG.info("Error Type: {}", ase.getErrorType());
LOG.info("Request ID: {}", ase.getRequestId());
LOG.info("Class Name: {}", ase.getClass().getName());
LOG.info("Exception", ase);
}
private void printAmazonClientException(String target,
AmazonClientException ace) {
LOG.info("{}: caught an AmazonClientException {}", target, ace);
LOG.info("This means the client encountered " +
"a problem while trying to communicate with S3, " +
"such as not being able to access the network.", ace);
}
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
@ -29,11 +30,14 @@
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
* The input stream for an S3A object. * The input stream for an S3A object.
* *
@ -112,7 +116,7 @@ public S3AInputStream(String bucket,
* @param reason reason for reopen * @param reason reason for reopen
* @param targetPos target position * @param targetPos target position
* @param length length requested * @param length length requested
* @throws IOException * @throws IOException on any failure to open the object
*/ */
private synchronized void reopen(String reason, long targetPos, long length) private synchronized void reopen(String reason, long targetPos, long length)
throws IOException { throws IOException {
@ -126,6 +130,7 @@ private synchronized void reopen(String reason, long targetPos, long length)
uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos); uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
streamStatistics.streamOpened(); streamStatistics.streamOpened();
try {
GetObjectRequest request = new GetObjectRequest(bucket, key) GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, requestedStreamLen); .withRange(targetPos, requestedStreamLen);
wrappedStream = client.getObject(request).getObjectContent(); wrappedStream = client.getObject(request).getObjectContent();
@ -134,6 +139,9 @@ private synchronized void reopen(String reason, long targetPos, long length)
throw new IOException("Null IO stream from reopen of (" + reason + ") " throw new IOException("Null IO stream from reopen of (" + reason + ") "
+ uri); + uri);
} }
} catch (AmazonClientException e) {
throw translateException("Reopen at position " + targetPos, uri, e);
}
this.pos = targetPos; this.pos = targetPos;
} }
@ -276,10 +284,10 @@ public synchronized int read() throws IOException {
return -1; return -1;
} }
lazySeek(nextReadPos, 1);
int byteRead; int byteRead;
try { try {
lazySeek(nextReadPos, 1);
byteRead = wrappedStream.read(); byteRead = wrappedStream.read();
} catch (EOFException e) { } catch (EOFException e) {
return -1; return -1;
@ -337,11 +345,16 @@ public synchronized int read(byte[] buf, int off, int len)
return -1; return -1;
} }
try {
lazySeek(nextReadPos, len); lazySeek(nextReadPos, len);
streamStatistics.readOperationStarted(nextReadPos, len); } catch (EOFException e) {
// the end of the file has moved
return -1;
}
int bytesRead; int bytesRead;
try { try {
streamStatistics.readOperationStarted(nextReadPos, len);
bytesRead = wrappedStream.read(buf, off, len); bytesRead = wrappedStream.read(buf, off, len);
} catch (EOFException e) { } catch (EOFException e) {
throw e; throw e;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressListener;
@ -40,11 +41,13 @@
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
* Output stream to save data to S3. * Output stream to save data to S3.
@ -92,13 +95,15 @@ public S3AOutputStream(Configuration conf, TransferManager transfers,
lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a"); lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
} }
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); backupFile = lDirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
closed = false; closed = false;
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile); key, backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
} }
@Override @Override
@ -123,7 +128,8 @@ public synchronized void close() throws IOException {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm); om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
} }
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); PutObjectRequest putObjectRequest =
new PutObjectRequest(bucket, key, backupFile);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(om); putObjectRequest.setMetadata(om);
@ -135,18 +141,20 @@ public synchronized void close() throws IOException {
upload.waitForUploadResult(); upload.waitForUploadResult();
long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred(); long delta = upload.getProgress().getBytesTransferred() -
listener.getLastBytesTransferred();
if (statistics != null && delta != 0) { if (statistics != null && delta != 0) {
if (LOG.isDebugEnabled()) { LOG.debug("S3A write delta changed after finished: {} bytes", delta);
LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
}
statistics.incrementBytesWritten(delta); statistics.incrementBytesWritten(delta);
} }
// This will delete unnecessary fake parent directories // This will delete unnecessary fake parent directories
fs.finishedWrite(key); fs.finishedWrite(key);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (AmazonClientException e) {
throw translateException("saving output", key , e);
} finally { } finally {
if (!backupFile.delete()) { if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile); LOG.warn("Could not delete temporary s3a file: {}", backupFile);
@ -154,9 +162,7 @@ public synchronized void close() throws IOException {
super.close(); super.close();
closed = true; closed = true;
} }
if (LOG.isDebugEnabled()) { LOG.debug("OutputStream for key '{}' upload complete", key);
LOG.debug("OutputStream for key '" + key + "' upload complete");
}
} }
@Override @Override

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Utility methods for S3A code.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class S3AUtils {
private S3AUtils() {
}
/**
* Translate an exception raised in an operation into an IOException.
* The specific type of IOException depends on the class of
* {@link AmazonClientException} passed in, and any status codes included
* in the operation. That is: HTTP error codes are examined and can be
* used to build a more specific response.
* @param operation operation
* @param path path operated on (must not be null)
* @param exception amazon exception raised
* @return an IOE which wraps the caught exception.
*/
public static IOException translateException(String operation,
Path path,
AmazonClientException exception) {
return translateException(operation, path.toString(), exception);
}
/**
* Translate an exception raised in an operation into an IOException.
* The specific type of IOException depends on the class of
* {@link AmazonClientException} passed in, and any status codes included
* in the operation. That is: HTTP error codes are examined and can be
* used to build a more specific response.
* @param operation operation
* @param path path operated on (may be null)
* @param exception amazon exception raised
* @return an IOE which wraps the caught exception.
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(String operation,
String path,
AmazonClientException exception) {
String message = String.format("%s%s: %s",
operation,
path != null ? (" on " + path) : "",
exception);
if (!(exception instanceof AmazonServiceException)) {
return new AWSClientIOException(message, exception);
} else {
IOException ioe;
AmazonServiceException ase = (AmazonServiceException) exception;
// this exception is non-null if the service exception is an s3 one
AmazonS3Exception s3Exception = ase instanceof AmazonS3Exception
? (AmazonS3Exception) ase
: null;
int status = ase.getStatusCode();
switch (status) {
// permissions
case 401:
case 403:
ioe = new AccessDeniedException(path, null, message);
ioe.initCause(ase);
break;
// the object isn't there
case 404:
case 410:
ioe = new FileNotFoundException(message);
ioe.initCause(ase);
break;
// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
case 416:
ioe = new EOFException(message);
break;
default:
// no specific exit code. Choose an IOE subclass based on the class
// of the caught exception
ioe = s3Exception != null
? new AWSS3IOException(message, s3Exception)
: new AWSServiceIOException(message, ase);
break;
}
return ioe;
}
}
/**
* Extract an exception from a failed future, and convert to an IOE.
* @param operation operation which failed
* @param path path operated on (may be null)
* @param ee execution exception
* @return an IOE which can be thrown
*/
public static IOException extractException(String operation,
String path,
ExecutionException ee) {
IOException ioe;
Throwable cause = ee.getCause();
if (cause instanceof AmazonClientException) {
ioe = translateException(operation, path, (AmazonClientException) cause);
} else if (cause instanceof IOException) {
ioe = (IOException) cause;
} else {
ioe = new IOException(operation + " failed: " + cause, cause);
}
return ioe;
}
/**
* Get low level details of an amazon exception for logging; multi-line.
* @param e exception
* @return string details
*/
public static String stringify(AmazonServiceException e) {
StringBuilder builder = new StringBuilder(
String.format("%s: %s error %d: %s; %s%s%n",
e.getErrorType(),
e.getServiceName(),
e.getStatusCode(),
e.getErrorCode(),
e.getErrorMessage(),
(e.isRetryable() ? " (retryable)": "")
));
String rawResponseContent = e.getRawResponseContent();
if (rawResponseContent != null) {
builder.append(rawResponseContent);
}
return builder.toString();
}
/**
* Get low level details of an amazon exception for logging; multi-line.
* @param e exception
* @return string details
*/
public static String stringify(AmazonS3Exception e) {
// get the low level details of an exception,
StringBuilder builder = new StringBuilder(
stringify((AmazonServiceException) e));
Map<String, String> details = e.getAdditionalDetails();
if (details != null) {
builder.append('\n');
for (Map.Entry<String, String> d : details.entrySet()) {
builder.append(d.getKey()).append('=')
.append(d.getValue()).append('\n');
}
}
return builder.toString();
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* S3A Filesystem. Except for the exceptions, it should
* all be hidden as implementation details.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -25,6 +25,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable;
public class S3ATestUtils { public class S3ATestUtils {
@ -72,4 +73,70 @@ public static FileContext createTestFileContext(Configuration conf) throws
FileContext fc = FileContext.getFileContext(testURI,conf); FileContext fc = FileContext.getFileContext(testURI,conf);
return fc; return fc;
} }
/**
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
* @param timeout timeout
* @param callback callback to invoke
* @throws FailFastException any fast-failure
* @throws Exception the exception which caused the iterator to fail
*/
public static void eventually(int timeout, Callable<Void> callback)
throws Exception {
Exception lastException;
long endtime = System.currentTimeMillis() + timeout;
do {
try {
callback.call();
return;
} catch (FailFastException e) {
throw e;
} catch (Exception e) {
lastException = e;
}
Thread.sleep(500);
} while (endtime > System.currentTimeMillis());
throw lastException;
}
/**
* The exception to raise so as to exit fast from
* {@link #eventually(int, Callable)}.
*/
public static class FailFastException extends Exception {
public FailFastException() {
}
public FailFastException(String message) {
super(message);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
public FailFastException(Throwable cause) {
super(cause);
}
}
/**
* Verify the class of an exception. If it is not as expected, rethrow it.
* Comparison is on the exact class, not subclass-of inference as
* offered by {@code instanceof}.
* @param clazz the expected exception class
* @param ex the exception caught
* @return the exception, if it is of the expected class
* @throws Exception the exception passed in.
*/
public static Exception verifyExceptionClass(Class clazz,
Exception ex)
throws Exception {
if (!(ex.getClass().equals(clazz))) {
throw ex;
}
return ex;
}
} }

View File

@ -23,8 +23,10 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.AccessDeniedException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test; import org.junit.Test;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
@ -32,7 +34,6 @@
import com.amazonaws.auth.AWSCredentialsProviderChain; import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,7 +50,7 @@ public void testBadConfiguration() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class"); conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class");
try { try {
S3ATestUtils.createTestFileSystem(conf); createFailingFS(conf);
} catch (IOException e) { } catch (IOException e) {
if (!(e.getCause() instanceof ClassNotFoundException)) { if (!(e.getCause() instanceof ClassNotFoundException)) {
LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e); LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e);
@ -58,6 +59,18 @@ public void testBadConfiguration() throws IOException {
} }
} }
/**
* Create a filesystem, expect it to fail by raising an IOException.
* Raises an assertion exception if in fact the FS does get instantiated.
* @param conf configuration
* @throws IOException an expected exception.
*/
private void createFailingFS(Configuration conf) throws IOException {
S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf);
fs.listStatus(new Path("/"));
fail("Expected exception - got " + fs);
}
static class BadCredentialsProvider implements AWSCredentialsProvider { static class BadCredentialsProvider implements AWSCredentialsProvider {
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -79,12 +92,9 @@ public void testBadCredentials() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName()); conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName());
try { try {
S3ATestUtils.createTestFileSystem(conf); createFailingFS(conf);
} catch (AmazonS3Exception e) { } catch (AccessDeniedException e) {
if (e.getStatusCode() != 403) { // expected
LOG.error("Unexpected status code: {}", e.getStatusCode(), e);
throw e;
}
} }
} }

View File

@ -21,11 +21,9 @@
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.reflect.FieldUtils; import org.apache.commons.lang.reflect.FieldUtils;
import com.amazonaws.AmazonClientException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -43,7 +41,6 @@
import java.io.File; import java.io.File;
import java.net.URI; import java.net.URI;
import java.lang.reflect.Field;
import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProvider;
@ -122,7 +119,7 @@ public void testProxyConnection() throws Exception {
try { try {
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server at " + proxy); fail("Expected a connection error for proxy server at " + proxy);
} catch (AmazonClientException e) { } catch (AWSClientIOException e) {
// expected // expected
} }
} }
@ -153,14 +150,14 @@ public void testAutomaticProxyPortSelection() throws Exception {
try { try {
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server"); fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) { } catch (AWSClientIOException e) {
// expected // expected
} }
conf.set(Constants.SECURE_CONNECTIONS, "false"); conf.set(Constants.SECURE_CONNECTIONS, "false");
try { try {
fs = S3ATestUtils.createTestFileSystem(conf); fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server"); fail("Expected a connection error for proxy server");
} catch (AmazonClientException e) { } catch (AWSClientIOException e) {
// expected // expected
} }
} }
@ -374,7 +371,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() throws Ex
clientOptions.isPathStyleAccess()); clientOptions.isPathStyleAccess());
byte[] file = ContractTestUtils.toAsciiByteArray("test file"); byte[] file = ContractTestUtils.toAsciiByteArray("test file");
ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true); ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
} catch (final AmazonS3Exception e) { } catch (final AWSS3IOException e) {
LOG.error("Caught exception: ", e); LOG.error("Caught exception: ", e);
// Catch/pass standard path style access behaviour when live bucket // Catch/pass standard path style access behaviour when live bucket
// isn't in the same region as the s3 client default. See // isn't in the same region as the s3 client default. See

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.nio.file.AccessDeniedException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
* Test S3A Failure translation, including a functional test
* generating errors during stream IO.
*/
public class TestS3AFailureHandling extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3AFailureHandling.class);
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Test
public void testReadFileChanged() throws Throwable {
describe("overwrite a file with a shorter one during a read, seek");
final int fullLength = 8192;
final byte[] fullDataset = dataset(fullLength, 'a', 32);
final int shortLen = 4096;
final byte[] shortDataset = dataset(shortLen, 'A', 32);
final FileSystem fs = getFileSystem();
final Path testpath = path("readFileToChange.txt");
// initial write
writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
try(FSDataInputStream instream = fs.open(testpath)) {
instream.seek(fullLength - 16);
assertTrue("no data to read", instream.read() >= 0);
// overwrite
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
// here the file length is less. Probe the file to see if this is true,
// with a spin and wait
eventually(30 *1000, new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
return null;
}
});
// here length is shorter. Assuming it has propagated to all replicas,
// the position of the input stream is now beyond the EOF.
// An attempt to seek backwards to a position greater than the
// short length will raise an exception from AWS S3, which must be
// translated into an EOF
instream.seek(shortLen + 1024);
int c = instream.read();
assertIsEOF("read()", c);
byte[] buf = new byte[256];
assertIsEOF("read(buffer)", instream.read(buf));
assertIsEOF("read(offset)",
instream.read(instream.getPos(), buf, 0, buf.length));
// now do a block read fully, again, backwards from the current pos
try {
instream.readFully(shortLen + 512, buf);
fail("Expected readFully to fail");
} catch (EOFException expected) {
LOG.debug("Expected EOF: ", expected);
}
assertIsEOF("read(offset)",
instream.read(shortLen + 510, buf, 0, buf.length));
// seek somewhere useful
instream.seek(shortLen - 256);
// delete the file. Reads must fail
fs.delete(testpath, false);
try {
int r = instream.read();
fail("Expected an exception, got " + r);
} catch (FileNotFoundException e) {
// expected
}
try {
instream.readFully(2048, buf);
fail("Expected readFully to fail");
} catch (FileNotFoundException e) {
// expected
}
}
}
/**
* Assert that a read operation returned an EOF value.
* @param operation specific operation
* @param readResult result
*/
private void assertIsEOF(String operation, int readResult) {
assertEquals("Expected EOF from "+ operation
+ "; got char " + (char) readResult, -1, readResult);
}
@Test
public void test404isNotFound() throws Throwable {
verifyTranslated(FileNotFoundException.class, createS3Exception(404));
}
protected Exception verifyTranslated(Class clazz,
AmazonClientException exception) throws Exception {
return verifyExceptionClass(clazz,
translateException("test", "/", exception));
}
@Test
public void test401isNotPermittedFound() throws Throwable {
verifyTranslated(AccessDeniedException.class,
createS3Exception(401));
}
protected AmazonS3Exception createS3Exception(int code) {
AmazonS3Exception source = new AmazonS3Exception("");
source.setStatusCode(code);
return source;
}
@Test
public void testGenericS3Exception() throws Throwable {
// S3 exception of no known type
AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
AWSS3IOException.class,
createS3Exception(451));
assertEquals(451, ex.getStatusCode());
}
@Test
public void testGenericServiceS3Exception() throws Throwable {
// service exception of no known type
AmazonServiceException ase = new AmazonServiceException("unwind");
ase.setStatusCode(500);
AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
AWSServiceIOException.class,
ase);
assertEquals(500, ex.getStatusCode());
}
@Test
public void testGenericClientException() throws Throwable {
// Generic Amazon exception
verifyTranslated(AWSClientIOException.class,
new AmazonClientException(""));
}
}