HADOOP-13761. S3Guard: implement retries for DDB failures and throttling; translate exceptions.

Contributed by Aaron Fabbri.
This commit is contained in:
Steve Loughran 2018-03-05 14:06:20 +00:00
parent e8c5be63f0
commit 8110d6a0d5
21 changed files with 1131 additions and 253 deletions

View File

@ -28,4 +28,40 @@
<Method name="s3Exists" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
</Match>
<!--
This extends the serializable S3Object, so findbug checks
serializability. It is never serialized however, so its
warnings are false positives.
-->
<Match>
<Class name="org.apache.hadoop.fs.s3a.InconsistentS3Object" />
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.s3a.InconsistentS3Object" />
<Bug pattern="SE_NO_SERIALVERSIONID" />
</Match>
<!--
findbugs gets confused by lambda expressions in synchronized methods
and considers references to fields to be unsynchronized.
As you can't disable the methods individually, we have to disable
them for the entire class.
-->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AInputStream"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<!--
findbugs reporting RV ignored. Not true.
"Return value of S3AReadOpContext.getReadInvoker() ignored,
but method has no side effect"
-->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AInputStream"/>
<Method name="reopen"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Simple object which stores current failure injection settings.
* "Delaying a key" can mean:
* - Removing it from the S3 client's listings while delay is in effect.
* - Causing input stream reads to fail.
* - Causing the S3 side of getFileStatus(), i.e.
* AmazonS3#getObjectMetadata(), to throw FileNotFound.
*/
public class FailureInjectionPolicy {
/**
* Keys containing this substring will be subject to delayed visibility.
*/
public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
/**
* How many seconds affected keys will have delayed visibility.
* This should probably be a config value.
*/
public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
/** Special config value since we can't store empty strings in XML. */
public static final String MATCH_ALL_KEYS = "*";
private static final Logger LOG =
LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
/** Empty string matches all keys. */
private String delayKeySubstring;
/** Probability to delay visibility of a matching key. */
private float delayKeyProbability;
/** Time in milliseconds to delay visibility of newly modified object. */
private long delayKeyMsec;
/**
* Probability of throttling a request.
*/
private float throttleProbability;
/**
* limit for failures before operations succeed; if 0 then "no limit".
*/
private int failureLimit = 0;
public FailureInjectionPolicy(Configuration conf) {
this.delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
DEFAULT_DELAY_KEY_SUBSTRING);
// "" is a substring of all strings, use it to match all keys.
if (this.delayKeySubstring.equals(MATCH_ALL_KEYS)) {
this.delayKeySubstring = "";
}
this.delayKeyProbability = validProbability(
conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
DEFAULT_DELAY_KEY_PROBABILITY));
this.delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
DEFAULT_DELAY_KEY_MSEC);
this.setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY,
0.0f));
}
public String getDelayKeySubstring() {
return delayKeySubstring;
}
public float getDelayKeyProbability() {
return delayKeyProbability;
}
public long getDelayKeyMsec() {
return delayKeyMsec;
}
public float getThrottleProbability() {
return throttleProbability;
}
public int getFailureLimit() {
return failureLimit;
}
public void setFailureLimit(int failureLimit) {
this.failureLimit = failureLimit;
}
/**
* Set the probability of throttling a request.
* @param throttleProbability the probability of a request being throttled.
*/
public void setThrottleProbability(float throttleProbability) {
this.throttleProbability = validProbability(throttleProbability);
}
public static boolean trueWithProbability(float p) {
return Math.random() < p;
}
/**
* Should we delay listing visibility for this key?
* @param key key which is being put
* @return true if we should delay
*/
public boolean shouldDelay(String key) {
float p = getDelayKeyProbability();
boolean delay = key.contains(getDelayKeySubstring());
delay = delay && trueWithProbability(p);
LOG.debug("{}, p={} -> {}", key, p, delay);
return delay;
}
@Override
public String toString() {
return String.format("FailureInjectionPolicy:" +
" %s msec delay, substring %s, delay probability %s;" +
" throttle probability %s" + "; failure limit %d",
delayKeyMsec, delayKeySubstring, delayKeyProbability,
throttleProbability, failureLimit);
}
/**
* Validate a probability option.
* @param p probability
* @return the probability, if valid
* @throws IllegalArgumentException if the probability is out of range.
*/
private static float validProbability(float p) {
Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
"Probability out of range 0 to 1 %s", p);
return p;
}
}

View File

@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
@ -48,6 +49,7 @@ import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
@ -60,8 +62,6 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
* inconsistency and/or errors. Used for testing S3Guard.
@ -71,49 +71,16 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
@InterfaceStability.Unstable
public class InconsistentAmazonS3Client extends AmazonS3Client {
/**
* Keys containing this substring will be subject to delayed visibility.
*/
public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
/**
* How many seconds affected keys will be delayed from appearing in listing.
* This should probably be a config value.
*/
public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
/** Special config value since we can't store empty strings in XML. */
public static final String MATCH_ALL_KEYS = "*";
private static final Logger LOG =
LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
/** Empty string matches all keys. */
private String delayKeySubstring;
/** Probability to delay visibility of a matching key. */
private float delayKeyProbability;
/** Time in milliseconds to delay visibility of newly modified object. */
private long delayKeyMsec;
/**
* Probability of throttling a request.
*/
private float throttleProbability;
private FailureInjectionPolicy policy;
/**
* Counter of failures since last reset.
*/
private final AtomicLong failureCounter = new AtomicLong(0);
/**
* limit for failures before operations succeed; if 0 then "no limit".
*/
private int failureLimit = 0;
/**
* Composite of data we need to track about recently deleted objects:
* when it was deleted (same was with recently put objects) and the object
@ -150,36 +117,42 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration clientConfiguration, Configuration conf) {
super(credentials, clientConfiguration);
setupConfig(conf);
policy = new FailureInjectionPolicy(conf);
}
protected void setupConfig(Configuration conf) {
delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
DEFAULT_DELAY_KEY_SUBSTRING);
// "" is a substring of all strings, use it to match all keys.
if (delayKeySubstring.equals(MATCH_ALL_KEYS)) {
delayKeySubstring = "";
}
delayKeyProbability = validProbability(
conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
DEFAULT_DELAY_KEY_PROBABILITY));
delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
DEFAULT_DELAY_KEY_MSEC);
setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY,
0.0f));
LOG.info("{}", this);
/**
* Clear any accumulated inconsistency state. Used by tests to make paths
* visible again.
* @param fs S3AFileSystem under test
* @throws Exception on failure
*/
public static void clearInconsistency(S3AFileSystem fs) throws Exception {
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
ic.clearInconsistency();
}
/**
* A way for tests to patch in a different fault injection policy at runtime.
* @param fs filesystem under test
*
*/
public static void setFailureInjectionPolicy(S3AFileSystem fs,
FailureInjectionPolicy policy) throws Exception {
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
ic.replacePolicy(policy);
}
private void replacePolicy(FailureInjectionPolicy pol) {
this.policy = pol;
}
@Override
public String toString() {
return String.format(
"Inconsistent S3 Client with"
+ " %s msec delay, substring %s, delay probability %s;"
+ " throttle probability %s"
+ "; failure limit %d, failure count %d",
delayKeyMsec, delayKeySubstring, delayKeyProbability,
throttleProbability, failureLimit, failureCounter.get());
return String.format("Inconsistent S3 Client: %s; failure count %d",
policy, failureCounter.get());
}
/**
@ -470,7 +443,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
return false;
}
long currentTime = System.currentTimeMillis();
long deadline = enqueueTime + delayKeyMsec;
long deadline = enqueueTime + policy.getDelayKeyMsec();
if (currentTime >= deadline) {
delayedDeletes.remove(key);
LOG.debug("no longer delaying {}", key);
@ -482,7 +455,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
}
private void registerDeleteObject(String key, String bucket) {
if (shouldDelay(key)) {
if (policy.shouldDelay(key)) {
// Record summary so we can add it back for some time post-deletion
ListObjectsRequest request = new ListObjectsRequest()
.withBucketName(bucket)
@ -498,28 +471,11 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
private void registerPutObject(PutObjectRequest req) {
String key = req.getKey();
if (shouldDelay(key)) {
if (policy.shouldDelay(key)) {
enqueueDelayedPut(key);
}
}
/**
* Should we delay listing visibility for this key?
* @param key key which is being put
* @return true if we should delay
*/
private boolean shouldDelay(String key) {
boolean delay = key.contains(delayKeySubstring);
delay = delay && trueWithProbability(delayKeyProbability);
LOG.debug("{} -> {}", key, delay);
return delay;
}
private boolean trueWithProbability(float p) {
return Math.random() < p;
}
/**
* Record this key as something that should not become visible in
* listObject replies for a while, to simulate eventual list consistency.
@ -561,20 +517,8 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
return super.listMultipartUploads(listMultipartUploadsRequest);
}
public float getDelayKeyProbability() {
return delayKeyProbability;
}
public long getDelayKeyMsec() {
return delayKeyMsec;
}
/**
* Get the probability of the request being throttled.
* @return a value 0 - 1.0f.
*/
public float getThrottleProbability() {
return throttleProbability;
return policy.getDelayKeyMsec();
}
/**
@ -582,36 +526,28 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
* @param throttleProbability the probability of a request being throttled.
*/
public void setThrottleProbability(float throttleProbability) {
this.throttleProbability = validProbability(throttleProbability);
}
/**
* Validate a probability option.
* @param p probability
* @return the probability, if valid
* @throws IllegalArgumentException if the probability is out of range.
*/
private float validProbability(float p) {
Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
"Probability out of range 0 to 1 %s", p);
return p;
policy.setThrottleProbability(throttleProbability);
}
/**
* Conditionally fail the operation.
* @param errorMsg description of failure
* @param statusCode http status code for error
* @throws AmazonClientException if the client chooses to fail
* the request.
*/
private void maybeFail() throws AmazonClientException {
private void maybeFail(String errorMsg, int statusCode)
throws AmazonClientException {
// code structure here is to line up for more failures later
AmazonServiceException ex = null;
if (trueWithProbability(throttleProbability)) {
if (policy.trueWithProbability(policy.getThrottleProbability())) {
// throttle the request
ex = new AmazonServiceException("throttled"
ex = new AmazonServiceException(errorMsg
+ " count = " + (failureCounter.get() + 1), null);
ex.setStatusCode(503);
ex.setStatusCode(statusCode);
}
int failureLimit = policy.getFailureLimit();
if (ex != null) {
long count = failureCounter.incrementAndGet();
if (failureLimit == 0
@ -621,16 +557,37 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
}
}
private void maybeFail() {
maybeFail("throttled", 503);
}
/**
* Set the limit on failures before all operations pass through.
* This resets the failure count.
* @param limit limit; "0" means "no limit"
*/
public void setFailureLimit(int limit) {
this.failureLimit = limit;
policy.setFailureLimit(limit);
failureCounter.set(0);
}
@Override
public S3Object getObject(GetObjectRequest var1) throws SdkClientException,
AmazonServiceException {
maybeFail("file not found", 404);
S3Object o = super.getObject(var1);
LOG.debug("Wrapping in InconsistentS3Object for key {}", var1.getKey());
return new InconsistentS3Object(o, policy);
}
@Override
public S3Object getObject(String bucketName, String key)
throws SdkClientException, AmazonServiceException {
S3Object o = super.getObject(bucketName, key);
LOG.debug("Wrapping in InconsistentS3Object for key {}", key);
return new InconsistentS3Object(o, policy);
}
/** Since ObjectListing is immutable, we just override it with wrapper. */
@SuppressWarnings("serial")
private static class CustomObjectListing extends ObjectListing {

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.internal.AmazonS3ExceptionBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper around S3Object so we can do failure injection on
* getObjectContent() and S3ObjectInputStream.
* See also {@link InconsistentAmazonS3Client}.
*/
@SuppressWarnings({"NonSerializableFieldInSerializableClass", "serial"})
public class InconsistentS3Object extends S3Object {
// This should be configurable, probably.
public static final int MAX_READ_FAILURES = 100;
private static int readFailureCounter = 0;
private transient S3Object wrapped;
private transient FailureInjectionPolicy policy;
private final static transient Logger LOG = LoggerFactory.getLogger(
InconsistentS3Object.class);
public InconsistentS3Object(S3Object wrapped, FailureInjectionPolicy policy) {
this.wrapped = wrapped;
this.policy = policy;
}
@Override
public S3ObjectInputStream getObjectContent() {
return new InconsistentS3InputStream(wrapped.getObjectContent());
}
@Override
public String toString() {
return "InconsistentS3Object wrapping: " + wrapped.toString();
}
@Override
public ObjectMetadata getObjectMetadata() {
return wrapped.getObjectMetadata();
}
@Override
public void setObjectMetadata(ObjectMetadata metadata) {
wrapped.setObjectMetadata(metadata);
}
@Override
public void setObjectContent(S3ObjectInputStream objectContent) {
wrapped.setObjectContent(objectContent);
}
@Override
public void setObjectContent(InputStream objectContent) {
wrapped.setObjectContent(objectContent);
}
@Override
public String getBucketName() {
return wrapped.getBucketName();
}
@Override
public void setBucketName(String bucketName) {
wrapped.setBucketName(bucketName);
}
@Override
public String getKey() {
return wrapped.getKey();
}
@Override
public void setKey(String key) {
wrapped.setKey(key);
}
@Override
public String getRedirectLocation() {
return wrapped.getRedirectLocation();
}
@Override
public void setRedirectLocation(String redirectLocation) {
wrapped.setRedirectLocation(redirectLocation);
}
@Override
public Integer getTaggingCount() {
return wrapped.getTaggingCount();
}
@Override
public void setTaggingCount(Integer taggingCount) {
wrapped.setTaggingCount(taggingCount);
}
@Override
public void close() throws IOException {
wrapped.close();
}
@Override
public boolean isRequesterCharged() {
return wrapped.isRequesterCharged();
}
@Override
public void setRequesterCharged(boolean isRequesterCharged) {
wrapped.setRequesterCharged(isRequesterCharged);
}
private AmazonS3Exception mockException(String msg, int httpResponse) {
AmazonS3ExceptionBuilder builder = new AmazonS3ExceptionBuilder();
builder.setErrorMessage(msg);
builder.setStatusCode(httpResponse); // this is the important part
builder.setErrorCode(String.valueOf(httpResponse));
return builder.build();
}
/**
* Insert a failiure injection point for a read call.
* @throw IOException, as codepath is on InputStream, not other SDK call.
*/
private void readFailpoint(int off, int len) throws IOException {
if (shouldInjectFailure(getKey())) {
String error = String.format(
"read(b, %d, %d) on key %s failed: injecting error %d/%d" +
" for test.", off, len, getKey(), readFailureCounter,
MAX_READ_FAILURES);
throw new FileNotFoundException(error);
}
}
/**
* Insert a failiure injection point for an InputStream skip() call.
* @throw IOException, as codepath is on InputStream, not other SDK call.
*/
private void skipFailpoint(long len) throws IOException {
if (shouldInjectFailure(getKey())) {
String error = String.format(
"skip(%d) on key %s failed: injecting error %d/%d for test.",
len, getKey(), readFailureCounter, MAX_READ_FAILURES);
throw new FileNotFoundException(error);
}
}
private boolean shouldInjectFailure(String key) {
if (policy.shouldDelay(key) &&
readFailureCounter < MAX_READ_FAILURES) {
readFailureCounter++;
return true;
}
return false;
}
/**
* Wraps S3ObjectInputStream and implements failure injection.
*/
protected class InconsistentS3InputStream extends S3ObjectInputStream {
private S3ObjectInputStream wrapped;
public InconsistentS3InputStream(S3ObjectInputStream wrapped) {
// seems awkward to have the stream wrap itself.
super(wrapped, wrapped.getHttpRequest());
this.wrapped = wrapped;
}
@Override
public void abort() {
wrapped.abort();
}
@Override
public int available() throws IOException {
return wrapped.available();
}
@Override
public void close() throws IOException {
wrapped.close();
}
@Override
public long skip(long n) throws IOException {
skipFailpoint(n);
return wrapped.skip(n);
}
@Override
public int read() throws IOException {
LOG.debug("read() for key {}", getKey());
readFailpoint(0, 1);
return wrapped.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
LOG.debug("read(b, {}, {}) for key {}", off, len, getKey());
readFailpoint(off, len);
return wrapped.read(b, off, len);
}
}
}

View File

@ -310,6 +310,9 @@ public class Invoker {
boolean shouldRetry;
do {
try {
if (retryCount > 0) {
LOG.debug("retry #{}", retryCount);
}
// execute the operation, returning if successful
return operation.execute();
} catch (IOException | SdkBaseException e) {
@ -327,8 +330,6 @@ public class Invoker {
(SdkBaseException)caught);
}
int attempts = retryCount + 1;
try {
// decide action base on operation, invocation count, etc
retryAction = retryPolicy.shouldRetry(translated, retryCount, 0,

View File

@ -166,6 +166,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
// APIs on an uninitialized filesystem.
private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
Invoker.LOG_EVENT);
// Only used for very specific code paths which behave differently for
// S3Guard. Retries FileNotFound, so be careful if you use this.
private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
Invoker.LOG_EVENT);
private final Retried onRetry = this::operationRetried;
private String bucket;
private int maxKeys;
@ -251,6 +255,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(name);
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()),
onRetry);
writeHelper = new WriteOperationHelper(this, getConf());
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
@ -697,18 +703,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
return new FSDataInputStream(
new S3AInputStream(new S3ObjectAttributes(
bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
getServerSideEncryptionKey(bucket, getConf())),
fileStatus.getLen(),
s3,
new S3AInputStream(new S3AReadOpContext(hasMetadataStore(),
invoker,
s3guardInvoker,
statistics,
instrumentation,
fileStatus),
new S3ObjectAttributes(bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
getServerSideEncryptionKey(bucket, getConf())),
fileStatus.getLen(),
s3,
readAhead,
inputPolicy,
invoker));
inputPolicy));
}
/**

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,10 +71,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
private volatile boolean closed;
private S3ObjectInputStream wrappedStream;
private final FileSystem.Statistics stats;
private final S3AReadOpContext context;
private final AmazonS3 client;
private final String bucket;
private final String key;
private final String pathStr;
private final long contentLength;
private final String uri;
private static final Logger LOG =
@ -85,7 +85,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private String serverSideEncryptionKey;
private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
private final Invoker invoker;
/**
* This is the actual position within the object, used by
@ -108,40 +107,33 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* Create the stream.
* This does not attempt to open it; that is only done on the first
* actual read() operation.
* @param ctx operation context
* @param s3Attributes object attributes from a HEAD request
* @param contentLength length of content
* @param client S3 client to use
* @param stats statistics to update
* @param instrumentation instrumentation to update
* @param readahead readahead bytes
* @param inputPolicy IO policy
* @param invoker preconfigured invoker
*/
public S3AInputStream(S3ObjectAttributes s3Attributes,
long contentLength,
AmazonS3 client,
FileSystem.Statistics stats,
S3AInstrumentation instrumentation,
long readahead,
S3AInputPolicy inputPolicy,
Invoker invoker) {
public S3AInputStream(S3AReadOpContext ctx, S3ObjectAttributes s3Attributes,
long contentLength, AmazonS3 client, long readahead,
S3AInputPolicy inputPolicy) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
Preconditions.checkArgument(contentLength >= 0, "Negative content length");
this.context = ctx;
this.bucket = s3Attributes.getBucket();
this.key = s3Attributes.getKey();
this.pathStr = ctx.dstFileStatus.getPath().toString();
this.contentLength = contentLength;
this.client = client;
this.stats = stats;
this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = instrumentation.newInputStreamStatistics();
this.streamStatistics = ctx.instrumentation.newInputStreamStatistics();
this.serverSideEncryptionAlgorithm =
s3Attributes.getServerSideEncryptionAlgorithm();
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
setInputPolicy(inputPolicy);
setReadahead(readahead);
this.invoker = invoker;
}
/**
@ -162,6 +154,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param length length requested
* @throws IOException on any failure to open the object
*/
@Retries.OnceTranslated
private synchronized void reopen(String reason, long targetPos, long length)
throws IOException {
@ -185,7 +178,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
}
String text = String.format("Failed to %s %s at %d",
(opencount == 0 ? "open" : "re-open"), uri, targetPos);
S3Object object = invoker.retry(text, uri, true,
S3Object object = context.getReadInvoker().once(text, uri,
() -> client.getObject(request));
wrappedStream = object.getObjectContent();
contentRangeStart = targetPos;
@ -241,6 +234,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param length length of content that needs to be read from targetPos
* @throws IOException
*/
@Retries.OnceTranslated
private void seekInStream(long targetPos, long length) throws IOException {
checkNotClosed();
if (wrappedStream == null) {
@ -317,14 +311,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {
//For lazy seek
seekInStream(targetPos, len);
//re-open at specific location if needed
if (wrappedStream == null) {
reopen("read from new offset", targetPos, len);
}
// With S3Guard, the metadatastore gave us metadata for the file in
// open(), so we use a slightly different retry policy.
Invoker invoker = context.getReadInvoker();
invoker.retry("lazySeek", pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);
//re-open at specific location if needed
if (wrappedStream == null) {
reopen("read from new offset", targetPos, len);
}
});
}
/**
@ -334,29 +336,44 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
private void incrementBytesRead(long bytesRead) {
streamStatistics.bytesRead(bytesRead);
if (stats != null && bytesRead > 0) {
stats.incrementBytesRead(bytesRead);
if (context.stats != null && bytesRead > 0) {
context.stats.incrementBytesRead(bytesRead);
}
}
@Override
@Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended.
public synchronized int read() throws IOException {
checkNotClosed();
if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
return -1;
}
int byteRead;
try {
lazySeek(nextReadPos, 1);
byteRead = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (IOException e) {
onReadFailure(e, 1);
byteRead = wrappedStream.read();
}
// With S3Guard, the metadatastore gave us metadata for the file in
// open(), so we use a slightly different retry policy.
// read() may not be likely to fail, but reopen() does a GET which
// certainly could.
Invoker invoker = context.getReadInvoker();
int byteRead = invoker.retry("read", pathStr, true,
() -> {
int b;
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (IOException e) {
onReadFailure(e, 1);
b = wrappedStream.read();
}
return b;
});
if (byteRead >= 0) {
pos++;
nextReadPos++;
@ -375,10 +392,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param length length of data being attempted to read
* @throws IOException any exception thrown on the re-open attempt.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length) throws IOException {
LOG.info("Got exception while trying to read from stream {}"
+ " trying to recover: "+ ioe, uri);
LOG.debug("While trying to read from stream {}", uri, ioe);
LOG.info("Got exception while trying to read from stream {}" +
" trying to recover: " + ioe, uri);
streamStatistics.readException();
reopen("failure recovery", pos, length);
}
@ -392,6 +410,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @throws IOException if there are other problems
*/
@Override
@Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended.
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
checkNotClosed();
@ -412,18 +431,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
return -1;
}
int bytesRead;
try {
streamStatistics.readOperationStarted(nextReadPos, len);
bytesRead = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
onReadFailure(e, len);
// the base implementation swallows EOFs.
return -1;
} catch (IOException e) {
onReadFailure(e, len);
bytesRead = wrappedStream.read(buf, off, len);
}
// With S3Guard, the metadatastore gave us metadata for the file in
// open(), so we use a slightly different retry policy.
// read() may not be likely to fail, but reopen() does a GET which
// certainly could.
Invoker invoker = context.getReadInvoker();
streamStatistics.readOperationStarted(nextReadPos, len);
int bytesRead = invoker.retry("read", pathStr, true,
() -> {
int bytes;
try {
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (IOException e) {
onReadFailure(e, len);
bytes= wrappedStream.read(buf, off, len);
}
return bytes;
});
if (bytesRead > 0) {
pos += bytesRead;
@ -481,6 +509,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @param length length of the stream.
* @param forceAbort force an abort; used if explicitly requested.
*/
@Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) {
if (wrappedStream != null) {
@ -645,6 +674,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*
*/
@Override
@Retries.RetryTranslated // Some retries only happen w/ S3Guard, as intended.
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkNotClosed();

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
/**
* Base class for operation context struct passed through codepaths for main
* S3AFileSystem operations.
* Anything op-specific should be moved to a subclass of this.
*/
@SuppressWarnings("visibilitymodifier") // I want a struct of finals, for real.
public class S3AOpContext {
final boolean isS3GuardEnabled;
final Invoker invoker;
@Nullable final FileSystem.Statistics stats;
final S3AInstrumentation instrumentation;
@Nullable final Invoker s3guardInvoker;
/** FileStatus for "destination" path being operated on. */
protected final FileStatus dstFileStatus;
/**
* Alternate constructor that allows passing in two invokers, the common
* one, and another with the S3Guard Retry Policy.
* @param isS3GuardEnabled true if s3Guard is active
* @param invoker invoker, which contains retry policy
* @param s3guardInvoker s3guard-specific retry policy invoker
* @param stats optional stats object
* @param instrumentation instrumentation to use
* @param dstFileStatus file status from existence check
*/
public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker,
Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats,
S3AInstrumentation instrumentation, FileStatus dstFileStatus) {
Preconditions.checkNotNull(invoker, "Null invoker arg");
Preconditions.checkNotNull(instrumentation, "Null instrumentation arg");
Preconditions.checkNotNull(dstFileStatus, "Null dstFileStatus arg");
this.isS3GuardEnabled = isS3GuardEnabled;
Preconditions.checkArgument(!isS3GuardEnabled || s3guardInvoker != null,
"S3Guard invoker required: S3Guard is enabled.");
this.invoker = invoker;
this.s3guardInvoker = s3guardInvoker;
this.stats = stats;
this.instrumentation = instrumentation;
this.dstFileStatus = dstFileStatus;
}
/**
* Constructor using common invoker and retry policy.
* @param isS3GuardEnabled true if s3Guard is active
* @param invoker invoker, which contains retry policy
* @param stats optional stats object
* @param instrumentation instrumentation to use
* @param dstFileStatus
*/
public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker,
@Nullable FileSystem.Statistics stats, S3AInstrumentation instrumentation,
FileStatus dstFileStatus) {
this(isS3GuardEnabled, invoker, null, stats, instrumentation,
dstFileStatus);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import javax.annotation.Nullable;
/**
* Read-specific operation context struct.
*/
public class S3AReadOpContext extends S3AOpContext {
public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker,
Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats,
S3AInstrumentation instrumentation, FileStatus dstFileStatus) {
super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation,
dstFileStatus);
}
public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker,
@Nullable FileSystem.Statistics stats, S3AInstrumentation instrumentation,
FileStatus dstFileStatus) {
super(isS3GuardEnabled, invoker, stats, instrumentation, dstFileStatus);
}
/**
* Get invoker to use for read operations. When S3Guard is enabled we use
* the S3Guard invoker, which deals with things like FileNotFoundException
* differently.
* @return invoker to use for read codepaths
*/
public Invoker getReadInvoker() {
if (isS3GuardEnabled) {
return s3guardInvoker;
} else {
return invoker;
}
}
}

View File

@ -76,10 +76,30 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html">Amazon S3 Error Best Practices</a>
* @see <a href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html">Dynamo DB Commmon errors</a>
*/
@SuppressWarnings("visibilitymodifier") // I want a struct of finals, for real.
public class S3ARetryPolicy implements RetryPolicy {
/** Final retry policy we end up with. */
private final RetryPolicy retryPolicy;
// Retry policies for mapping exceptions to
/** Base policy from configuration. */
protected final RetryPolicy fixedRetries;
/** Rejection of all non-idempotent calls except specific failures. */
protected final RetryPolicy retryIdempotentCalls;
/** Policy for throttle requests, which are considered repeatable, even for
* non-idempotent calls, as the service rejected the call entirely. */
protected final RetryPolicy throttlePolicy;
/** No retry on network and tangible API issues. */
protected final RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
/** Client connectivity: fixed retries without care for idempotency. */
protected final RetryPolicy connectivityFailure;
/**
* Instantiate.
* @param conf configuration to read.
@ -88,7 +108,7 @@ public class S3ARetryPolicy implements RetryPolicy {
Preconditions.checkArgument(conf != null, "Null configuration");
// base policy from configuration
RetryPolicy fixedRetries = retryUpToMaximumCountWithFixedSleep(
fixedRetries = retryUpToMaximumCountWithFixedSleep(
conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
conf.getTimeDuration(RETRY_INTERVAL,
RETRY_INTERVAL_DEFAULT,
@ -97,25 +117,33 @@ public class S3ARetryPolicy implements RetryPolicy {
// which is wrapped by a rejection of all non-idempotent calls except
// for specific failures.
RetryPolicy retryIdempotentCalls = new FailNonIOEs(
retryIdempotentCalls = new FailNonIOEs(
new IdempotencyRetryFilter(fixedRetries));
// and a separate policy for throttle requests, which are considered
// repeatable, even for non-idempotent calls, as the service
// rejected the call entirely
RetryPolicy throttlePolicy = exponentialBackoffRetry(
throttlePolicy = exponentialBackoffRetry(
conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT),
conf.getTimeDuration(RETRY_THROTTLE_INTERVAL,
RETRY_THROTTLE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// no retry on network and tangible API issues
RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
// client connectivity: fixed retries without care for idempotency
RetryPolicy connectivityFailure = fixedRetries;
connectivityFailure = fixedRetries;
Map<Class<? extends Exception>, RetryPolicy> policyMap =
createExceptionMap();
retryPolicy = retryByException(retryIdempotentCalls, policyMap);
}
/**
* Subclasses can override this like a constructor to change behavior: call
* superclass method, then modify it as needed, and return it.
* @return Map from exception type to RetryPolicy
*/
protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// the policy map maps the exact classname; subclasses do not
// inherit policies.
Map<Class<? extends Exception>, RetryPolicy> policyMap = new HashMap<>();
@ -126,7 +154,6 @@ public class S3ARetryPolicy implements RetryPolicy {
policyMap.put(InterruptedException.class, fail);
// note this does not pick up subclasses (like socket timeout)
policyMap.put(InterruptedIOException.class, fail);
policyMap.put(AWSRedirectException.class, fail);
// interesting question: should this be retried ever?
policyMap.put(AccessDeniedException.class, fail);
policyMap.put(FileNotFoundException.class, fail);
@ -169,7 +196,7 @@ public class S3ARetryPolicy implements RetryPolicy {
// trigger sleep
policyMap.put(ProvisionedThroughputExceededException.class, throttlePolicy);
retryPolicy = retryByException(retryIdempotentCalls, policyMap);
return policyMap;
}
@Override

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.FileNotFoundException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
/**
* Slightly-modified retry policy for cases when the file is present in the
* MetadataStore, but may be still throwing FileNotFoundException from S3.
*/
public class S3GuardExistsRetryPolicy extends S3ARetryPolicy {
/**
* Instantiate.
* @param conf configuration to read.
*/
public S3GuardExistsRetryPolicy(Configuration conf) {
super(conf);
}
@Override
protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
Map<Class<? extends Exception>, RetryPolicy> b = super.createExceptionMap();
b.put(FileNotFoundException.class, retryIdempotentCalls);
return b;
}
}

View File

@ -688,9 +688,11 @@ public class DynamoDBMetadataStore implements MetadataStore {
@Override
@Retries.OnceRaw
public void put(Collection<PathMetadata> metas) throws IOException {
LOG.debug("Saving batch to table {} in region {}", tableName, region);
processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas)));
Item[] items = pathMetadataToItem(completeAncestry(metas));
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
tableName, region);
processBatchWriteRequest(null, items);
}
/**
@ -1076,6 +1078,15 @@ public class DynamoDBMetadataStore implements MetadataStore {
});
}
@Retries.RetryTranslated
@VisibleForTesting
void provisionTableBlocking(Long readCapacity, Long writeCapacity)
throws IOException {
provisionTable(readCapacity, writeCapacity);
waitForTableActive(table);
}
@VisibleForTesting
Table getTable() {
return table;
}
@ -1173,15 +1184,12 @@ public class DynamoDBMetadataStore implements MetadataStore {
S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
currentWrite);
ProvisionedThroughput throughput = new ProvisionedThroughput()
.withReadCapacityUnits(newRead)
.withWriteCapacityUnits(newWrite);
if (newRead != currentRead || newWrite != currentWrite) {
LOG.info("Current table capacity is read: {}, write: {}",
currentRead, currentWrite);
LOG.info("Changing capacity of table to read: {}, write: {}",
newRead, newWrite);
table.updateTable(throughput);
provisionTableBlocking(newRead, newWrite);
} else {
LOG.info("Table capacity unchanged at read: {}, write: {}",
newRead, newWrite);

View File

@ -34,6 +34,9 @@ import org.apache.hadoop.fs.Path;
* {@code MetadataStore} defines the set of operations that any metadata store
* implementation must provide. Note that all {@link Path} objects provided
* to methods must be absolute, not relative paths.
* Implementations must implement any retries needed internally, such that
* transient errors are generally recovered from without throwing exceptions
* from this API.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving

View File

@ -300,6 +300,11 @@ By their very nature they are slow. And, as their execution time is often
limited by bandwidth between the computer running the tests and the S3 endpoint,
parallel execution does not speed these tests up.
***Note: Running scale tests with -Ds3guard and -Ddynamo requires that
you use a private, testing-only DynamoDB table.*** The tests do disruptive
things such as deleting metadata and setting the provisioned throughput
to very low values.
### <a name="enabling-scale"></a> Enabling the Scale Tests
The tests are enabled if the `scale` property is set in the maven build

View File

@ -22,16 +22,22 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.util.concurrent.Callable;
import java.io.InputStream;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests S3A behavior under forced inconsistency via {@link
@ -43,6 +49,8 @@ import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
*/
public class ITestS3AInconsistency extends AbstractS3ATestBase {
private static final int OPEN_READ_ITERATIONS = 20;
@Override
protected AbstractFSContract createContract(Configuration conf) {
conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
@ -86,15 +94,103 @@ public class ITestS3AInconsistency extends AbstractS3ATestBase {
}
}
/**
* Ensure that deleting a file with an open read stream does eventually cause
* readers to get a FNFE, even with S3Guard and its retries enabled.
* In real usage, S3Guard should be enabled for all clients that modify the
* file, so the delete would be immediately recorded in the MetadataStore.
* Here, however, we test deletion from under S3Guard to make sure it still
* eventually propagates the FNFE after any retry policies are exhausted.
*/
@Test
public void testOpenDeleteRead() throws Exception {
S3AFileSystem fs = getFileSystem();
Path p = path("testOpenDeleteRead.txt");
writeTextFile(fs, p, "1337c0d3z", true);
try (InputStream s = fs.open(p)) {
// Disable s3guard, delete file underneath it, re-enable s3guard
MetadataStore metadataStore = fs.getMetadataStore();
fs.setMetadataStore(new NullMetadataStore());
fs.delete(p, false);
fs.setMetadataStore(metadataStore);
eventually(1000, 200, () -> {
intercept(FileNotFoundException.class, () -> s.read());
});
}
}
/**
* Test read() path behavior when getFileStatus() succeeds but subsequent
* read() on the input stream fails due to eventual consistency.
* There are many points in the InputStream codepaths that can fail. We set
* a probability of failure and repeat the test multiple times to achieve
* decent coverage.
*/
@Test
public void testOpenFailOnRead() throws Exception {
S3AFileSystem fs = getFileSystem();
// 1. Patch in a different failure injection policy with <1.0 probability
Configuration conf = fs.getConf();
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 0.5f);
InconsistentAmazonS3Client.setFailureInjectionPolicy(fs,
new FailureInjectionPolicy(conf));
// 2. Make sure no ancestor dirs exist
Path dir = path("ancestor");
fs.delete(dir, true);
waitUntilDeleted(dir);
// 3. Create a descendant file, which implicitly creates ancestors
// This file has delayed visibility.
describe("creating test file");
Path path = path("ancestor/file-to-read-" + DEFAULT_DELAY_KEY_SUBSTRING);
writeTextFile(getFileSystem(), path, "Reading is fun", false);
// 4. Clear inconsistency so the first getFileStatus() can succeed, if we
// are not using S3Guard. If we are using S3Guard, it should tolerate the
// delayed visibility.
if (!fs.hasMetadataStore()) {
InconsistentAmazonS3Client.clearInconsistency(fs);
}
// ? Do we need multiple iterations when S3Guard is disabled? For now,
// leaving it in
for (int i = 0; i < OPEN_READ_ITERATIONS; i++) {
doOpenFailOnReadTest(fs, path, i);
}
}
private void doOpenFailOnReadTest(S3AFileSystem fs, Path path, int iteration)
throws Exception {
// 4. Open the file
describe(String.format("i=%d: opening test file", iteration));
try(InputStream in = fs.open(path)) {
// 5. Assert expected behavior on read() failure.
int l = 4;
byte[] buf = new byte[l];
describe("reading test file");
// Use both read() variants
if ((iteration % 2) == 0) {
assertEquals(l, in.read(buf, 0, l));
} else {
in.read();
}
} catch (FileNotFoundException e) {
if (fs.hasMetadataStore()) {
LOG.error("Error:", e);
ContractTestUtils.fail("S3Guard failed to handle fail-on-read", e);
} else {
LOG.info("File not found on read(), as expected.");
}
}
}
private void waitUntilDeleted(final Path p) throws Exception {
LambdaTestUtils.eventually(30 * 1000, 1000,
new Callable<Void>() {
@Override
public Void call() throws Exception {
assertPathDoesNotExist("Dir should be deleted", p);
return null;
}
}
);
() -> assertPathDoesNotExist("Dir should be deleted", p));
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -33,6 +32,7 @@ import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@ -41,6 +41,7 @@ import java.util.List;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
/**
@ -552,11 +553,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
* @param key
* @param delimiter
* @return
* @throws IOException
* @throws IOException on error
*/
private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
String key, String delimiter) throws java.io.IOException {
String key, String delimiter) throws IOException {
ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)
.getV2();
return invoker.retryUntranslated("list", true,
@ -565,9 +565,4 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
});
}
private static void clearInconsistency(S3AFileSystem fs) throws Exception {
AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
ic.clearInconsistency();
}
}

View File

@ -51,7 +51,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
@ -819,7 +819,7 @@ public final class S3ATestUtils {
* Turn on the inconsistent S3A FS client in a configuration,
* with 100% probability of inconsistency, default delays.
* For this to go live, the paths must include the element
* {@link InconsistentAmazonS3Client#DEFAULT_DELAY_KEY_SUBSTRING}.
* {@link FailureInjectionPolicy#DEFAULT_DELAY_KEY_SUBSTRING}.
* @param conf configuration to patch
* @param delay delay in millis
*/

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
@ -90,7 +91,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
@Override
protected Path path(String filepath) throws IOException {
return useInconsistentClient() ?
super.path(InconsistentAmazonS3Client.DEFAULT_DELAY_KEY_SUBSTRING
super.path(FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING
+ "/" + filepath)
: super.path(filepath);
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale;
import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.junit.Assume.*;
/**
* Scale test for DynamoDBMetadataStore.
*/
public class ITestDynamoDBMetadataStoreScale
extends AbstractITestS3AMetadataStoreScale {
private static final long BATCH_SIZE = 25;
private static final long SMALL_IO_UNITS = BATCH_SIZE / 4;
@Override
public MetadataStore createMetadataStore() throws IOException {
Configuration conf = getFileSystem().getConf();
String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
assumeNotNull("DynamoDB table is configured", ddbTable);
String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
ms.initialize(getFileSystem().getConf());
return ms;
}
/**
* Though the AWS SDK claims in documentation to handle retries and
* exponential backoff, we have witnessed
* com.amazonaws...dynamodbv2.model.ProvisionedThroughputExceededException
* (Status Code: 400; Error Code: ProvisionedThroughputExceededException)
* Hypothesis:
* Happens when the size of a batched write is bigger than the number of
* provisioned write units. This test ensures we handle the case
* correctly, retrying w/ smaller batch instead of surfacing exceptions.
*/
@Test
public void testBatchedWriteExceedsProvisioned() throws Exception {
final long iterations = 5;
boolean isProvisionedChanged;
List<PathMetadata> toCleanup = new ArrayList<>();
// Fail if someone changes a constant we depend on
assertTrue("Maximum batch size must big enough to run this test",
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE);
try (DynamoDBMetadataStore ddbms =
(DynamoDBMetadataStore)createMetadataStore()) {
DynamoDB ddb = ddbms.getDynamoDB();
String tableName = ddbms.getTable().getTableName();
final ProvisionedThroughputDescription existing =
ddb.getTable(tableName).describe().getProvisionedThroughput();
// If you set the same provisioned I/O as already set it throws an
// exception, avoid that.
isProvisionedChanged = (existing.getReadCapacityUnits() != SMALL_IO_UNITS
|| existing.getWriteCapacityUnits() != SMALL_IO_UNITS);
if (isProvisionedChanged) {
// Set low provisioned I/O for dynamodb
describe("Provisioning dynamo tbl %s read/write -> %d/%d", tableName,
SMALL_IO_UNITS, SMALL_IO_UNITS);
// Blocks to ensure table is back to ready state before we proceed
ddbms.provisionTableBlocking(SMALL_IO_UNITS, SMALL_IO_UNITS);
} else {
describe("Skipping provisioning table I/O, already %d/%d",
SMALL_IO_UNITS, SMALL_IO_UNITS);
}
try {
// We know the dynamodb metadata store will expand a put of a path
// of depth N into a batch of N writes (all ancestors are written
// separately up to the root). (Ab)use this for an easy way to write
// a batch of stuff that is bigger than the provisioned write units
try {
describe("Running %d iterations of batched put, size %d", iterations,
BATCH_SIZE);
long pruneItems = 0;
for (long i = 0; i < iterations; i++) {
Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i));
FileStatus status = basicFileStatus(longPath, 0, false, 12345,
12345);
PathMetadata pm = new PathMetadata(status);
ddbms.put(pm);
toCleanup.add(pm);
pruneItems++;
// Having hard time reproducing Exceeded exception with put, also
// try occasional prune, which was the only stack trace I've seen
// (on JIRA)
if (pruneItems == BATCH_SIZE) {
describe("pruning files");
ddbms.prune(Long.MAX_VALUE /* all files */);
pruneItems = 0;
}
}
} finally {
describe("Cleaning up table %s", tableName);
for (PathMetadata pm : toCleanup) {
cleanupMetadata(ddbms, pm);
}
}
} finally {
if (isProvisionedChanged) {
long write = existing.getWriteCapacityUnits();
long read = existing.getReadCapacityUnits();
describe("Restoring dynamo tbl %s read/write -> %d/%d", tableName,
read, write);
ddbms.provisionTableBlocking(existing.getReadCapacityUnits(),
existing.getWriteCapacityUnits());
}
}
}
}
// Attempt do delete metadata, suppressing any errors
private void cleanupMetadata(MetadataStore ms, PathMetadata pm) {
try {
ms.forgetMetadata(pm.getFileStatus().getPath());
} catch (IOException ioe) {
// Ignore.
}
}
private Path pathOfDepth(long n, @Nullable String fileSuffix) {
StringBuilder sb = new StringBuilder();
for (long i = 0; i < n; i++) {
sb.append(i == 0 ? "/" + this.getClass().getSimpleName() : "lvl");
sb.append(i);
if (i == n-1 && fileSuffix != null) {
sb.append(fileSuffix);
}
sb.append("/");
}
return new Path(getFileSystem().getUri().toString(), sb.toString());
}
}

View File

@ -839,7 +839,7 @@ public abstract class MetadataStoreTestBase extends Assert {
return basicFileStatus(path, size, isDir, modTime, accessTime);
}
FileStatus basicFileStatus(Path path, int size, boolean isDir,
public static FileStatus basicFileStatus(Path path, int size, boolean isDir,
long newModTime, long newAccessTime) throws IOException {
return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, newModTime,
newAccessTime, PERMISSION, OWNER, GROUP, path);

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import java.io.IOException;
import static org.junit.Assume.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Scale test for DynamoDBMetadataStore.
*/
public class ITestDynamoDBMetadataStoreScale
extends AbstractITestS3AMetadataStoreScale {
@Override
public MetadataStore createMetadataStore() throws IOException {
Configuration conf = getFileSystem().getConf();
String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
assumeNotNull("DynamoDB table is configured", ddbTable);
String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
ms.initialize(getFileSystem().getConf());
return ms;
}
}