From d7c0a08a1c077752918a8cf1b4f1900ce2721899 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 12 Sep 2018 21:04:49 -0700 Subject: [PATCH] HADOOP-15426 Make S3guard client resilient to DDB throttle events and network failures (Contributed by Steve Loughran) --- .../src/main/resources/core-default.xml | 19 +- hadoop-tools/hadoop-aws/pom.xml | 4 + .../org/apache/hadoop/fs/s3a/Constants.java | 12 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 15 +- .../hadoop/fs/s3a/S3AInstrumentation.java | 5 +- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 24 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 11 +- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 377 ++++++++--- .../s3guard/S3GuardDataAccessRetryPolicy.java | 47 ++ .../site/markdown/tools/hadoop-aws/s3guard.md | 150 ++++- .../site/markdown/tools/hadoop-aws/testing.md | 95 ++- .../fs/s3a/ITestS3AFileSystemContract.java | 5 + .../s3guard/ITestDynamoDBMetadataStore.java | 12 +- .../ITestDynamoDBMetadataStoreScale.java | 595 +++++++++++++++--- .../s3a/s3guard/ITestS3GuardToolDynamoDB.java | 34 +- .../AbstractITestS3AMetadataStoreScale.java | 24 +- .../src/test/resources/core-site.xml | 10 + 17 files changed, 1181 insertions(+), 258 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardDataAccessRetryPolicy.java diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index fdd7a871c4c..81502dc24c6 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1447,19 +1447,28 @@ fs.s3a.s3guard.ddb.max.retries 9 - Max retries on batched DynamoDB operations before giving up and - throwing an IOException. Each retry is delayed with an exponential + Max retries on throttled/incompleted DynamoDB operations + before giving up and throwing an IOException. + Each retry is delayed with an exponential backoff timer which starts at 100 milliseconds and approximately doubles each time. The minimum wait before throwing an exception is sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1) - So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking - before throwing an IOException. + + + + + fs.s3a.s3guard.ddb.throttle.retry.interval + 100ms + + Initial interval to retry after a request is throttled events; + the back-off policy is exponential until the number of retries of + fs.s3a.s3guard.ddb.max.retries is reached. fs.s3a.s3guard.ddb.background.sleep - 25 + 25ms Length (in milliseconds) of pause between each batch of deletes when pruning metadata. Prevents prune operations (which can typically be low diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index c6dddb0223b..78a3d5e19cc 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -184,6 +184,8 @@ **/ITestS3AFileContextStatistics.java **/ITestS3AEncryptionSSEC*.java **/ITestS3AHuge*.java + + **/ITestDynamoDBMetadataStoreScale.java @@ -216,6 +218,8 @@ **/ITestS3AFileContextStatistics.java **/ITestS3AHuge*.java **/ITestS3AEncryptionSSEC*.java + + **/ITestDynamoDBMetadataStoreScale.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 5c9d3cd2809..3fc25daaec3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -458,12 +458,20 @@ public final class Constants { @InterfaceStability.Unstable public static final String S3GUARD_DDB_MAX_RETRIES = "fs.s3a.s3guard.ddb.max.retries"; + /** - * Max retries on batched DynamoDB operations before giving up and + * Max retries on batched/throttled DynamoDB operations before giving up and * throwing an IOException. Default is {@value}. See core-default.xml for * more detail. */ - public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9; + public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = + DEFAULT_MAX_ERROR_RETRIES; + + @InterfaceStability.Unstable + public static final String S3GUARD_DDB_THROTTLE_RETRY_INTERVAL = + "fs.s3a.s3guard.ddb.throttle.retry.interval"; + public static final String S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT = + "100ms"; /** * Period of time (in milliseconds) to sleep between batches of writes. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index fcbcbcc4337..e817f0d55de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1131,6 +1131,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { /** * Increment a statistic by 1. + * This increments both the instrumentation and storage statistics. * @param statistic The operation to increment */ protected void incrementStatistic(Statistic statistic) { @@ -1139,6 +1140,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { /** * Increment a statistic by a specific value. + * This increments both the instrumentation and storage statistics. * @param statistic The operation to increment * @param count the count to increment */ @@ -1175,8 +1177,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { Statistic stat = isThrottleException(ex) ? STORE_IO_THROTTLED : IGNORED_ERRORS; - instrumentation.incrementCounter(stat, 1); - storageStatistics.incrementCounter(stat, 1); + incrementStatistic(stat); } /** @@ -1197,6 +1198,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { /** * Callback from {@link Invoker} when an operation against a metastore * is retried. + * Always increments the {@link Statistic#S3GUARD_METADATASTORE_RETRY} + * statistic/counter; + * if it is a throttling exception will update the associated + * throttled metrics/statistics. + * * @param ex exception * @param retries number of retries * @param idempotent is the method idempotent @@ -1205,6 +1211,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { int retries, boolean idempotent) { operationRetried(ex); + incrementStatistic(S3GUARD_METADATASTORE_RETRY); + if (isThrottleException(ex)) { + incrementStatistic(S3GUARD_METADATASTORE_THROTTLED); + instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1); + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 26ecefd0592..84f9c9f1392 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1032,15 +1032,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource { * Throttled request. */ public void throttled() { - incrementCounter(S3GUARD_METADATASTORE_THROTTLED, 1); - addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1); + // counters are incremented by owner. } /** * S3Guard is retrying after a (retryable) failure. */ public void retrying() { - incrementCounter(S3GUARD_METADATASTORE_RETRY, 1); + // counters are incremented by owner. } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index e6e789573e2..1e475e1570c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -124,12 +124,7 @@ public class S3ARetryPolicy implements RetryPolicy { // and a separate policy for throttle requests, which are considered // repeatable, even for non-idempotent calls, as the service // rejected the call entirely - throttlePolicy = exponentialBackoffRetry( - conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT), - conf.getTimeDuration(RETRY_THROTTLE_INTERVAL, - RETRY_THROTTLE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS), - TimeUnit.MILLISECONDS); + throttlePolicy = createThrottleRetryPolicy(conf); // client connectivity: fixed retries without care for idempotency connectivityFailure = fixedRetries; @@ -139,6 +134,22 @@ public class S3ARetryPolicy implements RetryPolicy { retryPolicy = retryByException(retryIdempotentCalls, policyMap); } + /** + * Create the throttling policy. + * This will be called from the S3ARetryPolicy constructor, so + * subclasses must assume they are not initialized. + * @param conf configuration to use. + * @return the retry policy for throttling events. + */ + protected RetryPolicy createThrottleRetryPolicy(final Configuration conf) { + return exponentialBackoffRetry( + conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT), + conf.getTimeDuration(RETRY_THROTTLE_INTERVAL, + RETRY_THROTTLE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + } + /** * Subclasses can override this like a constructor to change behavior: call * superclass method, then modify it as needed, and return it. @@ -206,6 +217,7 @@ public class S3ARetryPolicy implements RetryPolicy { int retries, int failovers, boolean idempotent) throws Exception { + Preconditions.checkArgument(exception != null, "Null exception"); Exception ex = exception; if (exception instanceof AmazonClientException) { // uprate the amazon client exception for the purpose of exception diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index c3f1e9368be..9318a5a4dce 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -27,6 +27,7 @@ import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.retry.RetryUtils; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.LimitExceededException; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException; @@ -358,8 +359,10 @@ public final class S3AUtils { /** * Is the exception an instance of a throttling exception. That * is an AmazonServiceException with a 503 response, any - * exception from DynamoDB for limits exceeded, or an - * {@link AWSServiceThrottledException}. + * exception from DynamoDB for limits exceeded, an + * {@link AWSServiceThrottledException}, + * or anything which the AWS SDK's RetryUtils considers to be + * a throttling exception. * @param ex exception to examine * @return true if it is considered a throttling exception */ @@ -368,7 +371,9 @@ public final class S3AUtils { || ex instanceof ProvisionedThroughputExceededException || ex instanceof LimitExceededException || (ex instanceof AmazonServiceException - && 503 == ((AmazonServiceException)ex).getStatusCode()); + && 503 == ((AmazonServiceException)ex).getStatusCode()) + || (ex instanceof SdkBaseException + && RetryUtils.isThrottlingException((SdkBaseException) ex)); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 784a291fd1c..7c826c11db7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -36,10 +36,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; @@ -77,12 +79,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; +import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AInstrumentation; -import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; import org.apache.hadoop.fs.s3a.auth.RolePolicies; @@ -198,10 +200,6 @@ public class DynamoDBMetadataStore implements MetadataStore { public static final String E_INCOMPATIBLE_VERSION = "Database table is from an incompatible S3Guard version."; - /** Initial delay for retries when batched operations get throttled by - * DynamoDB. Value is {@value} msec. */ - public static final long MIN_RETRY_SLEEP_MSEC = 100; - @VisibleForTesting static final String DESCRIPTION = "S3Guard metadata store in DynamoDB"; @@ -214,6 +212,13 @@ public class DynamoDBMetadataStore implements MetadataStore { @VisibleForTesting static final String TABLE = "table"; + @VisibleForTesting + static final String HINT_DDB_IOPS_TOO_LOW + = " This may be because the write threshold of DynamoDB is set too low."; + + @VisibleForTesting + static final String THROTTLING = "Throttling"; + private static ValueMap deleteTrackingValueMap = new ValueMap().withBoolean(":false", false); @@ -226,7 +231,14 @@ public class DynamoDBMetadataStore implements MetadataStore { private Configuration conf; private String username; - private RetryPolicy dataAccessRetryPolicy; + /** + * This policy is mostly for batched writes, not for processing + * exceptions in invoke() calls. + * It also has a role purpose in {@link #getVersionMarkerItem()}; + * look at that method for the details. + */ + private RetryPolicy batchWriteRetryPolicy; + private S3AInstrumentation.S3GuardInstrumentation instrumentation; /** Owner FS: only valid if configured with an owner FS. */ @@ -237,8 +249,15 @@ public class DynamoDBMetadataStore implements MetadataStore { Invoker.NO_OP ); - /** Data access can have its own policies. */ - private Invoker dataAccess; + /** Invoker for read operations. */ + private Invoker readOp; + + /** Invoker for write operations. */ + private Invoker writeOp; + + private final AtomicLong readThrottleEvents = new AtomicLong(0); + private final AtomicLong writeThrottleEvents = new AtomicLong(0); + private final AtomicLong batchWriteCapacityExceededEvents = new AtomicLong(0); /** * Total limit on the number of throttle events after which @@ -292,10 +311,8 @@ public class DynamoDBMetadataStore implements MetadataStore { Preconditions.checkNotNull(fs, "Null filesystem"); Preconditions.checkArgument(fs instanceof S3AFileSystem, "DynamoDBMetadataStore only supports S3A filesystem."); - owner = (S3AFileSystem) fs; - instrumentation = owner.getInstrumentation().getS3GuardInstrumentation(); + bindToOwnerFilesystem((S3AFileSystem) fs); final String bucket = owner.getBucket(); - conf = owner.getConf(); String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); if (!StringUtils.isEmpty(confRegion)) { region = confRegion; @@ -316,7 +333,6 @@ public class DynamoDBMetadataStore implements MetadataStore { } LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region); } - username = owner.getUsername(); credentials = owner.shareCredentials("s3guard"); dynamoDB = createDynamoDB(conf, region, bucket, credentials); @@ -325,7 +341,7 @@ public class DynamoDBMetadataStore implements MetadataStore { initDataAccessRetries(conf); // set up a full retry policy - invoker = new Invoker(new S3ARetryPolicy(conf), + invoker = new Invoker(new S3GuardDataAccessRetryPolicy(conf), this::retryEvent ); @@ -334,6 +350,20 @@ public class DynamoDBMetadataStore implements MetadataStore { instrumentation.initialized(); } + /** + * Declare that this table is owned by the specific S3A FS instance. + * This will bind some fields to the values provided by the owner, + * including wiring up the instrumentation. + * @param fs owner filesystem + */ + @VisibleForTesting + void bindToOwnerFilesystem(final S3AFileSystem fs) { + owner = fs; + conf = owner.getConf(); + instrumentation = owner.getInstrumentation().getS3GuardInstrumentation(); + username = owner.getUsername(); + } + /** * Performs one-time initialization of the metadata store via configuration. * @@ -382,16 +412,23 @@ public class DynamoDBMetadataStore implements MetadataStore { /** * Set retry policy. This is driven by the value of * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff - * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds. + * between each attempt of {@link Constants#S3GUARD_DDB_THROTTLE_RETRY_INTERVAL} + * milliseconds. * @param config configuration for data access */ private void initDataAccessRetries(Configuration config) { - int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES, - S3GUARD_DDB_MAX_RETRIES_DEFAULT); - dataAccessRetryPolicy = RetryPolicies - .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC, + batchWriteRetryPolicy = RetryPolicies + .exponentialBackoffRetry( + config.getInt(S3GUARD_DDB_MAX_RETRIES, + S3GUARD_DDB_MAX_RETRIES_DEFAULT), + conf.getTimeDuration(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, + S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); - dataAccess = new Invoker(dataAccessRetryPolicy, this::retryEvent); + final RetryPolicy throttledRetryRetryPolicy + = new S3GuardDataAccessRetryPolicy(config); + readOp = new Invoker(throttledRetryRetryPolicy, this::readRetryEvent); + writeOp = new Invoker(throttledRetryRetryPolicy, this::writeRetryEvent); } @Override @@ -432,11 +469,17 @@ public class DynamoDBMetadataStore implements MetadataStore { if (tombstone) { Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem( new DDBPathMetadata(PathMetadata.tombstone(path))); - invoker.retry("Put tombstone", path.toString(), idempotent, + writeOp.retry( + "Put tombstone", + path.toString(), + idempotent, () -> table.putItem(item)); } else { PrimaryKey key = pathToKey(path); - invoker.retry("Delete key", path.toString(), idempotent, + writeOp.retry( + "Delete key", + path.toString(), + idempotent, () -> table.deleteItem(key)); } } @@ -460,28 +503,38 @@ public class DynamoDBMetadataStore implements MetadataStore { } } - @Retries.OnceRaw - private Item getConsistentItem(PrimaryKey key) { + /** + * Get a consistent view of an item. + * @param path path to look up in the database + * @param path entry + * @return the result + * @throws IOException failure + */ + @Retries.RetryTranslated + private Item getConsistentItem(final Path path) throws IOException { + PrimaryKey key = pathToKey(path); final GetItemSpec spec = new GetItemSpec() .withPrimaryKey(key) .withConsistentRead(true); // strictly consistent read - return table.getItem(spec); + return readOp.retry("get", + path.toString(), + true, + () -> table.getItem(spec)); } @Override - @Retries.OnceTranslated + @Retries.RetryTranslated public DDBPathMetadata get(Path path) throws IOException { return get(path, false); } @Override - @Retries.OnceTranslated + @Retries.RetryTranslated public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag) throws IOException { checkPath(path); LOG.debug("Get from table {} in region {}: {}", tableName, region, path); - return Invoker.once("get", path.toString(), - () -> innerGet(path, wantEmptyDirectoryFlag)); + return innerGet(path, wantEmptyDirectoryFlag); } /** @@ -491,9 +544,8 @@ public class DynamoDBMetadataStore implements MetadataStore { * MetadataStore that it should try to compute the empty directory flag. * @return metadata for {@code path}, {@code null} if not found * @throws IOException IO problem - * @throws AmazonClientException dynamo DB level problem */ - @Retries.OnceRaw + @Retries.RetryTranslated private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag) throws IOException { final DDBPathMetadata meta; @@ -502,7 +554,7 @@ public class DynamoDBMetadataStore implements MetadataStore { meta = new DDBPathMetadata(makeDirStatus(username, path)); } else { - final Item item = getConsistentItem(pathToKey(path)); + final Item item = getConsistentItem(path); meta = itemToPathMetadata(item, username); LOG.debug("Get from table {} in region {} returning for {}: {}", tableName, region, path, meta); @@ -517,8 +569,10 @@ public class DynamoDBMetadataStore implements MetadataStore { .withConsistentRead(true) .withFilterExpression(IS_DELETED + " = :false") .withValueMap(deleteTrackingValueMap); - final ItemCollection items = table.query(spec); - boolean hasChildren = items.iterator().hasNext(); + boolean hasChildren = readOp.retry("get/hasChildren", + path.toString(), + true, + () -> table.query(spec).iterator().hasNext()); // When this class has support for authoritative // (fully-cached) directory listings, we may also be able to answer // TRUE here. Until then, we don't know if we have full listing or @@ -545,13 +599,16 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override - @Retries.OnceTranslated + @Retries.RetryTranslated public DirListingMetadata listChildren(final Path path) throws IOException { checkPath(path); LOG.debug("Listing table {} in region {}: {}", tableName, region, path); // find the children in the table - return Invoker.once("listChildren", path.toString(), + return readOp.retry( + "listChildren", + path.toString(), + true, () -> { final QuerySpec spec = new QuerySpec() .withHashKey(pathToParentKeyAttribute(path)) @@ -610,7 +667,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override - @Retries.OnceTranslated + @Retries.RetryTranslated public void move(Collection pathsToDelete, Collection pathsToCreate) throws IOException { if (pathsToDelete == null && pathsToCreate == null) { @@ -639,25 +696,25 @@ public class DynamoDBMetadataStore implements MetadataStore { } } - Invoker.once("move", tableName, - () -> processBatchWriteRequest(null, pathMetadataToItem(newItems))); + processBatchWriteRequest(null, pathMetadataToItem(newItems)); } /** * Helper method to issue a batch write request to DynamoDB. * - * The retry logic here is limited to repeating the write operations - * until all items have been written; there is no other attempt - * at recovery/retry. Throttling is handled internally. + * As well as retrying on the operation invocation, incomplete + * batches are retried until all have been deleted. * @param keysToDelete primary keys to be deleted; can be null * @param itemsToPut new items to be put; can be null + * @return the number of iterations needed to complete the call. */ - @Retries.OnceRaw("Outstanding batch items are updated with backoff") - private void processBatchWriteRequest(PrimaryKey[] keysToDelete, + @Retries.RetryTranslated("Outstanding batch items are updated with backoff") + private int processBatchWriteRequest(PrimaryKey[] keysToDelete, Item[] itemsToPut) throws IOException { final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length); final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length); int count = 0; + int batches = 0; while (count < totalToDelete + totalToPut) { final TableWriteItems writeItems = new TableWriteItems(tableName); int numToDelete = 0; @@ -682,35 +739,66 @@ public class DynamoDBMetadataStore implements MetadataStore { count += numToPut; } - BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems); + // if there's a retry and another process updates things then it's not + // quite idempotent, but this was the case anyway + batches++; + BatchWriteItemOutcome res = writeOp.retry( + "batch write", + "", + true, + () -> dynamoDB.batchWriteItem(writeItems)); // Check for unprocessed keys in case of exceeding provisioned throughput Map> unprocessed = res.getUnprocessedItems(); int retryCount = 0; while (!unprocessed.isEmpty()) { - retryBackoff(retryCount++); - res = dynamoDB.batchWriteItemUnprocessed(unprocessed); + batchWriteCapacityExceededEvents.incrementAndGet(); + batches++; + retryBackoffOnBatchWrite(retryCount++); + // use a different reference to keep the compiler quiet + final Map> upx = unprocessed; + res = writeOp.retry( + "batch write", + "", + true, + () -> dynamoDB.batchWriteItemUnprocessed(upx)); unprocessed = res.getUnprocessedItems(); } } + return batches; } /** * Put the current thread to sleep to implement exponential backoff * depending on retryCount. If max retries are exceeded, throws an * exception instead. + * * @param retryCount number of retries so far * @throws IOException when max retryCount is exceeded. */ - private void retryBackoff(int retryCount) throws IOException { + private void retryBackoffOnBatchWrite(int retryCount) throws IOException { try { // Our RetryPolicy ignores everything but retryCount here. - RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null, + RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry( + null, retryCount, 0, true); if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - throw new IOException( - String.format("Max retries exceeded (%d) for DynamoDB. This may be" - + " because write threshold of DynamoDB is set too low.", - retryCount)); + // Create an AWSServiceThrottledException, with a fake inner cause + // which we fill in to look like a real exception so + // error messages look sensible + AmazonServiceException cause = new AmazonServiceException( + "Throttling"); + cause.setServiceName("S3Guard"); + cause.setStatusCode(AWSServiceThrottledException.STATUS_CODE); + cause.setErrorCode(THROTTLING); // used in real AWS errors + cause.setErrorType(AmazonServiceException.ErrorType.Service); + cause.setErrorMessage(THROTTLING); + cause.setRequestId("n/a"); + throw new AWSServiceThrottledException( + String.format("Max retries during batch write exceeded" + + " (%d) for DynamoDB." + + HINT_DDB_IOPS_TOO_LOW, + retryCount), + cause); } else { LOG.debug("Sleeping {} msec before next retry", action.delayMillis); Thread.sleep(action.delayMillis); @@ -720,12 +808,12 @@ public class DynamoDBMetadataStore implements MetadataStore { } catch (IOException e) { throw e; } catch (Exception e) { - throw new IOException("Unexpected exception", e); + throw new IOException("Unexpected exception " + e, e); } } @Override - @Retries.OnceRaw + @Retries.RetryTranslated public void put(PathMetadata meta) throws IOException { // For a deeply nested path, this method will automatically create the full // ancestry and save respective item in DynamoDB table. @@ -741,7 +829,7 @@ public class DynamoDBMetadataStore implements MetadataStore { } @Override - @Retries.OnceRaw + @Retries.RetryTranslated public void put(Collection metas) throws IOException { innerPut(pathMetaToDDBPathMeta(metas)); } @@ -757,8 +845,9 @@ public class DynamoDBMetadataStore implements MetadataStore { /** * Helper method to get full path of ancestors that are nonexistent in table. */ - @Retries.OnceRaw - private Collection fullPathsToPut(DDBPathMetadata meta) + @VisibleForTesting + @Retries.RetryTranslated + Collection fullPathsToPut(DDBPathMetadata meta) throws IOException { checkPathMetadata(meta); final Collection metasToPut = new ArrayList<>(); @@ -771,7 +860,7 @@ public class DynamoDBMetadataStore implements MetadataStore { // first existent ancestor Path path = meta.getFileStatus().getPath().getParent(); while (path != null && !path.isRoot()) { - final Item item = getConsistentItem(pathToKey(path)); + final Item item = getConsistentItem(path); if (!itemExists(item)) { final FileStatus status = makeDirStatus(path, username); metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false, @@ -810,7 +899,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * @throws IOException IO problem */ @Override - @Retries.OnceTranslated("retry(listFullPaths); once(batchWrite)") + @Retries.RetryTranslated public void put(DirListingMetadata meta) throws IOException { LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta); @@ -821,15 +910,12 @@ public class DynamoDBMetadataStore implements MetadataStore { false, meta.isAuthoritative()); // First add any missing ancestors... - final Collection metasToPut = invoker.retry( - "paths to put", path.toString(), true, - () -> fullPathsToPut(ddbPathMeta)); + final Collection metasToPut = fullPathsToPut(ddbPathMeta); // next add all children of the directory metasToPut.addAll(pathMetaToDDBPathMeta(meta.getListing())); - Invoker.once("put", path.toString(), - () -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut))); + processBatchWriteRequest(null, pathMetadataToItem(metasToPut)); } @Override @@ -847,10 +933,10 @@ public class DynamoDBMetadataStore implements MetadataStore { closeAutocloseables(LOG, credentials); credentials = null; } -} + } @Override - @Retries.OnceTranslated + @Retries.RetryTranslated public void destroy() throws IOException { if (table == null) { LOG.info("In destroy(): no table to delete"); @@ -859,10 +945,11 @@ public class DynamoDBMetadataStore implements MetadataStore { LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); try { - table.delete(); + invoker.retry("delete", null, true, + () -> table.delete()); table.waitForDelete(); - } catch (ResourceNotFoundException rnfe) { - LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in " + } catch (FileNotFoundException rnfe) { + LOG.info("FileNotFoundException while deleting DynamoDB table {} in " + "region {}. This may indicate that the table does not exist, " + "or has been deleted by another concurrent thread or process.", tableName, region); @@ -872,39 +959,50 @@ public class DynamoDBMetadataStore implements MetadataStore { tableName, ie); throw new InterruptedIOException("Table " + tableName + " in region " + region + " has not been deleted"); - } catch (AmazonClientException e) { - throw translateException("destroy", tableName, e); } } - @Retries.OnceRaw + @Retries.RetryTranslated private ItemCollection expiredFiles(long modTime, - String keyPrefix) { + String keyPrefix) throws IOException { String filterExpression = "mod_time < :mod_time and begins_with(parent, :parent)"; String projectionExpression = "parent,child"; ValueMap map = new ValueMap() .withLong(":mod_time", modTime) .withString(":parent", keyPrefix); - return table.scan(filterExpression, projectionExpression, null, map); + return readOp.retry( + "scan", + keyPrefix, + true, + () -> table.scan(filterExpression, projectionExpression, null, map)); } @Override - @Retries.OnceRaw("once(batchWrite)") + @Retries.RetryTranslated public void prune(long modTime) throws IOException { prune(modTime, "/"); } + /** + * Prune files, in batches. There's a sleep between each batch. + * @param modTime Oldest modification time to allow + * @param keyPrefix The prefix for the keys that should be removed + * @throws IOException Any IO/DDB failure. + * @throws InterruptedIOException if the prune was interrupted + */ @Override - @Retries.OnceRaw("once(batchWrite)") + @Retries.RetryTranslated public void prune(long modTime, String keyPrefix) throws IOException { int itemCount = 0; try { Collection deletionBatch = new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT); - int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, - S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT); - Set parentPathSet = new HashSet<>(); + long delay = conf.getTimeDuration( + S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, + S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT, + TimeUnit.MILLISECONDS); + Set parentPathSet = new HashSet<>(); for (Item item : expiredFiles(modTime, keyPrefix)) { DDBPathMetadata md = PathMetadataDynamoDBTranslation .itemToPathMetadata(item, username); @@ -929,7 +1027,8 @@ public class DynamoDBMetadataStore implements MetadataStore { deletionBatch.clear(); } } - if (deletionBatch.size() > 0) { + // final batch of deletes + if (!deletionBatch.isEmpty()) { Thread.sleep(delay); processBatchWriteRequest(pathToKey(deletionBatch), null); @@ -1093,19 +1192,34 @@ public class DynamoDBMetadataStore implements MetadataStore { * Get the version mark item in the existing DynamoDB table. * * As the version marker item may be created by another concurrent thread or - * process, we sleep and retry a limited times before we fail to get it. - * This does not include handling any failure other than "item not found", - * so this method is tagged as "OnceRaw" + * process, we sleep and retry a limited number times if the lookup returns + * with a null value. + * DDB throttling is always retried. */ - @Retries.OnceRaw - private Item getVersionMarkerItem() throws IOException { + @VisibleForTesting + @Retries.RetryTranslated + Item getVersionMarkerItem() throws IOException { final PrimaryKey versionMarkerKey = createVersionMarkerPrimaryKey(VERSION_MARKER); int retryCount = 0; - Item versionMarker = table.getItem(versionMarkerKey); + // look for a version marker, with usual throttling/failure retries. + Item versionMarker = queryVersionMarker(versionMarkerKey); while (versionMarker == null) { + // The marker was null. + // Two possibilities + // 1. This isn't a S3Guard table. + // 2. This is a S3Guard table in construction; another thread/process + // is about to write/actively writing the version marker. + // So that state #2 is handled, batchWriteRetryPolicy is used to manage + // retries. + // This will mean that if the cause is actually #1, failure will not + // be immediate. As this will ultimately result in a failure to + // init S3Guard and the S3A FS, this isn't going to be a performance + // bottleneck -simply a slightly slower failure report than would otherwise + // be seen. + // "if your settings are broken, performance is not your main issue" try { - RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null, + RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null, retryCount, 0, true); if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { break; @@ -1114,14 +1228,29 @@ public class DynamoDBMetadataStore implements MetadataStore { Thread.sleep(action.delayMillis); } } catch (Exception e) { - throw new IOException("initTable: Unexpected exception", e); + throw new IOException("initTable: Unexpected exception " + e, e); } retryCount++; - versionMarker = table.getItem(versionMarkerKey); + versionMarker = queryVersionMarker(versionMarkerKey); } return versionMarker; } + /** + * Issue the query to get the version marker, with throttling for overloaded + * DDB tables. + * @param versionMarkerKey key to look up + * @return the marker + * @throws IOException failure + */ + @Retries.RetryTranslated + private Item queryVersionMarker(final PrimaryKey versionMarkerKey) + throws IOException { + return readOp.retry("getVersionMarkerItem", + VERSION_MARKER, true, + () -> table.getItem(versionMarkerKey)); + } + /** * Verify that a table version is compatible with this S3Guard client. * @param tableName name of the table (for error messages) @@ -1207,7 +1336,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * @return the outcome. */ @Retries.OnceRaw - PutItemOutcome putItem(Item item) { + private PutItemOutcome putItem(Item item) { LOG.debug("Putting item {}", item); return table.putItem(item); } @@ -1254,6 +1383,11 @@ public class DynamoDBMetadataStore implements MetadataStore { return region; } + @VisibleForTesting + public String getTableName() { + return tableName; + } + @VisibleForTesting DynamoDB getDynamoDB() { return dynamoDB; @@ -1312,8 +1446,8 @@ public class DynamoDBMetadataStore implements MetadataStore { } map.put("description", DESCRIPTION); map.put("region", region); - if (dataAccessRetryPolicy != null) { - map.put("retryPolicy", dataAccessRetryPolicy.toString()); + if (batchWriteRetryPolicy != null) { + map.put("retryPolicy", batchWriteRetryPolicy.toString()); } return map; } @@ -1368,6 +1502,38 @@ public class DynamoDBMetadataStore implements MetadataStore { } } + /** + * Callback on a read operation retried. + * @param text text of the operation + * @param ex exception + * @param attempts number of attempts + * @param idempotent is the method idempotent (this is assumed to be true) + */ + void readRetryEvent( + String text, + IOException ex, + int attempts, + boolean idempotent) { + readThrottleEvents.incrementAndGet(); + retryEvent(text, ex, attempts, true); + } + + /** + * Callback on a write operation retried. + * @param text text of the operation + * @param ex exception + * @param attempts number of attempts + * @param idempotent is the method idempotent (this is assumed to be true) + */ + void writeRetryEvent( + String text, + IOException ex, + int attempts, + boolean idempotent) { + writeThrottleEvents.incrementAndGet(); + retryEvent(text, ex, attempts, idempotent); + } + /** * Callback from {@link Invoker} when an operation is retried. * @param text text of the operation @@ -1410,4 +1576,31 @@ public class DynamoDBMetadataStore implements MetadataStore { } } + /** + * Get the count of read throttle events. + * @return the current count of read throttle events. + */ + @VisibleForTesting + public long getReadThrottleEventCount() { + return readThrottleEvents.get(); + } + + /** + * Get the count of write throttle events. + * @return the current count of write throttle events. + */ + @VisibleForTesting + public long getWriteThrottleEventCount() { + return writeThrottleEvents.get(); + } + + @VisibleForTesting + public long getBatchWriteCapacityExceededCount() { + return batchWriteCapacityExceededEvents.get(); + } + + @VisibleForTesting + public Invoker getInvoker() { + return invoker; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardDataAccessRetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardDataAccessRetryPolicy.java new file mode 100644 index 00000000000..915b94a0b7b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardDataAccessRetryPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ARetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry; + +/** + * A Retry policy whose throttling comes from the S3Guard config options. + */ +public class S3GuardDataAccessRetryPolicy extends S3ARetryPolicy { + + public S3GuardDataAccessRetryPolicy(final Configuration conf) { + super(conf); + } + + protected RetryPolicy createThrottleRetryPolicy(final Configuration conf) { + return exponentialBackoffRetry( + conf.getInt(S3GUARD_DDB_MAX_RETRIES, S3GUARD_DDB_MAX_RETRIES_DEFAULT), + conf.getTimeDuration(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, + S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index 6dd840adf33..a8c8d6cd2cd 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -298,8 +298,9 @@ rates. ``` -Attempting to perform more IO than the capacity requested simply throttles the -IO; small capacity numbers are recommended when initially experimenting +Attempting to perform more IO than the capacity requested throttles the +IO, and may result in operations failing. Larger IO capacities cost more. +We recommending using small read and write capacities when initially experimenting with S3Guard. ## Authenticating with S3Guard @@ -327,7 +328,7 @@ to the options `fs.s3a.KEY` *for that bucket only*. As an example, here is a configuration to use different metadata stores and tables for different buckets -First, we define shortcuts for the metadata store classnames +First, we define shortcuts for the metadata store classnames: ```xml @@ -343,7 +344,7 @@ First, we define shortcuts for the metadata store classnames ``` Next, Amazon's public landsat database is configured with no -metadata store +metadata store: ```xml @@ -355,7 +356,7 @@ metadata store ``` Next the `ireland-2` and `ireland-offline` buckets are configured with -DynamoDB as the store, and a shared table `production-table` +DynamoDB as the store, and a shared table `production-table`: ```xml @@ -716,10 +717,10 @@ Metadata Store Diagnostics: ``` After the update, the table status changes to `UPDATING`; this is a sign that -the capacity has been changed +the capacity has been changed. Repeating the same command will not change the capacity, as both read and -write values match that already in use +write values match that already in use. ``` 2017-08-30 16:24:35,337 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20 @@ -736,6 +737,9 @@ Metadata Store Diagnostics: write-capacity=20 ``` +*Note*: There is a limit to how many times in a 24 hour period the capacity +of a bucket can be changed, either through this command or the AWS console. + ## Debugging and Error Handling If you run into network connectivity issues, or have a machine failure in the @@ -817,6 +821,97 @@ are only made after successful file creation, deletion and rename, the store is *unlikely* to get out of sync, it is still something which merits more testing before it could be considered reliable. +## Managing DynamoDB IO Capacity + +DynamoDB is not only billed on use (data and IO requests), it is billed +on allocated IO Capacity. + +When an application makes more requests than +the allocated capacity permits, the request is rejected; it is up to +the calling application to detect when it is being so throttled and +react. S3Guard does this, but as a result: when the client is being +throttled, operations are slower. This capacity throttling is averaged +over a few minutes: a briefly overloaded table will not be throttled, +but the rate cannot be sustained. + +The load on a table isvisible in the AWS console: go to the +DynamoDB page for the table and select the "metrics" tab. +If the graphs of throttled read or write +requests show that a lot of throttling has taken place, then there is not +enough allocated capacity for the applications making use of the table. + +Similarly, if the capacity graphs show that the read or write loads are +low compared to the allocated capacities, then the table *may* be overprovisioned +for the current workload. + +The S3Guard connector to DynamoDB can be configured to make +multiple attempts to repeat a throttled request, with an exponential +backoff between them. + +The relevant settings for managing retries in the connector are: + +```xml + + + fs.s3a.s3guard.ddb.max.retries + 9 + + Max retries on throttled/incompleted DynamoDB operations + before giving up and throwing an IOException. + Each retry is delayed with an exponential + backoff timer which starts at 100 milliseconds and approximately + doubles each time. The minimum wait before throwing an exception is + sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1) + + + + + fs.s3a.s3guard.ddb.throttle.retry.interval + 100ms + + Initial interval to retry after a request is throttled events; + the back-off policy is exponential until the number of retries of + fs.s3a.s3guard.ddb.max.retries is reached. + + + + + fs.s3a.s3guard.ddb.background.sleep + 25ms + + Length (in milliseconds) of pause between each batch of deletes when + pruning metadata. Prevents prune operations (which can typically be low + priority background operations) from overly interfering with other I/O + operations. + + +``` + +Having a large value for `fs.s3a.s3guard.ddb.max.retries` will ensure +that clients of an overloaded table will not fail immediately. However +queries may be unexpectedly slow. + +If operations, especially directory operations, are slow, check the AWS +console. It is also possible to set up AWS alerts for capacity limits +being exceeded. + +[DynamoDB Auto Scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AutoScaling.html) +can automatically increase and decrease the allocated capacity. +This is good for keeping capacity high when needed, but avoiding large +bills when it is not. + +Experiments with S3Guard and DynamoDB Auto Scaling have shown that any Auto Scaling +operation will only take place after callers have been throttled for a period of +time. The clients will still need to be configured to retry when overloaded +until any extra capacity is allocated. Furthermore, as this retrying will +block the threads from performing other operations -including more IO, the +the autoscale may not scale fast enough. + +We recommend experimenting with this, based on usage information collected +from previous days, and and choosing a combination of +retry counts and an interval which allow for the clients to cope with +some throttling, but not to time out other applications. + ## Troubleshooting ### Error: `S3Guard table lacks version marker.` @@ -857,12 +952,49 @@ or the configuration is preventing S3Guard from finding the table. region as the bucket being used. 1. Create the table if necessary. + ### Error `"The level of configured provisioned throughput for the table was exceeded"` +``` +org.apache.hadoop.fs.s3a.AWSServiceThrottledException: listFiles on s3a://bucket/10/d1/d2/d3: +com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: +The level of configured provisioned throughput for the table was exceeded. +Consider increasing your provisioning level with the UpdateTable API. +(Service: AmazonDynamoDBv2; Status Code: 400; +Error Code: ProvisionedThroughputExceededException; +``` The IO load of clients of the (shared) DynamoDB table was exceeded. -Currently S3Guard doesn't do any throttling and retries here; the way to address -this is to increase capacity via the AWS console or the `set-capacity` command. +1. Increase the capacity of the DynamoDB table. +1. Increase the retry count and/or sleep time of S3Guard on throttle events. +1. Enable capacity autoscaling for the table in the AWS console. + +### Error `Max retries exceeded` + +The I/O load of clients of the (shared) DynamoDB table was exceeded, and +the number of attempts to retry the operation exceeded the configured amount. + +1. Increase the capacity of the DynamoDB table. +1. Increase the retry count and/or sleep time of S3Guard on throttle events. +1. Enable capacity autoscaling for the table in the AWS console. + + +### Error when running `set-capacity`: `org.apache.hadoop.fs.s3a.AWSServiceThrottledException: ProvisionTable` + +``` +org.apache.hadoop.fs.s3a.AWSServiceThrottledException: ProvisionTable on s3guard-example: +com.amazonaws.services.dynamodbv2.model.LimitExceededException: +Subscriber limit exceeded: Provisioned throughput decreases are limited within a given UTC day. +After the first 4 decreases, each subsequent decrease in the same UTC day can be performed at most once every 3600 seconds. +Number of decreases today: 6. +Last decrease at Wednesday, July 25, 2018 8:48:14 PM UTC. +Next decrease can be made at Wednesday, July 25, 2018 9:48:14 PM UTC +``` + +There's are limit on how often you can change the capacity of an DynamoDB table; +if you call set-capacity too often, it fails. Wait until the after the time indicated +and try again. + ## Other Topics diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 280636b329b..31d3a5f3af8 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -742,6 +742,54 @@ sequential one afterwards. The IO heavy ones must also be subclasses of This is invaluable for debugging test failures. +### Keeping AWS Costs down + +Most of the base S3 tests are designed to use public AWS data +(the landsat-pds bucket) for read IO, so you don't have to pay for bytes +downloaded or long term storage costs. The scale tests do work with more data +so will cost more as well as generally take more time to execute. + +You are however billed for + +1. Data left in S3 after test runs. +2. DynamoDB capacity reserved by S3Guard tables. +3. HTTP operations on files (HEAD, LIST, GET). +4. In-progress multipart uploads from bulk IO or S3A committer tests. +5. Encryption/decryption using AWS KMS keys. + +The GET/decrypt costs are incurred on each partial read of a file, +so random IO can cost more than sequential IO; the speedup of queries with +columnar data usually justifies this. + +The DynamoDB costs come from the number of entries stores and the allocated capacity. + +How to keep costs down + +* Don't run the scale tests with large datasets; keep `fs.s3a.scale.test.huge.filesize` unset, or a few MB (minimum: 5). +* Remove all files in the filesystem. The root tests usually do this, but +it can be manually done: + + hadoop fs -rm -r -f -skipTrash s3a://test-bucket/\* +* Abort all outstanding uploads: + + hadoop s3guard uploads -abort -force s3a://test-bucket/ +* If you don't need it, destroy the S3Guard DDB table. + + hadoop s3guard destroy s3a://hwdev-steve-ireland-new/ + +The S3Guard tests will automatically create the Dynamo DB table in runs with +`-Ds3guard -Ddynamodb` set; default capacity of these buckets +tests is very small; it keeps costs down at the expense of IO performance +and, for test runs in or near the S3/DDB stores, throttling events. + +If you want to manage capacity, use `s3guard set-capacity` to increase it +(performance) or decrease it (costs). +For remote `hadoop-aws` test runs, the read/write capacities of "10" each should suffice; +increase it if parallel test run logs warn of throttling. + +Tip: for agility, use DynamoDB autoscaling, setting the minimum to something very low (e.g 5 units), the maximum to the largest amount you are willing to pay. +This will automatically reduce capacity when you are not running tests against +the bucket, slowly increase it over multiple test runs, if the load justifies it. ## Tips @@ -985,9 +1033,13 @@ are included in the scale tests executed when `-Dscale` is passed to the maven command line. The two S3Guard scale tests are `ITestDynamoDBMetadataStoreScale` and -`ITestLocalMetadataStoreScale`. To run the DynamoDB test, you will need to -define your table name and region in your test configuration. For example, -the following settings allow us to run `ITestDynamoDBMetadataStoreScale` with +`ITestLocalMetadataStoreScale`. + +To run these tests, your DynamoDB table needs to be of limited capacity; +the values in `ITestDynamoDBMetadataStoreScale` currently require a read capacity +of 10 or less. a write capacity of 15 or more. + +The following settings allow us to run `ITestDynamoDBMetadataStoreScale` with artificially low read and write capacity provisioned, so we can judge the effects of being throttled by the DynamoDB service: @@ -1008,24 +1060,49 @@ effects of being throttled by the DynamoDB service: fs.s3a.s3guard.ddb.table my-scale-test - - fs.s3a.s3guard.ddb.region - us-west-2 - fs.s3a.s3guard.ddb.table.create true fs.s3a.s3guard.ddb.table.capacity.read - 10 + 5 fs.s3a.s3guard.ddb.table.capacity.write - 10 + 5 ``` +These tests verify that the invoked operations can trigger retries in the +S3Guard code, rather than just in the AWS SDK level, so showing that if +SDK operations fail, they get retried. They also verify that the filesystem +statistics are updated to record that throttling took place. + +*Do not panic if these tests fail to detect throttling!* + +These tests are unreliable as they need certain conditions to be met +to repeatedly fail: + +1. You must have a low-enough latency connection to the DynamoDB store that, +for the capacity allocated, you can overload it. +1. The AWS Console can give you a view of what is happening here. +1. Running a single test on its own is less likely to trigger an overload +than trying to run the whole test suite. +1. And running the test suite more than once, back-to-back, can also help +overload the cluster. +1. Stepping through with a debugger will reduce load, so may not trigger +failures. + +If the tests fail, it *probably* just means you aren't putting enough load +on the table. + +These tests do not verify that the entire set of DynamoDB calls made +during the use of a S3Guarded S3A filesystem are wrapped by retry logic. + +*The best way to verify resilience is to run the entire `hadoop-aws` test suite, +or even a real application, with throttling enabled. + ### Testing only: Local Metadata Store There is an in-memory Metadata Store for testing. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java index 27af23aa0cc..46d6ffc85e0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -54,6 +54,11 @@ public class ITestS3AFileSystemContract extends FileSystemContractBaseTest { Thread.currentThread().setName("JUnit-" + methodName.getMethodName()); } + @Override + protected int getGlobalTimeout() { + return S3ATestConstants.S3A_TEST_TIMEOUT; + } + @Before public void setUp() throws Exception { nameThread(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 00db04a7123..a8425bfe63a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -95,7 +95,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { private static DynamoDBMetadataStore ddbmsStatic; - private static String TEST_DYNAMODB_TABLE_NAME; + private static String testDynamoDBTableName; /** * Create a path under the test path provided by @@ -113,8 +113,8 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { Configuration conf = prepareTestConfiguration(new Configuration()); assertThatDynamoMetadataStoreImpl(conf); Assume.assumeTrue("Test DynamoDB table name should be set to run " - + "integration tests.", TEST_DYNAMODB_TABLE_NAME != null); - conf.set(S3GUARD_DDB_TABLE_NAME_KEY, TEST_DYNAMODB_TABLE_NAME); + + "integration tests.", testDynamoDBTableName != null); + conf.set(S3GUARD_DDB_TABLE_NAME_KEY, testDynamoDBTableName); s3AContract = new S3AContract(conf); s3AContract.init(); @@ -140,10 +140,10 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { public static void beforeClassSetup() throws IOException { Configuration conf = prepareTestConfiguration(new Configuration()); assertThatDynamoMetadataStoreImpl(conf); - TEST_DYNAMODB_TABLE_NAME = conf.get(S3GUARD_DDB_TEST_TABLE_NAME_KEY); + testDynamoDBTableName = conf.get(S3GUARD_DDB_TEST_TABLE_NAME_KEY); Assume.assumeTrue("Test DynamoDB table name should be set to run " - + "integration tests.", TEST_DYNAMODB_TABLE_NAME != null); - conf.set(S3GUARD_DDB_TABLE_NAME_KEY, TEST_DYNAMODB_TABLE_NAME); + + "integration tests.", testDynamoDBTableName != null); + conf.set(S3GUARD_DDB_TABLE_NAME_KEY, testDynamoDBTableName); LOG.debug("Creating static ddbms which will be shared between tests."); ddbmsStatic = new DynamoDBMetadataStore(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index 02a896653a6..48dbce98a77 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -18,46 +18,176 @@ package org.apache.hadoop.fs.s3a.s3guard; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import javax.annotation.Nullable; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.internal.AssumptionViolatedException; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; -import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; import static org.junit.Assume.*; /** * Scale test for DynamoDBMetadataStore. + * + * The throttle tests aren't quite trying to verify that throttling can + * be recovered from, because that makes for very slow tests: you have + * to overload the system and them have them back of until they finally complete. + * Instead */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ITestDynamoDBMetadataStoreScale extends AbstractITestS3AMetadataStoreScale { - private static final long BATCH_SIZE = 25; - private static final long SMALL_IO_UNITS = BATCH_SIZE / 4; + private static final Logger LOG = LoggerFactory.getLogger( + ITestDynamoDBMetadataStoreScale.class); + private static final long BATCH_SIZE = 25; + + /** + * IO Units for batch size; this sets the size to use for IO capacity. + * Value: {@value}. + */ + private static final long MAXIMUM_READ_CAPACITY = 10; + private static final long MAXIMUM_WRITE_CAPACITY = 15; + + private DynamoDBMetadataStore ddbms; + + private DynamoDB ddb; + + private Table table; + + private String tableName; + + /** was the provisioning changed in test_001_limitCapacity()? */ + private boolean isOverProvisionedForTest; + + private ProvisionedThroughputDescription originalCapacity; + + private static final int THREADS = 40; + + private static final int OPERATIONS_PER_THREAD = 50; + + /** + * Create the metadata store. The table and region are determined from + * the attributes of the FS used in the tests. + * @return a new metadata store instance + * @throws IOException failure to instantiate + * @throws AssumptionViolatedException if the FS isn't running S3Guard + DDB/ + */ @Override public MetadataStore createMetadataStore() throws IOException { - Configuration conf = getFileSystem().getConf(); - String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY); - assumeNotNull("DynamoDB table is configured", ddbTable); - String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY); - assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint); + S3AFileSystem fs = getFileSystem(); + assumeTrue("S3Guard is disabled for " + fs.getUri(), + fs.hasMetadataStore()); + MetadataStore store = fs.getMetadataStore(); + assumeTrue("Metadata store for " + fs.getUri() + " is " + store + + " -not DynamoDBMetadataStore", + store instanceof DynamoDBMetadataStore); + + DynamoDBMetadataStore fsStore = (DynamoDBMetadataStore) store; + Configuration conf = new Configuration(fs.getConf()); + + tableName = fsStore.getTableName(); + assertTrue("Null/Empty tablename in " + fsStore, + StringUtils.isNotEmpty(tableName)); + String region = fsStore.getRegion(); + assertTrue("Null/Empty region in " + fsStore, + StringUtils.isNotEmpty(region)); + // create a new metastore configured to fail fast if throttling + // happens. + conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName); + conf.set(S3GUARD_DDB_REGION_KEY, region); + conf.set(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, "50ms"); + conf.set(S3GUARD_DDB_MAX_RETRIES, "2"); + conf.set(MAX_ERROR_RETRIES, "1"); + conf.set(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, "5ms"); DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); - ms.initialize(getFileSystem().getConf()); + ms.initialize(conf); + // wire up the owner FS so that we can make assertions about throttle + // events + ms.bindToOwnerFilesystem(fs); return ms; } + @Override + public void setup() throws Exception { + super.setup(); + ddbms = (DynamoDBMetadataStore) createMetadataStore(); + tableName = ddbms.getTableName(); + assertNotNull("table has no name", tableName); + ddb = ddbms.getDynamoDB(); + table = ddb.getTable(tableName); + originalCapacity = table.describe().getProvisionedThroughput(); + + // If you set the same provisioned I/O as already set it throws an + // exception, avoid that. + isOverProvisionedForTest = ( + originalCapacity.getReadCapacityUnits() > MAXIMUM_READ_CAPACITY + || originalCapacity.getWriteCapacityUnits() > MAXIMUM_WRITE_CAPACITY); + assumeFalse("Table has too much capacity: " + originalCapacity.toString(), + isOverProvisionedForTest); + } + + @Override + public void teardown() throws Exception { + IOUtils.cleanupWithLogger(LOG, ddbms); + super.teardown(); + } + + /** + * The subclass expects the superclass to be throttled; sometimes it is. + */ + @Test + @Override + public void test_020_Moves() throws Throwable { + ThrottleTracker tracker = new ThrottleTracker(); + try { + // if this doesn't throttle, all is well. + super.test_020_Moves(); + } catch (AWSServiceThrottledException ex) { + // if the service was throttled, we ex;ect the exception text + GenericTestUtils.assertExceptionContains( + DynamoDBMetadataStore.HINT_DDB_IOPS_TOO_LOW, + ex, + "Expected throttling message"); + } finally { + LOG.info("Statistics {}", tracker); + } + } /** * Though the AWS SDK claims in documentation to handle retries and @@ -70,92 +200,298 @@ public class ITestDynamoDBMetadataStoreScale * correctly, retrying w/ smaller batch instead of surfacing exceptions. */ @Test - public void testBatchedWriteExceedsProvisioned() throws Exception { + public void test_030_BatchedWrite() throws Exception { - final long iterations = 5; - boolean isProvisionedChanged; - List toCleanup = new ArrayList<>(); + final int iterations = 15; + final ArrayList toCleanup = new ArrayList<>(); + toCleanup.ensureCapacity(THREADS * iterations); // Fail if someone changes a constant we depend on assertTrue("Maximum batch size must big enough to run this test", S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE); - try (DynamoDBMetadataStore ddbms = - (DynamoDBMetadataStore)createMetadataStore()) { - DynamoDB ddb = ddbms.getDynamoDB(); - String tableName = ddbms.getTable().getTableName(); - final ProvisionedThroughputDescription existing = - ddb.getTable(tableName).describe().getProvisionedThroughput(); + // We know the dynamodb metadata store will expand a put of a path + // of depth N into a batch of N writes (all ancestors are written + // separately up to the root). (Ab)use this for an easy way to write + // a batch of stuff that is bigger than the provisioned write units + try { + describe("Running %d iterations of batched put, size %d", iterations, + BATCH_SIZE); - // If you set the same provisioned I/O as already set it throws an - // exception, avoid that. - isProvisionedChanged = (existing.getReadCapacityUnits() != SMALL_IO_UNITS - || existing.getWriteCapacityUnits() != SMALL_IO_UNITS); + ThrottleTracker result = execute("prune", + 1, + true, + () -> { + ThrottleTracker tracker = new ThrottleTracker(); + long pruneItems = 0; + for (long i = 0; i < iterations; i++) { + Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i)); + FileStatus status = basicFileStatus(longPath, 0, false, 12345, + 12345); + PathMetadata pm = new PathMetadata(status); + synchronized (toCleanup) { + toCleanup.add(pm); + } - if (isProvisionedChanged) { - // Set low provisioned I/O for dynamodb - describe("Provisioning dynamo tbl %s read/write -> %d/%d", tableName, - SMALL_IO_UNITS, SMALL_IO_UNITS); - // Blocks to ensure table is back to ready state before we proceed - ddbms.provisionTableBlocking(SMALL_IO_UNITS, SMALL_IO_UNITS); - } else { - describe("Skipping provisioning table I/O, already %d/%d", - SMALL_IO_UNITS, SMALL_IO_UNITS); - } + ddbms.put(pm); - try { - // We know the dynamodb metadata store will expand a put of a path - // of depth N into a batch of N writes (all ancestors are written - // separately up to the root). (Ab)use this for an easy way to write - // a batch of stuff that is bigger than the provisioned write units - try { - describe("Running %d iterations of batched put, size %d", iterations, - BATCH_SIZE); - long pruneItems = 0; - for (long i = 0; i < iterations; i++) { - Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i)); - FileStatus status = basicFileStatus(longPath, 0, false, 12345, - 12345); - PathMetadata pm = new PathMetadata(status); + pruneItems++; - ddbms.put(pm); - toCleanup.add(pm); - pruneItems++; - // Having hard time reproducing Exceeded exception with put, also - // try occasional prune, which was the only stack trace I've seen - // (on JIRA) - if (pruneItems == BATCH_SIZE) { - describe("pruning files"); - ddbms.prune(Long.MAX_VALUE /* all files */); - pruneItems = 0; + if (pruneItems == BATCH_SIZE) { + describe("pruning files"); + ddbms.prune(Long.MAX_VALUE /* all files */); + pruneItems = 0; + } + if (tracker.probe()) { + // fail fast + break; + } } - } - } finally { - describe("Cleaning up table %s", tableName); - for (PathMetadata pm : toCleanup) { - cleanupMetadata(ddbms, pm); - } - } - } finally { - if (isProvisionedChanged) { - long write = existing.getWriteCapacityUnits(); - long read = existing.getReadCapacityUnits(); - describe("Restoring dynamo tbl %s read/write -> %d/%d", tableName, - read, write); - ddbms.provisionTableBlocking(existing.getReadCapacityUnits(), - existing.getWriteCapacityUnits()); - } + }); + assertNotEquals("No batch retries in " + result, + 0, result.batchThrottles); + } finally { + describe("Cleaning up table %s", tableName); + for (PathMetadata pm : toCleanup) { + cleanupMetadata(ddbms, pm); } } } - // Attempt do delete metadata, suppressing any errors - private void cleanupMetadata(MetadataStore ms, PathMetadata pm) { + /** + * Test Get throttling including using + * {@link MetadataStore#get(Path, boolean)}, + * as that stresses more of the code. + */ + @Test + public void test_040_get() throws Throwable { + // attempt to create many many get requests in parallel. + Path path = new Path("s3a://example.org/get"); + S3AFileStatus status = new S3AFileStatus(true, path, "alice"); + PathMetadata metadata = new PathMetadata(status); + ddbms.put(metadata); try { - ms.forgetMetadata(pm.getFileStatus().getPath()); + execute("get", + OPERATIONS_PER_THREAD, + true, + () -> ddbms.get(path, true) + ); + } finally { + retryingDelete(path); + } + } + + /** + * Ask for the version marker, which is where table init can be overloaded. + */ + @Test + public void test_050_getVersionMarkerItem() throws Throwable { + execute("get", + OPERATIONS_PER_THREAD * 2, + true, + () -> ddbms.getVersionMarkerItem() + ); + } + + /** + * Cleanup with an extra bit of retry logic around it, in case things + * are still over the limit. + * @param path path + */ + private void retryingDelete(final Path path) { + try { + ddbms.getInvoker().retry("Delete ", path.toString(), true, + () -> ddbms.delete(path)); + } catch (IOException e) { + LOG.warn("Failed to delete {}: ", path, e); + } + } + + @Test + public void test_060_list() throws Throwable { + // attempt to create many many get requests in parallel. + Path path = new Path("s3a://example.org/list"); + S3AFileStatus status = new S3AFileStatus(true, path, "alice"); + PathMetadata metadata = new PathMetadata(status); + ddbms.put(metadata); + try { + Path parent = path.getParent(); + execute("list", + OPERATIONS_PER_THREAD, + true, + () -> ddbms.listChildren(parent) + ); + } finally { + retryingDelete(path); + } + } + + @Test + public void test_070_putDirMarker() throws Throwable { + // attempt to create many many get requests in parallel. + Path path = new Path("s3a://example.org/putDirMarker"); + S3AFileStatus status = new S3AFileStatus(true, path, "alice"); + PathMetadata metadata = new PathMetadata(status); + ddbms.put(metadata); + DirListingMetadata children = ddbms.listChildren(path.getParent()); + try { + execute("list", + OPERATIONS_PER_THREAD, + true, + () -> ddbms.put(children) + ); + } finally { + retryingDelete(path); + } + } + + @Test + public void test_080_fullPathsToPut() throws Throwable { + // attempt to create many many get requests in parallel. + Path base = new Path("s3a://example.org/test_080_fullPathsToPut"); + Path child = new Path(base, "child"); + List pms = new ArrayList<>(); + ddbms.put(new PathMetadata(makeDirStatus(base))); + ddbms.put(new PathMetadata(makeDirStatus(child))); + ddbms.getInvoker().retry("set up directory tree", + base.toString(), + true, + () -> ddbms.put(pms)); + try { + DDBPathMetadata dirData = ddbms.get(child, true); + execute("list", + OPERATIONS_PER_THREAD, + true, + () -> ddbms.fullPathsToPut(dirData) + ); + } finally { + retryingDelete(base); + } + } + + @Test + public void test_900_instrumentation() throws Throwable { + describe("verify the owner FS gets updated after throttling events"); + // we rely on the FS being shared + S3AFileSystem fs = getFileSystem(); + String fsSummary = fs.toString(); + + S3AStorageStatistics statistics = fs.getStorageStatistics(); + for (StorageStatistics.LongStatistic statistic : statistics) { + LOG.info("{}", statistic.toString()); + } + String retryKey = Statistic.S3GUARD_METADATASTORE_RETRY.getSymbol(); + assertTrue("No increment of " + retryKey + " in " + fsSummary, + statistics.getLong(retryKey) > 0); + String throttledKey = Statistic.S3GUARD_METADATASTORE_THROTTLED.getSymbol(); + assertTrue("No increment of " + throttledKey + " in " + fsSummary, + statistics.getLong(throttledKey) > 0); + } + + /** + * Execute a set of operations in parallel, collect throttling statistics + * and return them. + * This execution will complete as soon as throttling is detected. + * This ensures that the tests do not run for longer than they should. + * @param operation string for messages. + * @param operationsPerThread number of times per thread to invoke the action. + * @param expectThrottling is throttling expected (and to be asserted on?) + * @param action action to invoke. + * @return the throttle statistics + */ + public ThrottleTracker execute(String operation, + int operationsPerThread, + final boolean expectThrottling, + LambdaTestUtils.VoidCallable action) + throws Exception { + + final ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + final ThrottleTracker tracker = new ThrottleTracker(); + final ExecutorService executorService = Executors.newFixedThreadPool( + THREADS); + final List> tasks = new ArrayList<>(THREADS); + + final AtomicInteger throttleExceptions = new AtomicInteger(0); + for (int i = 0; i < THREADS; i++) { + tasks.add( + () -> { + final ExecutionOutcome outcome = new ExecutionOutcome(); + final ContractTestUtils.NanoTimer t + = new ContractTestUtils.NanoTimer(); + for (int j = 0; j < operationsPerThread; j++) { + if (tracker.isThrottlingDetected()) { + outcome.skipped = true; + return outcome; + } + try { + action.call(); + outcome.completed++; + } catch (AWSServiceThrottledException e) { + // this is possibly OK + LOG.info("Operation [{}] raised a throttled exception " + e, j, e); + LOG.debug(e.toString(), e); + throttleExceptions.incrementAndGet(); + // consider it completed + outcome.throttleExceptions.add(e); + outcome.throttled++; + } catch (Exception e) { + LOG.error("Failed to execute {}", operation, e); + outcome.exceptions.add(e); + break; + } + tracker.probe(); + } + LOG.info("Thread completed {} with in {} ms with outcome {}: {}", + operation, t.elapsedTimeMs(), outcome, tracker); + return outcome; + } + ); + } + final List> futures = + executorService.invokeAll(tasks, + getTestTimeoutMillis(), TimeUnit.MILLISECONDS); + long elapsedMs = timer.elapsedTimeMs(); + LOG.info("Completed {} with {}", operation, tracker); + LOG.info("time to execute: {} millis", elapsedMs); + + for (Future future : futures) { + assertTrue("Future timed out", future.isDone()); + } + tracker.probe(); + + if (expectThrottling) { + tracker.assertThrottlingDetected(); + } + for (Future future : futures) { + + ExecutionOutcome outcome = future.get(); + if (!outcome.exceptions.isEmpty()) { + throw outcome.exceptions.get(0); + } + if (!outcome.skipped) { + assertEquals("Future did not complete all operations", + operationsPerThread, outcome.completed + outcome.throttled); + } + } + + return tracker; + } + + /** + * Attempt to delete metadata, suppressing any errors, and retrying on + * throttle events just in case some are still surfacing. + * @param ms store + * @param pm path to clean up + */ + private void cleanupMetadata(MetadataStore ms, PathMetadata pm) { + Path path = pm.getFileStatus().getPath(); + try { + ddbms.getInvoker().retry("clean up", path.toString(), true, + () -> ms.forgetMetadata(path)); } catch (IOException ioe) { // Ignore. + LOG.info("Ignoring error while cleaning up {} in database", path, ioe); } } @@ -164,11 +500,114 @@ public class ITestDynamoDBMetadataStoreScale for (long i = 0; i < n; i++) { sb.append(i == 0 ? "/" + this.getClass().getSimpleName() : "lvl"); sb.append(i); - if (i == n-1 && fileSuffix != null) { + if (i == n - 1 && fileSuffix != null) { sb.append(fileSuffix); } sb.append("/"); } return new Path(getFileSystem().getUri().toString(), sb.toString()); } + + /** + * Something to track throttles. + * The constructor sets the counters to the current count in the + * DDB table; a call to {@link #reset()} will set it to the latest values. + * The {@link #probe()} will pick up the latest values to compare them with + * the original counts. + */ + private class ThrottleTracker { + + private long writeThrottleEventOrig = ddbms.getWriteThrottleEventCount(); + + private long readThrottleEventOrig = ddbms.getReadThrottleEventCount(); + + private long batchWriteThrottleCountOrig = + ddbms.getBatchWriteCapacityExceededCount(); + + private long readThrottles; + + private long writeThrottles; + + private long batchThrottles; + + ThrottleTracker() { + reset(); + } + + /** + * Reset the counters. + */ + private synchronized void reset() { + writeThrottleEventOrig + = ddbms.getWriteThrottleEventCount(); + + readThrottleEventOrig + = ddbms.getReadThrottleEventCount(); + + batchWriteThrottleCountOrig + = ddbms.getBatchWriteCapacityExceededCount(); + } + + /** + * Update the latest throttle count; synchronized. + * @return true if throttling has been detected. + */ + private synchronized boolean probe() { + readThrottles = ddbms.getReadThrottleEventCount() - readThrottleEventOrig; + writeThrottles = ddbms.getWriteThrottleEventCount() + - writeThrottleEventOrig; + batchThrottles = ddbms.getBatchWriteCapacityExceededCount() + - batchWriteThrottleCountOrig; + return isThrottlingDetected(); + } + + @Override + public String toString() { + return String.format( + "Tracker with read throttle events = %d;" + + " write events = %d;" + + " batch throttles = %d", + readThrottles, writeThrottles, batchThrottles); + } + + /** + * Assert that throttling has been detected. + */ + void assertThrottlingDetected() { + assertTrue("No throttling detected in " + this + + " against " + ddbms.toString(), + isThrottlingDetected()); + } + + /** + * Has there been any throttling on an operation? + * @return true iff read, write or batch operations were throttled. + */ + private boolean isThrottlingDetected() { + return readThrottles > 0 || writeThrottles > 0 || batchThrottles > 0; + } + } + + /** + * Outcome of a thread's execution operation. + */ + private static class ExecutionOutcome { + private int completed; + private int throttled; + private boolean skipped; + private final List exceptions = new ArrayList<>(1); + private final List throttleExceptions = new ArrayList<>(1); + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ExecutionOutcome{"); + sb.append("completed=").append(completed); + sb.append(", skipped=").append(skipped); + sb.append(", throttled=").append(throttled); + sb.append(", exception count=").append(exceptions.size()); + sb.append('}'); + return sb.toString(); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index 266e68e0696..66a823937ff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Table; @@ -275,38 +274,7 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase { // that call does not change the values original.checkEquals("unchanged", getCapacities()); - // now update the value - long readCap = original.getRead(); - long writeCap = original.getWrite(); - long rc2 = readCap + 1; - long wc2 = writeCap + 1; - Capacities desired = new Capacities(rc2, wc2); - capacityOut = exec(newSetCapacity(), - S3GuardTool.SetCapacity.NAME, - "-" + READ_FLAG, Long.toString(rc2), - "-" + WRITE_FLAG, Long.toString(wc2), - fsURI); - LOG.info("Set Capacity output=\n{}", capacityOut); - - // to avoid race conditions, spin for the state change - AtomicInteger c = new AtomicInteger(0); - LambdaTestUtils.eventually(60000, - new LambdaTestUtils.VoidCallable() { - @Override - public void call() throws Exception { - c.incrementAndGet(); - Map diags = getMetadataStore().getDiagnostics(); - Capacities updated = getCapacities(diags); - String tableInfo = String.format("[%02d] table state: %s", - c.intValue(), diags.get(STATUS)); - LOG.info("{}; capacities {}", - tableInfo, updated); - desired.checkEquals(tableInfo, updated); - } - }, - new LambdaTestUtils.ProportionalRetryInterval(500, 5000)); - - // Destroy MetadataStore + // Destroy MetadataStore Destroy destroyCmd = new Destroy(fs.getConf()); String destroyed = exec(destroyCmd, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java index 876cc8020d3..0e6a1d8d092 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java @@ -22,7 +22,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; + +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; * Could be separated from S3A code, but we're using the S3A scale test * framework for convenience. */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public abstract class AbstractITestS3AMetadataStoreScale extends S3AScaleTestBase { private static final Logger LOG = LoggerFactory.getLogger( @@ -60,7 +64,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends public abstract MetadataStore createMetadataStore() throws IOException; @Test - public void testPut() throws Throwable { + public void test_010_Put() throws Throwable { describe("Test workload of put() operations"); // As described in hadoop-aws site docs, count parameter is used for @@ -83,7 +87,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends } @Test - public void testMoves() throws Throwable { + public void test_020_Moves() throws Throwable { describe("Test workload of batched move() operations"); // As described in hadoop-aws site docs, count parameter is used for @@ -140,7 +144,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends * Create a copy of given list of PathMetadatas with the paths moved from * src to dest. */ - private List moveMetas(List metas, Path src, + protected List moveMetas(List metas, Path src, Path dest) throws IOException { List moved = new ArrayList<>(metas.size()); for (PathMetadata srcMeta : metas) { @@ -151,7 +155,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends return moved; } - private Path movePath(Path p, Path src, Path dest) { + protected Path movePath(Path p, Path src, Path dest) { String srcStr = src.toUri().getPath(); String pathStr = p.toUri().getPath(); // Strip off src dir @@ -160,7 +164,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends return new Path(dest, pathStr); } - private S3AFileStatus copyStatus(S3AFileStatus status) { + protected S3AFileStatus copyStatus(S3AFileStatus status) { if (status.isDirectory()) { return new S3AFileStatus(status.isEmptyDirectory(), status.getPath(), status.getOwner()); @@ -185,7 +189,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends return count; } - private void clearMetadataStore(MetadataStore ms, long count) + protected void clearMetadataStore(MetadataStore ms, long count) throws IOException { describe("Recursive deletion"); NanoTimer deleteTimer = new NanoTimer(); @@ -202,15 +206,15 @@ public abstract class AbstractITestS3AMetadataStoreScale extends msecPerOp, op, count)); } - private static S3AFileStatus makeFileStatus(Path path) throws IOException { + protected static S3AFileStatus makeFileStatus(Path path) throws IOException { return new S3AFileStatus(SIZE, ACCESS_TIME, path, BLOCK_SIZE, OWNER); } - private static S3AFileStatus makeDirStatus(Path p) throws IOException { + protected static S3AFileStatus makeDirStatus(Path p) throws IOException { return new S3AFileStatus(false, p, OWNER); } - private List metasToPaths(List metas) { + protected List metasToPaths(List metas) { List paths = new ArrayList<>(metas.size()); for (PathMetadata meta : metas) { paths.add(meta.getFileStatus().getPath()); @@ -225,7 +229,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends * @param width Number of files (and directories, if depth > 0) per directory. * @param paths List to add generated paths to. */ - private static void createDirTree(Path parent, int depth, int width, + protected static void createDirTree(Path parent, int depth, int width, Collection paths) throws IOException { // Create files diff --git a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml index b68f5593980..f3a47fe5c12 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml @@ -150,6 +150,16 @@ simple + + + fs.s3a.s3guard.ddb.table.capacity.read + 10 + + + fs.s3a.s3guard.ddb.table.capacity.write + 10 + +