HADOOP-15426 Make S3guard client resilient to DDB throttle events and network failures (Contributed by Steve Loughran)
This commit is contained in:
parent
d32a8d5d58
commit
d7c0a08a1c
|
@ -1447,19 +1447,28 @@
|
||||||
<name>fs.s3a.s3guard.ddb.max.retries</name>
|
<name>fs.s3a.s3guard.ddb.max.retries</name>
|
||||||
<value>9</value>
|
<value>9</value>
|
||||||
<description>
|
<description>
|
||||||
Max retries on batched DynamoDB operations before giving up and
|
Max retries on throttled/incompleted DynamoDB operations
|
||||||
throwing an IOException. Each retry is delayed with an exponential
|
before giving up and throwing an IOException.
|
||||||
|
Each retry is delayed with an exponential
|
||||||
backoff timer which starts at 100 milliseconds and approximately
|
backoff timer which starts at 100 milliseconds and approximately
|
||||||
doubles each time. The minimum wait before throwing an exception is
|
doubles each time. The minimum wait before throwing an exception is
|
||||||
sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
|
sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
|
||||||
So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking
|
</description>
|
||||||
before throwing an IOException.
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.throttle.retry.interval</name>
|
||||||
|
<value>100ms</value>
|
||||||
|
<description>
|
||||||
|
Initial interval to retry after a request is throttled events;
|
||||||
|
the back-off policy is exponential until the number of retries of
|
||||||
|
fs.s3a.s3guard.ddb.max.retries is reached.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.s3guard.ddb.background.sleep</name>
|
<name>fs.s3a.s3guard.ddb.background.sleep</name>
|
||||||
<value>25</value>
|
<value>25ms</value>
|
||||||
<description>
|
<description>
|
||||||
Length (in milliseconds) of pause between each batch of deletes when
|
Length (in milliseconds) of pause between each batch of deletes when
|
||||||
pruning metadata. Prevents prune operations (which can typically be low
|
pruning metadata. Prevents prune operations (which can typically be low
|
||||||
|
|
|
@ -184,6 +184,8 @@
|
||||||
<exclude>**/ITestS3AFileContextStatistics.java</exclude>
|
<exclude>**/ITestS3AFileContextStatistics.java</exclude>
|
||||||
<exclude>**/ITestS3AEncryptionSSEC*.java</exclude>
|
<exclude>**/ITestS3AEncryptionSSEC*.java</exclude>
|
||||||
<exclude>**/ITestS3AHuge*.java</exclude>
|
<exclude>**/ITestS3AHuge*.java</exclude>
|
||||||
|
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
|
||||||
|
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
@ -216,6 +218,8 @@
|
||||||
<include>**/ITestS3AFileContextStatistics.java</include>
|
<include>**/ITestS3AFileContextStatistics.java</include>
|
||||||
<include>**/ITestS3AHuge*.java</include>
|
<include>**/ITestS3AHuge*.java</include>
|
||||||
<include>**/ITestS3AEncryptionSSEC*.java</include>
|
<include>**/ITestS3AEncryptionSSEC*.java</include>
|
||||||
|
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
|
||||||
|
<include>**/ITestDynamoDBMetadataStoreScale.java</include>
|
||||||
</includes>
|
</includes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
|
|
@ -458,12 +458,20 @@ public final class Constants {
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public static final String S3GUARD_DDB_MAX_RETRIES =
|
public static final String S3GUARD_DDB_MAX_RETRIES =
|
||||||
"fs.s3a.s3guard.ddb.max.retries";
|
"fs.s3a.s3guard.ddb.max.retries";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Max retries on batched DynamoDB operations before giving up and
|
* Max retries on batched/throttled DynamoDB operations before giving up and
|
||||||
* throwing an IOException. Default is {@value}. See core-default.xml for
|
* throwing an IOException. Default is {@value}. See core-default.xml for
|
||||||
* more detail.
|
* more detail.
|
||||||
*/
|
*/
|
||||||
public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9;
|
public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT =
|
||||||
|
DEFAULT_MAX_ERROR_RETRIES;
|
||||||
|
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public static final String S3GUARD_DDB_THROTTLE_RETRY_INTERVAL =
|
||||||
|
"fs.s3a.s3guard.ddb.throttle.retry.interval";
|
||||||
|
public static final String S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT =
|
||||||
|
"100ms";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Period of time (in milliseconds) to sleep between batches of writes.
|
* Period of time (in milliseconds) to sleep between batches of writes.
|
||||||
|
|
|
@ -1131,6 +1131,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment a statistic by 1.
|
* Increment a statistic by 1.
|
||||||
|
* This increments both the instrumentation and storage statistics.
|
||||||
* @param statistic The operation to increment
|
* @param statistic The operation to increment
|
||||||
*/
|
*/
|
||||||
protected void incrementStatistic(Statistic statistic) {
|
protected void incrementStatistic(Statistic statistic) {
|
||||||
|
@ -1139,6 +1140,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment a statistic by a specific value.
|
* Increment a statistic by a specific value.
|
||||||
|
* This increments both the instrumentation and storage statistics.
|
||||||
* @param statistic The operation to increment
|
* @param statistic The operation to increment
|
||||||
* @param count the count to increment
|
* @param count the count to increment
|
||||||
*/
|
*/
|
||||||
|
@ -1175,8 +1177,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
Statistic stat = isThrottleException(ex)
|
Statistic stat = isThrottleException(ex)
|
||||||
? STORE_IO_THROTTLED
|
? STORE_IO_THROTTLED
|
||||||
: IGNORED_ERRORS;
|
: IGNORED_ERRORS;
|
||||||
instrumentation.incrementCounter(stat, 1);
|
incrementStatistic(stat);
|
||||||
storageStatistics.incrementCounter(stat, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1197,6 +1198,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
/**
|
/**
|
||||||
* Callback from {@link Invoker} when an operation against a metastore
|
* Callback from {@link Invoker} when an operation against a metastore
|
||||||
* is retried.
|
* is retried.
|
||||||
|
* Always increments the {@link Statistic#S3GUARD_METADATASTORE_RETRY}
|
||||||
|
* statistic/counter;
|
||||||
|
* if it is a throttling exception will update the associated
|
||||||
|
* throttled metrics/statistics.
|
||||||
|
*
|
||||||
* @param ex exception
|
* @param ex exception
|
||||||
* @param retries number of retries
|
* @param retries number of retries
|
||||||
* @param idempotent is the method idempotent
|
* @param idempotent is the method idempotent
|
||||||
|
@ -1205,6 +1211,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
||||||
int retries,
|
int retries,
|
||||||
boolean idempotent) {
|
boolean idempotent) {
|
||||||
operationRetried(ex);
|
operationRetried(ex);
|
||||||
|
incrementStatistic(S3GUARD_METADATASTORE_RETRY);
|
||||||
|
if (isThrottleException(ex)) {
|
||||||
|
incrementStatistic(S3GUARD_METADATASTORE_THROTTLED);
|
||||||
|
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1032,15 +1032,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
||||||
* Throttled request.
|
* Throttled request.
|
||||||
*/
|
*/
|
||||||
public void throttled() {
|
public void throttled() {
|
||||||
incrementCounter(S3GUARD_METADATASTORE_THROTTLED, 1);
|
// counters are incremented by owner.
|
||||||
addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* S3Guard is retrying after a (retryable) failure.
|
* S3Guard is retrying after a (retryable) failure.
|
||||||
*/
|
*/
|
||||||
public void retrying() {
|
public void retrying() {
|
||||||
incrementCounter(S3GUARD_METADATASTORE_RETRY, 1);
|
// counters are incremented by owner.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,12 +124,7 @@ public class S3ARetryPolicy implements RetryPolicy {
|
||||||
// and a separate policy for throttle requests, which are considered
|
// and a separate policy for throttle requests, which are considered
|
||||||
// repeatable, even for non-idempotent calls, as the service
|
// repeatable, even for non-idempotent calls, as the service
|
||||||
// rejected the call entirely
|
// rejected the call entirely
|
||||||
throttlePolicy = exponentialBackoffRetry(
|
throttlePolicy = createThrottleRetryPolicy(conf);
|
||||||
conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT),
|
|
||||||
conf.getTimeDuration(RETRY_THROTTLE_INTERVAL,
|
|
||||||
RETRY_THROTTLE_INTERVAL_DEFAULT,
|
|
||||||
TimeUnit.MILLISECONDS),
|
|
||||||
TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
// client connectivity: fixed retries without care for idempotency
|
// client connectivity: fixed retries without care for idempotency
|
||||||
connectivityFailure = fixedRetries;
|
connectivityFailure = fixedRetries;
|
||||||
|
@ -139,6 +134,22 @@ public class S3ARetryPolicy implements RetryPolicy {
|
||||||
retryPolicy = retryByException(retryIdempotentCalls, policyMap);
|
retryPolicy = retryByException(retryIdempotentCalls, policyMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the throttling policy.
|
||||||
|
* This will be called from the S3ARetryPolicy constructor, so
|
||||||
|
* subclasses must assume they are not initialized.
|
||||||
|
* @param conf configuration to use.
|
||||||
|
* @return the retry policy for throttling events.
|
||||||
|
*/
|
||||||
|
protected RetryPolicy createThrottleRetryPolicy(final Configuration conf) {
|
||||||
|
return exponentialBackoffRetry(
|
||||||
|
conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT),
|
||||||
|
conf.getTimeDuration(RETRY_THROTTLE_INTERVAL,
|
||||||
|
RETRY_THROTTLE_INTERVAL_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS),
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclasses can override this like a constructor to change behavior: call
|
* Subclasses can override this like a constructor to change behavior: call
|
||||||
* superclass method, then modify it as needed, and return it.
|
* superclass method, then modify it as needed, and return it.
|
||||||
|
@ -206,6 +217,7 @@ public class S3ARetryPolicy implements RetryPolicy {
|
||||||
int retries,
|
int retries,
|
||||||
int failovers,
|
int failovers,
|
||||||
boolean idempotent) throws Exception {
|
boolean idempotent) throws Exception {
|
||||||
|
Preconditions.checkArgument(exception != null, "Null exception");
|
||||||
Exception ex = exception;
|
Exception ex = exception;
|
||||||
if (exception instanceof AmazonClientException) {
|
if (exception instanceof AmazonClientException) {
|
||||||
// uprate the amazon client exception for the purpose of exception
|
// uprate the amazon client exception for the purpose of exception
|
||||||
|
|
|
@ -27,6 +27,7 @@ import com.amazonaws.SdkBaseException;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
|
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
|
||||||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||||
|
import com.amazonaws.retry.RetryUtils;
|
||||||
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
|
import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
|
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
|
||||||
|
@ -358,8 +359,10 @@ public final class S3AUtils {
|
||||||
/**
|
/**
|
||||||
* Is the exception an instance of a throttling exception. That
|
* Is the exception an instance of a throttling exception. That
|
||||||
* is an AmazonServiceException with a 503 response, any
|
* is an AmazonServiceException with a 503 response, any
|
||||||
* exception from DynamoDB for limits exceeded, or an
|
* exception from DynamoDB for limits exceeded, an
|
||||||
* {@link AWSServiceThrottledException}.
|
* {@link AWSServiceThrottledException},
|
||||||
|
* or anything which the AWS SDK's RetryUtils considers to be
|
||||||
|
* a throttling exception.
|
||||||
* @param ex exception to examine
|
* @param ex exception to examine
|
||||||
* @return true if it is considered a throttling exception
|
* @return true if it is considered a throttling exception
|
||||||
*/
|
*/
|
||||||
|
@ -368,7 +371,9 @@ public final class S3AUtils {
|
||||||
|| ex instanceof ProvisionedThroughputExceededException
|
|| ex instanceof ProvisionedThroughputExceededException
|
||||||
|| ex instanceof LimitExceededException
|
|| ex instanceof LimitExceededException
|
||||||
|| (ex instanceof AmazonServiceException
|
|| (ex instanceof AmazonServiceException
|
||||||
&& 503 == ((AmazonServiceException)ex).getStatusCode());
|
&& 503 == ((AmazonServiceException)ex).getStatusCode())
|
||||||
|
|| (ex instanceof SdkBaseException
|
||||||
|
&& RetryUtils.isThrottlingException((SdkBaseException) ex));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,10 +36,12 @@ import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
|
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
|
||||||
|
@ -77,12 +79,12 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
|
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
|
||||||
|
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
import org.apache.hadoop.fs.s3a.Invoker;
|
import org.apache.hadoop.fs.s3a.Invoker;
|
||||||
import org.apache.hadoop.fs.s3a.Retries;
|
import org.apache.hadoop.fs.s3a.Retries;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||||
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
|
|
||||||
import org.apache.hadoop.fs.s3a.S3AUtils;
|
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||||
import org.apache.hadoop.fs.s3a.Tristate;
|
import org.apache.hadoop.fs.s3a.Tristate;
|
||||||
import org.apache.hadoop.fs.s3a.auth.RolePolicies;
|
import org.apache.hadoop.fs.s3a.auth.RolePolicies;
|
||||||
|
@ -198,10 +200,6 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
public static final String E_INCOMPATIBLE_VERSION
|
public static final String E_INCOMPATIBLE_VERSION
|
||||||
= "Database table is from an incompatible S3Guard version.";
|
= "Database table is from an incompatible S3Guard version.";
|
||||||
|
|
||||||
/** Initial delay for retries when batched operations get throttled by
|
|
||||||
* DynamoDB. Value is {@value} msec. */
|
|
||||||
public static final long MIN_RETRY_SLEEP_MSEC = 100;
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String DESCRIPTION
|
static final String DESCRIPTION
|
||||||
= "S3Guard metadata store in DynamoDB";
|
= "S3Guard metadata store in DynamoDB";
|
||||||
|
@ -214,6 +212,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String TABLE = "table";
|
static final String TABLE = "table";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String HINT_DDB_IOPS_TOO_LOW
|
||||||
|
= " This may be because the write threshold of DynamoDB is set too low.";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final String THROTTLING = "Throttling";
|
||||||
|
|
||||||
private static ValueMap deleteTrackingValueMap =
|
private static ValueMap deleteTrackingValueMap =
|
||||||
new ValueMap().withBoolean(":false", false);
|
new ValueMap().withBoolean(":false", false);
|
||||||
|
|
||||||
|
@ -226,7 +231,14 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private String username;
|
private String username;
|
||||||
|
|
||||||
private RetryPolicy dataAccessRetryPolicy;
|
/**
|
||||||
|
* This policy is mostly for batched writes, not for processing
|
||||||
|
* exceptions in invoke() calls.
|
||||||
|
* It also has a role purpose in {@link #getVersionMarkerItem()};
|
||||||
|
* look at that method for the details.
|
||||||
|
*/
|
||||||
|
private RetryPolicy batchWriteRetryPolicy;
|
||||||
|
|
||||||
private S3AInstrumentation.S3GuardInstrumentation instrumentation;
|
private S3AInstrumentation.S3GuardInstrumentation instrumentation;
|
||||||
|
|
||||||
/** Owner FS: only valid if configured with an owner FS. */
|
/** Owner FS: only valid if configured with an owner FS. */
|
||||||
|
@ -237,8 +249,15 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
Invoker.NO_OP
|
Invoker.NO_OP
|
||||||
);
|
);
|
||||||
|
|
||||||
/** Data access can have its own policies. */
|
/** Invoker for read operations. */
|
||||||
private Invoker dataAccess;
|
private Invoker readOp;
|
||||||
|
|
||||||
|
/** Invoker for write operations. */
|
||||||
|
private Invoker writeOp;
|
||||||
|
|
||||||
|
private final AtomicLong readThrottleEvents = new AtomicLong(0);
|
||||||
|
private final AtomicLong writeThrottleEvents = new AtomicLong(0);
|
||||||
|
private final AtomicLong batchWriteCapacityExceededEvents = new AtomicLong(0);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Total limit on the number of throttle events after which
|
* Total limit on the number of throttle events after which
|
||||||
|
@ -292,10 +311,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
Preconditions.checkNotNull(fs, "Null filesystem");
|
Preconditions.checkNotNull(fs, "Null filesystem");
|
||||||
Preconditions.checkArgument(fs instanceof S3AFileSystem,
|
Preconditions.checkArgument(fs instanceof S3AFileSystem,
|
||||||
"DynamoDBMetadataStore only supports S3A filesystem.");
|
"DynamoDBMetadataStore only supports S3A filesystem.");
|
||||||
owner = (S3AFileSystem) fs;
|
bindToOwnerFilesystem((S3AFileSystem) fs);
|
||||||
instrumentation = owner.getInstrumentation().getS3GuardInstrumentation();
|
|
||||||
final String bucket = owner.getBucket();
|
final String bucket = owner.getBucket();
|
||||||
conf = owner.getConf();
|
|
||||||
String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
|
String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
|
||||||
if (!StringUtils.isEmpty(confRegion)) {
|
if (!StringUtils.isEmpty(confRegion)) {
|
||||||
region = confRegion;
|
region = confRegion;
|
||||||
|
@ -316,7 +333,6 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
|
LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
|
||||||
}
|
}
|
||||||
username = owner.getUsername();
|
|
||||||
credentials = owner.shareCredentials("s3guard");
|
credentials = owner.shareCredentials("s3guard");
|
||||||
dynamoDB = createDynamoDB(conf, region, bucket, credentials);
|
dynamoDB = createDynamoDB(conf, region, bucket, credentials);
|
||||||
|
|
||||||
|
@ -325,7 +341,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
initDataAccessRetries(conf);
|
initDataAccessRetries(conf);
|
||||||
|
|
||||||
// set up a full retry policy
|
// set up a full retry policy
|
||||||
invoker = new Invoker(new S3ARetryPolicy(conf),
|
invoker = new Invoker(new S3GuardDataAccessRetryPolicy(conf),
|
||||||
this::retryEvent
|
this::retryEvent
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -334,6 +350,20 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
instrumentation.initialized();
|
instrumentation.initialized();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Declare that this table is owned by the specific S3A FS instance.
|
||||||
|
* This will bind some fields to the values provided by the owner,
|
||||||
|
* including wiring up the instrumentation.
|
||||||
|
* @param fs owner filesystem
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void bindToOwnerFilesystem(final S3AFileSystem fs) {
|
||||||
|
owner = fs;
|
||||||
|
conf = owner.getConf();
|
||||||
|
instrumentation = owner.getInstrumentation().getS3GuardInstrumentation();
|
||||||
|
username = owner.getUsername();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs one-time initialization of the metadata store via configuration.
|
* Performs one-time initialization of the metadata store via configuration.
|
||||||
*
|
*
|
||||||
|
@ -382,16 +412,23 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
/**
|
/**
|
||||||
* Set retry policy. This is driven by the value of
|
* Set retry policy. This is driven by the value of
|
||||||
* {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
|
* {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
|
||||||
* between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds.
|
* between each attempt of {@link Constants#S3GUARD_DDB_THROTTLE_RETRY_INTERVAL}
|
||||||
|
* milliseconds.
|
||||||
* @param config configuration for data access
|
* @param config configuration for data access
|
||||||
*/
|
*/
|
||||||
private void initDataAccessRetries(Configuration config) {
|
private void initDataAccessRetries(Configuration config) {
|
||||||
int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
|
batchWriteRetryPolicy = RetryPolicies
|
||||||
S3GUARD_DDB_MAX_RETRIES_DEFAULT);
|
.exponentialBackoffRetry(
|
||||||
dataAccessRetryPolicy = RetryPolicies
|
config.getInt(S3GUARD_DDB_MAX_RETRIES,
|
||||||
.exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
|
S3GUARD_DDB_MAX_RETRIES_DEFAULT),
|
||||||
|
conf.getTimeDuration(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL,
|
||||||
|
S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS),
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
dataAccess = new Invoker(dataAccessRetryPolicy, this::retryEvent);
|
final RetryPolicy throttledRetryRetryPolicy
|
||||||
|
= new S3GuardDataAccessRetryPolicy(config);
|
||||||
|
readOp = new Invoker(throttledRetryRetryPolicy, this::readRetryEvent);
|
||||||
|
writeOp = new Invoker(throttledRetryRetryPolicy, this::writeRetryEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -432,11 +469,17 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
if (tombstone) {
|
if (tombstone) {
|
||||||
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
|
||||||
new DDBPathMetadata(PathMetadata.tombstone(path)));
|
new DDBPathMetadata(PathMetadata.tombstone(path)));
|
||||||
invoker.retry("Put tombstone", path.toString(), idempotent,
|
writeOp.retry(
|
||||||
|
"Put tombstone",
|
||||||
|
path.toString(),
|
||||||
|
idempotent,
|
||||||
() -> table.putItem(item));
|
() -> table.putItem(item));
|
||||||
} else {
|
} else {
|
||||||
PrimaryKey key = pathToKey(path);
|
PrimaryKey key = pathToKey(path);
|
||||||
invoker.retry("Delete key", path.toString(), idempotent,
|
writeOp.retry(
|
||||||
|
"Delete key",
|
||||||
|
path.toString(),
|
||||||
|
idempotent,
|
||||||
() -> table.deleteItem(key));
|
() -> table.deleteItem(key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -460,28 +503,38 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Retries.OnceRaw
|
/**
|
||||||
private Item getConsistentItem(PrimaryKey key) {
|
* Get a consistent view of an item.
|
||||||
|
* @param path path to look up in the database
|
||||||
|
* @param path entry
|
||||||
|
* @return the result
|
||||||
|
* @throws IOException failure
|
||||||
|
*/
|
||||||
|
@Retries.RetryTranslated
|
||||||
|
private Item getConsistentItem(final Path path) throws IOException {
|
||||||
|
PrimaryKey key = pathToKey(path);
|
||||||
final GetItemSpec spec = new GetItemSpec()
|
final GetItemSpec spec = new GetItemSpec()
|
||||||
.withPrimaryKey(key)
|
.withPrimaryKey(key)
|
||||||
.withConsistentRead(true); // strictly consistent read
|
.withConsistentRead(true); // strictly consistent read
|
||||||
return table.getItem(spec);
|
return readOp.retry("get",
|
||||||
|
path.toString(),
|
||||||
|
true,
|
||||||
|
() -> table.getItem(spec));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.RetryTranslated
|
||||||
public DDBPathMetadata get(Path path) throws IOException {
|
public DDBPathMetadata get(Path path) throws IOException {
|
||||||
return get(path, false);
|
return get(path, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.RetryTranslated
|
||||||
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkPath(path);
|
checkPath(path);
|
||||||
LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
|
LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
|
||||||
return Invoker.once("get", path.toString(),
|
return innerGet(path, wantEmptyDirectoryFlag);
|
||||||
() -> innerGet(path, wantEmptyDirectoryFlag));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -491,9 +544,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* MetadataStore that it should try to compute the empty directory flag.
|
* MetadataStore that it should try to compute the empty directory flag.
|
||||||
* @return metadata for {@code path}, {@code null} if not found
|
* @return metadata for {@code path}, {@code null} if not found
|
||||||
* @throws IOException IO problem
|
* @throws IOException IO problem
|
||||||
* @throws AmazonClientException dynamo DB level problem
|
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@Retries.RetryTranslated
|
||||||
private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
|
private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final DDBPathMetadata meta;
|
final DDBPathMetadata meta;
|
||||||
|
@ -502,7 +554,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
meta =
|
meta =
|
||||||
new DDBPathMetadata(makeDirStatus(username, path));
|
new DDBPathMetadata(makeDirStatus(username, path));
|
||||||
} else {
|
} else {
|
||||||
final Item item = getConsistentItem(pathToKey(path));
|
final Item item = getConsistentItem(path);
|
||||||
meta = itemToPathMetadata(item, username);
|
meta = itemToPathMetadata(item, username);
|
||||||
LOG.debug("Get from table {} in region {} returning for {}: {}",
|
LOG.debug("Get from table {} in region {} returning for {}: {}",
|
||||||
tableName, region, path, meta);
|
tableName, region, path, meta);
|
||||||
|
@ -517,8 +569,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
.withConsistentRead(true)
|
.withConsistentRead(true)
|
||||||
.withFilterExpression(IS_DELETED + " = :false")
|
.withFilterExpression(IS_DELETED + " = :false")
|
||||||
.withValueMap(deleteTrackingValueMap);
|
.withValueMap(deleteTrackingValueMap);
|
||||||
final ItemCollection<QueryOutcome> items = table.query(spec);
|
boolean hasChildren = readOp.retry("get/hasChildren",
|
||||||
boolean hasChildren = items.iterator().hasNext();
|
path.toString(),
|
||||||
|
true,
|
||||||
|
() -> table.query(spec).iterator().hasNext());
|
||||||
// When this class has support for authoritative
|
// When this class has support for authoritative
|
||||||
// (fully-cached) directory listings, we may also be able to answer
|
// (fully-cached) directory listings, we may also be able to answer
|
||||||
// TRUE here. Until then, we don't know if we have full listing or
|
// TRUE here. Until then, we don't know if we have full listing or
|
||||||
|
@ -545,13 +599,16 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.RetryTranslated
|
||||||
public DirListingMetadata listChildren(final Path path) throws IOException {
|
public DirListingMetadata listChildren(final Path path) throws IOException {
|
||||||
checkPath(path);
|
checkPath(path);
|
||||||
LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
|
LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
|
||||||
|
|
||||||
// find the children in the table
|
// find the children in the table
|
||||||
return Invoker.once("listChildren", path.toString(),
|
return readOp.retry(
|
||||||
|
"listChildren",
|
||||||
|
path.toString(),
|
||||||
|
true,
|
||||||
() -> {
|
() -> {
|
||||||
final QuerySpec spec = new QuerySpec()
|
final QuerySpec spec = new QuerySpec()
|
||||||
.withHashKey(pathToParentKeyAttribute(path))
|
.withHashKey(pathToParentKeyAttribute(path))
|
||||||
|
@ -610,7 +667,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.RetryTranslated
|
||||||
public void move(Collection<Path> pathsToDelete,
|
public void move(Collection<Path> pathsToDelete,
|
||||||
Collection<PathMetadata> pathsToCreate) throws IOException {
|
Collection<PathMetadata> pathsToCreate) throws IOException {
|
||||||
if (pathsToDelete == null && pathsToCreate == null) {
|
if (pathsToDelete == null && pathsToCreate == null) {
|
||||||
|
@ -639,25 +696,25 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Invoker.once("move", tableName,
|
processBatchWriteRequest(null, pathMetadataToItem(newItems));
|
||||||
() -> processBatchWriteRequest(null, pathMetadataToItem(newItems)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to issue a batch write request to DynamoDB.
|
* Helper method to issue a batch write request to DynamoDB.
|
||||||
*
|
*
|
||||||
* The retry logic here is limited to repeating the write operations
|
* As well as retrying on the operation invocation, incomplete
|
||||||
* until all items have been written; there is no other attempt
|
* batches are retried until all have been deleted.
|
||||||
* at recovery/retry. Throttling is handled internally.
|
|
||||||
* @param keysToDelete primary keys to be deleted; can be null
|
* @param keysToDelete primary keys to be deleted; can be null
|
||||||
* @param itemsToPut new items to be put; can be null
|
* @param itemsToPut new items to be put; can be null
|
||||||
|
* @return the number of iterations needed to complete the call.
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw("Outstanding batch items are updated with backoff")
|
@Retries.RetryTranslated("Outstanding batch items are updated with backoff")
|
||||||
private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
|
private int processBatchWriteRequest(PrimaryKey[] keysToDelete,
|
||||||
Item[] itemsToPut) throws IOException {
|
Item[] itemsToPut) throws IOException {
|
||||||
final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
|
final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
|
||||||
final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
|
final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
int batches = 0;
|
||||||
while (count < totalToDelete + totalToPut) {
|
while (count < totalToDelete + totalToPut) {
|
||||||
final TableWriteItems writeItems = new TableWriteItems(tableName);
|
final TableWriteItems writeItems = new TableWriteItems(tableName);
|
||||||
int numToDelete = 0;
|
int numToDelete = 0;
|
||||||
|
@ -682,35 +739,66 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
count += numToPut;
|
count += numToPut;
|
||||||
}
|
}
|
||||||
|
|
||||||
BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
|
// if there's a retry and another process updates things then it's not
|
||||||
|
// quite idempotent, but this was the case anyway
|
||||||
|
batches++;
|
||||||
|
BatchWriteItemOutcome res = writeOp.retry(
|
||||||
|
"batch write",
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
() -> dynamoDB.batchWriteItem(writeItems));
|
||||||
// Check for unprocessed keys in case of exceeding provisioned throughput
|
// Check for unprocessed keys in case of exceeding provisioned throughput
|
||||||
Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
|
Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
while (!unprocessed.isEmpty()) {
|
while (!unprocessed.isEmpty()) {
|
||||||
retryBackoff(retryCount++);
|
batchWriteCapacityExceededEvents.incrementAndGet();
|
||||||
res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
|
batches++;
|
||||||
|
retryBackoffOnBatchWrite(retryCount++);
|
||||||
|
// use a different reference to keep the compiler quiet
|
||||||
|
final Map<String, List<WriteRequest>> upx = unprocessed;
|
||||||
|
res = writeOp.retry(
|
||||||
|
"batch write",
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
() -> dynamoDB.batchWriteItemUnprocessed(upx));
|
||||||
unprocessed = res.getUnprocessedItems();
|
unprocessed = res.getUnprocessedItems();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return batches;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put the current thread to sleep to implement exponential backoff
|
* Put the current thread to sleep to implement exponential backoff
|
||||||
* depending on retryCount. If max retries are exceeded, throws an
|
* depending on retryCount. If max retries are exceeded, throws an
|
||||||
* exception instead.
|
* exception instead.
|
||||||
|
*
|
||||||
* @param retryCount number of retries so far
|
* @param retryCount number of retries so far
|
||||||
* @throws IOException when max retryCount is exceeded.
|
* @throws IOException when max retryCount is exceeded.
|
||||||
*/
|
*/
|
||||||
private void retryBackoff(int retryCount) throws IOException {
|
private void retryBackoffOnBatchWrite(int retryCount) throws IOException {
|
||||||
try {
|
try {
|
||||||
// Our RetryPolicy ignores everything but retryCount here.
|
// Our RetryPolicy ignores everything but retryCount here.
|
||||||
RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
|
RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(
|
||||||
|
null,
|
||||||
retryCount, 0, true);
|
retryCount, 0, true);
|
||||||
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
|
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
|
||||||
throw new IOException(
|
// Create an AWSServiceThrottledException, with a fake inner cause
|
||||||
String.format("Max retries exceeded (%d) for DynamoDB. This may be"
|
// which we fill in to look like a real exception so
|
||||||
+ " because write threshold of DynamoDB is set too low.",
|
// error messages look sensible
|
||||||
retryCount));
|
AmazonServiceException cause = new AmazonServiceException(
|
||||||
|
"Throttling");
|
||||||
|
cause.setServiceName("S3Guard");
|
||||||
|
cause.setStatusCode(AWSServiceThrottledException.STATUS_CODE);
|
||||||
|
cause.setErrorCode(THROTTLING); // used in real AWS errors
|
||||||
|
cause.setErrorType(AmazonServiceException.ErrorType.Service);
|
||||||
|
cause.setErrorMessage(THROTTLING);
|
||||||
|
cause.setRequestId("n/a");
|
||||||
|
throw new AWSServiceThrottledException(
|
||||||
|
String.format("Max retries during batch write exceeded"
|
||||||
|
+ " (%d) for DynamoDB."
|
||||||
|
+ HINT_DDB_IOPS_TOO_LOW,
|
||||||
|
retryCount),
|
||||||
|
cause);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
|
LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
|
||||||
Thread.sleep(action.delayMillis);
|
Thread.sleep(action.delayMillis);
|
||||||
|
@ -720,12 +808,12 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Unexpected exception", e);
|
throw new IOException("Unexpected exception " + e, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw
|
@Retries.RetryTranslated
|
||||||
public void put(PathMetadata meta) throws IOException {
|
public void put(PathMetadata meta) throws IOException {
|
||||||
// For a deeply nested path, this method will automatically create the full
|
// For a deeply nested path, this method will automatically create the full
|
||||||
// ancestry and save respective item in DynamoDB table.
|
// ancestry and save respective item in DynamoDB table.
|
||||||
|
@ -741,7 +829,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw
|
@Retries.RetryTranslated
|
||||||
public void put(Collection<PathMetadata> metas) throws IOException {
|
public void put(Collection<PathMetadata> metas) throws IOException {
|
||||||
innerPut(pathMetaToDDBPathMeta(metas));
|
innerPut(pathMetaToDDBPathMeta(metas));
|
||||||
}
|
}
|
||||||
|
@ -757,8 +845,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
/**
|
/**
|
||||||
* Helper method to get full path of ancestors that are nonexistent in table.
|
* Helper method to get full path of ancestors that are nonexistent in table.
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@VisibleForTesting
|
||||||
private Collection<DDBPathMetadata> fullPathsToPut(DDBPathMetadata meta)
|
@Retries.RetryTranslated
|
||||||
|
Collection<DDBPathMetadata> fullPathsToPut(DDBPathMetadata meta)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkPathMetadata(meta);
|
checkPathMetadata(meta);
|
||||||
final Collection<DDBPathMetadata> metasToPut = new ArrayList<>();
|
final Collection<DDBPathMetadata> metasToPut = new ArrayList<>();
|
||||||
|
@ -771,7 +860,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
// first existent ancestor
|
// first existent ancestor
|
||||||
Path path = meta.getFileStatus().getPath().getParent();
|
Path path = meta.getFileStatus().getPath().getParent();
|
||||||
while (path != null && !path.isRoot()) {
|
while (path != null && !path.isRoot()) {
|
||||||
final Item item = getConsistentItem(pathToKey(path));
|
final Item item = getConsistentItem(path);
|
||||||
if (!itemExists(item)) {
|
if (!itemExists(item)) {
|
||||||
final FileStatus status = makeDirStatus(path, username);
|
final FileStatus status = makeDirStatus(path, username);
|
||||||
metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
|
metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
|
||||||
|
@ -810,7 +899,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* @throws IOException IO problem
|
* @throws IOException IO problem
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated("retry(listFullPaths); once(batchWrite)")
|
@Retries.RetryTranslated
|
||||||
public void put(DirListingMetadata meta) throws IOException {
|
public void put(DirListingMetadata meta) throws IOException {
|
||||||
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
|
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
|
||||||
|
|
||||||
|
@ -821,15 +910,12 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
false, meta.isAuthoritative());
|
false, meta.isAuthoritative());
|
||||||
|
|
||||||
// First add any missing ancestors...
|
// First add any missing ancestors...
|
||||||
final Collection<DDBPathMetadata> metasToPut = invoker.retry(
|
final Collection<DDBPathMetadata> metasToPut = fullPathsToPut(ddbPathMeta);
|
||||||
"paths to put", path.toString(), true,
|
|
||||||
() -> fullPathsToPut(ddbPathMeta));
|
|
||||||
|
|
||||||
// next add all children of the directory
|
// next add all children of the directory
|
||||||
metasToPut.addAll(pathMetaToDDBPathMeta(meta.getListing()));
|
metasToPut.addAll(pathMetaToDDBPathMeta(meta.getListing()));
|
||||||
|
|
||||||
Invoker.once("put", path.toString(),
|
processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
|
||||||
() -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -850,7 +936,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceTranslated
|
@Retries.RetryTranslated
|
||||||
public void destroy() throws IOException {
|
public void destroy() throws IOException {
|
||||||
if (table == null) {
|
if (table == null) {
|
||||||
LOG.info("In destroy(): no table to delete");
|
LOG.info("In destroy(): no table to delete");
|
||||||
|
@ -859,10 +945,11 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
|
LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
|
||||||
Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
|
Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
|
||||||
try {
|
try {
|
||||||
table.delete();
|
invoker.retry("delete", null, true,
|
||||||
|
() -> table.delete());
|
||||||
table.waitForDelete();
|
table.waitForDelete();
|
||||||
} catch (ResourceNotFoundException rnfe) {
|
} catch (FileNotFoundException rnfe) {
|
||||||
LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
|
LOG.info("FileNotFoundException while deleting DynamoDB table {} in "
|
||||||
+ "region {}. This may indicate that the table does not exist, "
|
+ "region {}. This may indicate that the table does not exist, "
|
||||||
+ "or has been deleted by another concurrent thread or process.",
|
+ "or has been deleted by another concurrent thread or process.",
|
||||||
tableName, region);
|
tableName, region);
|
||||||
|
@ -872,38 +959,49 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
tableName, ie);
|
tableName, ie);
|
||||||
throw new InterruptedIOException("Table " + tableName
|
throw new InterruptedIOException("Table " + tableName
|
||||||
+ " in region " + region + " has not been deleted");
|
+ " in region " + region + " has not been deleted");
|
||||||
} catch (AmazonClientException e) {
|
|
||||||
throw translateException("destroy", tableName, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Retries.OnceRaw
|
@Retries.RetryTranslated
|
||||||
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
|
private ItemCollection<ScanOutcome> expiredFiles(long modTime,
|
||||||
String keyPrefix) {
|
String keyPrefix) throws IOException {
|
||||||
String filterExpression =
|
String filterExpression =
|
||||||
"mod_time < :mod_time and begins_with(parent, :parent)";
|
"mod_time < :mod_time and begins_with(parent, :parent)";
|
||||||
String projectionExpression = "parent,child";
|
String projectionExpression = "parent,child";
|
||||||
ValueMap map = new ValueMap()
|
ValueMap map = new ValueMap()
|
||||||
.withLong(":mod_time", modTime)
|
.withLong(":mod_time", modTime)
|
||||||
.withString(":parent", keyPrefix);
|
.withString(":parent", keyPrefix);
|
||||||
return table.scan(filterExpression, projectionExpression, null, map);
|
return readOp.retry(
|
||||||
|
"scan",
|
||||||
|
keyPrefix,
|
||||||
|
true,
|
||||||
|
() -> table.scan(filterExpression, projectionExpression, null, map));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw("once(batchWrite)")
|
@Retries.RetryTranslated
|
||||||
public void prune(long modTime) throws IOException {
|
public void prune(long modTime) throws IOException {
|
||||||
prune(modTime, "/");
|
prune(modTime, "/");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prune files, in batches. There's a sleep between each batch.
|
||||||
|
* @param modTime Oldest modification time to allow
|
||||||
|
* @param keyPrefix The prefix for the keys that should be removed
|
||||||
|
* @throws IOException Any IO/DDB failure.
|
||||||
|
* @throws InterruptedIOException if the prune was interrupted
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Retries.OnceRaw("once(batchWrite)")
|
@Retries.RetryTranslated
|
||||||
public void prune(long modTime, String keyPrefix) throws IOException {
|
public void prune(long modTime, String keyPrefix) throws IOException {
|
||||||
int itemCount = 0;
|
int itemCount = 0;
|
||||||
try {
|
try {
|
||||||
Collection<Path> deletionBatch =
|
Collection<Path> deletionBatch =
|
||||||
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
|
||||||
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
long delay = conf.getTimeDuration(
|
||||||
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
|
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
||||||
|
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
Set<Path> parentPathSet = new HashSet<>();
|
Set<Path> parentPathSet = new HashSet<>();
|
||||||
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
for (Item item : expiredFiles(modTime, keyPrefix)) {
|
||||||
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
DDBPathMetadata md = PathMetadataDynamoDBTranslation
|
||||||
|
@ -929,7 +1027,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
deletionBatch.clear();
|
deletionBatch.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (deletionBatch.size() > 0) {
|
// final batch of deletes
|
||||||
|
if (!deletionBatch.isEmpty()) {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
processBatchWriteRequest(pathToKey(deletionBatch), null);
|
||||||
|
|
||||||
|
@ -1093,19 +1192,34 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* Get the version mark item in the existing DynamoDB table.
|
* Get the version mark item in the existing DynamoDB table.
|
||||||
*
|
*
|
||||||
* As the version marker item may be created by another concurrent thread or
|
* As the version marker item may be created by another concurrent thread or
|
||||||
* process, we sleep and retry a limited times before we fail to get it.
|
* process, we sleep and retry a limited number times if the lookup returns
|
||||||
* This does not include handling any failure other than "item not found",
|
* with a null value.
|
||||||
* so this method is tagged as "OnceRaw"
|
* DDB throttling is always retried.
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@VisibleForTesting
|
||||||
private Item getVersionMarkerItem() throws IOException {
|
@Retries.RetryTranslated
|
||||||
|
Item getVersionMarkerItem() throws IOException {
|
||||||
final PrimaryKey versionMarkerKey =
|
final PrimaryKey versionMarkerKey =
|
||||||
createVersionMarkerPrimaryKey(VERSION_MARKER);
|
createVersionMarkerPrimaryKey(VERSION_MARKER);
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
Item versionMarker = table.getItem(versionMarkerKey);
|
// look for a version marker, with usual throttling/failure retries.
|
||||||
|
Item versionMarker = queryVersionMarker(versionMarkerKey);
|
||||||
while (versionMarker == null) {
|
while (versionMarker == null) {
|
||||||
|
// The marker was null.
|
||||||
|
// Two possibilities
|
||||||
|
// 1. This isn't a S3Guard table.
|
||||||
|
// 2. This is a S3Guard table in construction; another thread/process
|
||||||
|
// is about to write/actively writing the version marker.
|
||||||
|
// So that state #2 is handled, batchWriteRetryPolicy is used to manage
|
||||||
|
// retries.
|
||||||
|
// This will mean that if the cause is actually #1, failure will not
|
||||||
|
// be immediate. As this will ultimately result in a failure to
|
||||||
|
// init S3Guard and the S3A FS, this isn't going to be a performance
|
||||||
|
// bottleneck -simply a slightly slower failure report than would otherwise
|
||||||
|
// be seen.
|
||||||
|
// "if your settings are broken, performance is not your main issue"
|
||||||
try {
|
try {
|
||||||
RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
|
RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null,
|
||||||
retryCount, 0, true);
|
retryCount, 0, true);
|
||||||
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
|
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
|
||||||
break;
|
break;
|
||||||
|
@ -1114,14 +1228,29 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
Thread.sleep(action.delayMillis);
|
Thread.sleep(action.delayMillis);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("initTable: Unexpected exception", e);
|
throw new IOException("initTable: Unexpected exception " + e, e);
|
||||||
}
|
}
|
||||||
retryCount++;
|
retryCount++;
|
||||||
versionMarker = table.getItem(versionMarkerKey);
|
versionMarker = queryVersionMarker(versionMarkerKey);
|
||||||
}
|
}
|
||||||
return versionMarker;
|
return versionMarker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Issue the query to get the version marker, with throttling for overloaded
|
||||||
|
* DDB tables.
|
||||||
|
* @param versionMarkerKey key to look up
|
||||||
|
* @return the marker
|
||||||
|
* @throws IOException failure
|
||||||
|
*/
|
||||||
|
@Retries.RetryTranslated
|
||||||
|
private Item queryVersionMarker(final PrimaryKey versionMarkerKey)
|
||||||
|
throws IOException {
|
||||||
|
return readOp.retry("getVersionMarkerItem",
|
||||||
|
VERSION_MARKER, true,
|
||||||
|
() -> table.getItem(versionMarkerKey));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that a table version is compatible with this S3Guard client.
|
* Verify that a table version is compatible with this S3Guard client.
|
||||||
* @param tableName name of the table (for error messages)
|
* @param tableName name of the table (for error messages)
|
||||||
|
@ -1207,7 +1336,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* @return the outcome.
|
* @return the outcome.
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@Retries.OnceRaw
|
||||||
PutItemOutcome putItem(Item item) {
|
private PutItemOutcome putItem(Item item) {
|
||||||
LOG.debug("Putting item {}", item);
|
LOG.debug("Putting item {}", item);
|
||||||
return table.putItem(item);
|
return table.putItem(item);
|
||||||
}
|
}
|
||||||
|
@ -1254,6 +1383,11 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
return region;
|
return region;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public String getTableName() {
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
DynamoDB getDynamoDB() {
|
DynamoDB getDynamoDB() {
|
||||||
return dynamoDB;
|
return dynamoDB;
|
||||||
|
@ -1312,8 +1446,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
map.put("description", DESCRIPTION);
|
map.put("description", DESCRIPTION);
|
||||||
map.put("region", region);
|
map.put("region", region);
|
||||||
if (dataAccessRetryPolicy != null) {
|
if (batchWriteRetryPolicy != null) {
|
||||||
map.put("retryPolicy", dataAccessRetryPolicy.toString());
|
map.put("retryPolicy", batchWriteRetryPolicy.toString());
|
||||||
}
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
@ -1368,6 +1502,38 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback on a read operation retried.
|
||||||
|
* @param text text of the operation
|
||||||
|
* @param ex exception
|
||||||
|
* @param attempts number of attempts
|
||||||
|
* @param idempotent is the method idempotent (this is assumed to be true)
|
||||||
|
*/
|
||||||
|
void readRetryEvent(
|
||||||
|
String text,
|
||||||
|
IOException ex,
|
||||||
|
int attempts,
|
||||||
|
boolean idempotent) {
|
||||||
|
readThrottleEvents.incrementAndGet();
|
||||||
|
retryEvent(text, ex, attempts, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback on a write operation retried.
|
||||||
|
* @param text text of the operation
|
||||||
|
* @param ex exception
|
||||||
|
* @param attempts number of attempts
|
||||||
|
* @param idempotent is the method idempotent (this is assumed to be true)
|
||||||
|
*/
|
||||||
|
void writeRetryEvent(
|
||||||
|
String text,
|
||||||
|
IOException ex,
|
||||||
|
int attempts,
|
||||||
|
boolean idempotent) {
|
||||||
|
writeThrottleEvents.incrementAndGet();
|
||||||
|
retryEvent(text, ex, attempts, idempotent);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback from {@link Invoker} when an operation is retried.
|
* Callback from {@link Invoker} when an operation is retried.
|
||||||
* @param text text of the operation
|
* @param text text of the operation
|
||||||
|
@ -1410,4 +1576,31 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the count of read throttle events.
|
||||||
|
* @return the current count of read throttle events.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getReadThrottleEventCount() {
|
||||||
|
return readThrottleEvents.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the count of write throttle events.
|
||||||
|
* @return the current count of write throttle events.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getWriteThrottleEventCount() {
|
||||||
|
return writeThrottleEvents.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getBatchWriteCapacityExceededCount() {
|
||||||
|
return batchWriteCapacityExceededEvents.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Invoker getInvoker() {
|
||||||
|
return invoker;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Retry policy whose throttling comes from the S3Guard config options.
|
||||||
|
*/
|
||||||
|
public class S3GuardDataAccessRetryPolicy extends S3ARetryPolicy {
|
||||||
|
|
||||||
|
public S3GuardDataAccessRetryPolicy(final Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RetryPolicy createThrottleRetryPolicy(final Configuration conf) {
|
||||||
|
return exponentialBackoffRetry(
|
||||||
|
conf.getInt(S3GUARD_DDB_MAX_RETRIES, S3GUARD_DDB_MAX_RETRIES_DEFAULT),
|
||||||
|
conf.getTimeDuration(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL,
|
||||||
|
S3GUARD_DDB_THROTTLE_RETRY_INTERVAL_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS),
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
|
@ -298,8 +298,9 @@ rates.
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
Attempting to perform more IO than the capacity requested simply throttles the
|
Attempting to perform more IO than the capacity requested throttles the
|
||||||
IO; small capacity numbers are recommended when initially experimenting
|
IO, and may result in operations failing. Larger IO capacities cost more.
|
||||||
|
We recommending using small read and write capacities when initially experimenting
|
||||||
with S3Guard.
|
with S3Guard.
|
||||||
|
|
||||||
## Authenticating with S3Guard
|
## Authenticating with S3Guard
|
||||||
|
@ -327,7 +328,7 @@ to the options `fs.s3a.KEY` *for that bucket only*.
|
||||||
As an example, here is a configuration to use different metadata stores
|
As an example, here is a configuration to use different metadata stores
|
||||||
and tables for different buckets
|
and tables for different buckets
|
||||||
|
|
||||||
First, we define shortcuts for the metadata store classnames
|
First, we define shortcuts for the metadata store classnames:
|
||||||
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
|
@ -343,7 +344,7 @@ First, we define shortcuts for the metadata store classnames
|
||||||
```
|
```
|
||||||
|
|
||||||
Next, Amazon's public landsat database is configured with no
|
Next, Amazon's public landsat database is configured with no
|
||||||
metadata store
|
metadata store:
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
<property>
|
<property>
|
||||||
|
@ -355,7 +356,7 @@ metadata store
|
||||||
```
|
```
|
||||||
|
|
||||||
Next the `ireland-2` and `ireland-offline` buckets are configured with
|
Next the `ireland-2` and `ireland-offline` buckets are configured with
|
||||||
DynamoDB as the store, and a shared table `production-table`
|
DynamoDB as the store, and a shared table `production-table`:
|
||||||
|
|
||||||
|
|
||||||
```xml
|
```xml
|
||||||
|
@ -716,10 +717,10 @@ Metadata Store Diagnostics:
|
||||||
```
|
```
|
||||||
|
|
||||||
After the update, the table status changes to `UPDATING`; this is a sign that
|
After the update, the table status changes to `UPDATING`; this is a sign that
|
||||||
the capacity has been changed
|
the capacity has been changed.
|
||||||
|
|
||||||
Repeating the same command will not change the capacity, as both read and
|
Repeating the same command will not change the capacity, as both read and
|
||||||
write values match that already in use
|
write values match that already in use.
|
||||||
|
|
||||||
```
|
```
|
||||||
2017-08-30 16:24:35,337 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20
|
2017-08-30 16:24:35,337 [main] INFO s3guard.DynamoDBMetadataStore (DynamoDBMetadataStore.java:updateParameters(1090)) - Table capacity unchanged at read: 20, write: 20
|
||||||
|
@ -736,6 +737,9 @@ Metadata Store Diagnostics:
|
||||||
write-capacity=20
|
write-capacity=20
|
||||||
```
|
```
|
||||||
|
|
||||||
|
*Note*: There is a limit to how many times in a 24 hour period the capacity
|
||||||
|
of a bucket can be changed, either through this command or the AWS console.
|
||||||
|
|
||||||
## Debugging and Error Handling
|
## Debugging and Error Handling
|
||||||
|
|
||||||
If you run into network connectivity issues, or have a machine failure in the
|
If you run into network connectivity issues, or have a machine failure in the
|
||||||
|
@ -817,6 +821,97 @@ are only made after successful file creation, deletion and rename, the
|
||||||
store is *unlikely* to get out of sync, it is still something which
|
store is *unlikely* to get out of sync, it is still something which
|
||||||
merits more testing before it could be considered reliable.
|
merits more testing before it could be considered reliable.
|
||||||
|
|
||||||
|
## Managing DynamoDB IO Capacity
|
||||||
|
|
||||||
|
DynamoDB is not only billed on use (data and IO requests), it is billed
|
||||||
|
on allocated IO Capacity.
|
||||||
|
|
||||||
|
When an application makes more requests than
|
||||||
|
the allocated capacity permits, the request is rejected; it is up to
|
||||||
|
the calling application to detect when it is being so throttled and
|
||||||
|
react. S3Guard does this, but as a result: when the client is being
|
||||||
|
throttled, operations are slower. This capacity throttling is averaged
|
||||||
|
over a few minutes: a briefly overloaded table will not be throttled,
|
||||||
|
but the rate cannot be sustained.
|
||||||
|
|
||||||
|
The load on a table isvisible in the AWS console: go to the
|
||||||
|
DynamoDB page for the table and select the "metrics" tab.
|
||||||
|
If the graphs of throttled read or write
|
||||||
|
requests show that a lot of throttling has taken place, then there is not
|
||||||
|
enough allocated capacity for the applications making use of the table.
|
||||||
|
|
||||||
|
Similarly, if the capacity graphs show that the read or write loads are
|
||||||
|
low compared to the allocated capacities, then the table *may* be overprovisioned
|
||||||
|
for the current workload.
|
||||||
|
|
||||||
|
The S3Guard connector to DynamoDB can be configured to make
|
||||||
|
multiple attempts to repeat a throttled request, with an exponential
|
||||||
|
backoff between them.
|
||||||
|
|
||||||
|
The relevant settings for managing retries in the connector are:
|
||||||
|
|
||||||
|
```xml
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.max.retries</name>
|
||||||
|
<value>9</value>
|
||||||
|
<description>
|
||||||
|
Max retries on throttled/incompleted DynamoDB operations
|
||||||
|
before giving up and throwing an IOException.
|
||||||
|
Each retry is delayed with an exponential
|
||||||
|
backoff timer which starts at 100 milliseconds and approximately
|
||||||
|
doubles each time. The minimum wait before throwing an exception is
|
||||||
|
sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.throttle.retry.interval</name>
|
||||||
|
<value>100ms</value>
|
||||||
|
<description>
|
||||||
|
Initial interval to retry after a request is throttled events;
|
||||||
|
the back-off policy is exponential until the number of retries of
|
||||||
|
fs.s3a.s3guard.ddb.max.retries is reached.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.background.sleep</name>
|
||||||
|
<value>25ms</value>
|
||||||
|
<description>
|
||||||
|
Length (in milliseconds) of pause between each batch of deletes when
|
||||||
|
pruning metadata. Prevents prune operations (which can typically be low
|
||||||
|
priority background operations) from overly interfering with other I/O
|
||||||
|
operations.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
|
|
||||||
|
Having a large value for `fs.s3a.s3guard.ddb.max.retries` will ensure
|
||||||
|
that clients of an overloaded table will not fail immediately. However
|
||||||
|
queries may be unexpectedly slow.
|
||||||
|
|
||||||
|
If operations, especially directory operations, are slow, check the AWS
|
||||||
|
console. It is also possible to set up AWS alerts for capacity limits
|
||||||
|
being exceeded.
|
||||||
|
|
||||||
|
[DynamoDB Auto Scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AutoScaling.html)
|
||||||
|
can automatically increase and decrease the allocated capacity.
|
||||||
|
This is good for keeping capacity high when needed, but avoiding large
|
||||||
|
bills when it is not.
|
||||||
|
|
||||||
|
Experiments with S3Guard and DynamoDB Auto Scaling have shown that any Auto Scaling
|
||||||
|
operation will only take place after callers have been throttled for a period of
|
||||||
|
time. The clients will still need to be configured to retry when overloaded
|
||||||
|
until any extra capacity is allocated. Furthermore, as this retrying will
|
||||||
|
block the threads from performing other operations -including more IO, the
|
||||||
|
the autoscale may not scale fast enough.
|
||||||
|
|
||||||
|
We recommend experimenting with this, based on usage information collected
|
||||||
|
from previous days, and and choosing a combination of
|
||||||
|
retry counts and an interval which allow for the clients to cope with
|
||||||
|
some throttling, but not to time out other applications.
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
### Error: `S3Guard table lacks version marker.`
|
### Error: `S3Guard table lacks version marker.`
|
||||||
|
@ -857,12 +952,49 @@ or the configuration is preventing S3Guard from finding the table.
|
||||||
region as the bucket being used.
|
region as the bucket being used.
|
||||||
1. Create the table if necessary.
|
1. Create the table if necessary.
|
||||||
|
|
||||||
|
|
||||||
### Error `"The level of configured provisioned throughput for the table was exceeded"`
|
### Error `"The level of configured provisioned throughput for the table was exceeded"`
|
||||||
|
|
||||||
|
```
|
||||||
|
org.apache.hadoop.fs.s3a.AWSServiceThrottledException: listFiles on s3a://bucket/10/d1/d2/d3:
|
||||||
|
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException:
|
||||||
|
The level of configured provisioned throughput for the table was exceeded.
|
||||||
|
Consider increasing your provisioning level with the UpdateTable API.
|
||||||
|
(Service: AmazonDynamoDBv2; Status Code: 400;
|
||||||
|
Error Code: ProvisionedThroughputExceededException;
|
||||||
|
```
|
||||||
The IO load of clients of the (shared) DynamoDB table was exceeded.
|
The IO load of clients of the (shared) DynamoDB table was exceeded.
|
||||||
|
|
||||||
Currently S3Guard doesn't do any throttling and retries here; the way to address
|
1. Increase the capacity of the DynamoDB table.
|
||||||
this is to increase capacity via the AWS console or the `set-capacity` command.
|
1. Increase the retry count and/or sleep time of S3Guard on throttle events.
|
||||||
|
1. Enable capacity autoscaling for the table in the AWS console.
|
||||||
|
|
||||||
|
### Error `Max retries exceeded`
|
||||||
|
|
||||||
|
The I/O load of clients of the (shared) DynamoDB table was exceeded, and
|
||||||
|
the number of attempts to retry the operation exceeded the configured amount.
|
||||||
|
|
||||||
|
1. Increase the capacity of the DynamoDB table.
|
||||||
|
1. Increase the retry count and/or sleep time of S3Guard on throttle events.
|
||||||
|
1. Enable capacity autoscaling for the table in the AWS console.
|
||||||
|
|
||||||
|
|
||||||
|
### Error when running `set-capacity`: `org.apache.hadoop.fs.s3a.AWSServiceThrottledException: ProvisionTable`
|
||||||
|
|
||||||
|
```
|
||||||
|
org.apache.hadoop.fs.s3a.AWSServiceThrottledException: ProvisionTable on s3guard-example:
|
||||||
|
com.amazonaws.services.dynamodbv2.model.LimitExceededException:
|
||||||
|
Subscriber limit exceeded: Provisioned throughput decreases are limited within a given UTC day.
|
||||||
|
After the first 4 decreases, each subsequent decrease in the same UTC day can be performed at most once every 3600 seconds.
|
||||||
|
Number of decreases today: 6.
|
||||||
|
Last decrease at Wednesday, July 25, 2018 8:48:14 PM UTC.
|
||||||
|
Next decrease can be made at Wednesday, July 25, 2018 9:48:14 PM UTC
|
||||||
|
```
|
||||||
|
|
||||||
|
There's are limit on how often you can change the capacity of an DynamoDB table;
|
||||||
|
if you call set-capacity too often, it fails. Wait until the after the time indicated
|
||||||
|
and try again.
|
||||||
|
|
||||||
|
|
||||||
## Other Topics
|
## Other Topics
|
||||||
|
|
||||||
|
|
|
@ -742,6 +742,54 @@ sequential one afterwards. The IO heavy ones must also be subclasses of
|
||||||
|
|
||||||
This is invaluable for debugging test failures.
|
This is invaluable for debugging test failures.
|
||||||
|
|
||||||
|
### Keeping AWS Costs down
|
||||||
|
|
||||||
|
Most of the base S3 tests are designed to use public AWS data
|
||||||
|
(the landsat-pds bucket) for read IO, so you don't have to pay for bytes
|
||||||
|
downloaded or long term storage costs. The scale tests do work with more data
|
||||||
|
so will cost more as well as generally take more time to execute.
|
||||||
|
|
||||||
|
You are however billed for
|
||||||
|
|
||||||
|
1. Data left in S3 after test runs.
|
||||||
|
2. DynamoDB capacity reserved by S3Guard tables.
|
||||||
|
3. HTTP operations on files (HEAD, LIST, GET).
|
||||||
|
4. In-progress multipart uploads from bulk IO or S3A committer tests.
|
||||||
|
5. Encryption/decryption using AWS KMS keys.
|
||||||
|
|
||||||
|
The GET/decrypt costs are incurred on each partial read of a file,
|
||||||
|
so random IO can cost more than sequential IO; the speedup of queries with
|
||||||
|
columnar data usually justifies this.
|
||||||
|
|
||||||
|
The DynamoDB costs come from the number of entries stores and the allocated capacity.
|
||||||
|
|
||||||
|
How to keep costs down
|
||||||
|
|
||||||
|
* Don't run the scale tests with large datasets; keep `fs.s3a.scale.test.huge.filesize` unset, or a few MB (minimum: 5).
|
||||||
|
* Remove all files in the filesystem. The root tests usually do this, but
|
||||||
|
it can be manually done:
|
||||||
|
|
||||||
|
hadoop fs -rm -r -f -skipTrash s3a://test-bucket/\*
|
||||||
|
* Abort all outstanding uploads:
|
||||||
|
|
||||||
|
hadoop s3guard uploads -abort -force s3a://test-bucket/
|
||||||
|
* If you don't need it, destroy the S3Guard DDB table.
|
||||||
|
|
||||||
|
hadoop s3guard destroy s3a://hwdev-steve-ireland-new/
|
||||||
|
|
||||||
|
The S3Guard tests will automatically create the Dynamo DB table in runs with
|
||||||
|
`-Ds3guard -Ddynamodb` set; default capacity of these buckets
|
||||||
|
tests is very small; it keeps costs down at the expense of IO performance
|
||||||
|
and, for test runs in or near the S3/DDB stores, throttling events.
|
||||||
|
|
||||||
|
If you want to manage capacity, use `s3guard set-capacity` to increase it
|
||||||
|
(performance) or decrease it (costs).
|
||||||
|
For remote `hadoop-aws` test runs, the read/write capacities of "10" each should suffice;
|
||||||
|
increase it if parallel test run logs warn of throttling.
|
||||||
|
|
||||||
|
Tip: for agility, use DynamoDB autoscaling, setting the minimum to something very low (e.g 5 units), the maximum to the largest amount you are willing to pay.
|
||||||
|
This will automatically reduce capacity when you are not running tests against
|
||||||
|
the bucket, slowly increase it over multiple test runs, if the load justifies it.
|
||||||
|
|
||||||
## <a name="tips"></a> Tips
|
## <a name="tips"></a> Tips
|
||||||
|
|
||||||
|
@ -985,9 +1033,13 @@ are included in the scale tests executed when `-Dscale` is passed to
|
||||||
the maven command line.
|
the maven command line.
|
||||||
|
|
||||||
The two S3Guard scale tests are `ITestDynamoDBMetadataStoreScale` and
|
The two S3Guard scale tests are `ITestDynamoDBMetadataStoreScale` and
|
||||||
`ITestLocalMetadataStoreScale`. To run the DynamoDB test, you will need to
|
`ITestLocalMetadataStoreScale`.
|
||||||
define your table name and region in your test configuration. For example,
|
|
||||||
the following settings allow us to run `ITestDynamoDBMetadataStoreScale` with
|
To run these tests, your DynamoDB table needs to be of limited capacity;
|
||||||
|
the values in `ITestDynamoDBMetadataStoreScale` currently require a read capacity
|
||||||
|
of 10 or less. a write capacity of 15 or more.
|
||||||
|
|
||||||
|
The following settings allow us to run `ITestDynamoDBMetadataStoreScale` with
|
||||||
artificially low read and write capacity provisioned, so we can judge the
|
artificially low read and write capacity provisioned, so we can judge the
|
||||||
effects of being throttled by the DynamoDB service:
|
effects of being throttled by the DynamoDB service:
|
||||||
|
|
||||||
|
@ -1008,24 +1060,49 @@ effects of being throttled by the DynamoDB service:
|
||||||
<name>fs.s3a.s3guard.ddb.table</name>
|
<name>fs.s3a.s3guard.ddb.table</name>
|
||||||
<value>my-scale-test</value>
|
<value>my-scale-test</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>fs.s3a.s3guard.ddb.region</name>
|
|
||||||
<value>us-west-2</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.s3guard.ddb.table.create</name>
|
<name>fs.s3a.s3guard.ddb.table.create</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.s3guard.ddb.table.capacity.read</name>
|
<name>fs.s3a.s3guard.ddb.table.capacity.read</name>
|
||||||
<value>10</value>
|
<value>5</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.s3guard.ddb.table.capacity.write</name>
|
<name>fs.s3a.s3guard.ddb.table.capacity.write</name>
|
||||||
<value>10</value>
|
<value>5</value>
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
These tests verify that the invoked operations can trigger retries in the
|
||||||
|
S3Guard code, rather than just in the AWS SDK level, so showing that if
|
||||||
|
SDK operations fail, they get retried. They also verify that the filesystem
|
||||||
|
statistics are updated to record that throttling took place.
|
||||||
|
|
||||||
|
*Do not panic if these tests fail to detect throttling!*
|
||||||
|
|
||||||
|
These tests are unreliable as they need certain conditions to be met
|
||||||
|
to repeatedly fail:
|
||||||
|
|
||||||
|
1. You must have a low-enough latency connection to the DynamoDB store that,
|
||||||
|
for the capacity allocated, you can overload it.
|
||||||
|
1. The AWS Console can give you a view of what is happening here.
|
||||||
|
1. Running a single test on its own is less likely to trigger an overload
|
||||||
|
than trying to run the whole test suite.
|
||||||
|
1. And running the test suite more than once, back-to-back, can also help
|
||||||
|
overload the cluster.
|
||||||
|
1. Stepping through with a debugger will reduce load, so may not trigger
|
||||||
|
failures.
|
||||||
|
|
||||||
|
If the tests fail, it *probably* just means you aren't putting enough load
|
||||||
|
on the table.
|
||||||
|
|
||||||
|
These tests do not verify that the entire set of DynamoDB calls made
|
||||||
|
during the use of a S3Guarded S3A filesystem are wrapped by retry logic.
|
||||||
|
|
||||||
|
*The best way to verify resilience is to run the entire `hadoop-aws` test suite,
|
||||||
|
or even a real application, with throttling enabled.
|
||||||
|
|
||||||
### Testing only: Local Metadata Store
|
### Testing only: Local Metadata Store
|
||||||
|
|
||||||
There is an in-memory Metadata Store for testing.
|
There is an in-memory Metadata Store for testing.
|
||||||
|
|
|
@ -54,6 +54,11 @@ public class ITestS3AFileSystemContract extends FileSystemContractBaseTest {
|
||||||
Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
|
Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getGlobalTimeout() {
|
||||||
|
return S3ATestConstants.S3A_TEST_TIMEOUT;
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
nameThread();
|
nameThread();
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
|
|
||||||
private static DynamoDBMetadataStore ddbmsStatic;
|
private static DynamoDBMetadataStore ddbmsStatic;
|
||||||
|
|
||||||
private static String TEST_DYNAMODB_TABLE_NAME;
|
private static String testDynamoDBTableName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a path under the test path provided by
|
* Create a path under the test path provided by
|
||||||
|
@ -113,8 +113,8 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
Configuration conf = prepareTestConfiguration(new Configuration());
|
Configuration conf = prepareTestConfiguration(new Configuration());
|
||||||
assertThatDynamoMetadataStoreImpl(conf);
|
assertThatDynamoMetadataStoreImpl(conf);
|
||||||
Assume.assumeTrue("Test DynamoDB table name should be set to run "
|
Assume.assumeTrue("Test DynamoDB table name should be set to run "
|
||||||
+ "integration tests.", TEST_DYNAMODB_TABLE_NAME != null);
|
+ "integration tests.", testDynamoDBTableName != null);
|
||||||
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, TEST_DYNAMODB_TABLE_NAME);
|
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, testDynamoDBTableName);
|
||||||
|
|
||||||
s3AContract = new S3AContract(conf);
|
s3AContract = new S3AContract(conf);
|
||||||
s3AContract.init();
|
s3AContract.init();
|
||||||
|
@ -140,10 +140,10 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
|
||||||
public static void beforeClassSetup() throws IOException {
|
public static void beforeClassSetup() throws IOException {
|
||||||
Configuration conf = prepareTestConfiguration(new Configuration());
|
Configuration conf = prepareTestConfiguration(new Configuration());
|
||||||
assertThatDynamoMetadataStoreImpl(conf);
|
assertThatDynamoMetadataStoreImpl(conf);
|
||||||
TEST_DYNAMODB_TABLE_NAME = conf.get(S3GUARD_DDB_TEST_TABLE_NAME_KEY);
|
testDynamoDBTableName = conf.get(S3GUARD_DDB_TEST_TABLE_NAME_KEY);
|
||||||
Assume.assumeTrue("Test DynamoDB table name should be set to run "
|
Assume.assumeTrue("Test DynamoDB table name should be set to run "
|
||||||
+ "integration tests.", TEST_DYNAMODB_TABLE_NAME != null);
|
+ "integration tests.", testDynamoDBTableName != null);
|
||||||
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, TEST_DYNAMODB_TABLE_NAME);
|
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, testDynamoDBTableName);
|
||||||
|
|
||||||
LOG.debug("Creating static ddbms which will be shared between tests.");
|
LOG.debug("Creating static ddbms which will be shared between tests.");
|
||||||
ddbmsStatic = new DynamoDBMetadataStore();
|
ddbmsStatic = new DynamoDBMetadataStore();
|
||||||
|
|
|
@ -18,46 +18,176 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.s3guard;
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import javax.annotation.Nullable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
|
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
|
||||||
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.internal.AssumptionViolatedException;
|
||||||
|
import org.junit.runners.MethodSorters;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageStatistics;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
|
||||||
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale;
|
import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus;
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus;
|
||||||
import static org.junit.Assume.*;
|
import static org.junit.Assume.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scale test for DynamoDBMetadataStore.
|
* Scale test for DynamoDBMetadataStore.
|
||||||
|
*
|
||||||
|
* The throttle tests aren't quite trying to verify that throttling can
|
||||||
|
* be recovered from, because that makes for very slow tests: you have
|
||||||
|
* to overload the system and them have them back of until they finally complete.
|
||||||
|
* Instead
|
||||||
*/
|
*/
|
||||||
|
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||||
public class ITestDynamoDBMetadataStoreScale
|
public class ITestDynamoDBMetadataStoreScale
|
||||||
extends AbstractITestS3AMetadataStoreScale {
|
extends AbstractITestS3AMetadataStoreScale {
|
||||||
|
|
||||||
private static final long BATCH_SIZE = 25;
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
private static final long SMALL_IO_UNITS = BATCH_SIZE / 4;
|
ITestDynamoDBMetadataStoreScale.class);
|
||||||
|
|
||||||
|
private static final long BATCH_SIZE = 25;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IO Units for batch size; this sets the size to use for IO capacity.
|
||||||
|
* Value: {@value}.
|
||||||
|
*/
|
||||||
|
private static final long MAXIMUM_READ_CAPACITY = 10;
|
||||||
|
private static final long MAXIMUM_WRITE_CAPACITY = 15;
|
||||||
|
|
||||||
|
private DynamoDBMetadataStore ddbms;
|
||||||
|
|
||||||
|
private DynamoDB ddb;
|
||||||
|
|
||||||
|
private Table table;
|
||||||
|
|
||||||
|
private String tableName;
|
||||||
|
|
||||||
|
/** was the provisioning changed in test_001_limitCapacity()? */
|
||||||
|
private boolean isOverProvisionedForTest;
|
||||||
|
|
||||||
|
private ProvisionedThroughputDescription originalCapacity;
|
||||||
|
|
||||||
|
private static final int THREADS = 40;
|
||||||
|
|
||||||
|
private static final int OPERATIONS_PER_THREAD = 50;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the metadata store. The table and region are determined from
|
||||||
|
* the attributes of the FS used in the tests.
|
||||||
|
* @return a new metadata store instance
|
||||||
|
* @throws IOException failure to instantiate
|
||||||
|
* @throws AssumptionViolatedException if the FS isn't running S3Guard + DDB/
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public MetadataStore createMetadataStore() throws IOException {
|
public MetadataStore createMetadataStore() throws IOException {
|
||||||
Configuration conf = getFileSystem().getConf();
|
S3AFileSystem fs = getFileSystem();
|
||||||
String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
|
assumeTrue("S3Guard is disabled for " + fs.getUri(),
|
||||||
assumeNotNull("DynamoDB table is configured", ddbTable);
|
fs.hasMetadataStore());
|
||||||
String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
|
MetadataStore store = fs.getMetadataStore();
|
||||||
assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
|
assumeTrue("Metadata store for " + fs.getUri() + " is " + store
|
||||||
|
+ " -not DynamoDBMetadataStore",
|
||||||
|
store instanceof DynamoDBMetadataStore);
|
||||||
|
|
||||||
|
DynamoDBMetadataStore fsStore = (DynamoDBMetadataStore) store;
|
||||||
|
Configuration conf = new Configuration(fs.getConf());
|
||||||
|
|
||||||
|
tableName = fsStore.getTableName();
|
||||||
|
assertTrue("Null/Empty tablename in " + fsStore,
|
||||||
|
StringUtils.isNotEmpty(tableName));
|
||||||
|
String region = fsStore.getRegion();
|
||||||
|
assertTrue("Null/Empty region in " + fsStore,
|
||||||
|
StringUtils.isNotEmpty(region));
|
||||||
|
// create a new metastore configured to fail fast if throttling
|
||||||
|
// happens.
|
||||||
|
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
|
||||||
|
conf.set(S3GUARD_DDB_REGION_KEY, region);
|
||||||
|
conf.set(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, "50ms");
|
||||||
|
conf.set(S3GUARD_DDB_MAX_RETRIES, "2");
|
||||||
|
conf.set(MAX_ERROR_RETRIES, "1");
|
||||||
|
conf.set(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, "5ms");
|
||||||
|
|
||||||
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
|
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
|
||||||
ms.initialize(getFileSystem().getConf());
|
ms.initialize(conf);
|
||||||
|
// wire up the owner FS so that we can make assertions about throttle
|
||||||
|
// events
|
||||||
|
ms.bindToOwnerFilesystem(fs);
|
||||||
return ms;
|
return ms;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
ddbms = (DynamoDBMetadataStore) createMetadataStore();
|
||||||
|
tableName = ddbms.getTableName();
|
||||||
|
assertNotNull("table has no name", tableName);
|
||||||
|
ddb = ddbms.getDynamoDB();
|
||||||
|
table = ddb.getTable(tableName);
|
||||||
|
originalCapacity = table.describe().getProvisionedThroughput();
|
||||||
|
|
||||||
|
// If you set the same provisioned I/O as already set it throws an
|
||||||
|
// exception, avoid that.
|
||||||
|
isOverProvisionedForTest = (
|
||||||
|
originalCapacity.getReadCapacityUnits() > MAXIMUM_READ_CAPACITY
|
||||||
|
|| originalCapacity.getWriteCapacityUnits() > MAXIMUM_WRITE_CAPACITY);
|
||||||
|
assumeFalse("Table has too much capacity: " + originalCapacity.toString(),
|
||||||
|
isOverProvisionedForTest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
IOUtils.cleanupWithLogger(LOG, ddbms);
|
||||||
|
super.teardown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The subclass expects the superclass to be throttled; sometimes it is.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Override
|
||||||
|
public void test_020_Moves() throws Throwable {
|
||||||
|
ThrottleTracker tracker = new ThrottleTracker();
|
||||||
|
try {
|
||||||
|
// if this doesn't throttle, all is well.
|
||||||
|
super.test_020_Moves();
|
||||||
|
} catch (AWSServiceThrottledException ex) {
|
||||||
|
// if the service was throttled, we ex;ect the exception text
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
DynamoDBMetadataStore.HINT_DDB_IOPS_TOO_LOW,
|
||||||
|
ex,
|
||||||
|
"Expected throttling message");
|
||||||
|
} finally {
|
||||||
|
LOG.info("Statistics {}", tracker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Though the AWS SDK claims in documentation to handle retries and
|
* Though the AWS SDK claims in documentation to handle retries and
|
||||||
|
@ -70,41 +200,17 @@ public class ITestDynamoDBMetadataStoreScale
|
||||||
* correctly, retrying w/ smaller batch instead of surfacing exceptions.
|
* correctly, retrying w/ smaller batch instead of surfacing exceptions.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBatchedWriteExceedsProvisioned() throws Exception {
|
public void test_030_BatchedWrite() throws Exception {
|
||||||
|
|
||||||
final long iterations = 5;
|
final int iterations = 15;
|
||||||
boolean isProvisionedChanged;
|
final ArrayList<PathMetadata> toCleanup = new ArrayList<>();
|
||||||
List<PathMetadata> toCleanup = new ArrayList<>();
|
toCleanup.ensureCapacity(THREADS * iterations);
|
||||||
|
|
||||||
// Fail if someone changes a constant we depend on
|
// Fail if someone changes a constant we depend on
|
||||||
assertTrue("Maximum batch size must big enough to run this test",
|
assertTrue("Maximum batch size must big enough to run this test",
|
||||||
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE);
|
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE);
|
||||||
|
|
||||||
try (DynamoDBMetadataStore ddbms =
|
|
||||||
(DynamoDBMetadataStore)createMetadataStore()) {
|
|
||||||
|
|
||||||
DynamoDB ddb = ddbms.getDynamoDB();
|
|
||||||
String tableName = ddbms.getTable().getTableName();
|
|
||||||
final ProvisionedThroughputDescription existing =
|
|
||||||
ddb.getTable(tableName).describe().getProvisionedThroughput();
|
|
||||||
|
|
||||||
// If you set the same provisioned I/O as already set it throws an
|
|
||||||
// exception, avoid that.
|
|
||||||
isProvisionedChanged = (existing.getReadCapacityUnits() != SMALL_IO_UNITS
|
|
||||||
|| existing.getWriteCapacityUnits() != SMALL_IO_UNITS);
|
|
||||||
|
|
||||||
if (isProvisionedChanged) {
|
|
||||||
// Set low provisioned I/O for dynamodb
|
|
||||||
describe("Provisioning dynamo tbl %s read/write -> %d/%d", tableName,
|
|
||||||
SMALL_IO_UNITS, SMALL_IO_UNITS);
|
|
||||||
// Blocks to ensure table is back to ready state before we proceed
|
|
||||||
ddbms.provisionTableBlocking(SMALL_IO_UNITS, SMALL_IO_UNITS);
|
|
||||||
} else {
|
|
||||||
describe("Skipping provisioning table I/O, already %d/%d",
|
|
||||||
SMALL_IO_UNITS, SMALL_IO_UNITS);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// We know the dynamodb metadata store will expand a put of a path
|
// We know the dynamodb metadata store will expand a put of a path
|
||||||
// of depth N into a batch of N writes (all ancestors are written
|
// of depth N into a batch of N writes (all ancestors are written
|
||||||
// separately up to the root). (Ab)use this for an easy way to write
|
// separately up to the root). (Ab)use this for an easy way to write
|
||||||
|
@ -112,50 +218,280 @@ public class ITestDynamoDBMetadataStoreScale
|
||||||
try {
|
try {
|
||||||
describe("Running %d iterations of batched put, size %d", iterations,
|
describe("Running %d iterations of batched put, size %d", iterations,
|
||||||
BATCH_SIZE);
|
BATCH_SIZE);
|
||||||
|
|
||||||
|
ThrottleTracker result = execute("prune",
|
||||||
|
1,
|
||||||
|
true,
|
||||||
|
() -> {
|
||||||
|
ThrottleTracker tracker = new ThrottleTracker();
|
||||||
long pruneItems = 0;
|
long pruneItems = 0;
|
||||||
for (long i = 0; i < iterations; i++) {
|
for (long i = 0; i < iterations; i++) {
|
||||||
Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i));
|
Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i));
|
||||||
FileStatus status = basicFileStatus(longPath, 0, false, 12345,
|
FileStatus status = basicFileStatus(longPath, 0, false, 12345,
|
||||||
12345);
|
12345);
|
||||||
PathMetadata pm = new PathMetadata(status);
|
PathMetadata pm = new PathMetadata(status);
|
||||||
|
synchronized (toCleanup) {
|
||||||
|
toCleanup.add(pm);
|
||||||
|
}
|
||||||
|
|
||||||
ddbms.put(pm);
|
ddbms.put(pm);
|
||||||
toCleanup.add(pm);
|
|
||||||
pruneItems++;
|
pruneItems++;
|
||||||
// Having hard time reproducing Exceeded exception with put, also
|
|
||||||
// try occasional prune, which was the only stack trace I've seen
|
|
||||||
// (on JIRA)
|
|
||||||
if (pruneItems == BATCH_SIZE) {
|
if (pruneItems == BATCH_SIZE) {
|
||||||
describe("pruning files");
|
describe("pruning files");
|
||||||
ddbms.prune(Long.MAX_VALUE /* all files */);
|
ddbms.prune(Long.MAX_VALUE /* all files */);
|
||||||
pruneItems = 0;
|
pruneItems = 0;
|
||||||
}
|
}
|
||||||
|
if (tracker.probe()) {
|
||||||
|
// fail fast
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertNotEquals("No batch retries in " + result,
|
||||||
|
0, result.batchThrottles);
|
||||||
} finally {
|
} finally {
|
||||||
describe("Cleaning up table %s", tableName);
|
describe("Cleaning up table %s", tableName);
|
||||||
for (PathMetadata pm : toCleanup) {
|
for (PathMetadata pm : toCleanup) {
|
||||||
cleanupMetadata(ddbms, pm);
|
cleanupMetadata(ddbms, pm);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Get throttling including using
|
||||||
|
* {@link MetadataStore#get(Path, boolean)},
|
||||||
|
* as that stresses more of the code.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_040_get() throws Throwable {
|
||||||
|
// attempt to create many many get requests in parallel.
|
||||||
|
Path path = new Path("s3a://example.org/get");
|
||||||
|
S3AFileStatus status = new S3AFileStatus(true, path, "alice");
|
||||||
|
PathMetadata metadata = new PathMetadata(status);
|
||||||
|
ddbms.put(metadata);
|
||||||
|
try {
|
||||||
|
execute("get",
|
||||||
|
OPERATIONS_PER_THREAD,
|
||||||
|
true,
|
||||||
|
() -> ddbms.get(path, true)
|
||||||
|
);
|
||||||
} finally {
|
} finally {
|
||||||
if (isProvisionedChanged) {
|
retryingDelete(path);
|
||||||
long write = existing.getWriteCapacityUnits();
|
|
||||||
long read = existing.getReadCapacityUnits();
|
|
||||||
describe("Restoring dynamo tbl %s read/write -> %d/%d", tableName,
|
|
||||||
read, write);
|
|
||||||
ddbms.provisionTableBlocking(existing.getReadCapacityUnits(),
|
|
||||||
existing.getWriteCapacityUnits());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt do delete metadata, suppressing any errors
|
/**
|
||||||
private void cleanupMetadata(MetadataStore ms, PathMetadata pm) {
|
* Ask for the version marker, which is where table init can be overloaded.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_050_getVersionMarkerItem() throws Throwable {
|
||||||
|
execute("get",
|
||||||
|
OPERATIONS_PER_THREAD * 2,
|
||||||
|
true,
|
||||||
|
() -> ddbms.getVersionMarkerItem()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup with an extra bit of retry logic around it, in case things
|
||||||
|
* are still over the limit.
|
||||||
|
* @param path path
|
||||||
|
*/
|
||||||
|
private void retryingDelete(final Path path) {
|
||||||
try {
|
try {
|
||||||
ms.forgetMetadata(pm.getFileStatus().getPath());
|
ddbms.getInvoker().retry("Delete ", path.toString(), true,
|
||||||
|
() -> ddbms.delete(path));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to delete {}: ", path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_060_list() throws Throwable {
|
||||||
|
// attempt to create many many get requests in parallel.
|
||||||
|
Path path = new Path("s3a://example.org/list");
|
||||||
|
S3AFileStatus status = new S3AFileStatus(true, path, "alice");
|
||||||
|
PathMetadata metadata = new PathMetadata(status);
|
||||||
|
ddbms.put(metadata);
|
||||||
|
try {
|
||||||
|
Path parent = path.getParent();
|
||||||
|
execute("list",
|
||||||
|
OPERATIONS_PER_THREAD,
|
||||||
|
true,
|
||||||
|
() -> ddbms.listChildren(parent)
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
retryingDelete(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_070_putDirMarker() throws Throwable {
|
||||||
|
// attempt to create many many get requests in parallel.
|
||||||
|
Path path = new Path("s3a://example.org/putDirMarker");
|
||||||
|
S3AFileStatus status = new S3AFileStatus(true, path, "alice");
|
||||||
|
PathMetadata metadata = new PathMetadata(status);
|
||||||
|
ddbms.put(metadata);
|
||||||
|
DirListingMetadata children = ddbms.listChildren(path.getParent());
|
||||||
|
try {
|
||||||
|
execute("list",
|
||||||
|
OPERATIONS_PER_THREAD,
|
||||||
|
true,
|
||||||
|
() -> ddbms.put(children)
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
retryingDelete(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_080_fullPathsToPut() throws Throwable {
|
||||||
|
// attempt to create many many get requests in parallel.
|
||||||
|
Path base = new Path("s3a://example.org/test_080_fullPathsToPut");
|
||||||
|
Path child = new Path(base, "child");
|
||||||
|
List<PathMetadata> pms = new ArrayList<>();
|
||||||
|
ddbms.put(new PathMetadata(makeDirStatus(base)));
|
||||||
|
ddbms.put(new PathMetadata(makeDirStatus(child)));
|
||||||
|
ddbms.getInvoker().retry("set up directory tree",
|
||||||
|
base.toString(),
|
||||||
|
true,
|
||||||
|
() -> ddbms.put(pms));
|
||||||
|
try {
|
||||||
|
DDBPathMetadata dirData = ddbms.get(child, true);
|
||||||
|
execute("list",
|
||||||
|
OPERATIONS_PER_THREAD,
|
||||||
|
true,
|
||||||
|
() -> ddbms.fullPathsToPut(dirData)
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
retryingDelete(base);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_900_instrumentation() throws Throwable {
|
||||||
|
describe("verify the owner FS gets updated after throttling events");
|
||||||
|
// we rely on the FS being shared
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
String fsSummary = fs.toString();
|
||||||
|
|
||||||
|
S3AStorageStatistics statistics = fs.getStorageStatistics();
|
||||||
|
for (StorageStatistics.LongStatistic statistic : statistics) {
|
||||||
|
LOG.info("{}", statistic.toString());
|
||||||
|
}
|
||||||
|
String retryKey = Statistic.S3GUARD_METADATASTORE_RETRY.getSymbol();
|
||||||
|
assertTrue("No increment of " + retryKey + " in " + fsSummary,
|
||||||
|
statistics.getLong(retryKey) > 0);
|
||||||
|
String throttledKey = Statistic.S3GUARD_METADATASTORE_THROTTLED.getSymbol();
|
||||||
|
assertTrue("No increment of " + throttledKey + " in " + fsSummary,
|
||||||
|
statistics.getLong(throttledKey) > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a set of operations in parallel, collect throttling statistics
|
||||||
|
* and return them.
|
||||||
|
* This execution will complete as soon as throttling is detected.
|
||||||
|
* This ensures that the tests do not run for longer than they should.
|
||||||
|
* @param operation string for messages.
|
||||||
|
* @param operationsPerThread number of times per thread to invoke the action.
|
||||||
|
* @param expectThrottling is throttling expected (and to be asserted on?)
|
||||||
|
* @param action action to invoke.
|
||||||
|
* @return the throttle statistics
|
||||||
|
*/
|
||||||
|
public ThrottleTracker execute(String operation,
|
||||||
|
int operationsPerThread,
|
||||||
|
final boolean expectThrottling,
|
||||||
|
LambdaTestUtils.VoidCallable action)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
final ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||||
|
final ThrottleTracker tracker = new ThrottleTracker();
|
||||||
|
final ExecutorService executorService = Executors.newFixedThreadPool(
|
||||||
|
THREADS);
|
||||||
|
final List<Callable<ExecutionOutcome>> tasks = new ArrayList<>(THREADS);
|
||||||
|
|
||||||
|
final AtomicInteger throttleExceptions = new AtomicInteger(0);
|
||||||
|
for (int i = 0; i < THREADS; i++) {
|
||||||
|
tasks.add(
|
||||||
|
() -> {
|
||||||
|
final ExecutionOutcome outcome = new ExecutionOutcome();
|
||||||
|
final ContractTestUtils.NanoTimer t
|
||||||
|
= new ContractTestUtils.NanoTimer();
|
||||||
|
for (int j = 0; j < operationsPerThread; j++) {
|
||||||
|
if (tracker.isThrottlingDetected()) {
|
||||||
|
outcome.skipped = true;
|
||||||
|
return outcome;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
action.call();
|
||||||
|
outcome.completed++;
|
||||||
|
} catch (AWSServiceThrottledException e) {
|
||||||
|
// this is possibly OK
|
||||||
|
LOG.info("Operation [{}] raised a throttled exception " + e, j, e);
|
||||||
|
LOG.debug(e.toString(), e);
|
||||||
|
throttleExceptions.incrementAndGet();
|
||||||
|
// consider it completed
|
||||||
|
outcome.throttleExceptions.add(e);
|
||||||
|
outcome.throttled++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to execute {}", operation, e);
|
||||||
|
outcome.exceptions.add(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tracker.probe();
|
||||||
|
}
|
||||||
|
LOG.info("Thread completed {} with in {} ms with outcome {}: {}",
|
||||||
|
operation, t.elapsedTimeMs(), outcome, tracker);
|
||||||
|
return outcome;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
final List<Future<ExecutionOutcome>> futures =
|
||||||
|
executorService.invokeAll(tasks,
|
||||||
|
getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
long elapsedMs = timer.elapsedTimeMs();
|
||||||
|
LOG.info("Completed {} with {}", operation, tracker);
|
||||||
|
LOG.info("time to execute: {} millis", elapsedMs);
|
||||||
|
|
||||||
|
for (Future<ExecutionOutcome> future : futures) {
|
||||||
|
assertTrue("Future timed out", future.isDone());
|
||||||
|
}
|
||||||
|
tracker.probe();
|
||||||
|
|
||||||
|
if (expectThrottling) {
|
||||||
|
tracker.assertThrottlingDetected();
|
||||||
|
}
|
||||||
|
for (Future<ExecutionOutcome> future : futures) {
|
||||||
|
|
||||||
|
ExecutionOutcome outcome = future.get();
|
||||||
|
if (!outcome.exceptions.isEmpty()) {
|
||||||
|
throw outcome.exceptions.get(0);
|
||||||
|
}
|
||||||
|
if (!outcome.skipped) {
|
||||||
|
assertEquals("Future did not complete all operations",
|
||||||
|
operationsPerThread, outcome.completed + outcome.throttled);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to delete metadata, suppressing any errors, and retrying on
|
||||||
|
* throttle events just in case some are still surfacing.
|
||||||
|
* @param ms store
|
||||||
|
* @param pm path to clean up
|
||||||
|
*/
|
||||||
|
private void cleanupMetadata(MetadataStore ms, PathMetadata pm) {
|
||||||
|
Path path = pm.getFileStatus().getPath();
|
||||||
|
try {
|
||||||
|
ddbms.getInvoker().retry("clean up", path.toString(), true,
|
||||||
|
() -> ms.forgetMetadata(path));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// Ignore.
|
// Ignore.
|
||||||
|
LOG.info("Ignoring error while cleaning up {} in database", path, ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,4 +507,107 @@ public class ITestDynamoDBMetadataStoreScale
|
||||||
}
|
}
|
||||||
return new Path(getFileSystem().getUri().toString(), sb.toString());
|
return new Path(getFileSystem().getUri().toString(), sb.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Something to track throttles.
|
||||||
|
* The constructor sets the counters to the current count in the
|
||||||
|
* DDB table; a call to {@link #reset()} will set it to the latest values.
|
||||||
|
* The {@link #probe()} will pick up the latest values to compare them with
|
||||||
|
* the original counts.
|
||||||
|
*/
|
||||||
|
private class ThrottleTracker {
|
||||||
|
|
||||||
|
private long writeThrottleEventOrig = ddbms.getWriteThrottleEventCount();
|
||||||
|
|
||||||
|
private long readThrottleEventOrig = ddbms.getReadThrottleEventCount();
|
||||||
|
|
||||||
|
private long batchWriteThrottleCountOrig =
|
||||||
|
ddbms.getBatchWriteCapacityExceededCount();
|
||||||
|
|
||||||
|
private long readThrottles;
|
||||||
|
|
||||||
|
private long writeThrottles;
|
||||||
|
|
||||||
|
private long batchThrottles;
|
||||||
|
|
||||||
|
ThrottleTracker() {
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the counters.
|
||||||
|
*/
|
||||||
|
private synchronized void reset() {
|
||||||
|
writeThrottleEventOrig
|
||||||
|
= ddbms.getWriteThrottleEventCount();
|
||||||
|
|
||||||
|
readThrottleEventOrig
|
||||||
|
= ddbms.getReadThrottleEventCount();
|
||||||
|
|
||||||
|
batchWriteThrottleCountOrig
|
||||||
|
= ddbms.getBatchWriteCapacityExceededCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the latest throttle count; synchronized.
|
||||||
|
* @return true if throttling has been detected.
|
||||||
|
*/
|
||||||
|
private synchronized boolean probe() {
|
||||||
|
readThrottles = ddbms.getReadThrottleEventCount() - readThrottleEventOrig;
|
||||||
|
writeThrottles = ddbms.getWriteThrottleEventCount()
|
||||||
|
- writeThrottleEventOrig;
|
||||||
|
batchThrottles = ddbms.getBatchWriteCapacityExceededCount()
|
||||||
|
- batchWriteThrottleCountOrig;
|
||||||
|
return isThrottlingDetected();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format(
|
||||||
|
"Tracker with read throttle events = %d;"
|
||||||
|
+ " write events = %d;"
|
||||||
|
+ " batch throttles = %d",
|
||||||
|
readThrottles, writeThrottles, batchThrottles);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that throttling has been detected.
|
||||||
|
*/
|
||||||
|
void assertThrottlingDetected() {
|
||||||
|
assertTrue("No throttling detected in " + this +
|
||||||
|
" against " + ddbms.toString(),
|
||||||
|
isThrottlingDetected());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Has there been any throttling on an operation?
|
||||||
|
* @return true iff read, write or batch operations were throttled.
|
||||||
|
*/
|
||||||
|
private boolean isThrottlingDetected() {
|
||||||
|
return readThrottles > 0 || writeThrottles > 0 || batchThrottles > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Outcome of a thread's execution operation.
|
||||||
|
*/
|
||||||
|
private static class ExecutionOutcome {
|
||||||
|
private int completed;
|
||||||
|
private int throttled;
|
||||||
|
private boolean skipped;
|
||||||
|
private final List<Exception> exceptions = new ArrayList<>(1);
|
||||||
|
private final List<Exception> throttleExceptions = new ArrayList<>(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
final StringBuilder sb = new StringBuilder(
|
||||||
|
"ExecutionOutcome{");
|
||||||
|
sb.append("completed=").append(completed);
|
||||||
|
sb.append(", skipped=").append(skipped);
|
||||||
|
sb.append(", throttled=").append(throttled);
|
||||||
|
sb.append(", exception count=").append(exceptions.size());
|
||||||
|
sb.append('}');
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.Table;
|
import com.amazonaws.services.dynamodbv2.document.Table;
|
||||||
|
@ -275,37 +274,6 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
|
||||||
// that call does not change the values
|
// that call does not change the values
|
||||||
original.checkEquals("unchanged", getCapacities());
|
original.checkEquals("unchanged", getCapacities());
|
||||||
|
|
||||||
// now update the value
|
|
||||||
long readCap = original.getRead();
|
|
||||||
long writeCap = original.getWrite();
|
|
||||||
long rc2 = readCap + 1;
|
|
||||||
long wc2 = writeCap + 1;
|
|
||||||
Capacities desired = new Capacities(rc2, wc2);
|
|
||||||
capacityOut = exec(newSetCapacity(),
|
|
||||||
S3GuardTool.SetCapacity.NAME,
|
|
||||||
"-" + READ_FLAG, Long.toString(rc2),
|
|
||||||
"-" + WRITE_FLAG, Long.toString(wc2),
|
|
||||||
fsURI);
|
|
||||||
LOG.info("Set Capacity output=\n{}", capacityOut);
|
|
||||||
|
|
||||||
// to avoid race conditions, spin for the state change
|
|
||||||
AtomicInteger c = new AtomicInteger(0);
|
|
||||||
LambdaTestUtils.eventually(60000,
|
|
||||||
new LambdaTestUtils.VoidCallable() {
|
|
||||||
@Override
|
|
||||||
public void call() throws Exception {
|
|
||||||
c.incrementAndGet();
|
|
||||||
Map<String, String> diags = getMetadataStore().getDiagnostics();
|
|
||||||
Capacities updated = getCapacities(diags);
|
|
||||||
String tableInfo = String.format("[%02d] table state: %s",
|
|
||||||
c.intValue(), diags.get(STATUS));
|
|
||||||
LOG.info("{}; capacities {}",
|
|
||||||
tableInfo, updated);
|
|
||||||
desired.checkEquals(tableInfo, updated);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
new LambdaTestUtils.ProportionalRetryInterval(500, 5000));
|
|
||||||
|
|
||||||
// Destroy MetadataStore
|
// Destroy MetadataStore
|
||||||
Destroy destroyCmd = new Destroy(fs.getConf());
|
Destroy destroyCmd = new Destroy(fs.getConf());
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,10 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||||
|
|
||||||
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runners.MethodSorters;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -38,6 +41,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
|
||||||
* Could be separated from S3A code, but we're using the S3A scale test
|
* Could be separated from S3A code, but we're using the S3A scale test
|
||||||
* framework for convenience.
|
* framework for convenience.
|
||||||
*/
|
*/
|
||||||
|
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||||
public abstract class AbstractITestS3AMetadataStoreScale extends
|
public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
S3AScaleTestBase {
|
S3AScaleTestBase {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
@ -60,7 +64,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
public abstract MetadataStore createMetadataStore() throws IOException;
|
public abstract MetadataStore createMetadataStore() throws IOException;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPut() throws Throwable {
|
public void test_010_Put() throws Throwable {
|
||||||
describe("Test workload of put() operations");
|
describe("Test workload of put() operations");
|
||||||
|
|
||||||
// As described in hadoop-aws site docs, count parameter is used for
|
// As described in hadoop-aws site docs, count parameter is used for
|
||||||
|
@ -83,7 +87,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMoves() throws Throwable {
|
public void test_020_Moves() throws Throwable {
|
||||||
describe("Test workload of batched move() operations");
|
describe("Test workload of batched move() operations");
|
||||||
|
|
||||||
// As described in hadoop-aws site docs, count parameter is used for
|
// As described in hadoop-aws site docs, count parameter is used for
|
||||||
|
@ -140,7 +144,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
* Create a copy of given list of PathMetadatas with the paths moved from
|
* Create a copy of given list of PathMetadatas with the paths moved from
|
||||||
* src to dest.
|
* src to dest.
|
||||||
*/
|
*/
|
||||||
private List<PathMetadata> moveMetas(List<PathMetadata> metas, Path src,
|
protected List<PathMetadata> moveMetas(List<PathMetadata> metas, Path src,
|
||||||
Path dest) throws IOException {
|
Path dest) throws IOException {
|
||||||
List<PathMetadata> moved = new ArrayList<>(metas.size());
|
List<PathMetadata> moved = new ArrayList<>(metas.size());
|
||||||
for (PathMetadata srcMeta : metas) {
|
for (PathMetadata srcMeta : metas) {
|
||||||
|
@ -151,7 +155,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
return moved;
|
return moved;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path movePath(Path p, Path src, Path dest) {
|
protected Path movePath(Path p, Path src, Path dest) {
|
||||||
String srcStr = src.toUri().getPath();
|
String srcStr = src.toUri().getPath();
|
||||||
String pathStr = p.toUri().getPath();
|
String pathStr = p.toUri().getPath();
|
||||||
// Strip off src dir
|
// Strip off src dir
|
||||||
|
@ -160,7 +164,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
return new Path(dest, pathStr);
|
return new Path(dest, pathStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
private S3AFileStatus copyStatus(S3AFileStatus status) {
|
protected S3AFileStatus copyStatus(S3AFileStatus status) {
|
||||||
if (status.isDirectory()) {
|
if (status.isDirectory()) {
|
||||||
return new S3AFileStatus(status.isEmptyDirectory(), status.getPath(),
|
return new S3AFileStatus(status.isEmptyDirectory(), status.getPath(),
|
||||||
status.getOwner());
|
status.getOwner());
|
||||||
|
@ -185,7 +189,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void clearMetadataStore(MetadataStore ms, long count)
|
protected void clearMetadataStore(MetadataStore ms, long count)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
describe("Recursive deletion");
|
describe("Recursive deletion");
|
||||||
NanoTimer deleteTimer = new NanoTimer();
|
NanoTimer deleteTimer = new NanoTimer();
|
||||||
|
@ -202,15 +206,15 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
msecPerOp, op, count));
|
msecPerOp, op, count));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static S3AFileStatus makeFileStatus(Path path) throws IOException {
|
protected static S3AFileStatus makeFileStatus(Path path) throws IOException {
|
||||||
return new S3AFileStatus(SIZE, ACCESS_TIME, path, BLOCK_SIZE, OWNER);
|
return new S3AFileStatus(SIZE, ACCESS_TIME, path, BLOCK_SIZE, OWNER);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static S3AFileStatus makeDirStatus(Path p) throws IOException {
|
protected static S3AFileStatus makeDirStatus(Path p) throws IOException {
|
||||||
return new S3AFileStatus(false, p, OWNER);
|
return new S3AFileStatus(false, p, OWNER);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Path> metasToPaths(List<PathMetadata> metas) {
|
protected List<Path> metasToPaths(List<PathMetadata> metas) {
|
||||||
List<Path> paths = new ArrayList<>(metas.size());
|
List<Path> paths = new ArrayList<>(metas.size());
|
||||||
for (PathMetadata meta : metas) {
|
for (PathMetadata meta : metas) {
|
||||||
paths.add(meta.getFileStatus().getPath());
|
paths.add(meta.getFileStatus().getPath());
|
||||||
|
@ -225,7 +229,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
|
||||||
* @param width Number of files (and directories, if depth > 0) per directory.
|
* @param width Number of files (and directories, if depth > 0) per directory.
|
||||||
* @param paths List to add generated paths to.
|
* @param paths List to add generated paths to.
|
||||||
*/
|
*/
|
||||||
private static void createDirTree(Path parent, int depth, int width,
|
protected static void createDirTree(Path parent, int depth, int width,
|
||||||
Collection<PathMetadata> paths) throws IOException {
|
Collection<PathMetadata> paths) throws IOException {
|
||||||
|
|
||||||
// Create files
|
// Create files
|
||||||
|
|
|
@ -150,6 +150,16 @@
|
||||||
<value>simple</value>
|
<value>simple</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<!-- Reduce DDB capacity on auto-created tables, to keep bills down. -->
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.table.capacity.read</name>
|
||||||
|
<value>10</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.s3guard.ddb.table.capacity.write</name>
|
||||||
|
<value>10</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
To run these tests.
|
To run these tests.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue