diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index f34afb5e9b3..6f892e5b794 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -42,6 +42,7 @@ import java.util.stream.Collectors; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; @@ -66,6 +67,7 @@ import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.Tag; import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import com.amazonaws.waiters.WaiterTimedOutException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -78,6 +80,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSClientIOException; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; @@ -1129,6 +1132,9 @@ public class DynamoDBMetadataStore implements MetadataStore { * overall, this method is synchronous, and the table is guaranteed to exist * after this method returns successfully. * + * The wait for a table becoming active is Retry+Translated; it can fail + * while a table is not yet ready. + * * @throws IOException if table does not exist and auto-creation is disabled; * or table is being deleted, or any other I/O exception occurred. */ @@ -1144,7 +1150,6 @@ public class DynamoDBMetadataStore implements MetadataStore { final String status = description.getTableStatus(); switch (status) { case "CREATING": - case "UPDATING": LOG.debug("Table {} in region {} is being created/updated. This may" + " indicate that the table is being operated by another " + "concurrent thread or process. Waiting for active...", @@ -1155,6 +1160,10 @@ public class DynamoDBMetadataStore implements MetadataStore { throw new FileNotFoundException("DynamoDB table " + "'" + tableName + "' is being " + "deleted in region " + region); + case "UPDATING": + // table being updated; it can still be used. + LOG.debug("Table is being updated."); + break; case "ACTIVE": break; default: @@ -1285,24 +1294,34 @@ public class DynamoDBMetadataStore implements MetadataStore { * @throws InterruptedIOException if the wait was interrupted * @throws IllegalArgumentException if an exception was raised in the waiter */ - @Retries.OnceRaw - private void waitForTableActive(Table t) throws InterruptedIOException { - try { - t.waitForActive(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for table {} in region {} active", - tableName, region, e); - Thread.currentThread().interrupt(); - throw (InterruptedIOException) - new InterruptedIOException("DynamoDB table '" - + tableName + "' is not active yet in region " + region) - .initCause(e); - } + @Retries.RetryTranslated + private void waitForTableActive(Table t) throws IOException { + invoker.retry("Waiting for active state of table " + tableName, + null, + true, + () -> { + try { + t.waitForActive(); + } catch (IllegalArgumentException ex) { + throw translateTableWaitFailure(tableName, ex); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for table {} in region {}" + + " active", + tableName, region, e); + Thread.currentThread().interrupt(); + throw (InterruptedIOException) + new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region) + .initCause(e); + } + }); } /** * Create a table, wait for it to become active, then add the version * marker. + * Creating an setting up the table isn't wrapped by any retry operations; + * the wait for a table to become available is RetryTranslated. * @param capacity capacity to provision * @throws IOException on any failure. * @throws InterruptedIOException if the wait was interrupted @@ -1604,4 +1623,46 @@ public class DynamoDBMetadataStore implements MetadataStore { public Invoker getInvoker() { return invoker; } + + /** + * Take an {@code IllegalArgumentException} raised by a DDB operation + * and if it contains an inner SDK exception, unwrap it. + * @param ex exception. + * @return the inner AWS exception or null. + */ + public static SdkBaseException extractInnerException( + IllegalArgumentException ex) { + if (ex.getCause() instanceof SdkBaseException) { + return (SdkBaseException) ex.getCause(); + } else { + return null; + } + } + + /** + * Handle a table wait failure by extracting any inner cause and + * converting it, or, if unconvertable by wrapping + * the IllegalArgumentException in an IOE. + * + * @param name name of the table + * @param e exception + * @return an IOE to raise. + */ + @VisibleForTesting + static IOException translateTableWaitFailure( + final String name, IllegalArgumentException e) { + final SdkBaseException ex = extractInnerException(e); + if (ex != null) { + if (ex instanceof WaiterTimedOutException) { + // a timeout waiting for state change: extract the + // message from the outer exception, but translate + // the inner one for the throttle policy. + return new AWSClientIOException(e.getMessage(), ex); + } else { + return translateException(e.getMessage(), name, ex); + } + } else { + return new IOException(e); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java new file mode 100644 index 00000000000..03be39f7dfd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.waiters.WaiterTimedOutException; +import org.junit.Test; + +import org.apache.hadoop.fs.s3a.AWSClientIOException; +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit test suite for misc dynamoDB metastore operations. + */ +public class TestDynamoDBMiscOperations extends HadoopTestBase { + + private static final String TIMEOUT_ERROR_MESSAGE + = "Table table-name did not transition into ACTIVE state."; + + @Test + public void testUnwrapTableWaitTimeout() throws Throwable { + final Exception waiterTimedOut = + new WaiterTimedOutException("waiter timed out"); + final AWSClientIOException ex = intercept(AWSClientIOException.class, + TIMEOUT_ERROR_MESSAGE, + () -> { + throw translateTableWaitFailure("example", + new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE, + waiterTimedOut)); + }); + assertEquals(waiterTimedOut, ex.getCause()); + } + + @Test + public void testTranslateIllegalArgumentException() throws Throwable { + final IllegalArgumentException e = + new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE); + final IOException ex = intercept(IOException.class, + TIMEOUT_ERROR_MESSAGE, + () -> { + throw translateTableWaitFailure("example", e); + }); + assertEquals(e, ex.getCause()); + } + + @Test + public void testTranslateWrappedDDBException() throws Throwable { + final Exception inner = new ResourceNotFoundException("ddb"); + final IllegalArgumentException e = + new IllegalArgumentException("outer", inner); + final FileNotFoundException ex = intercept(FileNotFoundException.class, + "outer", + () -> { + throw translateTableWaitFailure("example", e); + }); + assertEquals(inner, ex.getCause()); + } + + @Test + public void testTranslateWrappedOtherException() throws Throwable { + final Exception inner = new NullPointerException("npe"); + final IllegalArgumentException e = + new IllegalArgumentException("outer", inner); + final IOException ex = intercept(IOException.class, + "outer", + () -> { + throw translateTableWaitFailure("example", e); + }); + assertEquals(e, ex.getCause()); + } + +}