diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java index 437276d5c10..081e08597ed 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java @@ -29,4 +29,8 @@ public class InvalidRequestException extends IOException { public InvalidRequestException(String str) { super(str); } + + public InvalidRequestException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java index 5277507e150..e0379b3971d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java @@ -24,4 +24,14 @@ public class PathAccessDeniedException extends PathIOException { public PathAccessDeniedException(String path) { super(path, "Permission denied"); } -} \ No newline at end of file + + public PathAccessDeniedException(String path, Throwable cause) { + super(path, cause); + } + + public PathAccessDeniedException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java index c5a083837e5..ae3a57f17a4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java @@ -18,12 +18,26 @@ package org.apache.hadoop.fs; /** - * Exception corresponding to Permission denied - ENOENT + * Exception corresponding to path not found: ENOENT/ENOFILE */ public class PathNotFoundException extends PathIOException { static final long serialVersionUID = 0L; /** @param path for the exception */ public PathNotFoundException(String path) { super(path, "No such file or directory"); - } -} \ No newline at end of file + } + + public PathNotFoundException(String path, Throwable cause) { + super(path, cause); + } + + public PathNotFoundException(String path, String error) { + super(path, error); + } + + public PathNotFoundException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java index 483b1de4535..3c3541c8c3d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java @@ -26,4 +26,18 @@ public class PathPermissionException extends PathIOException { public PathPermissionException(String path) { super(path, "Operation not permitted"); } -} \ No newline at end of file + + public PathPermissionException(String path, Throwable cause) { + super(path, cause); + } + + public PathPermissionException(String path, String error) { + super(path, error); + } + + public PathPermissionException(String path, + String error, + Throwable cause) { + super(path, error, cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java new file mode 100644 index 00000000000..a8c01cbe02b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.google.common.base.Preconditions; + +import java.io.IOException; + +/** + * IOException equivalent of an {@link AmazonClientException}. + */ +public class AWSClientIOException extends IOException { + + private final String operation; + + public AWSClientIOException(String operation, + AmazonClientException cause) { + super(cause); + Preconditions.checkArgument(operation != null, "Null 'operation' argument"); + Preconditions.checkArgument(cause != null, "Null 'cause' argument"); + this.operation = operation; + } + + public AmazonClientException getCause() { + return (AmazonClientException) super.getCause(); + } + + @Override + public String getMessage() { + return operation + ": " + getCause().getMessage(); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java new file mode 100644 index 00000000000..014d217b6a4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Map; + +/** + * Wrap a {@link AmazonS3Exception} as an IOE, relaying all + * getters. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AWSS3IOException extends AWSServiceIOException { + + /** + * Instantiate. + * @param operation operation which triggered this + * @param cause the underlying cause + */ + public AWSS3IOException(String operation, + AmazonS3Exception cause) { + super(operation, cause); + } + + public AmazonS3Exception getCause() { + return (AmazonS3Exception) super.getCause(); + } + + public String getErrorResponseXml() { + return getCause().getErrorResponseXml(); + } + + public Map getAdditionalDetails() { + return getCause().getAdditionalDetails(); + } + + public String getExtendedRequestId() { + return getCause().getExtendedRequestId(); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java new file mode 100644 index 00000000000..a9c2c984020 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A specific exception from AWS operations. + * The exception must always be created with an {@link AmazonServiceException}. + * The attributes of this exception can all be directly accessed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AWSServiceIOException extends AWSClientIOException { + + /** + * Instantiate. + * @param operation operation which triggered this + * @param cause the underlying cause + */ + public AWSServiceIOException(String operation, + AmazonServiceException cause) { + super(operation, cause); + } + + public AmazonServiceException getCause() { + return (AmazonServiceException) super.getCause(); + } + + public String getRequestId() { + return getCause().getRequestId(); + } + + public String getServiceName() { + return getCause().getServiceName(); + } + + public String getErrorCode() { + return getCause().getErrorCode(); + } + + public int getStatusCode() { + return getCause().getStatusCode(); + } + + public String getRawResponseContent() { + return getCause().getRawResponseContent(); + } + + public boolean isRetryable() { + return getCause().isRetryable(); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 33535223a12..61a83d4b062 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.AmazonS3Client; @@ -54,6 +53,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** * Upload files/parts asap directly from a memory buffer (instead of buffering @@ -152,10 +152,8 @@ public class S3AFastOutputStream extends OutputStream { this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); this.multiPartUpload = null; this.progressListener = new ProgressableListener(progress); - if (LOG.isDebugEnabled()){ - LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'", - bucket, key); - } + LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'", + bucket, key); } /** @@ -210,15 +208,11 @@ public class S3AFastOutputStream extends OutputStream { requires multiple parts! */ final byte[] allBytes = buffer.toByteArray(); buffer = null; //earlier gc? - if (LOG.isDebugEnabled()) { - LOG.debug("Total length of initial buffer: {}", allBytes.length); - } + LOG.debug("Total length of initial buffer: {}", allBytes.length); int processedPos = 0; while ((multiPartThreshold - processedPos) >= partSize) { - if (LOG.isDebugEnabled()) { - LOG.debug("Initial buffer: processing from byte {} to byte {}", - processedPos, (processedPos + partSize - 1)); - } + LOG.debug("Initial buffer: processing from byte {} to byte {}", + processedPos, (processedPos + partSize - 1)); multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes, processedPos, partSize), partSize); processedPos += partSize; @@ -235,7 +229,13 @@ public class S3AFastOutputStream extends OutputStream { } } - + /** + * Close the stream. This will not return until the upload is complete + * or the attempt to perform the upload has failed. + * Exceptions raised in this method are indicative that the write has + * failed and data is at risk of being lost. + * @throws IOException on any failure. + */ @Override public synchronized void close() throws IOException { if (closed) { @@ -258,9 +258,7 @@ public class S3AFastOutputStream extends OutputStream { statistics.incrementWriteOps(1); // This will delete unnecessary fake parent directories fs.finishedWrite(key); - if (LOG.isDebugEnabled()) { - LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); - } + LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); } finally { buffer = null; super.close(); @@ -283,20 +281,14 @@ public class S3AFastOutputStream extends OutputStream { try { return new MultiPartUpload( client.initiateMultipartUpload(initiateMPURequest).getUploadId()); - } catch (AmazonServiceException ase) { - throw new IOException("Unable to initiate MultiPartUpload (server side)" + - ": " + ase, ase); } catch (AmazonClientException ace) { - throw new IOException("Unable to initiate MultiPartUpload (client side)" + - ": " + ace, ace); + throw translateException("initiate MultiPartUpload", key, ace); } } private void putObject() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket, - key); - } + LOG.debug("Executing regular upload for bucket '{}' key '{}'", + bucket, key); final ObjectMetadata om = createDefaultMetadata(); om.setContentLength(buffer.size()); final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, @@ -317,10 +309,11 @@ public class S3AFastOutputStream extends OutputStream { LOG.warn("Interrupted object upload:" + ie, ie); Thread.currentThread().interrupt(); } catch (ExecutionException ee) { - throw new IOException("Regular upload failed", ee.getCause()); + throw extractException("regular upload", key, ee); } } + private class MultiPartUpload { private final String uploadId; private final List> partETagsFutures; @@ -328,13 +321,11 @@ public class S3AFastOutputStream extends OutputStream { public MultiPartUpload(String uploadId) { this.uploadId = uploadId; this.partETagsFutures = new ArrayList>(); - if (LOG.isDebugEnabled()) { - LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " + - "id '{}'", bucket, key, uploadId); - } + LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " + + "id '{}'", bucket, key, uploadId); } - public void uploadPartAsync(ByteArrayInputStream inputStream, + private void uploadPartAsync(ByteArrayInputStream inputStream, int partSize) { final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request = @@ -346,22 +337,21 @@ public class S3AFastOutputStream extends OutputStream { executorService.submit(new Callable() { @Override public PartETag call() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Uploading part {} for id '{}'", currentPartNumber, - uploadId); - } + LOG.debug("Uploading part {} for id '{}'", currentPartNumber, + uploadId); return client.uploadPart(request).getPartETag(); } }); partETagsFutures.add(partETagFuture); } - public List waitForAllPartUploads() throws IOException { + private List waitForAllPartUploads() throws IOException { try { return Futures.allAsList(partETagsFutures).get(); } catch (InterruptedException ie) { LOG.warn("Interrupted partUpload:" + ie, ie); Thread.currentThread().interrupt(); + return null; } catch (ExecutionException ee) { //there is no way of recovering so abort //cancel all partUploads @@ -370,22 +360,23 @@ public class S3AFastOutputStream extends OutputStream { } //abort multipartupload this.abort(); - throw new IOException("Part upload failed in multi-part upload with " + - "id '" +uploadId + "':" + ee, ee); + throw extractException("Multi-part upload with id '" + uploadId + "'", + key, ee); } - //should not happen? - return null; } - public void complete(List partETags) { - if (LOG.isDebugEnabled()) { - LOG.debug("Completing multi-part upload for key '{}', id '{}'", key, - uploadId); + private void complete(List partETags) throws IOException { + try { + LOG.debug("Completing multi-part upload for key '{}', id '{}'", + key, uploadId); + client.completeMultipartUpload( + new CompleteMultipartUploadRequest(bucket, + key, + uploadId, + partETags)); + } catch (AmazonClientException e) { + throw translateException("Completing multi-part upload", key, e); } - final CompleteMultipartUploadRequest completeRequest = - new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags); - client.completeMultipartUpload(completeRequest); - } public void abort() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 6a4c689d4d5..f6086ae0cc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -43,6 +43,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain; import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -78,6 +79,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.VersionInfo; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,93 +183,122 @@ public class S3AFileSystem extends FileSystem { public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); setConf(conf); - instrumentation = new S3AInstrumentation(name); + try { + instrumentation = new S3AInstrumentation(name); - uri = URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", System.getProperty("user.name")) - .makeQualified(this.uri, this.getWorkingDirectory()); + uri = URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", System.getProperty("user.name")) + .makeQualified(this.uri, this.getWorkingDirectory()); - bucket = name.getHost(); + bucket = name.getHost(); - AWSCredentialsProvider credentials = getAWSCredentialsProvider(name, conf); + AWSCredentialsProvider credentials = + getAWSCredentialsProvider(name, conf); - ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); + ClientConfiguration awsConf = new ClientConfiguration(); + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + + initProxySupport(conf, awsConf, secureConnections); + + initUserAgent(conf, awsConf); + + initAmazonS3Client(conf, credentials, awsConf); + + maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); + partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + if (partSize < 5 * 1024 * 1024) { + LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); + partSize = 5 * 1024 * 1024; + } + + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); + if (multiPartThreshold < 5 * 1024 * 1024) { + LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + multiPartThreshold = 5 * 1024 * 1024; + } + //check but do not store the block size + longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); + enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); + + readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); + + int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0); + int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0); + if (maxThreads == 0) { + maxThreads = Runtime.getRuntime().availableProcessors() * 8; + } + if (coreThreads == 0) { + coreThreads = Runtime.getRuntime().availableProcessors() * 8; + } + long keepAliveTime = longOption(conf, KEEPALIVE_TIME, + DEFAULT_KEEPALIVE_TIME, 0); + LinkedBlockingQueue workQueue = + new LinkedBlockingQueue<>(maxThreads * + intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1)); + threadPoolExecutor = new ThreadPoolExecutor( + coreThreads, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + workQueue, + newDaemonThreadFactory("s3a-transfer-shared-")); + threadPoolExecutor.allowCoreThreadTimeOut(true); + + initTransferManager(); + + initCannedAcls(conf); + + verifyBucketExists(); + + initMultipartUploads(conf); + + serverSideEncryptionAlgorithm = + conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); + } catch (AmazonClientException e) { + throw translateException("initializing ", new Path(name), e); } - initProxySupport(conf, awsConf, secureConnections); + } - initUserAgent(conf, awsConf); - - initAmazonS3Client(conf, credentials, awsConf); - - maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - if (partSize < 5 * 1024 * 1024) { - LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); - partSize = 5 * 1024 * 1024; + /** + * Verify that the bucket exists. This does not check permissions, + * not even read access. + * @throws FileNotFoundException the bucket is absent + * @throws IOException any other problem talking to S3 + */ + protected void verifyBucketExists() + throws FileNotFoundException, IOException { + try { + if (!s3.doesBucketExist(bucket)) { + throw new FileNotFoundException("Bucket " + bucket + " does not exist"); + } + } catch (AmazonS3Exception e) { + // this is a sign of a serious startup problem so do dump everything + LOG.warn(stringify(e), e); + throw translateException("doesBucketExist", bucket, e); + } catch (AmazonServiceException e) { + // this is a sign of a serious startup problem so do dump everything + LOG.warn(stringify(e), e); + throw translateException("doesBucketExist", bucket, e); + } catch (AmazonClientException e) { + throw translateException("doesBucketExist", bucket, e); } - - multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - if (multiPartThreshold < 5 * 1024 * 1024) { - LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); - multiPartThreshold = 5 * 1024 * 1024; - } - //check but do not store the block size - longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); - enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - - readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); - - int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0); - int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0); - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; - } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = longOption(conf, KEEPALIVE_TIME, - DEFAULT_KEEPALIVE_TIME, 0); - LinkedBlockingQueue workQueue = - new LinkedBlockingQueue<>(maxThreads * - intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1)); - threadPoolExecutor = new ThreadPoolExecutor( - coreThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - newDaemonThreadFactory("s3a-transfer-shared-")); - threadPoolExecutor.allowCoreThreadTimeOut(true); - - initTransferManager(); - - initCannedAcls(conf); - - if (!s3.doesBucketExist(bucket)) { - throw new FileNotFoundException("Bucket " + bucket + " does not exist"); - } - - initMultipartUploads(conf); - - serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); - } void initProxySupport(Configuration conf, ClientConfiguration awsConf, @@ -379,7 +410,7 @@ public class S3AFileSystem extends FileSystem { } } - private void initMultipartUploads(Configuration conf) { + private void initMultipartUploads(Configuration conf) throws IOException { boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART); long purgeExistingMultipartAge = longOption(conf, @@ -394,10 +425,10 @@ public class S3AFileSystem extends FileSystem { } catch (AmazonServiceException e) { if (e.getStatusCode() == 403) { instrumentation.errorIgnored(); - LOG.debug("Failed to abort multipart uploads against {}," + + LOG.debug("Failed to purging multipart uploads against {}," + " FS may be read only", bucket, e); } else { - throw e; + throw translateException("purging multipart uploads", bucket, e); } } } @@ -639,10 +670,28 @@ public class S3AFileSystem extends FileSystem { * * @param src path to be renamed * @param dst new path after rename - * @throws IOException on failure + * @throws IOException on IO failure * @return true if rename is successful */ public boolean rename(Path src, Path dst) throws IOException { + try { + return innerRename(src, dst); + } catch (AmazonClientException e) { + throw translateException("rename(" + src +", " + dst + ")", src, e); + } + } + + /** + * The inner rename operation. See {@link #rename(Path, Path)} for + * the description of the operation. + * @param src path to be renamed + * @param dst new path after rename + * @return true if rename is successful + * @throws IOException on IO failure. + * @throws AmazonClientException on failures inside the AWS SDK + */ + private boolean innerRename(Path src, Path dst) throws IOException, + AmazonClientException { LOG.debug("Rename path {} to {}", src, dst); String srcKey = pathToKey(src); @@ -785,7 +834,7 @@ public class S3AFileSystem extends FileSystem { * when set to true */ private void removeKeys(List keysToDelete, - boolean clearKeys) { + boolean clearKeys) throws AmazonClientException { if (enableMultiObjectsDelete) { DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); @@ -808,7 +857,9 @@ public class S3AFileSystem extends FileSystem { } } - /** Delete a file. + /** + * Delete a Path. This operation is at least {@code O(files)}, with + * added overheads to enumerate the path. It is also not atomic. * * @param f the path to delete. * @param recursive if path is a directory and set to @@ -818,6 +869,26 @@ public class S3AFileSystem extends FileSystem { * @throws IOException due to inability to delete a directory or file. */ public boolean delete(Path f, boolean recursive) throws IOException { + try { + return innerDelete(f, recursive); + } catch (AmazonClientException e) { + throw translateException("delete", f, e); + } + } + + /** + * Delete a path. See {@link #delete(Path, boolean)}. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException due to inability to delete a directory or file. + * @throws AmazonClientException on failures inside the AWS SDK + */ + private boolean innerDelete(Path f, boolean recursive) throws IOException, + AmazonClientException { LOG.debug("Delete path {} - recursive {}", f , recursive); S3AFileStatus status; try { @@ -898,7 +969,8 @@ public class S3AFileSystem extends FileSystem { return true; } - private void createFakeDirectoryIfNecessary(Path f) throws IOException { + private void createFakeDirectoryIfNecessary(Path f) + throws IOException, AmazonClientException { String key = pathToKey(f); if (!key.isEmpty() && !exists(f)) { LOG.debug("Creating new fake directory at {}", f); @@ -917,6 +989,25 @@ public class S3AFileSystem extends FileSystem { */ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + try { + return innerListStatus(f); + } catch (AmazonClientException e) { + throw translateException("listStatus", f, e); + } + } + + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws FileNotFoundException when the path does not exist; + * @throws IOException due to an IO problem. + * @throws AmazonClientException on failures inside the AWS SDK + */ + public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, + IOException, AmazonClientException { String key = pathToKey(f); LOG.debug("List status for path: {}", f); @@ -1008,15 +1099,42 @@ public class S3AFileSystem extends FileSystem { } /** - * Make the given file and all non-existent parents into - * directories. Has the semantics of Unix 'mkdir -p'. + * + * Make the given path and all non-existent parents into + * directories. Has the semantics of Unix @{code 'mkdir -p'}. * Existence of the directory hierarchy is not an error. - * @param f path to create + * @param path path to create * @param permission to apply to f + * @return true if a directory was created + * @throws FileAlreadyExistsException there is a file at the path specified + * @throws IOException other IO problems */ // TODO: If we have created an empty file at /foo/bar and we then call // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? - public boolean mkdirs(Path f, FsPermission permission) throws IOException { + public boolean mkdirs(Path path, FsPermission permission) throws IOException, + FileAlreadyExistsException { + try { + return innerMkdirs(path, permission); + } catch (AmazonClientException e) { + throw translateException("innerMkdirs", path, e); + } + } + /** + * + * Make the given path and all non-existent parents into + * directories. + * See {@link #mkdirs(Path, FsPermission)} + * @param f path to create + * @param permission to apply to f + * @return true if a directory was created + * @throws FileAlreadyExistsException there is a file at the path specified + * @throws IOException other IO problems + * @throws AmazonClientException on failures inside the AWS SDK + */ + // TODO: If we have created an empty file at /foo/bar and we then call + // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? + private boolean innerMkdirs(Path f, FsPermission permission) + throws IOException, FileAlreadyExistsException, AmazonClientException { LOG.debug("Making directory: {}", f); try { @@ -1054,7 +1172,7 @@ public class S3AFileSystem extends FileSystem { * @param f The path we want information from * @return a FileStatus object * @throws java.io.FileNotFoundException when the path does not exist; - * IOException see specific implementation + * @throws IOException on other problems. */ public S3AFileStatus getFileStatus(Path f) throws IOException { String key = pathToKey(f); @@ -1078,12 +1196,10 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(f.toString(), e); - throw e; + throw translateException("getFileStatus", f, e); } } catch (AmazonClientException e) { - printAmazonClientException(f.toString(), e); - throw e; + throw translateException("getFileStatus", f, e); } // Necessary? @@ -1106,12 +1222,10 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(newKey, e); - throw e; + throw translateException("getFileStatus", newKey, e); } } catch (AmazonClientException e) { - printAmazonClientException(newKey, e); - throw e; + throw translateException("getFileStatus", newKey, e); } } } @@ -1152,12 +1266,10 @@ public class S3AFileSystem extends FileSystem { } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - printAmazonServiceException(key, e); - throw e; + throw translateException("getFileStatus", key, e); } } catch (AmazonClientException e) { - printAmazonClientException(key, e); - throw e; + throw translateException("getFileStatus", key, e); } LOG.debug("Not Found: {}", f); @@ -1176,10 +1288,42 @@ public class S3AFileSystem extends FileSystem { * @param overwrite whether to overwrite an existing file * @param src path * @param dst path + * @throws IOException IO problem + * @throws FileAlreadyExistsException the destination file exists and + * overwrite==false + * @throws AmazonClientException failure in the AWS SDK */ @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { + try { + innerCopyFromLocalFile(delSrc, overwrite, src, dst); + } catch (AmazonClientException e) { + throw translateException("copyFromLocalFile(" + src + ", " + dst + ")", + src, e); + } + } + + /** + * The src file is on the local disk. Add it to FS at + * the given dst name. + * + * This version doesn't need to create a temporary file to calculate the md5. + * Sadly this doesn't seem to be used by the shell cp :( + * + * delSrc indicates if the source should be removed + * @param delSrc whether to delete the src + * @param overwrite whether to overwrite an existing file + * @param src path + * @param dst path + * @throws IOException IO problem + * @throws FileAlreadyExistsException the destination file exists and + * overwrite==false + * @throws AmazonClientException failure in the AWS SDK + */ + private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, + Path src, Path dst) + throws IOException, FileAlreadyExistsException, AmazonClientException { String key = pathToKey(dst); if (!overwrite && exists(dst)) { @@ -1229,8 +1373,12 @@ public class S3AFileSystem extends FileSystem { } } + /** + * Close the filesystem. This shuts down all transfers. + * @throws IOException IO problem + */ @Override - public void close() throws IOException { + public synchronized void close() throws IOException { try { super.close(); } finally { @@ -1242,49 +1390,63 @@ public class S3AFileSystem extends FileSystem { } /** - * Override getCanonicalServiceName because we don't support token in S3A. - */ + * Override getCanonicalServiceName because we don't support token in S3A. + */ @Override public String getCanonicalServiceName() { // Does not support Token return null; } + /** + * Copy a single object in the bucket via a COPY operation. + * @param srcKey source object path + * @param dstKey destination object path + * @param size object size + * @throws AmazonClientException on failures inside the AWS SDK + * @throws InterruptedIOException the operation was interrupted + * @throws IOException Other IO problems + */ private void copyFile(String srcKey, String dstKey, long size) - throws IOException { + throws IOException, InterruptedIOException, AmazonClientException { LOG.debug("copyFile {} -> {} ", srcKey, dstKey); - ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); - ObjectMetadata dstom = cloneObjectMetadata(srcom); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } - CopyObjectRequest copyObjectRequest = - new CopyObjectRequest(bucket, srcKey, bucket, dstKey); - copyObjectRequest.setCannedAccessControlList(cannedACL); - copyObjectRequest.setNewObjectMetadata(dstom); - - ProgressListener progressListener = new ProgressListener() { - public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventType()) { - case TRANSFER_PART_COMPLETED_EVENT: - statistics.incrementWriteOps(1); - break; - default: - break; - } - } - }; - - Copy copy = transfers.copy(copyObjectRequest); - copy.addProgressListener(progressListener); try { - copy.waitForCopyResult(); - statistics.incrementWriteOps(1); - instrumentation.filesCopied(1, size); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted copying " + srcKey - + " to " + dstKey + ", cancelling"); + ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + ObjectMetadata dstom = cloneObjectMetadata(srcom); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = + new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: + statistics.incrementWriteOps(1); + break; + default: + break; + } + } + }; + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(progressListener); + try { + copy.waitForCopyResult(); + statistics.incrementWriteOps(1); + instrumentation.filesCopied(1, size); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted copying " + srcKey + + " to " + dstKey + ", cancelling"); + } + } catch (AmazonClientException e) { + throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")", + srcKey, e); } } @@ -1303,11 +1465,20 @@ public class S3AFileSystem extends FileSystem { return date.getTime(); } - public void finishedWrite(String key) throws IOException { + /** + * Perform post-write actions. + * @param key key written to + */ + public void finishedWrite(String key) { deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); } - private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { + /** + * Delete mock parent directories which are no longer needed. + * This code swallows IO exceptions encountered + * @param f path + */ + private void deleteUnnecessaryFakeDirectories(Path f) { while (true) { String key = ""; try { @@ -1323,7 +1494,7 @@ public class S3AFileSystem extends FileSystem { s3.deleteObject(bucket, key + "/"); statistics.incrementWriteOps(1); } - } catch (FileNotFoundException | AmazonServiceException e) { + } catch (IOException | AmazonClientException e) { LOG.debug("While deleting key {} ", key, e); instrumentation.errorIgnored(); } @@ -1446,28 +1617,6 @@ public class S3AFileSystem extends FileSystem { return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); } - private void printAmazonServiceException(String target, - AmazonServiceException ase) { - LOG.info("{}: caught an AmazonServiceException {}", target, ase); - LOG.info("This means your request made it to Amazon S3," + - " but was rejected with an error response for some reason."); - LOG.info("Error Message: {}", ase.getMessage()); - LOG.info("HTTP Status Code: {}", ase.getStatusCode()); - LOG.info("AWS Error Code: {}", ase.getErrorCode()); - LOG.info("Error Type: {}", ase.getErrorType()); - LOG.info("Request ID: {}", ase.getRequestId()); - LOG.info("Class Name: {}", ase.getClass().getName()); - LOG.info("Exception", ase); - } - - private void printAmazonClientException(String target, - AmazonClientException ace) { - LOG.info("{}: caught an AmazonClientException {}", target, ace); - LOG.info("This means the client encountered " + - "a problem while trying to communicate with S3, " + - "such as not being able to access the network.", ace); - } - @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 27557f82a83..7b5b7b3a017 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; @@ -29,11 +30,14 @@ import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; + import org.slf4j.Logger; import java.io.EOFException; import java.io.IOException; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; + /** * The input stream for an S3A object. * @@ -112,7 +116,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { * @param reason reason for reopen * @param targetPos target position * @param length length requested - * @throws IOException + * @throws IOException on any failure to open the object */ private synchronized void reopen(String reason, long targetPos, long length) throws IOException { @@ -126,13 +130,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos); streamStatistics.streamOpened(); - GetObjectRequest request = new GetObjectRequest(bucket, key) - .withRange(targetPos, requestedStreamLen); - wrappedStream = client.getObject(request).getObjectContent(); + try { + GetObjectRequest request = new GetObjectRequest(bucket, key) + .withRange(targetPos, requestedStreamLen); + wrappedStream = client.getObject(request).getObjectContent(); - if (wrappedStream == null) { - throw new IOException("Null IO stream from reopen of (" + reason + ") " - + uri); + if (wrappedStream == null) { + throw new IOException("Null IO stream from reopen of (" + reason + ") " + + uri); + } + } catch (AmazonClientException e) { + throw translateException("Reopen at position " + targetPos, uri, e); } this.pos = targetPos; @@ -276,10 +284,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { return -1; } - lazySeek(nextReadPos, 1); int byteRead; try { + lazySeek(nextReadPos, 1); byteRead = wrappedStream.read(); } catch (EOFException e) { return -1; @@ -337,11 +345,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { return -1; } - lazySeek(nextReadPos, len); - streamStatistics.readOperationStarted(nextReadPos, len); + try { + lazySeek(nextReadPos, len); + } catch (EOFException e) { + // the end of the file has moved + return -1; + } int bytesRead; try { + streamStatistics.readOperationStarted(nextReadPos, len); bytesRead = wrappedStream.read(buf, off, len); } catch (EOFException e) { throw e; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index f9ff701a9b9..593e9e8da49 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; @@ -40,11 +41,13 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** * Output stream to save data to S3. @@ -92,13 +95,15 @@ public class S3AOutputStream extends OutputStream { lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a"); } - backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); + backupFile = lDirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); closed = false; LOG.debug("OutputStream for key '{}' writing to tempfile: {}", key, backupFile); - this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); + this.backupStream = new BufferedOutputStream( + new FileOutputStream(backupFile)); } @Override @@ -123,7 +128,8 @@ public class S3AOutputStream extends OutputStream { if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } - PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); + PutObjectRequest putObjectRequest = + new PutObjectRequest(bucket, key, backupFile); putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setMetadata(om); @@ -135,18 +141,20 @@ public class S3AOutputStream extends OutputStream { upload.waitForUploadResult(); - long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred(); + long delta = upload.getProgress().getBytesTransferred() - + listener.getLastBytesTransferred(); if (statistics != null && delta != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("S3A write delta changed after finished: " + delta + " bytes"); - } + LOG.debug("S3A write delta changed after finished: {} bytes", delta); statistics.incrementBytesWritten(delta); } // This will delete unnecessary fake parent directories fs.finishedWrite(key); } catch (InterruptedException e) { - throw new IOException(e); + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (AmazonClientException e) { + throw translateException("saving output", key , e); } finally { if (!backupFile.delete()) { LOG.warn("Could not delete temporary s3a file: {}", backupFile); @@ -154,9 +162,7 @@ public class S3AOutputStream extends OutputStream { super.close(); closed = true; } - if (LOG.isDebugEnabled()) { - LOG.debug("OutputStream for key '" + key + "' upload complete"); - } + LOG.debug("OutputStream for key '{}' upload complete", key); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java new file mode 100644 index 00000000000..12d14e27f54 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * Utility methods for S3A code. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class S3AUtils { + + private S3AUtils() { + } + + /** + * Translate an exception raised in an operation into an IOException. + * The specific type of IOException depends on the class of + * {@link AmazonClientException} passed in, and any status codes included + * in the operation. That is: HTTP error codes are examined and can be + * used to build a more specific response. + * @param operation operation + * @param path path operated on (must not be null) + * @param exception amazon exception raised + * @return an IOE which wraps the caught exception. + */ + public static IOException translateException(String operation, + Path path, + AmazonClientException exception) { + return translateException(operation, path.toString(), exception); + } + + /** + * Translate an exception raised in an operation into an IOException. + * The specific type of IOException depends on the class of + * {@link AmazonClientException} passed in, and any status codes included + * in the operation. That is: HTTP error codes are examined and can be + * used to build a more specific response. + * @param operation operation + * @param path path operated on (may be null) + * @param exception amazon exception raised + * @return an IOE which wraps the caught exception. + */ + @SuppressWarnings("ThrowableInstanceNeverThrown") + public static IOException translateException(String operation, + String path, + AmazonClientException exception) { + String message = String.format("%s%s: %s", + operation, + path != null ? (" on " + path) : "", + exception); + if (!(exception instanceof AmazonServiceException)) { + return new AWSClientIOException(message, exception); + } else { + + IOException ioe; + AmazonServiceException ase = (AmazonServiceException) exception; + // this exception is non-null if the service exception is an s3 one + AmazonS3Exception s3Exception = ase instanceof AmazonS3Exception + ? (AmazonS3Exception) ase + : null; + int status = ase.getStatusCode(); + switch (status) { + + // permissions + case 401: + case 403: + ioe = new AccessDeniedException(path, null, message); + ioe.initCause(ase); + break; + + // the object isn't there + case 404: + case 410: + ioe = new FileNotFoundException(message); + ioe.initCause(ase); + break; + + // out of range. This may happen if an object is overwritten with + // a shorter one while it is being read. + case 416: + ioe = new EOFException(message); + break; + + default: + // no specific exit code. Choose an IOE subclass based on the class + // of the caught exception + ioe = s3Exception != null + ? new AWSS3IOException(message, s3Exception) + : new AWSServiceIOException(message, ase); + break; + } + return ioe; + } + } + + /** + * Extract an exception from a failed future, and convert to an IOE. + * @param operation operation which failed + * @param path path operated on (may be null) + * @param ee execution exception + * @return an IOE which can be thrown + */ + public static IOException extractException(String operation, + String path, + ExecutionException ee) { + IOException ioe; + Throwable cause = ee.getCause(); + if (cause instanceof AmazonClientException) { + ioe = translateException(operation, path, (AmazonClientException) cause); + } else if (cause instanceof IOException) { + ioe = (IOException) cause; + } else { + ioe = new IOException(operation + " failed: " + cause, cause); + } + return ioe; + } + + /** + * Get low level details of an amazon exception for logging; multi-line. + * @param e exception + * @return string details + */ + public static String stringify(AmazonServiceException e) { + StringBuilder builder = new StringBuilder( + String.format("%s: %s error %d: %s; %s%s%n", + e.getErrorType(), + e.getServiceName(), + e.getStatusCode(), + e.getErrorCode(), + e.getErrorMessage(), + (e.isRetryable() ? " (retryable)": "") + )); + String rawResponseContent = e.getRawResponseContent(); + if (rawResponseContent != null) { + builder.append(rawResponseContent); + } + return builder.toString(); + } + + /** + * Get low level details of an amazon exception for logging; multi-line. + * @param e exception + * @return string details + */ + public static String stringify(AmazonS3Exception e) { + // get the low level details of an exception, + StringBuilder builder = new StringBuilder( + stringify((AmazonServiceException) e)); + Map details = e.getAdditionalDetails(); + if (details != null) { + builder.append('\n'); + for (Map.Entry d : details.entrySet()) { + builder.append(d.getKey()).append('=') + .append(d.getValue()).append('\n'); + } + } + return builder.toString(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java new file mode 100644 index 00000000000..a01af5ccdc4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * S3A Filesystem. Except for the exceptions, it should + * all be hidden as implementation details. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 2308dd48cb0..44bdc023e88 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -25,6 +25,7 @@ import org.junit.internal.AssumptionViolatedException; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Callable; public class S3ATestUtils { @@ -72,4 +73,70 @@ public class S3ATestUtils { FileContext fc = FileContext.getFileContext(testURI,conf); return fc; } + + /** + * Repeatedly attempt a callback until timeout or a {@link FailFastException} + * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code. + * @param timeout timeout + * @param callback callback to invoke + * @throws FailFastException any fast-failure + * @throws Exception the exception which caused the iterator to fail + */ + public static void eventually(int timeout, Callable callback) + throws Exception { + Exception lastException; + long endtime = System.currentTimeMillis() + timeout; + do { + try { + callback.call(); + return; + } catch (FailFastException e) { + throw e; + } catch (Exception e) { + lastException = e; + } + Thread.sleep(500); + } while (endtime > System.currentTimeMillis()); + throw lastException; + } + + /** + * The exception to raise so as to exit fast from + * {@link #eventually(int, Callable)}. + */ + public static class FailFastException extends Exception { + public FailFastException() { + } + + public FailFastException(String message) { + super(message); + } + + public FailFastException(String message, Throwable cause) { + super(message, cause); + } + + public FailFastException(Throwable cause) { + super(cause); + } + } + + /** + * Verify the class of an exception. If it is not as expected, rethrow it. + * Comparison is on the exact class, not subclass-of inference as + * offered by {@code instanceof}. + * @param clazz the expected exception class + * @param ex the exception caught + * @return the exception, if it is of the expected class + * @throws Exception the exception passed in. + */ + public static Exception verifyExceptionClass(Class clazz, + Exception ex) + throws Exception { + if (!(ex.getClass().equals(clazz))) { + throw ex; + } + return ex; + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java index b20a768f287..1a11a45a197 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java @@ -23,8 +23,10 @@ import static org.junit.Assert.*; import java.io.IOException; import java.net.URI; +import java.nio.file.AccessDeniedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.junit.Test; import com.amazonaws.auth.AWSCredentials; @@ -32,7 +34,6 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProviderChain; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.InstanceProfileCredentialsProvider; -import com.amazonaws.services.s3.model.AmazonS3Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public class TestS3AAWSCredentialsProvider { Configuration conf = new Configuration(); conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class"); try { - S3ATestUtils.createTestFileSystem(conf); + createFailingFS(conf); } catch (IOException e) { if (!(e.getCause() instanceof ClassNotFoundException)) { LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e); @@ -58,6 +59,18 @@ public class TestS3AAWSCredentialsProvider { } } + /** + * Create a filesystem, expect it to fail by raising an IOException. + * Raises an assertion exception if in fact the FS does get instantiated. + * @param conf configuration + * @throws IOException an expected exception. + */ + private void createFailingFS(Configuration conf) throws IOException { + S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf); + fs.listStatus(new Path("/")); + fail("Expected exception - got " + fs); + } + static class BadCredentialsProvider implements AWSCredentialsProvider { @SuppressWarnings("unused") @@ -79,12 +92,9 @@ public class TestS3AAWSCredentialsProvider { Configuration conf = new Configuration(); conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName()); try { - S3ATestUtils.createTestFileSystem(conf); - } catch (AmazonS3Exception e) { - if (e.getStatusCode() != 403) { - LOG.error("Unexpected status code: {}", e.getStatusCode(), e); - throw e; - } + createFailingFS(conf); + } catch (AccessDeniedException e) { + // expected } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java index e5bcc799bc7..874c8390507 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java @@ -21,11 +21,9 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; -import com.amazonaws.services.s3.model.AmazonS3Exception; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.reflect.FieldUtils; -import com.amazonaws.AmazonClientException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -43,7 +41,6 @@ import static org.junit.Assert.fail; import java.io.File; import java.net.URI; -import java.lang.reflect.Field; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.alias.CredentialProvider; @@ -122,7 +119,7 @@ public class TestS3AConfiguration { try { fs = S3ATestUtils.createTestFileSystem(conf); fail("Expected a connection error for proxy server at " + proxy); - } catch (AmazonClientException e) { + } catch (AWSClientIOException e) { // expected } } @@ -153,14 +150,14 @@ public class TestS3AConfiguration { try { fs = S3ATestUtils.createTestFileSystem(conf); fail("Expected a connection error for proxy server"); - } catch (AmazonClientException e) { + } catch (AWSClientIOException e) { // expected } conf.set(Constants.SECURE_CONNECTIONS, "false"); try { fs = S3ATestUtils.createTestFileSystem(conf); fail("Expected a connection error for proxy server"); - } catch (AmazonClientException e) { + } catch (AWSClientIOException e) { // expected } } @@ -374,7 +371,7 @@ public class TestS3AConfiguration { clientOptions.isPathStyleAccess()); byte[] file = ContractTestUtils.toAsciiByteArray("test file"); ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true); - } catch (final AmazonS3Exception e) { + } catch (final AWSS3IOException e) { LOG.error("Caught exception: ", e); // Catch/pass standard path style access behaviour when live bucket // isn't in the same region as the s3 client default. See diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java new file mode 100644 index 00000000000..58ac4969a0e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.nio.file.AccessDeniedException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; + +/** + * Test S3A Failure translation, including a functional test + * generating errors during stream IO. + */ +public class TestS3AFailureHandling extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(TestS3AFailureHandling.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Test + public void testReadFileChanged() throws Throwable { + describe("overwrite a file with a shorter one during a read, seek"); + final int fullLength = 8192; + final byte[] fullDataset = dataset(fullLength, 'a', 32); + final int shortLen = 4096; + final byte[] shortDataset = dataset(shortLen, 'A', 32); + final FileSystem fs = getFileSystem(); + final Path testpath = path("readFileToChange.txt"); + // initial write + writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false); + try(FSDataInputStream instream = fs.open(testpath)) { + instream.seek(fullLength - 16); + assertTrue("no data to read", instream.read() >= 0); + // overwrite + writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); + // here the file length is less. Probe the file to see if this is true, + // with a spin and wait + eventually(30 *1000, new Callable() { + @Override + public Void call() throws Exception { + assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); + return null; + } + }); + // here length is shorter. Assuming it has propagated to all replicas, + // the position of the input stream is now beyond the EOF. + // An attempt to seek backwards to a position greater than the + // short length will raise an exception from AWS S3, which must be + // translated into an EOF + + instream.seek(shortLen + 1024); + int c = instream.read(); + assertIsEOF("read()", c); + + byte[] buf = new byte[256]; + + assertIsEOF("read(buffer)", instream.read(buf)); + assertIsEOF("read(offset)", + instream.read(instream.getPos(), buf, 0, buf.length)); + + // now do a block read fully, again, backwards from the current pos + try { + instream.readFully(shortLen + 512, buf); + fail("Expected readFully to fail"); + } catch (EOFException expected) { + LOG.debug("Expected EOF: ", expected); + } + + assertIsEOF("read(offset)", + instream.read(shortLen + 510, buf, 0, buf.length)); + + // seek somewhere useful + instream.seek(shortLen - 256); + + // delete the file. Reads must fail + fs.delete(testpath, false); + + try { + int r = instream.read(); + fail("Expected an exception, got " + r); + } catch (FileNotFoundException e) { + // expected + } + + try { + instream.readFully(2048, buf); + fail("Expected readFully to fail"); + } catch (FileNotFoundException e) { + // expected + } + + } + } + + /** + * Assert that a read operation returned an EOF value. + * @param operation specific operation + * @param readResult result + */ + private void assertIsEOF(String operation, int readResult) { + assertEquals("Expected EOF from "+ operation + + "; got char " + (char) readResult, -1, readResult); + } + + @Test + public void test404isNotFound() throws Throwable { + verifyTranslated(FileNotFoundException.class, createS3Exception(404)); + } + + protected Exception verifyTranslated(Class clazz, + AmazonClientException exception) throws Exception { + return verifyExceptionClass(clazz, + translateException("test", "/", exception)); + } + + @Test + public void test401isNotPermittedFound() throws Throwable { + verifyTranslated(AccessDeniedException.class, + createS3Exception(401)); + } + + protected AmazonS3Exception createS3Exception(int code) { + AmazonS3Exception source = new AmazonS3Exception(""); + source.setStatusCode(code); + return source; + } + + @Test + public void testGenericS3Exception() throws Throwable { + // S3 exception of no known type + AWSS3IOException ex = (AWSS3IOException)verifyTranslated( + AWSS3IOException.class, + createS3Exception(451)); + assertEquals(451, ex.getStatusCode()); + } + + @Test + public void testGenericServiceS3Exception() throws Throwable { + // service exception of no known type + AmazonServiceException ase = new AmazonServiceException("unwind"); + ase.setStatusCode(500); + AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated( + AWSServiceIOException.class, + ase); + assertEquals(500, ex.getStatusCode()); + } + + @Test + public void testGenericClientException() throws Throwable { + // Generic Amazon exception + verifyTranslated(AWSClientIOException.class, + new AmazonClientException("")); + } + +}