HADOOP-16499. S3A retry policy to be exponential (#1246). Contributed by Steve Loughran.

This commit is contained in:
Steve Loughran 2019-08-09 14:52:37 +01:00 committed by Gabor Bota
parent 43a91f820a
commit e25a5c2eab
12 changed files with 124 additions and 46 deletions

View File

@ -1660,7 +1660,7 @@
<property>
<name>fs.s3a.retry.limit</name>
<value>${fs.s3a.attempts.maximum}</value>
<value>7</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
@ -1671,7 +1671,7 @@
<name>fs.s3a.retry.interval</name>
<value>500ms</value>
<description>
Interval between attempts to retry operations for any reason other
Initial retry interval when retrying operations for any reason other
than S3 throttle errors.
</description>
</property>

View File

@ -635,7 +635,7 @@ public final class Constants {
/**
* Default retry limit: {@value}.
*/
public static final int RETRY_LIMIT_DEFAULT = DEFAULT_MAX_ERROR_RETRIES;
public static final int RETRY_LIMIT_DEFAULT = 7;
/**
* Interval between retry attempts.: {@value}.

View File

@ -109,7 +109,7 @@ public class S3ARetryPolicy implements RetryPolicy {
Preconditions.checkArgument(conf != null, "Null configuration");
// base policy from configuration
fixedRetries = retryUpToMaximumCountWithFixedSleep(
fixedRetries = exponentialBackoffRetry(
conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
conf.getTimeDuration(RETRY_INTERVAL,
RETRY_INTERVAL_DEFAULT,

View File

@ -1018,7 +1018,7 @@ is unrecoverable; it's the generic "No" response. Very rarely it
does recover, which is why it is in this category, rather than that
of unrecoverable failures.
These failures will be retried with a fixed sleep interval set in
These failures will be retried with an exponential sleep interval set in
`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.
@ -1033,7 +1033,7 @@ after the request was processed by S3.
* "No response from Server" (443, 444) HTTP responses.
* Any other AWS client, service or S3 exception.
These failures will be retried with a fixed sleep interval set in
These failures will be retried with an exponential sleep interval set in
`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.
*Important*: DELETE is considered idempotent, hence: `FileSystem.delete()`

View File

@ -1233,17 +1233,20 @@ The number of retries and interval between each retry can be configured:
```xml
<property>
<name>fs.s3a.attempts.maximum</name>
<value>20</value>
<description>How many times we should retry commands on transient errors,
excluding throttling errors.</description>
<name>fs.s3a.retry.limit</name>
<value>7</value>
<description>
Number of times to retry any repeatable S3 client request on failure,
excluding throttling requests.
</description>
</property>
<property>
<name>fs.s3a.retry.interval</name>
<value>500ms</value>
<description>
Interval between retry attempts.
Initial retry interval when retrying operations for any reason other
than S3 throttle errors.
</description>
</property>
```

View File

@ -123,8 +123,7 @@ public class ITestS3AConfiguration {
@Test
public void testProxyConnection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
String proxy =
@ -133,6 +132,16 @@ public class ITestS3AConfiguration {
conf, "when using proxy " + proxy);
}
/**
* Create a configuration designed to fail fast on network problems.
*/
protected void useFailFastConfiguration() {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.RETRY_LIMIT, 2);
conf.set(RETRY_INTERVAL, "100ms");
}
/**
* Expect a filesystem to not be created from a configuration
* @return the exception intercepted
@ -153,9 +162,8 @@ public class ITestS3AConfiguration {
@Test
public void testProxyPortWithoutHost() throws Exception {
conf = new Configuration();
useFailFastConfiguration();
conf.unset(Constants.PROXY_HOST);
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.PROXY_PORT, 1);
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
@ -169,9 +177,8 @@ public class ITestS3AConfiguration {
@Test
public void testAutomaticProxyPortSelection() throws Exception {
conf = new Configuration();
useFailFastConfiguration();
conf.unset(Constants.PROXY_PORT);
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.set(Constants.SECURE_CONNECTIONS, "true");
expectFSCreateFailure(AWSClientIOException.class,
@ -183,8 +190,7 @@ public class ITestS3AConfiguration {
@Test
public void testUsernameInconsistentWithPassword() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_USERNAME, "user");
@ -204,8 +210,7 @@ public class ITestS3AConfiguration {
@Test
public void testUsernameInconsistentWithPassword2() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
useFailFastConfiguration();
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_PASSWORD, "password");

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
@ -30,7 +30,13 @@ import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_MODE;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
/**
* Tests behavior of a FileNotFound error that happens after open(), i.e. on
@ -38,6 +44,21 @@ import java.util.concurrent.Callable;
*/
public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,
RETRY_INTERVAL,
METADATASTORE_AUTHORITATIVE);
conf.setInt(RETRY_LIMIT, 2);
conf.set(RETRY_INTERVAL, "1ms");
return conf;
}
/**
* See debugging documentation
@ -46,9 +67,9 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
*/
@Test
public void testNotFoundFirstRead() throws Exception {
FileSystem fs = getFileSystem();
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
((S3AFileSystem) fs).getChangeDetectionPolicy();
fs.getChangeDetectionPolicy();
Assume.assumeFalse("FNF not expected when using a bucket with"
+ " object versioning",
changeDetectionPolicy.getSource() == Source.VersionId);
@ -61,12 +82,7 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
// This should fail since we deleted after the open.
LambdaTestUtils.intercept(FileNotFoundException.class,
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return in.read();
}
});
() -> in.read());
}
}

View File

@ -21,9 +21,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
@ -40,6 +38,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -53,16 +52,40 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
*/
public class ITestS3AInconsistency extends AbstractS3ATestBase {
private static final int OPEN_READ_ITERATIONS = 20;
private static final int OPEN_READ_ITERATIONS = 10;
public static final int INCONSISTENCY_MSEC = 800;
private static final int INITIAL_RETRY = 128;
private static final int RETRIES = 4;
/** By using a power of 2 for the initial time, the total is a shift left. */
private static final int TOTAL_RETRY_DELAY = INITIAL_RETRY << RETRIES;
@Override
protected AbstractFSContract createContract(Configuration conf) {
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,
RETRY_INTERVAL,
METADATASTORE_AUTHORITATIVE,
S3_CLIENT_FACTORY_IMPL);
conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
S3ClientFactory.class);
conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
// the reads are always inconsistent
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
return new S3AContract(conf);
// but the inconsistent time is less than exponential growth of the
// retry interval (128 -> 256 -> 512 -> 1024
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, INCONSISTENCY_MSEC);
conf.setInt(RETRY_LIMIT, RETRIES);
conf.set(RETRY_INTERVAL, String.format("%dms", INITIAL_RETRY));
return conf;
}
@Test
@ -111,7 +134,7 @@ public class ITestS3AInconsistency extends AbstractS3ATestBase {
public void testOpenDeleteRead() throws Exception {
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
((S3AFileSystem) fs).getChangeDetectionPolicy();
fs.getChangeDetectionPolicy();
Assume.assumeFalse("FNF not expected when using a bucket with"
+ " object versioning",
changeDetectionPolicy.getSource() == Source.VersionId);
@ -124,7 +147,7 @@ public class ITestS3AInconsistency extends AbstractS3ATestBase {
fs.setMetadataStore(new NullMetadataStore());
fs.delete(p, false);
fs.setMetadataStore(metadataStore);
eventually(1000, 200, () -> {
eventually(TOTAL_RETRY_DELAY * 2, INITIAL_RETRY * 2, () -> {
intercept(FileNotFoundException.class, () -> s.read());
});
}

View File

@ -61,8 +61,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readUTF8;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.S3_SELECT_CAPABILITY;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_SQL;
@ -123,8 +122,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
private static final byte[] TEST_DATA_BYTES = TEST_DATA.getBytes(
Charsets.UTF_8);
private static final int TEST_MAX_RETRIES = 5;
private static final String TEST_RETRY_INTERVAL = "10ms";
private static final int TEST_MAX_RETRIES = 4;
private static final String TEST_RETRY_INTERVAL = "1ms";
private static final String QUOTED_TEST_DATA =
"\"" + TEST_DATA + "\"";
@ -276,8 +275,7 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
removeBucketOverrides(bucketName, conf,
removeBaseAndBucketOverrides(conf,
CHANGE_DETECT_SOURCE,
CHANGE_DETECT_MODE,
RETRY_LIMIT,

View File

@ -53,9 +53,13 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_MODE;
import static org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath;
@ -115,7 +119,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
public static final int STABILIZATION_TIME = 20_000;
public static final int PROBE_INTERVAL_MILLIS = 500;
public static final int PROBE_INTERVAL_MILLIS = 2500;
private S3AFileSystem guardedFs;
private S3AFileSystem rawFS;
@ -153,6 +157,19 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
(authoritative ? "-auth" : "-nonauth");
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
// reduce retry limit so FileNotFoundException cases timeout faster,
// speeding up the tests
removeBaseAndBucketOverrides(conf,
RETRY_LIMIT,
RETRY_INTERVAL);
conf.setInt(RETRY_LIMIT, 3);
conf.set(RETRY_INTERVAL, "10ms");
return conf;
}
@Before
public void setup() throws Exception {
super.setup();

View File

@ -769,6 +769,20 @@ public final class S3ATestUtils {
removeBucketOverrides(bucket, conf, options);
}
/**
* Remove any values from the test bucket and the base values too.
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBaseAndBucketOverrides(
final Configuration conf,
final String... options) {
for (String option : options) {
conf.unset(option);
}
removeBaseAndBucketOverrides(getTestBucketName(conf), conf, options);
}
/**
* Call a function; any exception raised is logged at info.
* This is for test teardowns.

View File

@ -142,6 +142,8 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
uniqueFilenames);
jobConf.set(FS_S3A_COMMITTER_STAGING_UUID,
UUID.randomUUID().toString());
jobConf.set(RETRY_INTERVAL, "100ms");
jobConf.setInt(RETRY_LIMIT, 1);
this.results = new StagingTestBase.ClientResults();
this.errors = new StagingTestBase.ClientErrors();