diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index fdbdf37a26a..9f120b8eda0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -176,7 +176,7 @@ public final class Constants { // number of times we should retry errors public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; - public static final int DEFAULT_MAX_ERROR_RETRIES = 20; + public static final int DEFAULT_MAX_ERROR_RETRIES = 10; // seconds until we give up trying to establish a connection to s3 public static final String ESTABLISH_TIMEOUT = diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 92f04bf5b76..044f3a57311 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a.s3guard; import javax.annotation.Nullable; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; @@ -28,7 +27,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,9 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; -import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; @@ -62,17 +58,9 @@ import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec; import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; import com.amazonaws.services.dynamodbv2.document.utils.ValueMap; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; -import com.amazonaws.services.dynamodbv2.model.BillingMode; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; -import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; -import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; -import com.amazonaws.services.dynamodbv2.model.Tag; -import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; import com.amazonaws.services.dynamodbv2.model.WriteRequest; -import com.amazonaws.waiters.WaiterTimedOutException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -89,7 +77,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.AWSClientIOException; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; @@ -233,19 +220,14 @@ public class DynamoDBMetadataStore implements MetadataStore, OPERATIONS_LOG_NAME); /** parent/child name to use in the version marker. */ - public static final String VERSION_MARKER = "../VERSION"; + public static final String VERSION_MARKER_ITEM_NAME = "../VERSION"; + + /** parent/child name to use in the version marker. */ + public static final String VERSION_MARKER_TAG_NAME = "s3guard_version"; /** Current version number. */ public static final int VERSION = 100; - /** Error: version marker not found in table. */ - public static final String E_NO_VERSION_MARKER - = "S3Guard table lacks version marker."; - - /** Error: version mismatch. */ - public static final String E_INCOMPATIBLE_VERSION - = "Database table is from an incompatible S3Guard version."; - @VisibleForTesting static final String BILLING_MODE = "billing-mode"; @@ -305,14 +287,14 @@ public class DynamoDBMetadataStore implements MetadataStore, private String region; private Table table; private String tableName; - private String tableArn; private Configuration conf; private String username; /** * This policy is mostly for batched writes, not for processing * exceptions in invoke() calls. - * It also has a role purpose in {@link #getVersionMarkerItem()}; + * It also has a role purpose in + * {@link DynamoDBMetadataStoreTableManager#getVersionMarkerItem()}; * look at that method for the details. */ private RetryPolicy batchWriteRetryPolicy; @@ -359,6 +341,8 @@ public class DynamoDBMetadataStore implements MetadataStore, */ private ITtlTimeProvider ttlTimeProvider; + private DynamoDBMetadataStoreTableManager tableHandler; + /** * A utility function to create DynamoDB instance. * @param conf the file system configuration @@ -437,7 +421,11 @@ public class DynamoDBMetadataStore implements MetadataStore, ); this.ttlTimeProvider = ttlTp; - initTable(); + + tableHandler = new DynamoDBMetadataStoreTableManager( + dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, + batchWriteRetryPolicy); + this.table = tableHandler.initTable(); instrumentation.initialized(); } @@ -494,6 +482,7 @@ public class DynamoDBMetadataStore implements MetadataStore, conf = config; // use the bucket as the DynamoDB table name if not specified in config tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY); + Preconditions.checkArgument(!StringUtils.isEmpty(tableName), "No DynamoDB table name configured"); region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); @@ -518,7 +507,11 @@ public class DynamoDBMetadataStore implements MetadataStore, "s3a-ddb-" + tableName); initDataAccessRetries(conf); this.ttlTimeProvider = ttlTp; - initTable(); + + tableHandler = new DynamoDBMetadataStoreTableManager( + dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, + batchWriteRetryPolicy); + this.table = tableHandler.initTable(); } /** @@ -1438,32 +1431,7 @@ public class DynamoDBMetadataStore implements MetadataStore, @Override @Retries.RetryTranslated public void destroy() throws IOException { - if (table == null) { - LOG.info("In destroy(): no table to delete"); - return; - } - LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); - Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); - try { - invoker.retry("delete", null, true, - () -> table.delete()); - table.waitForDelete(); - } catch (IllegalArgumentException ex) { - throw new TableDeleteTimeoutException(tableName, - "Timeout waiting for the table " + tableArn + " to be deleted", - ex); - } catch (FileNotFoundException rnfe) { - LOG.info("FileNotFoundException while deleting DynamoDB table {} in " - + "region {}. This may indicate that the table does not exist, " - + "or has been deleted by another concurrent thread or process.", - tableName, region); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted", - tableName, ie); - throw new InterruptedIOException("Table " + tableName - + " in region " + region + " has not been deleted"); - } + tableHandler.destroy(); } @Retries.RetryTranslated @@ -1688,29 +1656,6 @@ public class DynamoDBMetadataStore implements MetadataStore, } } - /** - * Add tags from configuration to the existing DynamoDB table. - */ - @Retries.OnceRaw - public void tagTable() { - List tags = new ArrayList<>(); - Map tagProperties = - conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG); - for (Map.Entry tagMapEntry : tagProperties.entrySet()) { - Tag tag = new Tag().withKey(tagMapEntry.getKey()) - .withValue(tagMapEntry.getValue()); - tags.add(tag); - } - if (tags.isEmpty()) { - return; - } - - TagResourceRequest tagResourceRequest = new TagResourceRequest() - .withResourceArn(table.getDescription().getTableArn()) - .withTags(tags); - getAmazonDynamoDB().tagResource(tagResourceRequest); - } - @VisibleForTesting public AmazonDynamoDB getAmazonDynamoDB() { return amazonDynamoDB; @@ -1721,7 +1666,7 @@ public class DynamoDBMetadataStore implements MetadataStore, return getClass().getSimpleName() + '{' + "region=" + region + ", tableName=" + tableName - + ", tableArn=" + tableArn + + ", tableArn=" + tableHandler.getTableArn() + '}'; } @@ -1735,275 +1680,20 @@ public class DynamoDBMetadataStore implements MetadataStore, @Override public List listAWSPolicyRules( final Set access) { - Preconditions.checkState(tableArn != null, "TableARN not known"); + Preconditions.checkState(tableHandler.getTableArn() != null, + "TableARN not known"); if (access.isEmpty()) { return Collections.emptyList(); } RoleModel.Statement stat; if (access.contains(AccessLevel.ADMIN)) { - stat = allowAllDynamoDBOperations(tableArn); + stat = allowAllDynamoDBOperations(tableHandler.getTableArn()); } else { - stat = allowS3GuardClientOperations(tableArn); + stat = allowS3GuardClientOperations(tableHandler.getTableArn()); } return Lists.newArrayList(stat); } - /** - * Create a table if it does not exist and wait for it to become active. - * - * If a table with the intended name already exists, then it uses that table. - * Otherwise, it will automatically create the table if the config - * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is - * enabled. The DynamoDB table creation API is asynchronous. This method wait - * for the table to become active after sending the creation request, so - * overall, this method is synchronous, and the table is guaranteed to exist - * after this method returns successfully. - * - * The wait for a table becoming active is Retry+Translated; it can fail - * while a table is not yet ready. - * - * @throws IOException if table does not exist and auto-creation is disabled; - * or table is being deleted, or any other I/O exception occurred. - */ - @VisibleForTesting - @Retries.OnceRaw - void initTable() throws IOException { - table = dynamoDB.getTable(tableName); - try { - try { - LOG.debug("Binding to table {}", tableName); - TableDescription description = table.describe(); - LOG.debug("Table state: {}", description); - tableArn = description.getTableArn(); - final String status = description.getTableStatus(); - switch (status) { - case "CREATING": - LOG.debug("Table {} in region {} is being created/updated. This may" - + " indicate that the table is being operated by another " - + "concurrent thread or process. Waiting for active...", - tableName, region); - waitForTableActive(table); - break; - case "DELETING": - throw new FileNotFoundException("DynamoDB table " - + "'" + tableName + "' is being " - + "deleted in region " + region); - case "UPDATING": - // table being updated; it can still be used. - LOG.debug("Table is being updated."); - break; - case "ACTIVE": - break; - default: - throw new IOException("Unknown DynamoDB table status " + status - + ": tableName='" + tableName + "', region=" + region); - } - - final Item versionMarker = getVersionMarkerItem(); - verifyVersionCompatibility(tableName, versionMarker); - Long created = extractCreationTimeFromMarker(versionMarker); - LOG.debug("Using existing DynamoDB table {} in region {} created {}", - tableName, region, (created != null) ? new Date(created) : null); - } catch (ResourceNotFoundException rnfe) { - if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { - long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, - S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT); - long writeCapacity = conf.getLong( - S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, - S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT); - ProvisionedThroughput capacity; - if (readCapacity > 0 && writeCapacity > 0) { - capacity = new ProvisionedThroughput( - readCapacity, - writeCapacity); - } else { - // at least one capacity value is <= 0 - // verify they are both exactly zero - Preconditions.checkArgument( - readCapacity == 0 && writeCapacity == 0, - "S3Guard table read capacity %d and and write capacity %d" - + " are inconsistent", readCapacity, writeCapacity); - // and set the capacity to null for per-request billing. - capacity = null; - } - - createTable(capacity); - } else { - throw (FileNotFoundException)new FileNotFoundException( - "DynamoDB table '" + tableName + "' does not " - + "exist in region " + region + "; auto-creation is turned off") - .initCause(rnfe); - } - } - - } catch (AmazonClientException e) { - throw translateException("initTable", tableName, e); - } - } - - /** - * Get the version mark item in the existing DynamoDB table. - * - * As the version marker item may be created by another concurrent thread or - * process, we sleep and retry a limited number times if the lookup returns - * with a null value. - * DDB throttling is always retried. - */ - @VisibleForTesting - @Retries.RetryTranslated - Item getVersionMarkerItem() throws IOException { - final PrimaryKey versionMarkerKey = - createVersionMarkerPrimaryKey(VERSION_MARKER); - int retryCount = 0; - // look for a version marker, with usual throttling/failure retries. - Item versionMarker = queryVersionMarker(versionMarkerKey); - while (versionMarker == null) { - // The marker was null. - // Two possibilities - // 1. This isn't a S3Guard table. - // 2. This is a S3Guard table in construction; another thread/process - // is about to write/actively writing the version marker. - // So that state #2 is handled, batchWriteRetryPolicy is used to manage - // retries. - // This will mean that if the cause is actually #1, failure will not - // be immediate. As this will ultimately result in a failure to - // init S3Guard and the S3A FS, this isn't going to be a performance - // bottleneck -simply a slightly slower failure report than would otherwise - // be seen. - // "if your settings are broken, performance is not your main issue" - try { - RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null, - retryCount, 0, true); - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - break; - } else { - LOG.debug("Sleeping {} ms before next retry", action.delayMillis); - Thread.sleep(action.delayMillis); - } - } catch (Exception e) { - throw new IOException("initTable: Unexpected exception " + e, e); - } - retryCount++; - versionMarker = queryVersionMarker(versionMarkerKey); - } - return versionMarker; - } - - /** - * Issue the query to get the version marker, with throttling for overloaded - * DDB tables. - * @param versionMarkerKey key to look up - * @return the marker - * @throws IOException failure - */ - @Retries.RetryTranslated - private Item queryVersionMarker(final PrimaryKey versionMarkerKey) - throws IOException { - return readOp.retry("getVersionMarkerItem", - VERSION_MARKER, true, - () -> table.getItem(versionMarkerKey)); - } - - /** - * Verify that a table version is compatible with this S3Guard client. - * @param tableName name of the table (for error messages) - * @param versionMarker the version marker retrieved from the table - * @throws IOException on any incompatibility - */ - @VisibleForTesting - static void verifyVersionCompatibility(String tableName, - Item versionMarker) throws IOException { - if (versionMarker == null) { - LOG.warn("Table {} contains no version marker", tableName); - throw new IOException(E_NO_VERSION_MARKER - + " Table: " + tableName); - } else { - final int version = extractVersionFromMarker(versionMarker); - if (VERSION != version) { - // version mismatch. Unless/until there is support for - // upgrading versions, treat this as an incompatible change - // and fail. - throw new IOException(E_INCOMPATIBLE_VERSION - + " Table "+ tableName - + " Expected version " + VERSION + " actual " + version); - } - } - } - - /** - * Wait for table being active. - * @param t table to block on. - * @throws IOException IO problems - * @throws InterruptedIOException if the wait was interrupted - * @throws IllegalArgumentException if an exception was raised in the waiter - */ - @Retries.RetryTranslated - private void waitForTableActive(Table t) throws IOException { - invoker.retry("Waiting for active state of table " + tableName, - null, - true, - () -> { - try { - t.waitForActive(); - } catch (IllegalArgumentException ex) { - throw translateTableWaitFailure(tableName, ex); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for table {} in region {}" - + " active", - tableName, region, e); - Thread.currentThread().interrupt(); - throw (InterruptedIOException) - new InterruptedIOException("DynamoDB table '" - + tableName + "' is not active yet in region " + region) - .initCause(e); - } - }); - } - - /** - * Create a table, wait for it to become active, then add the version - * marker. - * Creating an setting up the table isn't wrapped by any retry operations; - * the wait for a table to become available is RetryTranslated. - * @param capacity capacity to provision. If null: create a per-request - * table. - * @throws IOException on any failure. - * @throws InterruptedIOException if the wait was interrupted - */ - @Retries.OnceRaw - private void createTable(ProvisionedThroughput capacity) throws IOException { - try { - String mode; - CreateTableRequest request = new CreateTableRequest() - .withTableName(tableName) - .withKeySchema(keySchema()) - .withAttributeDefinitions(attributeDefinitions()); - if (capacity != null) { - mode = String.format("with provisioned read capacity %d and" - + " write capacity %s", - capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits()); - request.withProvisionedThroughput(capacity); - } else { - mode = "with pay-per-request billing"; - request.withBillingMode(BillingMode.PAY_PER_REQUEST); - } - LOG.info("Creating non-existent DynamoDB table {} in region {} {}", - tableName, region, mode); - table = dynamoDB.createTable(request); - LOG.debug("Awaiting table becoming active"); - } catch (ResourceInUseException e) { - LOG.warn("ResourceInUseException while creating DynamoDB table {} " - + "in region {}. This may indicate that the table was " - + "created by another concurrent thread or process.", - tableName, region); - } - waitForTableActive(table); - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, - System.currentTimeMillis()); - putItem(marker); - tagTable(); - } - /** * PUT a single item to the table. * @param item item to put @@ -2015,47 +1705,6 @@ public class DynamoDBMetadataStore implements MetadataStore, return table.putItem(item); } - /** - * Provision the table with given read and write capacity units. - * Call will fail if the table is busy, or the new values match the current - * ones. - *

- * Until the AWS SDK lets us switch a table to on-demand, an attempt to - * set the I/O capacity to zero will fail. - * @param readCapacity read units: must be greater than zero - * @param writeCapacity write units: must be greater than zero - * @throws IOException on a failure - */ - @Retries.RetryTranslated - void provisionTable(Long readCapacity, Long writeCapacity) - throws IOException { - - if (readCapacity == 0 || writeCapacity == 0) { - // table is pay on demand - throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY); - } - final ProvisionedThroughput toProvision = new ProvisionedThroughput() - .withReadCapacityUnits(readCapacity) - .withWriteCapacityUnits(writeCapacity); - invoker.retry("ProvisionTable", tableName, true, - () -> { - final ProvisionedThroughputDescription p = - table.updateTable(toProvision).getProvisionedThroughput(); - LOG.info("Provision table {} in region {}: readCapacityUnits={}, " - + "writeCapacityUnits={}", - tableName, region, p.getReadCapacityUnits(), - p.getWriteCapacityUnits()); - }); - } - - @Retries.RetryTranslated - @VisibleForTesting - void provisionTableBlocking(Long readCapacity, Long writeCapacity) - throws IOException { - provisionTable(readCapacity, writeCapacity); - waitForTableActive(table); - } - @VisibleForTesting Table getTable() { return table; @@ -2175,7 +1824,7 @@ public class DynamoDBMetadataStore implements MetadataStore, currentRead, currentWrite); LOG.info("Changing capacity of table to read: {}, write: {}", newRead, newWrite); - provisionTableBlocking(newRead, newWrite); + tableHandler.provisionTableBlocking(newRead, newWrite); } else { LOG.info("Table capacity unchanged at read: {}, write: {}", newRead, newWrite); @@ -2374,48 +2023,6 @@ public class DynamoDBMetadataStore implements MetadataStore, return username; } - /** - * Take an {@code IllegalArgumentException} raised by a DDB operation - * and if it contains an inner SDK exception, unwrap it. - * @param ex exception. - * @return the inner AWS exception or null. - */ - public static SdkBaseException extractInnerException( - IllegalArgumentException ex) { - if (ex.getCause() instanceof SdkBaseException) { - return (SdkBaseException) ex.getCause(); - } else { - return null; - } - } - - /** - * Handle a table wait failure by extracting any inner cause and - * converting it, or, if unconvertable by wrapping - * the IllegalArgumentException in an IOE. - * - * @param name name of the table - * @param e exception - * @return an IOE to raise. - */ - @VisibleForTesting - static IOException translateTableWaitFailure( - final String name, IllegalArgumentException e) { - final SdkBaseException ex = extractInnerException(e); - if (ex != null) { - if (ex instanceof WaiterTimedOutException) { - // a timeout waiting for state change: extract the - // message from the outer exception, but translate - // the inner one for the throttle policy. - return new AWSClientIOException(e.getMessage(), ex); - } else { - return translateException(e.getMessage(), name, ex); - } - } else { - return new IOException(e); - } - } - /** * Log a PUT into the operations log at debug level. * @param state optional ancestor state. @@ -2691,4 +2298,9 @@ public class DynamoDBMetadataStore implements MetadataStore, return stateStr; } } + + protected DynamoDBMetadataStoreTableManager getTableHandler() { + Preconditions.checkNotNull(tableHandler, "Not initialized"); + return tableHandler; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java new file mode 100644 index 00000000000..d9f297cb254 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java @@ -0,0 +1,693 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.SdkBaseException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.PrimaryKey; +import com.amazonaws.services.dynamodbv2.document.PutItemOutcome; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.model.Tag; +import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; +import com.amazonaws.waiters.WaiterTimedOutException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.AWSClientIOException; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; + +import static java.lang.String.valueOf; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG; +import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_TAG_NAME; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema; + +/** + * Managing dynamo tables for S3Guard dynamodb based metadatastore. + * Factored out from DynamoDBMetadataStore. + */ +public class DynamoDBMetadataStoreTableManager { + public static final Logger LOG = LoggerFactory.getLogger( + DynamoDBMetadataStoreTableManager.class); + + /** Error: version marker not found in table but the table is not empty. */ + public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY + = "S3Guard table lacks version marker, and it is not empty."; + + /** Error: version mismatch. */ + public static final String E_INCOMPATIBLE_TAG_VERSION + = "Database table is from an incompatible S3Guard version based on table TAG."; + + /** Error: version mismatch. */ + public static final String E_INCOMPATIBLE_ITEM_VERSION + = "Database table is from an incompatible S3Guard version based on table ITEM."; + + /** Invoker for IO. Until configured properly, use try-once. */ + private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, + Invoker.NO_OP + ); + + final private AmazonDynamoDB amazonDynamoDB; + final private DynamoDB dynamoDB; + final private String tableName; + final private String region; + final private Configuration conf; + final private Invoker readOp; + final private RetryPolicy batchWriteRetryPolicy; + + private Table table; + private String tableArn; + + public DynamoDBMetadataStoreTableManager(DynamoDB dynamoDB, + String tableName, + String region, + AmazonDynamoDB amazonDynamoDB, + Configuration conf, + Invoker readOp, + RetryPolicy batchWriteCapacityExceededEvents) { + this.dynamoDB = dynamoDB; + this.amazonDynamoDB = amazonDynamoDB; + this.tableName = tableName; + this.region = region; + this.conf = conf; + this.readOp = readOp; + this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents; + } + + /** + * Create a table if it does not exist and wait for it to become active. + * + * If a table with the intended name already exists, then it uses that table. + * Otherwise, it will automatically create the table if the config + * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is + * enabled. The DynamoDB table creation API is asynchronous. This method wait + * for the table to become active after sending the creation request, so + * overall, this method is synchronous, and the table is guaranteed to exist + * after this method returns successfully. + * + * The wait for a table becoming active is Retry+Translated; it can fail + * while a table is not yet ready. + * + * @throws IOException if table does not exist and auto-creation is disabled; + * or table is being deleted, or any other I/O exception occurred. + */ + @VisibleForTesting + @Retries.RetryTranslated + Table initTable() throws IOException { + table = dynamoDB.getTable(tableName); + try { + try { + LOG.debug("Binding to table {}", tableName); + TableDescription description = table.describe(); + LOG.debug("Table state: {}", description); + tableArn = description.getTableArn(); + final String status = description.getTableStatus(); + switch (status) { + case "CREATING": + LOG.debug("Table {} in region {} is being created/updated. This may" + + " indicate that the table is being operated by another " + + "concurrent thread or process. Waiting for active...", + tableName, region); + waitForTableActive(table); + break; + case "DELETING": + throw new FileNotFoundException("DynamoDB table " + + "'" + tableName + "' is being " + + "deleted in region " + region); + case "UPDATING": + // table being updated; it can still be used. + LOG.debug("Table is being updated."); + break; + case "ACTIVE": + break; + default: + throw new IOException("Unknown DynamoDB table status " + status + + ": tableName='" + tableName + "', region=" + region); + } + + verifyVersionCompatibility(); + final Item versionMarker = getVersionMarkerItem(); + Long created = extractCreationTimeFromMarker(versionMarker); + LOG.debug("Using existing DynamoDB table {} in region {} created {}", + tableName, region, (created != null) ? new Date(created) : null); + } catch (ResourceNotFoundException rnfe) { + if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { + long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, + S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT); + long writeCapacity = conf.getLong( + S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, + S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT); + ProvisionedThroughput capacity; + if (readCapacity > 0 && writeCapacity > 0) { + capacity = new ProvisionedThroughput( + readCapacity, + writeCapacity); + } else { + // at least one capacity value is <= 0 + // verify they are both exactly zero + Preconditions.checkArgument( + readCapacity == 0 && writeCapacity == 0, + "S3Guard table read capacity %d and and write capacity %d" + + " are inconsistent", readCapacity, writeCapacity); + // and set the capacity to null for per-request billing. + capacity = null; + } + + createTable(capacity); + } else { + throw (FileNotFoundException) new FileNotFoundException( + "DynamoDB table '" + tableName + "' does not " + + "exist in region " + region + + "; auto-creation is turned off") + .initCause(rnfe); + } + } + + } catch (AmazonClientException e) { + throw translateException("initTable", tableName, e); + } + + return table; + } + + protected void tagTableWithVersionMarker() { + try { + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(newVersionMarkerTag()); + amazonDynamoDB.tagResource(tagResourceRequest); + } catch (AmazonDynamoDBException e) { + LOG.warn("Exception during tagging table: {}", e.getMessage()); + } + } + + protected static Item getVersionMarkerFromTags(Table table, + AmazonDynamoDB addb) { + List tags = null; + try { + final TableDescription description = table.describe(); + ListTagsOfResourceRequest listTagsOfResourceRequest = + new ListTagsOfResourceRequest() + .withResourceArn(description.getTableArn()); + tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags(); + } catch (ResourceNotFoundException e) { + LOG.error("Table: {} not found.", table.getTableName()); + throw e; + } catch (AmazonDynamoDBException e) { + LOG.warn("Exception while getting tags from the dynamo table: {}", + e.getMessage()); + } + + if (tags == null) { + return null; + } + + final Optional first = tags.stream() + .filter(tag -> tag.getKey().equals(VERSION_MARKER_TAG_NAME)).findFirst(); + if (first.isPresent()) { + final Tag vmTag = first.get(); + return createVersionMarker( + vmTag.getKey(), Integer.parseInt(vmTag.getValue()), 0 + ); + } else { + return null; + } + } + + /** + * Create a table, wait for it to become active, then add the version + * marker. + * Creating an setting up the table isn't wrapped by any retry operations; + * the wait for a table to become available is RetryTranslated. + * The tags are added to the table during creation, not after creation. + * We can assume that tagging and creating the table is a single atomic + * operation. + * + * @param capacity capacity to provision. If null: create a per-request + * table. + * @throws IOException on any failure. + * @throws InterruptedIOException if the wait was interrupted + */ + @Retries.OnceMixed + private void createTable(ProvisionedThroughput capacity) throws IOException { + try { + String mode; + CreateTableRequest request = new CreateTableRequest() + .withTableName(tableName) + .withKeySchema(keySchema()) + .withAttributeDefinitions(attributeDefinitions()) + .withTags(getTableTagsFromConfig()); + if (capacity != null) { + mode = String.format("with provisioned read capacity %d and" + + " write capacity %s", + capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits()); + request.withProvisionedThroughput(capacity); + } else { + mode = "with pay-per-request billing"; + request.withBillingMode(BillingMode.PAY_PER_REQUEST); + } + LOG.info("Creating non-existent DynamoDB table {} in region {} {}", + tableName, region, mode); + table = dynamoDB.createTable(request); + LOG.debug("Awaiting table becoming active"); + } catch (ResourceInUseException e) { + LOG.warn("ResourceInUseException while creating DynamoDB table {} " + + "in region {}. This may indicate that the table was " + + "created by another concurrent thread or process.", + tableName, region); + } + waitForTableActive(table); + putVersionMarkerItemToTable(); + } + + /** + * Return tags from configuration and the version marker for adding to + * dynamo table during creation. + */ + @Retries.OnceRaw + public List getTableTagsFromConfig() { + List tags = new ArrayList<>(); + + // from configuration + Map tagProperties = + conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG); + for (Map.Entry tagMapEntry : tagProperties.entrySet()) { + Tag tag = new Tag().withKey(tagMapEntry.getKey()) + .withValue(tagMapEntry.getValue()); + tags.add(tag); + } + // add the version marker + tags.add(newVersionMarkerTag()); + return tags; + } + + /** + * Create a new version marker tag. + * @return a new version marker tag + */ + private static Tag newVersionMarkerTag() { + return new Tag().withKey(VERSION_MARKER_TAG_NAME).withValue(valueOf(VERSION)); + } + + /** + * Verify that a table version is compatible with this S3Guard client. + * + * Checks for consistency between the version marker as the item and tag. + * + *

+   *   1. If the table lacks both version markers AND it's empty,
+   *      both markers will be added.
+   *      If the table is not empty the check throws IOException
+   *   2. If there's no version marker ITEM, the compatibility with the TAG
+   *      will be checked, and the version marker ITEM will be added if the
+   *      TAG version is compatible.
+   *      If the TAG version is not compatible, the check throws OException
+   *   3. If there's no version marker TAG, the compatibility with the ITEM
+   *      version marker will be checked, and the version marker ITEM will be
+   *      added if the ITEM version is compatible.
+   *      If the ITEM version is not compatible, the check throws IOException
+   *   4. If the TAG and ITEM versions are both present then both will be checked
+   *      for compatibility. If the ITEM or TAG version marker is not compatible,
+   *      the check throws IOException
+   * 
+ * + * @throws IOException on any incompatibility + */ + @VisibleForTesting + protected void verifyVersionCompatibility() throws IOException { + final Item versionMarkerItem = getVersionMarkerItem(); + final Item versionMarkerFromTag = + getVersionMarkerFromTags(table, amazonDynamoDB); + + LOG.debug("versionMarkerItem: {}; versionMarkerFromTag: {}", + versionMarkerItem, versionMarkerFromTag); + + if (versionMarkerItem == null && versionMarkerFromTag == null) { + if (!isEmptyTable(tableName, amazonDynamoDB)) { + LOG.error("Table is not empty but missing the version maker. Failing."); + throw new IOException(E_NO_VERSION_MARKER_AND_NOT_EMPTY + + " Table: " + tableName); + } + + LOG.info("Table {} contains no version marker item or tag. " + + "The table is empty, so the version marker will be added " + + "as TAG and ITEM.", tableName); + + tagTableWithVersionMarker(); + putVersionMarkerItemToTable(); + } + + if (versionMarkerItem == null && versionMarkerFromTag != null) { + final int tagVersionMarker = + extractVersionFromMarker(versionMarkerFromTag); + throwExceptionOnVersionMismatch(tagVersionMarker, tableName, + E_INCOMPATIBLE_TAG_VERSION); + + LOG.info("Table {} contains no version marker ITEM but contains " + + "compatible version marker TAG. Restoring the version marker " + + "item from tag.", tableName); + + putVersionMarkerItemToTable(); + } + + if (versionMarkerItem != null && versionMarkerFromTag == null) { + final int itemVersionMarker = + extractVersionFromMarker(versionMarkerItem); + throwExceptionOnVersionMismatch(itemVersionMarker, tableName, + E_INCOMPATIBLE_ITEM_VERSION); + + LOG.info("Table {} contains no version marker TAG but contains " + + "compatible version marker ITEM. Restoring the version marker " + + "item from item.", tableName); + + tagTableWithVersionMarker(); + } + + if (versionMarkerItem != null && versionMarkerFromTag != null) { + final int tagVersionMarker = + extractVersionFromMarker(versionMarkerFromTag); + final int itemVersionMarker = + extractVersionFromMarker(versionMarkerItem); + + throwExceptionOnVersionMismatch(tagVersionMarker, tableName, + E_INCOMPATIBLE_TAG_VERSION); + throwExceptionOnVersionMismatch(itemVersionMarker, tableName, + E_INCOMPATIBLE_ITEM_VERSION); + + LOG.debug("Table {} contains correct version marker TAG and ITEM.", + tableName); + } + } + + private static boolean isEmptyTable(String tableName, AmazonDynamoDB aadb) { + final ScanRequest req = new ScanRequest().withTableName( + tableName).withLimit(1); + final ScanResult result = aadb.scan(req); + return result.getCount() == 0; + } + + private static void throwExceptionOnVersionMismatch(int actual, + String tableName, + String exMsg) throws IOException { + + if (VERSION != actual) { + throw new IOException(exMsg + " Table " + tableName + + " Expected version: " + VERSION + " actual tag version: " + + actual); + } + } + + /** + * Add version marker to the dynamo table. + */ + @Retries.OnceRaw + private void putVersionMarkerItemToTable() { + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, + System.currentTimeMillis()); + putItem(marker); + } + + /** + * Wait for table being active. + * @param t table to block on. + * @throws IOException IO problems + * @throws InterruptedIOException if the wait was interrupted + * @throws IllegalArgumentException if an exception was raised in the waiter + */ + @Retries.RetryTranslated + private void waitForTableActive(Table t) throws IOException { + invoker.retry("Waiting for active state of table " + tableName, + null, + true, + () -> { + try { + t.waitForActive(); + } catch (IllegalArgumentException ex) { + throw translateTableWaitFailure(tableName, ex); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for table {} in region {}" + + " active", + tableName, region, e); + Thread.currentThread().interrupt(); + throw (InterruptedIOException) + new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region) + .initCause(e); + } + }); + } + + /** + * Handle a table wait failure by extracting any inner cause and + * converting it, or, if unconvertable by wrapping + * the IllegalArgumentException in an IOE. + * + * @param name name of the table + * @param e exception + * @return an IOE to raise. + */ + @VisibleForTesting + static IOException translateTableWaitFailure( + final String name, IllegalArgumentException e) { + final SdkBaseException ex = extractInnerException(e); + if (ex != null) { + if (ex instanceof WaiterTimedOutException) { + // a timeout waiting for state change: extract the + // message from the outer exception, but translate + // the inner one for the throttle policy. + return new AWSClientIOException(e.getMessage(), ex); + } else { + return translateException(e.getMessage(), name, ex); + } + } else { + return new IOException(e); + } + } + + /** + * Take an {@code IllegalArgumentException} raised by a DDB operation + * and if it contains an inner SDK exception, unwrap it. + * @param ex exception. + * @return the inner AWS exception or null. + */ + public static SdkBaseException extractInnerException( + IllegalArgumentException ex) { + if (ex.getCause() instanceof SdkBaseException) { + return (SdkBaseException) ex.getCause(); + } else { + return null; + } + } + + /** + * Get the version mark item in the existing DynamoDB table. + * + * As the version marker item may be created by another concurrent thread or + * process, we sleep and retry a limited number times if the lookup returns + * with a null value. + * DDB throttling is always retried. + */ + @VisibleForTesting + @Retries.RetryTranslated + protected Item getVersionMarkerItem() throws IOException { + final PrimaryKey versionMarkerKey = + createVersionMarkerPrimaryKey(VERSION_MARKER_ITEM_NAME); + int retryCount = 0; + // look for a version marker, with usual throttling/failure retries. + Item versionMarker = queryVersionMarker(versionMarkerKey); + while (versionMarker == null) { + // The marker was null. + // Two possibilities + // 1. This isn't a S3Guard table. + // 2. This is a S3Guard table in construction; another thread/process + // is about to write/actively writing the version marker. + // So that state #2 is handled, batchWriteRetryPolicy is used to manage + // retries. + // This will mean that if the cause is actually #1, failure will not + // be immediate. As this will ultimately result in a failure to + // init S3Guard and the S3A FS, this isn't going to be a performance + // bottleneck -simply a slightly slower failure report than would otherwise + // be seen. + // "if your settings are broken, performance is not your main issue" + try { + RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null, + retryCount, 0, true); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + break; + } else { + LOG.warn("No version marker found in the DynamoDB table: {}. " + + "Sleeping {} ms before next retry", tableName, action.delayMillis); + Thread.sleep(action.delayMillis); + } + } catch (Exception e) { + throw new IOException("initTable: Unexpected exception " + e, e); + } + retryCount++; + versionMarker = queryVersionMarker(versionMarkerKey); + } + return versionMarker; + } + + /** + * Issue the query to get the version marker, with throttling for overloaded + * DDB tables. + * @param versionMarkerKey key to look up + * @return the marker + * @throws IOException failure + */ + @Retries.RetryTranslated + private Item queryVersionMarker(final PrimaryKey versionMarkerKey) + throws IOException { + return readOp.retry("getVersionMarkerItem", + VERSION_MARKER_ITEM_NAME, true, + () -> table.getItem(versionMarkerKey)); + } + + /** + * PUT a single item to the table. + * @param item item to put + * @return the outcome. + */ + @Retries.OnceRaw + private PutItemOutcome putItem(Item item) { + LOG.debug("Putting item {}", item); + return table.putItem(item); + } + + /** + * Provision the table with given read and write capacity units. + * Call will fail if the table is busy, or the new values match the current + * ones. + *

+ * Until the AWS SDK lets us switch a table to on-demand, an attempt to + * set the I/O capacity to zero will fail. + * @param readCapacity read units: must be greater than zero + * @param writeCapacity write units: must be greater than zero + * @throws IOException on a failure + */ + @Retries.RetryTranslated + void provisionTable(Long readCapacity, Long writeCapacity) + throws IOException { + + if (readCapacity == 0 || writeCapacity == 0) { + // table is pay on demand + throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY); + } + final ProvisionedThroughput toProvision = new ProvisionedThroughput() + .withReadCapacityUnits(readCapacity) + .withWriteCapacityUnits(writeCapacity); + invoker.retry("ProvisionTable", tableName, true, + () -> { + final ProvisionedThroughputDescription p = + table.updateTable(toProvision).getProvisionedThroughput(); + LOG.info("Provision table {} in region {}: readCapacityUnits={}, " + + "writeCapacityUnits={}", + tableName, region, p.getReadCapacityUnits(), + p.getWriteCapacityUnits()); + }); + } + + @Retries.RetryTranslated + public void destroy() throws IOException { + if (table == null) { + LOG.info("In destroy(): no table to delete"); + return; + } + LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); + Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); + try { + invoker.retry("delete", null, true, + () -> table.delete()); + table.waitForDelete(); + } catch (IllegalArgumentException ex) { + throw new TableDeleteTimeoutException(tableName, + "Timeout waiting for the table " + getTableArn() + + " to be deleted", ex); + } catch (FileNotFoundException rnfe) { + LOG.info("FileNotFoundException while deleting DynamoDB table {} in " + + "region {}. This may indicate that the table does not exist, " + + "or has been deleted by another concurrent thread or process.", + tableName, region); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted", + tableName, ie); + throw new InterruptedIOException("Table " + tableName + + " in region " + region + " has not been deleted"); + } + } + + @Retries.RetryTranslated + @VisibleForTesting + void provisionTableBlocking(Long readCapacity, Long writeCapacity) + throws IOException { + provisionTable(readCapacity, writeCapacity); + waitForTableActive(table); + } + + public Table getTable() { + return table; + } + + public String getTableArn() { + return tableArn; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java index 348dfbfced4..be120881cb3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java @@ -247,7 +247,7 @@ public final class PathMetadataDynamoDBTranslation { * @return the creation time, or null * @throws IOException if the item is not a version marker */ - static Long extractCreationTimeFromMarker(Item marker) throws IOException { + static Long extractCreationTimeFromMarker(Item marker) { if (marker.hasAttribute(TABLE_CREATED)) { return marker.getLong(TABLE_CREATED); } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java index 5592faafe3e..19ef90e4557 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java @@ -39,7 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.CHILD; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION; @@ -199,8 +199,8 @@ class S3GuardTableAccess { public DDBPathMetadata next() { Item item = it.next(); Pair key = primaryKey(item); - if (VERSION_MARKER.equals(key.getLeft()) && - VERSION_MARKER.equals(key.getRight())) { + if (VERSION_MARKER_ITEM_NAME.equals(key.getLeft()) && + VERSION_MARKER_ITEM_NAME.equals(key.getRight())) { // a version marker is found, return the special type return new VersionMarker(item); } else { diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index cb0fd139262..571f2230cb7 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -951,9 +951,36 @@ logged. ### Versioning -S3Guard tables are created with a version marker, an entry with the primary -key and child entry of `../VERSION`; the use of a relative path guarantees -that it will not be resolved. +S3Guard tables are created with a version marker entry and table tag. +The entry is created with the primary key and child entry of `../VERSION`; +the use of a relative path guarantees that it will not be resolved. +Table tag key is named `s3guard_version`. + +When the table is initialized by S3Guard, the table will be tagged during the +creating and the version marker entry will be created in the table. +If the table lacks the version marker entry or tag, S3Guard will try to create +it according to the following rules: + +1. If the table lacks both version markers AND it's empty, both markers will be added. +If the table is not empty the check throws IOException +1. If there's no version marker ITEM, the compatibility with the TAG +will be checked, and the version marker ITEM will be added if the +TAG version is compatible. +If the TAG version is not compatible, the check throws OException +1. If there's no version marker TAG, the compatibility with the ITEM +version marker will be checked, and the version marker ITEM will be +added if the ITEM version is compatible. +If the ITEM version is not compatible, the check throws IOException +1. If the TAG and ITEM versions are both present then both will be checked +for compatibility. If the ITEM or TAG version marker is not compatible, +the check throws IOException + +*Note*: If the user does not have sufficient rights to tag the table the +initialization of S3Guard will not fail, but there will be no version marker tag +on the dynamo table and the following message will be logged on WARN level: +``` +Exception during tagging table: {AmazonDynamoDBException exception message} +``` *Versioning policy* diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 78c9fea2f57..e541683285e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.PrimaryKey; @@ -41,6 +42,8 @@ import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest; import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.Tag; +import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; +import com.amazonaws.services.dynamodbv2.model.UntagResourceRequest; import com.google.common.collect.Lists; import org.assertj.core.api.Assertions; @@ -70,10 +73,15 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.security.UserGroupInformation; +import static java.lang.String.valueOf; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_ITEM_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_TAG_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_NO_VERSION_MARKER_AND_NOT_EMPTY; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.getVersionMarkerFromTags; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*; import static org.apache.hadoop.test.LambdaTestUtils.*; @@ -110,10 +118,11 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { LoggerFactory.getLogger(ITestDynamoDBMetadataStore.class); public static final PrimaryKey VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey( - DynamoDBMetadataStore.VERSION_MARKER); + DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME); private S3AFileSystem fileSystem; private S3AContract s3AContract; + private DynamoDBMetadataStoreTableManager tableHandler; private URI fsUri; @@ -153,6 +162,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { try{ super.setUp(); + tableHandler = getDynamoMetadataStore().getTableHandler(); } catch (FileNotFoundException e){ LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be " + "deleted before trying again."); @@ -613,31 +623,10 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { final String tableName = ddbms.getTable().getTableName(); verifyTableInitialized(tableName, ddbms.getDynamoDB()); // create existing table - ddbms.initTable(); + tableHandler.initTable(); verifyTableInitialized(tableName, ddbms.getDynamoDB()); } - /** - * Test the low level version check code. - */ - @Test - public void testItemVersionCompatibility() throws Throwable { - verifyVersionCompatibility("table", - createVersionMarker(VERSION_MARKER, VERSION, 0)); - } - - /** - * Test that a version marker entry without the version number field - * is rejected as incompatible with a meaningful error message. - */ - @Test - public void testItemLacksVersion() throws Throwable { - intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> verifyVersionCompatibility("table", - new Item().withPrimaryKey( - createVersionMarkerPrimaryKey(VERSION_MARKER)))); - } - /** * Test versioning handling. *

    @@ -663,43 +652,118 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore(); try { ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); + DynamoDBMetadataStoreTableManager localTableHandler = + ddbms.getTableHandler(); + Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB()); - // check the tagging too + // check the tagging verifyStoreTags(createTagMap(), ddbms); + // check version compatibility + checkVerifyVersionMarkerCompatibility(localTableHandler, table); - Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); - table.deleteItem(VERSION_MARKER_PRIMARY_KEY); - - // create existing table - intercept(IOException.class, E_NO_VERSION_MARKER, - () -> ddbms.initTable()); - - // now add a different version marker - Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0); - table.putItem(v200); - - // create existing table - intercept(IOException.class, E_INCOMPATIBLE_VERSION, - () -> ddbms.initTable()); - - // create a marker with no version and expect failure - final Item invalidMarker = new Item().withPrimaryKey( - createVersionMarkerPrimaryKey(VERSION_MARKER)) - .withLong(TABLE_CREATED, 0); - table.putItem(invalidMarker); - - intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> ddbms.initTable()); - - // reinstate the version marker - table.putItem(originalVersionMarker); - ddbms.initTable(); conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries); } finally { destroy(ddbms); } } + private void checkVerifyVersionMarkerCompatibility( + DynamoDBMetadataStoreTableManager localTableHandler, Table table) + throws Exception { + final AmazonDynamoDB addb + = getDynamoMetadataStore().getAmazonDynamoDB(); + Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); + + LOG.info("1/6: remove version marker and tags from table " + + "the table is empty, so it should be initialized after the call"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + localTableHandler.initTable(); + + final int versionFromItem = extractVersionFromMarker( + localTableHandler.getVersionMarkerItem()); + final int versionFromTag = extractVersionFromMarker( + getVersionMarkerFromTags(table, addb)); + assertEquals("Table should be tagged with the right version.", + VERSION, versionFromTag); + assertEquals("Table should have the right version marker.", + VERSION, versionFromItem); + + LOG.info("2/6: if the table is not empty and there's no version marker " + + "it should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + String testKey = "coffee"; + Item wrongItem = + createVersionMarker(testKey, VERSION * 2, 0); + table.putItem(wrongItem); + intercept(IOException.class, E_NO_VERSION_MARKER_AND_NOT_EMPTY, + () -> localTableHandler.initTable()); + + LOG.info("3/6: table has only version marker item then it will be tagged"); + table.putItem(originalVersionMarker); + localTableHandler.initTable(); + final int versionFromTag2 = extractVersionFromMarker( + getVersionMarkerFromTags(table, addb)); + assertEquals("Table should have the right version marker tag " + + "if there was a version item.", VERSION, versionFromTag2); + + LOG.info("4/6: table has only version marker tag then the version marker " + + "item will be created."); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + localTableHandler.tagTableWithVersionMarker(); + localTableHandler.initTable(); + final int versionFromItem2 = extractVersionFromMarker( + localTableHandler.getVersionMarkerItem()); + assertEquals("Table should have the right version marker item " + + "if there was a version tag.", VERSION, versionFromItem2); + + LOG.info("5/6: add a different marker tag to the table: init should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + Item v200 = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION * 2, 0); + table.putItem(v200); + intercept(IOException.class, E_INCOMPATIBLE_ITEM_VERSION, + () -> localTableHandler.initTable()); + + LOG.info("6/6: add a different marker item to the table: init should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + int wrongVersion = VERSION + 3; + tagTableWithCustomVersion(table, addb, wrongVersion); + intercept(IOException.class, E_INCOMPATIBLE_TAG_VERSION, + () -> localTableHandler.initTable()); + + // CLEANUP + table.putItem(originalVersionMarker); + localTableHandler.tagTableWithVersionMarker(); + localTableHandler.initTable(); + } + + private void tagTableWithCustomVersion(Table table, + AmazonDynamoDB addb, + int wrongVersion) { + final Tag vmTag = new Tag().withKey(VERSION_MARKER_TAG_NAME) + .withValue(valueOf(wrongVersion)); + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(vmTag); + addb.tagResource(tagResourceRequest); + } + + private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) { + addb.untagResource(new UntagResourceRequest() + .withResourceArn(table.describe().getTableArn()) + .withTagKeys(VERSION_MARKER_TAG_NAME)); + } + + private void deleteVersionMarkerItem(Table table) { + table.deleteItem(VERSION_MARKER_PRIMARY_KEY); + assertNull("Version marker should be null after deleting it " + + "from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY)); + } + /** * Test that initTable fails with IOException when table does not exist and * table auto-creation is disabled. @@ -952,8 +1016,11 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { tags.forEach(t -> actual.put(t.getKey(), t.getValue())); Assertions.assertThat(actual) .describedAs("Tags from DDB table") - .containsExactlyEntriesOf(tagMap); - assertEquals(tagMap.size(), tags.size()); + .containsAllEntriesOf(tagMap); + + // The version marker is always there in the tags. + // We have a plus one in tags we expect. + assertEquals(tagMap.size() + 1, tags.size()); } protected List listTagsOfStore(final DynamoDBMetadataStore store) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index 53df60f59c2..a0614d5a05f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -91,6 +91,7 @@ public class ITestDynamoDBMetadataStoreScale private static final long MAXIMUM_WRITE_CAPACITY = 15; private DynamoDBMetadataStore ddbms; + private DynamoDBMetadataStoreTableManager tableHandler; private DynamoDB ddb; @@ -160,6 +161,7 @@ public class ITestDynamoDBMetadataStoreScale super.setup(); ddbms = (DynamoDBMetadataStore) createMetadataStore(); tableName = ddbms.getTableName(); + tableHandler = ddbms.getTableHandler(); assertNotNull("table has no name", tableName); ddb = ddbms.getDynamoDB(); table = ddb.getTable(tableName); @@ -325,7 +327,7 @@ public class ITestDynamoDBMetadataStoreScale execute("get", OPERATIONS_PER_THREAD * 2, expectThrottling(), - () -> ddbms.getVersionMarkerItem() + () -> tableHandler.getVersionMarkerItem() ); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index 205eb65a1c9..358ab832204 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -160,8 +160,13 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase { List tags = ddbms.getAmazonDynamoDB().listTagsOfResource(listTagsOfResourceRequest).getTags(); // assert - assertEquals(tagMap.size(), tags.size()); + // table version is always there as a plus one tag. + assertEquals(tagMap.size() + 1, tags.size()); for (Tag tag : tags) { + // skip the version marker tag + if (tag.getKey().equals(VERSION_MARKER_TAG_NAME)) { + continue; + } Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue()); } // be sure to clean up - delete table diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java index 578aed06bc3..602a072aac2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java @@ -32,10 +32,10 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.test.HadoopTestBase; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.translateTableWaitFailure; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java index 70bf901514b..c8820948533 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java @@ -51,7 +51,7 @@ import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.is; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; import static org.mockito.Mockito.never; @@ -272,14 +272,14 @@ public class TestPathMetadataDynamoDBTranslation extends Assert { @Test public void testVersionRoundTrip() throws Throwable { - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0); + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0); assertEquals("Extracted version from " + marker, VERSION, extractVersionFromMarker(marker)); } @Test public void testVersionMarkerNotStatusIllegalPath() throws Throwable { - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0); + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0); assertNull("Path metadata fromfrom " + marker, itemToPathMetadata(marker, "alice")); }