HADOOP-13130. s3a failures can surface as RTEs, not IOEs. (Steve Loughran)
This commit is contained in:
parent
e7b56cf93c
commit
15f997cca6
|
@ -29,4 +29,8 @@ public class InvalidRequestException extends IOException {
|
|||
public InvalidRequestException(String str) {
|
||||
super(str);
|
||||
}
|
||||
|
||||
public InvalidRequestException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,4 +24,14 @@ public class PathAccessDeniedException extends PathIOException {
|
|||
public PathAccessDeniedException(String path) {
|
||||
super(path, "Permission denied");
|
||||
}
|
||||
|
||||
public PathAccessDeniedException(String path, Throwable cause) {
|
||||
super(path, cause);
|
||||
}
|
||||
|
||||
public PathAccessDeniedException(String path,
|
||||
String error,
|
||||
Throwable cause) {
|
||||
super(path, error, cause);
|
||||
}
|
||||
}
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.hadoop.fs;
|
||||
|
||||
/**
|
||||
* Exception corresponding to Permission denied - ENOENT
|
||||
* Exception corresponding to path not found: ENOENT/ENOFILE
|
||||
*/
|
||||
public class PathNotFoundException extends PathIOException {
|
||||
static final long serialVersionUID = 0L;
|
||||
|
@ -26,4 +26,18 @@ public class PathNotFoundException extends PathIOException {
|
|||
public PathNotFoundException(String path) {
|
||||
super(path, "No such file or directory");
|
||||
}
|
||||
|
||||
public PathNotFoundException(String path, Throwable cause) {
|
||||
super(path, cause);
|
||||
}
|
||||
|
||||
public PathNotFoundException(String path, String error) {
|
||||
super(path, error);
|
||||
}
|
||||
|
||||
public PathNotFoundException(String path,
|
||||
String error,
|
||||
Throwable cause) {
|
||||
super(path, error, cause);
|
||||
}
|
||||
}
|
|
@ -26,4 +26,18 @@ public class PathPermissionException extends PathIOException {
|
|||
public PathPermissionException(String path) {
|
||||
super(path, "Operation not permitted");
|
||||
}
|
||||
|
||||
public PathPermissionException(String path, Throwable cause) {
|
||||
super(path, cause);
|
||||
}
|
||||
|
||||
public PathPermissionException(String path, String error) {
|
||||
super(path, error);
|
||||
}
|
||||
|
||||
public PathPermissionException(String path,
|
||||
String error,
|
||||
Throwable cause) {
|
||||
super(path, error, cause);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* IOException equivalent of an {@link AmazonClientException}.
|
||||
*/
|
||||
public class AWSClientIOException extends IOException {
|
||||
|
||||
private final String operation;
|
||||
|
||||
public AWSClientIOException(String operation,
|
||||
AmazonClientException cause) {
|
||||
super(cause);
|
||||
Preconditions.checkArgument(operation != null, "Null 'operation' argument");
|
||||
Preconditions.checkArgument(cause != null, "Null 'cause' argument");
|
||||
this.operation = operation;
|
||||
}
|
||||
|
||||
public AmazonClientException getCause() {
|
||||
return (AmazonClientException) super.getCause();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage() {
|
||||
return operation + ": " + getCause().getMessage();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Wrap a {@link AmazonS3Exception} as an IOE, relaying all
|
||||
* getters.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class AWSS3IOException extends AWSServiceIOException {
|
||||
|
||||
/**
|
||||
* Instantiate.
|
||||
* @param operation operation which triggered this
|
||||
* @param cause the underlying cause
|
||||
*/
|
||||
public AWSS3IOException(String operation,
|
||||
AmazonS3Exception cause) {
|
||||
super(operation, cause);
|
||||
}
|
||||
|
||||
public AmazonS3Exception getCause() {
|
||||
return (AmazonS3Exception) super.getCause();
|
||||
}
|
||||
|
||||
public String getErrorResponseXml() {
|
||||
return getCause().getErrorResponseXml();
|
||||
}
|
||||
|
||||
public Map<String, String> getAdditionalDetails() {
|
||||
return getCause().getAdditionalDetails();
|
||||
}
|
||||
|
||||
public String getExtendedRequestId() {
|
||||
return getCause().getExtendedRequestId();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* A specific exception from AWS operations.
|
||||
* The exception must always be created with an {@link AmazonServiceException}.
|
||||
* The attributes of this exception can all be directly accessed.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class AWSServiceIOException extends AWSClientIOException {
|
||||
|
||||
/**
|
||||
* Instantiate.
|
||||
* @param operation operation which triggered this
|
||||
* @param cause the underlying cause
|
||||
*/
|
||||
public AWSServiceIOException(String operation,
|
||||
AmazonServiceException cause) {
|
||||
super(operation, cause);
|
||||
}
|
||||
|
||||
public AmazonServiceException getCause() {
|
||||
return (AmazonServiceException) super.getCause();
|
||||
}
|
||||
|
||||
public String getRequestId() {
|
||||
return getCause().getRequestId();
|
||||
}
|
||||
|
||||
public String getServiceName() {
|
||||
return getCause().getServiceName();
|
||||
}
|
||||
|
||||
public String getErrorCode() {
|
||||
return getCause().getErrorCode();
|
||||
}
|
||||
|
||||
public int getStatusCode() {
|
||||
return getCause().getStatusCode();
|
||||
}
|
||||
|
||||
public String getRawResponseContent() {
|
||||
return getCause().getRawResponseContent();
|
||||
}
|
||||
|
||||
public boolean isRetryable() {
|
||||
return getCause().isRetryable();
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.event.ProgressEvent;
|
||||
import com.amazonaws.event.ProgressListener;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
|
@ -54,6 +53,7 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* Upload files/parts asap directly from a memory buffer (instead of buffering
|
||||
|
@ -152,10 +152,8 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
|
||||
this.multiPartUpload = null;
|
||||
this.progressListener = new ProgressableListener(progress);
|
||||
if (LOG.isDebugEnabled()){
|
||||
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
|
||||
bucket, key);
|
||||
}
|
||||
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
|
||||
bucket, key);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -210,15 +208,11 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
requires multiple parts! */
|
||||
final byte[] allBytes = buffer.toByteArray();
|
||||
buffer = null; //earlier gc?
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Total length of initial buffer: {}", allBytes.length);
|
||||
}
|
||||
LOG.debug("Total length of initial buffer: {}", allBytes.length);
|
||||
int processedPos = 0;
|
||||
while ((multiPartThreshold - processedPos) >= partSize) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Initial buffer: processing from byte {} to byte {}",
|
||||
processedPos, (processedPos + partSize - 1));
|
||||
}
|
||||
LOG.debug("Initial buffer: processing from byte {} to byte {}",
|
||||
processedPos, (processedPos + partSize - 1));
|
||||
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
|
||||
processedPos, partSize), partSize);
|
||||
processedPos += partSize;
|
||||
|
@ -235,7 +229,13 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Close the stream. This will not return until the upload is complete
|
||||
* or the attempt to perform the upload has failed.
|
||||
* Exceptions raised in this method are indicative that the write has
|
||||
* failed and data is at risk of being lost.
|
||||
* @throws IOException on any failure.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
|
@ -258,9 +258,7 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
statistics.incrementWriteOps(1);
|
||||
// This will delete unnecessary fake parent directories
|
||||
fs.finishedWrite(key);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
|
||||
}
|
||||
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
|
||||
} finally {
|
||||
buffer = null;
|
||||
super.close();
|
||||
|
@ -283,20 +281,14 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
try {
|
||||
return new MultiPartUpload(
|
||||
client.initiateMultipartUpload(initiateMPURequest).getUploadId());
|
||||
} catch (AmazonServiceException ase) {
|
||||
throw new IOException("Unable to initiate MultiPartUpload (server side)" +
|
||||
": " + ase, ase);
|
||||
} catch (AmazonClientException ace) {
|
||||
throw new IOException("Unable to initiate MultiPartUpload (client side)" +
|
||||
": " + ace, ace);
|
||||
throw translateException("initiate MultiPartUpload", key, ace);
|
||||
}
|
||||
}
|
||||
|
||||
private void putObject() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
|
||||
key);
|
||||
}
|
||||
LOG.debug("Executing regular upload for bucket '{}' key '{}'",
|
||||
bucket, key);
|
||||
final ObjectMetadata om = createDefaultMetadata();
|
||||
om.setContentLength(buffer.size());
|
||||
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
|
||||
|
@ -317,10 +309,11 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
LOG.warn("Interrupted object upload:" + ie, ie);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException ee) {
|
||||
throw new IOException("Regular upload failed", ee.getCause());
|
||||
throw extractException("regular upload", key, ee);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class MultiPartUpload {
|
||||
private final String uploadId;
|
||||
private final List<ListenableFuture<PartETag>> partETagsFutures;
|
||||
|
@ -328,13 +321,11 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
public MultiPartUpload(String uploadId) {
|
||||
this.uploadId = uploadId;
|
||||
this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
|
||||
"id '{}'", bucket, key, uploadId);
|
||||
}
|
||||
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
|
||||
"id '{}'", bucket, key, uploadId);
|
||||
}
|
||||
|
||||
public void uploadPartAsync(ByteArrayInputStream inputStream,
|
||||
private void uploadPartAsync(ByteArrayInputStream inputStream,
|
||||
int partSize) {
|
||||
final int currentPartNumber = partETagsFutures.size() + 1;
|
||||
final UploadPartRequest request =
|
||||
|
@ -346,22 +337,21 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
executorService.submit(new Callable<PartETag>() {
|
||||
@Override
|
||||
public PartETag call() throws Exception {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
|
||||
uploadId);
|
||||
}
|
||||
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
|
||||
uploadId);
|
||||
return client.uploadPart(request).getPartETag();
|
||||
}
|
||||
});
|
||||
partETagsFutures.add(partETagFuture);
|
||||
}
|
||||
|
||||
public List<PartETag> waitForAllPartUploads() throws IOException {
|
||||
private List<PartETag> waitForAllPartUploads() throws IOException {
|
||||
try {
|
||||
return Futures.allAsList(partETagsFutures).get();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted partUpload:" + ie, ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} catch (ExecutionException ee) {
|
||||
//there is no way of recovering so abort
|
||||
//cancel all partUploads
|
||||
|
@ -370,22 +360,23 @@ public class S3AFastOutputStream extends OutputStream {
|
|||
}
|
||||
//abort multipartupload
|
||||
this.abort();
|
||||
throw new IOException("Part upload failed in multi-part upload with " +
|
||||
"id '" +uploadId + "':" + ee, ee);
|
||||
throw extractException("Multi-part upload with id '" + uploadId + "'",
|
||||
key, ee);
|
||||
}
|
||||
//should not happen?
|
||||
return null;
|
||||
}
|
||||
|
||||
public void complete(List<PartETag> partETags) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
|
||||
uploadId);
|
||||
private void complete(List<PartETag> partETags) throws IOException {
|
||||
try {
|
||||
LOG.debug("Completing multi-part upload for key '{}', id '{}'",
|
||||
key, uploadId);
|
||||
client.completeMultipartUpload(
|
||||
new CompleteMultipartUploadRequest(bucket,
|
||||
key,
|
||||
uploadId,
|
||||
partETags));
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("Completing multi-part upload", key, e);
|
||||
}
|
||||
final CompleteMultipartUploadRequest completeRequest =
|
||||
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
|
||||
client.completeMultipartUpload(completeRequest);
|
||||
|
||||
}
|
||||
|
||||
public void abort() {
|
||||
|
|
|
@ -43,6 +43,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
|
|||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.S3ClientOptions;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
|
@ -78,6 +79,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -181,93 +183,122 @@ public class S3AFileSystem extends FileSystem {
|
|||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
instrumentation = new S3AInstrumentation(name);
|
||||
try {
|
||||
instrumentation = new S3AInstrumentation(name);
|
||||
|
||||
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||
workingDir = new Path("/user", System.getProperty("user.name"))
|
||||
.makeQualified(this.uri, this.getWorkingDirectory());
|
||||
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||
workingDir = new Path("/user", System.getProperty("user.name"))
|
||||
.makeQualified(this.uri, this.getWorkingDirectory());
|
||||
|
||||
bucket = name.getHost();
|
||||
bucket = name.getHost();
|
||||
|
||||
AWSCredentialsProvider credentials = getAWSCredentialsProvider(name, conf);
|
||||
AWSCredentialsProvider credentials =
|
||||
getAWSCredentialsProvider(name, conf);
|
||||
|
||||
ClientConfiguration awsConf = new ClientConfiguration();
|
||||
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
||||
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
||||
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||
DEFAULT_SECURE_CONNECTIONS);
|
||||
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
||||
DEFAULT_MAX_ERROR_RETRIES, 0));
|
||||
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
|
||||
DEFAULT_ESTABLISH_TIMEOUT, 0));
|
||||
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
|
||||
DEFAULT_SOCKET_TIMEOUT, 0));
|
||||
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
||||
if (!signerOverride.isEmpty()) {
|
||||
LOG.debug("Signer override = {}", signerOverride);
|
||||
awsConf.setSignerOverride(signerOverride);
|
||||
ClientConfiguration awsConf = new ClientConfiguration();
|
||||
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
||||
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
||||
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
||||
DEFAULT_SECURE_CONNECTIONS);
|
||||
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
||||
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
||||
DEFAULT_MAX_ERROR_RETRIES, 0));
|
||||
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
|
||||
DEFAULT_ESTABLISH_TIMEOUT, 0));
|
||||
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
|
||||
DEFAULT_SOCKET_TIMEOUT, 0));
|
||||
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
||||
if (!signerOverride.isEmpty()) {
|
||||
LOG.debug("Signer override = {}", signerOverride);
|
||||
awsConf.setSignerOverride(signerOverride);
|
||||
}
|
||||
|
||||
initProxySupport(conf, awsConf, secureConnections);
|
||||
|
||||
initUserAgent(conf, awsConf);
|
||||
|
||||
initAmazonS3Client(conf, credentials, awsConf);
|
||||
|
||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||
if (partSize < 5 * 1024 * 1024) {
|
||||
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
||||
partSize = 5 * 1024 * 1024;
|
||||
}
|
||||
|
||||
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
||||
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||
if (multiPartThreshold < 5 * 1024 * 1024) {
|
||||
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
||||
multiPartThreshold = 5 * 1024 * 1024;
|
||||
}
|
||||
//check but do not store the block size
|
||||
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
||||
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
||||
|
||||
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
|
||||
|
||||
int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
|
||||
int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
if (coreThreads == 0) {
|
||||
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
|
||||
DEFAULT_KEEPALIVE_TIME, 0);
|
||||
LinkedBlockingQueue<Runnable> workQueue =
|
||||
new LinkedBlockingQueue<>(maxThreads *
|
||||
intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
|
||||
threadPoolExecutor = new ThreadPoolExecutor(
|
||||
coreThreads,
|
||||
maxThreads,
|
||||
keepAliveTime,
|
||||
TimeUnit.SECONDS,
|
||||
workQueue,
|
||||
newDaemonThreadFactory("s3a-transfer-shared-"));
|
||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
initTransferManager();
|
||||
|
||||
initCannedAcls(conf);
|
||||
|
||||
verifyBucketExists();
|
||||
|
||||
initMultipartUploads(conf);
|
||||
|
||||
serverSideEncryptionAlgorithm =
|
||||
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("initializing ", new Path(name), e);
|
||||
}
|
||||
|
||||
initProxySupport(conf, awsConf, secureConnections);
|
||||
}
|
||||
|
||||
initUserAgent(conf, awsConf);
|
||||
|
||||
initAmazonS3Client(conf, credentials, awsConf);
|
||||
|
||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||
if (partSize < 5 * 1024 * 1024) {
|
||||
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
||||
partSize = 5 * 1024 * 1024;
|
||||
/**
|
||||
* Verify that the bucket exists. This does not check permissions,
|
||||
* not even read access.
|
||||
* @throws FileNotFoundException the bucket is absent
|
||||
* @throws IOException any other problem talking to S3
|
||||
*/
|
||||
protected void verifyBucketExists()
|
||||
throws FileNotFoundException, IOException {
|
||||
try {
|
||||
if (!s3.doesBucketExist(bucket)) {
|
||||
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
|
||||
}
|
||||
} catch (AmazonS3Exception e) {
|
||||
// this is a sign of a serious startup problem so do dump everything
|
||||
LOG.warn(stringify(e), e);
|
||||
throw translateException("doesBucketExist", bucket, e);
|
||||
} catch (AmazonServiceException e) {
|
||||
// this is a sign of a serious startup problem so do dump everything
|
||||
LOG.warn(stringify(e), e);
|
||||
throw translateException("doesBucketExist", bucket, e);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("doesBucketExist", bucket, e);
|
||||
}
|
||||
|
||||
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
||||
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||
if (multiPartThreshold < 5 * 1024 * 1024) {
|
||||
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
||||
multiPartThreshold = 5 * 1024 * 1024;
|
||||
}
|
||||
//check but do not store the block size
|
||||
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
||||
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
||||
|
||||
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
|
||||
|
||||
int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
|
||||
int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
if (coreThreads == 0) {
|
||||
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
|
||||
DEFAULT_KEEPALIVE_TIME, 0);
|
||||
LinkedBlockingQueue<Runnable> workQueue =
|
||||
new LinkedBlockingQueue<>(maxThreads *
|
||||
intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
|
||||
threadPoolExecutor = new ThreadPoolExecutor(
|
||||
coreThreads,
|
||||
maxThreads,
|
||||
keepAliveTime,
|
||||
TimeUnit.SECONDS,
|
||||
workQueue,
|
||||
newDaemonThreadFactory("s3a-transfer-shared-"));
|
||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
initTransferManager();
|
||||
|
||||
initCannedAcls(conf);
|
||||
|
||||
if (!s3.doesBucketExist(bucket)) {
|
||||
throw new FileNotFoundException("Bucket " + bucket + " does not exist");
|
||||
}
|
||||
|
||||
initMultipartUploads(conf);
|
||||
|
||||
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||
|
||||
}
|
||||
|
||||
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
|
||||
|
@ -379,7 +410,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
private void initMultipartUploads(Configuration conf) {
|
||||
private void initMultipartUploads(Configuration conf) throws IOException {
|
||||
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
||||
DEFAULT_PURGE_EXISTING_MULTIPART);
|
||||
long purgeExistingMultipartAge = longOption(conf,
|
||||
|
@ -394,10 +425,10 @@ public class S3AFileSystem extends FileSystem {
|
|||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() == 403) {
|
||||
instrumentation.errorIgnored();
|
||||
LOG.debug("Failed to abort multipart uploads against {}," +
|
||||
LOG.debug("Failed to purging multipart uploads against {}," +
|
||||
" FS may be read only", bucket, e);
|
||||
} else {
|
||||
throw e;
|
||||
throw translateException("purging multipart uploads", bucket, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -639,10 +670,28 @@ public class S3AFileSystem extends FileSystem {
|
|||
*
|
||||
* @param src path to be renamed
|
||||
* @param dst new path after rename
|
||||
* @throws IOException on failure
|
||||
* @throws IOException on IO failure
|
||||
* @return true if rename is successful
|
||||
*/
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
try {
|
||||
return innerRename(src, dst);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("rename(" + src +", " + dst + ")", src, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The inner rename operation. See {@link #rename(Path, Path)} for
|
||||
* the description of the operation.
|
||||
* @param src path to be renamed
|
||||
* @param dst new path after rename
|
||||
* @return true if rename is successful
|
||||
* @throws IOException on IO failure.
|
||||
* @throws AmazonClientException on failures inside the AWS SDK
|
||||
*/
|
||||
private boolean innerRename(Path src, Path dst) throws IOException,
|
||||
AmazonClientException {
|
||||
LOG.debug("Rename path {} to {}", src, dst);
|
||||
|
||||
String srcKey = pathToKey(src);
|
||||
|
@ -785,7 +834,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
* when set to true
|
||||
*/
|
||||
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||
boolean clearKeys) {
|
||||
boolean clearKeys) throws AmazonClientException {
|
||||
if (enableMultiObjectsDelete) {
|
||||
DeleteObjectsRequest deleteRequest
|
||||
= new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
||||
|
@ -808,7 +857,9 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
/** Delete a file.
|
||||
/**
|
||||
* Delete a Path. This operation is at least {@code O(files)}, with
|
||||
* added overheads to enumerate the path. It is also not atomic.
|
||||
*
|
||||
* @param f the path to delete.
|
||||
* @param recursive if path is a directory and set to
|
||||
|
@ -818,6 +869,26 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @throws IOException due to inability to delete a directory or file.
|
||||
*/
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
try {
|
||||
return innerDelete(f, recursive);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("delete", f, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a path. See {@link #delete(Path, boolean)}.
|
||||
*
|
||||
* @param f the path to delete.
|
||||
* @param recursive if path is a directory and set to
|
||||
* true, the directory is deleted else throws an exception. In
|
||||
* case of a file the recursive can be set to either true or false.
|
||||
* @return true if delete is successful else false.
|
||||
* @throws IOException due to inability to delete a directory or file.
|
||||
* @throws AmazonClientException on failures inside the AWS SDK
|
||||
*/
|
||||
private boolean innerDelete(Path f, boolean recursive) throws IOException,
|
||||
AmazonClientException {
|
||||
LOG.debug("Delete path {} - recursive {}", f , recursive);
|
||||
S3AFileStatus status;
|
||||
try {
|
||||
|
@ -898,7 +969,8 @@ public class S3AFileSystem extends FileSystem {
|
|||
return true;
|
||||
}
|
||||
|
||||
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
|
||||
private void createFakeDirectoryIfNecessary(Path f)
|
||||
throws IOException, AmazonClientException {
|
||||
String key = pathToKey(f);
|
||||
if (!key.isEmpty() && !exists(f)) {
|
||||
LOG.debug("Creating new fake directory at {}", f);
|
||||
|
@ -917,6 +989,25 @@ public class S3AFileSystem extends FileSystem {
|
|||
*/
|
||||
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
||||
IOException {
|
||||
try {
|
||||
return innerListStatus(f);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("listStatus", f, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List the statuses of the files/directories in the given path if the path is
|
||||
* a directory.
|
||||
*
|
||||
* @param f given path
|
||||
* @return the statuses of the files/directories in the given patch
|
||||
* @throws FileNotFoundException when the path does not exist;
|
||||
* @throws IOException due to an IO problem.
|
||||
* @throws AmazonClientException on failures inside the AWS SDK
|
||||
*/
|
||||
public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
|
||||
IOException, AmazonClientException {
|
||||
String key = pathToKey(f);
|
||||
LOG.debug("List status for path: {}", f);
|
||||
|
||||
|
@ -1008,15 +1099,42 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* Make the given file and all non-existent parents into
|
||||
* directories. Has the semantics of Unix 'mkdir -p'.
|
||||
*
|
||||
* Make the given path and all non-existent parents into
|
||||
* directories. Has the semantics of Unix @{code 'mkdir -p'}.
|
||||
* Existence of the directory hierarchy is not an error.
|
||||
* @param f path to create
|
||||
* @param path path to create
|
||||
* @param permission to apply to f
|
||||
* @return true if a directory was created
|
||||
* @throws FileAlreadyExistsException there is a file at the path specified
|
||||
* @throws IOException other IO problems
|
||||
*/
|
||||
// TODO: If we have created an empty file at /foo/bar and we then call
|
||||
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
public boolean mkdirs(Path path, FsPermission permission) throws IOException,
|
||||
FileAlreadyExistsException {
|
||||
try {
|
||||
return innerMkdirs(path, permission);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("innerMkdirs", path, e);
|
||||
}
|
||||
}
|
||||
/**
|
||||
*
|
||||
* Make the given path and all non-existent parents into
|
||||
* directories.
|
||||
* See {@link #mkdirs(Path, FsPermission)}
|
||||
* @param f path to create
|
||||
* @param permission to apply to f
|
||||
* @return true if a directory was created
|
||||
* @throws FileAlreadyExistsException there is a file at the path specified
|
||||
* @throws IOException other IO problems
|
||||
* @throws AmazonClientException on failures inside the AWS SDK
|
||||
*/
|
||||
// TODO: If we have created an empty file at /foo/bar and we then call
|
||||
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
||||
private boolean innerMkdirs(Path f, FsPermission permission)
|
||||
throws IOException, FileAlreadyExistsException, AmazonClientException {
|
||||
LOG.debug("Making directory: {}", f);
|
||||
|
||||
try {
|
||||
|
@ -1054,7 +1172,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @param f The path we want information from
|
||||
* @return a FileStatus object
|
||||
* @throws java.io.FileNotFoundException when the path does not exist;
|
||||
* IOException see specific implementation
|
||||
* @throws IOException on other problems.
|
||||
*/
|
||||
public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||
String key = pathToKey(f);
|
||||
|
@ -1078,12 +1196,10 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
printAmazonServiceException(f.toString(), e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", f, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
printAmazonClientException(f.toString(), e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", f, e);
|
||||
}
|
||||
|
||||
// Necessary?
|
||||
|
@ -1106,12 +1222,10 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
printAmazonServiceException(newKey, e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", newKey, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
printAmazonClientException(newKey, e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", newKey, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1152,12 +1266,10 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
} catch (AmazonServiceException e) {
|
||||
if (e.getStatusCode() != 404) {
|
||||
printAmazonServiceException(key, e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", key, e);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
printAmazonClientException(key, e);
|
||||
throw e;
|
||||
throw translateException("getFileStatus", key, e);
|
||||
}
|
||||
|
||||
LOG.debug("Not Found: {}", f);
|
||||
|
@ -1176,10 +1288,42 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @param overwrite whether to overwrite an existing file
|
||||
* @param src path
|
||||
* @param dst path
|
||||
* @throws IOException IO problem
|
||||
* @throws FileAlreadyExistsException the destination file exists and
|
||||
* overwrite==false
|
||||
* @throws AmazonClientException failure in the AWS SDK
|
||||
*/
|
||||
@Override
|
||||
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
||||
Path dst) throws IOException {
|
||||
try {
|
||||
innerCopyFromLocalFile(delSrc, overwrite, src, dst);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("copyFromLocalFile(" + src + ", " + dst + ")",
|
||||
src, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The src file is on the local disk. Add it to FS at
|
||||
* the given dst name.
|
||||
*
|
||||
* This version doesn't need to create a temporary file to calculate the md5.
|
||||
* Sadly this doesn't seem to be used by the shell cp :(
|
||||
*
|
||||
* delSrc indicates if the source should be removed
|
||||
* @param delSrc whether to delete the src
|
||||
* @param overwrite whether to overwrite an existing file
|
||||
* @param src path
|
||||
* @param dst path
|
||||
* @throws IOException IO problem
|
||||
* @throws FileAlreadyExistsException the destination file exists and
|
||||
* overwrite==false
|
||||
* @throws AmazonClientException failure in the AWS SDK
|
||||
*/
|
||||
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
|
||||
Path src, Path dst)
|
||||
throws IOException, FileAlreadyExistsException, AmazonClientException {
|
||||
String key = pathToKey(dst);
|
||||
|
||||
if (!overwrite && exists(dst)) {
|
||||
|
@ -1229,8 +1373,12 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the filesystem. This shuts down all transfers.
|
||||
* @throws IOException IO problem
|
||||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public synchronized void close() throws IOException {
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
|
@ -1242,49 +1390,63 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* Override getCanonicalServiceName because we don't support token in S3A.
|
||||
*/
|
||||
* Override getCanonicalServiceName because we don't support token in S3A.
|
||||
*/
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
// Does not support Token
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy a single object in the bucket via a COPY operation.
|
||||
* @param srcKey source object path
|
||||
* @param dstKey destination object path
|
||||
* @param size object size
|
||||
* @throws AmazonClientException on failures inside the AWS SDK
|
||||
* @throws InterruptedIOException the operation was interrupted
|
||||
* @throws IOException Other IO problems
|
||||
*/
|
||||
private void copyFile(String srcKey, String dstKey, long size)
|
||||
throws IOException {
|
||||
throws IOException, InterruptedIOException, AmazonClientException {
|
||||
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
|
||||
|
||||
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
||||
ObjectMetadata dstom = cloneObjectMetadata(srcom);
|
||||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
||||
}
|
||||
CopyObjectRequest copyObjectRequest =
|
||||
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
||||
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
||||
copyObjectRequest.setNewObjectMetadata(dstom);
|
||||
|
||||
ProgressListener progressListener = new ProgressListener() {
|
||||
public void progressChanged(ProgressEvent progressEvent) {
|
||||
switch (progressEvent.getEventType()) {
|
||||
case TRANSFER_PART_COMPLETED_EVENT:
|
||||
statistics.incrementWriteOps(1);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Copy copy = transfers.copy(copyObjectRequest);
|
||||
copy.addProgressListener(progressListener);
|
||||
try {
|
||||
copy.waitForCopyResult();
|
||||
statistics.incrementWriteOps(1);
|
||||
instrumentation.filesCopied(1, size);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted copying " + srcKey
|
||||
+ " to " + dstKey + ", cancelling");
|
||||
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
||||
ObjectMetadata dstom = cloneObjectMetadata(srcom);
|
||||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
||||
}
|
||||
CopyObjectRequest copyObjectRequest =
|
||||
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
||||
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
||||
copyObjectRequest.setNewObjectMetadata(dstom);
|
||||
|
||||
ProgressListener progressListener = new ProgressListener() {
|
||||
public void progressChanged(ProgressEvent progressEvent) {
|
||||
switch (progressEvent.getEventType()) {
|
||||
case TRANSFER_PART_COMPLETED_EVENT:
|
||||
statistics.incrementWriteOps(1);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Copy copy = transfers.copy(copyObjectRequest);
|
||||
copy.addProgressListener(progressListener);
|
||||
try {
|
||||
copy.waitForCopyResult();
|
||||
statistics.incrementWriteOps(1);
|
||||
instrumentation.filesCopied(1, size);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted copying " + srcKey
|
||||
+ " to " + dstKey + ", cancelling");
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")",
|
||||
srcKey, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1303,11 +1465,20 @@ public class S3AFileSystem extends FileSystem {
|
|||
return date.getTime();
|
||||
}
|
||||
|
||||
public void finishedWrite(String key) throws IOException {
|
||||
/**
|
||||
* Perform post-write actions.
|
||||
* @param key key written to
|
||||
*/
|
||||
public void finishedWrite(String key) {
|
||||
deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
|
||||
}
|
||||
|
||||
private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
|
||||
/**
|
||||
* Delete mock parent directories which are no longer needed.
|
||||
* This code swallows IO exceptions encountered
|
||||
* @param f path
|
||||
*/
|
||||
private void deleteUnnecessaryFakeDirectories(Path f) {
|
||||
while (true) {
|
||||
String key = "";
|
||||
try {
|
||||
|
@ -1323,7 +1494,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
s3.deleteObject(bucket, key + "/");
|
||||
statistics.incrementWriteOps(1);
|
||||
}
|
||||
} catch (FileNotFoundException | AmazonServiceException e) {
|
||||
} catch (IOException | AmazonClientException e) {
|
||||
LOG.debug("While deleting key {} ", key, e);
|
||||
instrumentation.errorIgnored();
|
||||
}
|
||||
|
@ -1446,28 +1617,6 @@ public class S3AFileSystem extends FileSystem {
|
|||
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
||||
}
|
||||
|
||||
private void printAmazonServiceException(String target,
|
||||
AmazonServiceException ase) {
|
||||
LOG.info("{}: caught an AmazonServiceException {}", target, ase);
|
||||
LOG.info("This means your request made it to Amazon S3," +
|
||||
" but was rejected with an error response for some reason.");
|
||||
LOG.info("Error Message: {}", ase.getMessage());
|
||||
LOG.info("HTTP Status Code: {}", ase.getStatusCode());
|
||||
LOG.info("AWS Error Code: {}", ase.getErrorCode());
|
||||
LOG.info("Error Type: {}", ase.getErrorType());
|
||||
LOG.info("Request ID: {}", ase.getRequestId());
|
||||
LOG.info("Class Name: {}", ase.getClass().getName());
|
||||
LOG.info("Exception", ase);
|
||||
}
|
||||
|
||||
private void printAmazonClientException(String target,
|
||||
AmazonClientException ace) {
|
||||
LOG.info("{}: caught an AmazonClientException {}", target, ace);
|
||||
LOG.info("This means the client encountered " +
|
||||
"a problem while trying to communicate with S3, " +
|
||||
"such as not being able to access the network.", ace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
|
@ -29,11 +30,14 @@ import org.apache.hadoop.fs.CanSetReadahead;
|
|||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* The input stream for an S3A object.
|
||||
*
|
||||
|
@ -112,7 +116,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
* @param reason reason for reopen
|
||||
* @param targetPos target position
|
||||
* @param length length requested
|
||||
* @throws IOException
|
||||
* @throws IOException on any failure to open the object
|
||||
*/
|
||||
private synchronized void reopen(String reason, long targetPos, long length)
|
||||
throws IOException {
|
||||
|
@ -126,13 +130,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
|
||||
|
||||
streamStatistics.streamOpened();
|
||||
GetObjectRequest request = new GetObjectRequest(bucket, key)
|
||||
.withRange(targetPos, requestedStreamLen);
|
||||
wrappedStream = client.getObject(request).getObjectContent();
|
||||
try {
|
||||
GetObjectRequest request = new GetObjectRequest(bucket, key)
|
||||
.withRange(targetPos, requestedStreamLen);
|
||||
wrappedStream = client.getObject(request).getObjectContent();
|
||||
|
||||
if (wrappedStream == null) {
|
||||
throw new IOException("Null IO stream from reopen of (" + reason + ") "
|
||||
+ uri);
|
||||
if (wrappedStream == null) {
|
||||
throw new IOException("Null IO stream from reopen of (" + reason + ") "
|
||||
+ uri);
|
||||
}
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("Reopen at position " + targetPos, uri, e);
|
||||
}
|
||||
|
||||
this.pos = targetPos;
|
||||
|
@ -276,10 +284,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
return -1;
|
||||
}
|
||||
|
||||
lazySeek(nextReadPos, 1);
|
||||
|
||||
int byteRead;
|
||||
try {
|
||||
lazySeek(nextReadPos, 1);
|
||||
byteRead = wrappedStream.read();
|
||||
} catch (EOFException e) {
|
||||
return -1;
|
||||
|
@ -337,11 +345,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
|
|||
return -1;
|
||||
}
|
||||
|
||||
lazySeek(nextReadPos, len);
|
||||
streamStatistics.readOperationStarted(nextReadPos, len);
|
||||
try {
|
||||
lazySeek(nextReadPos, len);
|
||||
} catch (EOFException e) {
|
||||
// the end of the file has moved
|
||||
return -1;
|
||||
}
|
||||
|
||||
int bytesRead;
|
||||
try {
|
||||
streamStatistics.readOperationStarted(nextReadPos, len);
|
||||
bytesRead = wrappedStream.read(buf, off, len);
|
||||
} catch (EOFException e) {
|
||||
throw e;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.event.ProgressEvent;
|
||||
import com.amazonaws.event.ProgressEventType;
|
||||
import com.amazonaws.event.ProgressListener;
|
||||
|
@ -40,11 +41,13 @@ import java.io.BufferedOutputStream;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
|
||||
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* Output stream to save data to S3.
|
||||
|
@ -92,13 +95,15 @@ public class S3AOutputStream extends OutputStream {
|
|||
lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
|
||||
}
|
||||
|
||||
backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
|
||||
backupFile = lDirAlloc.createTmpFileForWrite("output-",
|
||||
LocalDirAllocator.SIZE_UNKNOWN, conf);
|
||||
closed = false;
|
||||
|
||||
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
|
||||
key, backupFile);
|
||||
|
||||
this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
|
||||
this.backupStream = new BufferedOutputStream(
|
||||
new FileOutputStream(backupFile));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,7 +128,8 @@ public class S3AOutputStream extends OutputStream {
|
|||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
||||
}
|
||||
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile);
|
||||
PutObjectRequest putObjectRequest =
|
||||
new PutObjectRequest(bucket, key, backupFile);
|
||||
putObjectRequest.setCannedAcl(cannedACL);
|
||||
putObjectRequest.setMetadata(om);
|
||||
|
||||
|
@ -135,18 +141,20 @@ public class S3AOutputStream extends OutputStream {
|
|||
|
||||
upload.waitForUploadResult();
|
||||
|
||||
long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred();
|
||||
long delta = upload.getProgress().getBytesTransferred() -
|
||||
listener.getLastBytesTransferred();
|
||||
if (statistics != null && delta != 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
|
||||
}
|
||||
LOG.debug("S3A write delta changed after finished: {} bytes", delta);
|
||||
statistics.incrementBytesWritten(delta);
|
||||
}
|
||||
|
||||
// This will delete unnecessary fake parent directories
|
||||
fs.finishedWrite(key);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
throw (InterruptedIOException) new InterruptedIOException(e.toString())
|
||||
.initCause(e);
|
||||
} catch (AmazonClientException e) {
|
||||
throw translateException("saving output", key , e);
|
||||
} finally {
|
||||
if (!backupFile.delete()) {
|
||||
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
|
||||
|
@ -154,9 +162,7 @@ public class S3AOutputStream extends OutputStream {
|
|||
super.close();
|
||||
closed = true;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("OutputStream for key '" + key + "' upload complete");
|
||||
}
|
||||
LOG.debug("OutputStream for key '{}' upload complete", key);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* Utility methods for S3A code.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class S3AUtils {
|
||||
|
||||
private S3AUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate an exception raised in an operation into an IOException.
|
||||
* The specific type of IOException depends on the class of
|
||||
* {@link AmazonClientException} passed in, and any status codes included
|
||||
* in the operation. That is: HTTP error codes are examined and can be
|
||||
* used to build a more specific response.
|
||||
* @param operation operation
|
||||
* @param path path operated on (must not be null)
|
||||
* @param exception amazon exception raised
|
||||
* @return an IOE which wraps the caught exception.
|
||||
*/
|
||||
public static IOException translateException(String operation,
|
||||
Path path,
|
||||
AmazonClientException exception) {
|
||||
return translateException(operation, path.toString(), exception);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate an exception raised in an operation into an IOException.
|
||||
* The specific type of IOException depends on the class of
|
||||
* {@link AmazonClientException} passed in, and any status codes included
|
||||
* in the operation. That is: HTTP error codes are examined and can be
|
||||
* used to build a more specific response.
|
||||
* @param operation operation
|
||||
* @param path path operated on (may be null)
|
||||
* @param exception amazon exception raised
|
||||
* @return an IOE which wraps the caught exception.
|
||||
*/
|
||||
@SuppressWarnings("ThrowableInstanceNeverThrown")
|
||||
public static IOException translateException(String operation,
|
||||
String path,
|
||||
AmazonClientException exception) {
|
||||
String message = String.format("%s%s: %s",
|
||||
operation,
|
||||
path != null ? (" on " + path) : "",
|
||||
exception);
|
||||
if (!(exception instanceof AmazonServiceException)) {
|
||||
return new AWSClientIOException(message, exception);
|
||||
} else {
|
||||
|
||||
IOException ioe;
|
||||
AmazonServiceException ase = (AmazonServiceException) exception;
|
||||
// this exception is non-null if the service exception is an s3 one
|
||||
AmazonS3Exception s3Exception = ase instanceof AmazonS3Exception
|
||||
? (AmazonS3Exception) ase
|
||||
: null;
|
||||
int status = ase.getStatusCode();
|
||||
switch (status) {
|
||||
|
||||
// permissions
|
||||
case 401:
|
||||
case 403:
|
||||
ioe = new AccessDeniedException(path, null, message);
|
||||
ioe.initCause(ase);
|
||||
break;
|
||||
|
||||
// the object isn't there
|
||||
case 404:
|
||||
case 410:
|
||||
ioe = new FileNotFoundException(message);
|
||||
ioe.initCause(ase);
|
||||
break;
|
||||
|
||||
// out of range. This may happen if an object is overwritten with
|
||||
// a shorter one while it is being read.
|
||||
case 416:
|
||||
ioe = new EOFException(message);
|
||||
break;
|
||||
|
||||
default:
|
||||
// no specific exit code. Choose an IOE subclass based on the class
|
||||
// of the caught exception
|
||||
ioe = s3Exception != null
|
||||
? new AWSS3IOException(message, s3Exception)
|
||||
: new AWSServiceIOException(message, ase);
|
||||
break;
|
||||
}
|
||||
return ioe;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract an exception from a failed future, and convert to an IOE.
|
||||
* @param operation operation which failed
|
||||
* @param path path operated on (may be null)
|
||||
* @param ee execution exception
|
||||
* @return an IOE which can be thrown
|
||||
*/
|
||||
public static IOException extractException(String operation,
|
||||
String path,
|
||||
ExecutionException ee) {
|
||||
IOException ioe;
|
||||
Throwable cause = ee.getCause();
|
||||
if (cause instanceof AmazonClientException) {
|
||||
ioe = translateException(operation, path, (AmazonClientException) cause);
|
||||
} else if (cause instanceof IOException) {
|
||||
ioe = (IOException) cause;
|
||||
} else {
|
||||
ioe = new IOException(operation + " failed: " + cause, cause);
|
||||
}
|
||||
return ioe;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get low level details of an amazon exception for logging; multi-line.
|
||||
* @param e exception
|
||||
* @return string details
|
||||
*/
|
||||
public static String stringify(AmazonServiceException e) {
|
||||
StringBuilder builder = new StringBuilder(
|
||||
String.format("%s: %s error %d: %s; %s%s%n",
|
||||
e.getErrorType(),
|
||||
e.getServiceName(),
|
||||
e.getStatusCode(),
|
||||
e.getErrorCode(),
|
||||
e.getErrorMessage(),
|
||||
(e.isRetryable() ? " (retryable)": "")
|
||||
));
|
||||
String rawResponseContent = e.getRawResponseContent();
|
||||
if (rawResponseContent != null) {
|
||||
builder.append(rawResponseContent);
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get low level details of an amazon exception for logging; multi-line.
|
||||
* @param e exception
|
||||
* @return string details
|
||||
*/
|
||||
public static String stringify(AmazonS3Exception e) {
|
||||
// get the low level details of an exception,
|
||||
StringBuilder builder = new StringBuilder(
|
||||
stringify((AmazonServiceException) e));
|
||||
Map<String, String> details = e.getAdditionalDetails();
|
||||
if (details != null) {
|
||||
builder.append('\n');
|
||||
for (Map.Entry<String, String> d : details.entrySet()) {
|
||||
builder.append(d.getKey()).append('=')
|
||||
.append(d.getValue()).append('\n');
|
||||
}
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* S3A Filesystem. Except for the exceptions, it should
|
||||
* all be hidden as implementation details.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -25,6 +25,7 @@ import org.junit.internal.AssumptionViolatedException;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
public class S3ATestUtils {
|
||||
|
||||
|
@ -72,4 +73,70 @@ public class S3ATestUtils {
|
|||
FileContext fc = FileContext.getFileContext(testURI,conf);
|
||||
return fc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
|
||||
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
|
||||
* @param timeout timeout
|
||||
* @param callback callback to invoke
|
||||
* @throws FailFastException any fast-failure
|
||||
* @throws Exception the exception which caused the iterator to fail
|
||||
*/
|
||||
public static void eventually(int timeout, Callable<Void> callback)
|
||||
throws Exception {
|
||||
Exception lastException;
|
||||
long endtime = System.currentTimeMillis() + timeout;
|
||||
do {
|
||||
try {
|
||||
callback.call();
|
||||
return;
|
||||
} catch (FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
} while (endtime > System.currentTimeMillis());
|
||||
throw lastException;
|
||||
}
|
||||
|
||||
/**
|
||||
* The exception to raise so as to exit fast from
|
||||
* {@link #eventually(int, Callable)}.
|
||||
*/
|
||||
public static class FailFastException extends Exception {
|
||||
public FailFastException() {
|
||||
}
|
||||
|
||||
public FailFastException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FailFastException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public FailFastException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the class of an exception. If it is not as expected, rethrow it.
|
||||
* Comparison is on the exact class, not subclass-of inference as
|
||||
* offered by {@code instanceof}.
|
||||
* @param clazz the expected exception class
|
||||
* @param ex the exception caught
|
||||
* @return the exception, if it is of the expected class
|
||||
* @throws Exception the exception passed in.
|
||||
*/
|
||||
public static Exception verifyExceptionClass(Class clazz,
|
||||
Exception ex)
|
||||
throws Exception {
|
||||
if (!(ex.getClass().equals(clazz))) {
|
||||
throw ex;
|
||||
}
|
||||
return ex;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,8 +23,10 @@ import static org.junit.Assert.*;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
|
@ -32,7 +34,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
|
|||
import com.amazonaws.auth.AWSCredentialsProviderChain;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -49,7 +50,7 @@ public class TestS3AAWSCredentialsProvider {
|
|||
Configuration conf = new Configuration();
|
||||
conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class");
|
||||
try {
|
||||
S3ATestUtils.createTestFileSystem(conf);
|
||||
createFailingFS(conf);
|
||||
} catch (IOException e) {
|
||||
if (!(e.getCause() instanceof ClassNotFoundException)) {
|
||||
LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e);
|
||||
|
@ -58,6 +59,18 @@ public class TestS3AAWSCredentialsProvider {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a filesystem, expect it to fail by raising an IOException.
|
||||
* Raises an assertion exception if in fact the FS does get instantiated.
|
||||
* @param conf configuration
|
||||
* @throws IOException an expected exception.
|
||||
*/
|
||||
private void createFailingFS(Configuration conf) throws IOException {
|
||||
S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
fs.listStatus(new Path("/"));
|
||||
fail("Expected exception - got " + fs);
|
||||
}
|
||||
|
||||
static class BadCredentialsProvider implements AWSCredentialsProvider {
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -79,12 +92,9 @@ public class TestS3AAWSCredentialsProvider {
|
|||
Configuration conf = new Configuration();
|
||||
conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName());
|
||||
try {
|
||||
S3ATestUtils.createTestFileSystem(conf);
|
||||
} catch (AmazonS3Exception e) {
|
||||
if (e.getStatusCode() != 403) {
|
||||
LOG.error("Unexpected status code: {}", e.getStatusCode(), e);
|
||||
throw e;
|
||||
}
|
||||
createFailingFS(conf);
|
||||
} catch (AccessDeniedException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,11 +21,9 @@ package org.apache.hadoop.fs.s3a;
|
|||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.S3ClientOptions;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.reflect.FieldUtils;
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
@ -43,7 +41,6 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||
|
@ -122,7 +119,7 @@ public class TestS3AConfiguration {
|
|||
try {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
fail("Expected a connection error for proxy server at " + proxy);
|
||||
} catch (AmazonClientException e) {
|
||||
} catch (AWSClientIOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
@ -155,14 +152,14 @@ public class TestS3AConfiguration {
|
|||
try {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
fail("Expected a connection error for proxy server");
|
||||
} catch (AmazonClientException e) {
|
||||
} catch (AWSClientIOException e) {
|
||||
// expected
|
||||
}
|
||||
conf.set(Constants.SECURE_CONNECTIONS, "false");
|
||||
try {
|
||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||
fail("Expected a connection error for proxy server");
|
||||
} catch (AmazonClientException e) {
|
||||
} catch (AWSClientIOException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
@ -376,7 +373,7 @@ public class TestS3AConfiguration {
|
|||
clientOptions.isPathStyleAccess());
|
||||
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
|
||||
ContractTestUtils.writeAndRead(fs, new Path("/path/style/access/testFile"), file, file.length, conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
|
||||
} catch (final AmazonS3Exception e) {
|
||||
} catch (final AWSS3IOException e) {
|
||||
LOG.error("Caught exception: ", e);
|
||||
// Catch/pass standard path style access behaviour when live bucket
|
||||
// isn't in the same region as the s3 client default. See
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.InvalidRequestException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* Test S3A Failure translation, including a functional test
|
||||
* generating errors during stream IO.
|
||||
*/
|
||||
public class TestS3AFailureHandling extends AbstractFSContractTestBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestS3AFailureHandling.class);
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFileChanged() throws Throwable {
|
||||
describe("overwrite a file with a shorter one during a read, seek");
|
||||
final int fullLength = 8192;
|
||||
final byte[] fullDataset = dataset(fullLength, 'a', 32);
|
||||
final int shortLen = 4096;
|
||||
final byte[] shortDataset = dataset(shortLen, 'A', 32);
|
||||
final FileSystem fs = getFileSystem();
|
||||
final Path testpath = path("readFileToChange.txt");
|
||||
// initial write
|
||||
writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
|
||||
try(FSDataInputStream instream = fs.open(testpath)) {
|
||||
instream.seek(fullLength - 16);
|
||||
assertTrue("no data to read", instream.read() >= 0);
|
||||
// overwrite
|
||||
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
|
||||
// here the file length is less. Probe the file to see if this is true,
|
||||
// with a spin and wait
|
||||
eventually(30 *1000, new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
// here length is shorter. Assuming it has propagated to all replicas,
|
||||
// the position of the input stream is now beyond the EOF.
|
||||
// An attempt to seek backwards to a position greater than the
|
||||
// short length will raise an exception from AWS S3, which must be
|
||||
// translated into an EOF
|
||||
|
||||
instream.seek(shortLen + 1024);
|
||||
int c = instream.read();
|
||||
assertIsEOF("read()", c);
|
||||
|
||||
byte[] buf = new byte[256];
|
||||
|
||||
assertIsEOF("read(buffer)", instream.read(buf));
|
||||
assertIsEOF("read(offset)",
|
||||
instream.read(instream.getPos(), buf, 0, buf.length));
|
||||
|
||||
// now do a block read fully, again, backwards from the current pos
|
||||
try {
|
||||
instream.readFully(shortLen + 512, buf);
|
||||
fail("Expected readFully to fail");
|
||||
} catch (EOFException expected) {
|
||||
LOG.debug("Expected EOF: ", expected);
|
||||
}
|
||||
|
||||
assertIsEOF("read(offset)",
|
||||
instream.read(shortLen + 510, buf, 0, buf.length));
|
||||
|
||||
// seek somewhere useful
|
||||
instream.seek(shortLen - 256);
|
||||
|
||||
// delete the file. Reads must fail
|
||||
fs.delete(testpath, false);
|
||||
|
||||
try {
|
||||
int r = instream.read();
|
||||
fail("Expected an exception, got " + r);
|
||||
} catch (FileNotFoundException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try {
|
||||
instream.readFully(2048, buf);
|
||||
fail("Expected readFully to fail");
|
||||
} catch (FileNotFoundException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a read operation returned an EOF value.
|
||||
* @param operation specific operation
|
||||
* @param readResult result
|
||||
*/
|
||||
private void assertIsEOF(String operation, int readResult) {
|
||||
assertEquals("Expected EOF from "+ operation
|
||||
+ "; got char " + (char) readResult, -1, readResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test404isNotFound() throws Throwable {
|
||||
verifyTranslated(FileNotFoundException.class, createS3Exception(404));
|
||||
}
|
||||
|
||||
protected Exception verifyTranslated(Class clazz,
|
||||
AmazonClientException exception) throws Exception {
|
||||
return verifyExceptionClass(clazz,
|
||||
translateException("test", "/", exception));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test401isNotPermittedFound() throws Throwable {
|
||||
verifyTranslated(AccessDeniedException.class,
|
||||
createS3Exception(401));
|
||||
}
|
||||
|
||||
protected AmazonS3Exception createS3Exception(int code) {
|
||||
AmazonS3Exception source = new AmazonS3Exception("");
|
||||
source.setStatusCode(code);
|
||||
return source;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenericS3Exception() throws Throwable {
|
||||
// S3 exception of no known type
|
||||
AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
|
||||
AWSS3IOException.class,
|
||||
createS3Exception(451));
|
||||
assertEquals(451, ex.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenericServiceS3Exception() throws Throwable {
|
||||
// service exception of no known type
|
||||
AmazonServiceException ase = new AmazonServiceException("unwind");
|
||||
ase.setStatusCode(500);
|
||||
AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
|
||||
AWSServiceIOException.class,
|
||||
ase);
|
||||
assertEquals(500, ex.getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenericClientException() throws Throwable {
|
||||
// Generic Amazon exception
|
||||
verifyTranslated(AWSClientIOException.class,
|
||||
new AmazonClientException(""));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue