HADOOP-15837. DynamoDB table Update can fail S3A FS init.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2018-10-11 14:57:38 +01:00
parent 8a37983990
commit ee816f1fd7
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 169 additions and 14 deletions

View File

@ -42,6 +42,7 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkBaseException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
@ -66,6 +67,7 @@
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.amazonaws.waiters.WaiterTimedOutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@ -78,6 +80,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
import org.apache.hadoop.fs.s3a.Constants;
@ -1129,6 +1132,9 @@ public String toString() {
* overall, this method is synchronous, and the table is guaranteed to exist
* after this method returns successfully.
*
* The wait for a table becoming active is Retry+Translated; it can fail
* while a table is not yet ready.
*
* @throws IOException if table does not exist and auto-creation is disabled;
* or table is being deleted, or any other I/O exception occurred.
*/
@ -1144,7 +1150,6 @@ void initTable() throws IOException {
final String status = description.getTableStatus();
switch (status) {
case "CREATING":
case "UPDATING":
LOG.debug("Table {} in region {} is being created/updated. This may"
+ " indicate that the table is being operated by another "
+ "concurrent thread or process. Waiting for active...",
@ -1155,6 +1160,10 @@ void initTable() throws IOException {
throw new FileNotFoundException("DynamoDB table "
+ "'" + tableName + "' is being "
+ "deleted in region " + region);
case "UPDATING":
// table being updated; it can still be used.
LOG.debug("Table is being updated.");
break;
case "ACTIVE":
break;
default:
@ -1285,24 +1294,34 @@ static void verifyVersionCompatibility(String tableName,
* @throws InterruptedIOException if the wait was interrupted
* @throws IllegalArgumentException if an exception was raised in the waiter
*/
@Retries.OnceRaw
private void waitForTableActive(Table t) throws InterruptedIOException {
try {
t.waitForActive();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for table {} in region {} active",
tableName, region, e);
Thread.currentThread().interrupt();
throw (InterruptedIOException)
new InterruptedIOException("DynamoDB table '"
+ tableName + "' is not active yet in region " + region)
.initCause(e);
}
@Retries.RetryTranslated
private void waitForTableActive(Table t) throws IOException {
invoker.retry("Waiting for active state of table " + tableName,
null,
true,
() -> {
try {
t.waitForActive();
} catch (IllegalArgumentException ex) {
throw translateTableWaitFailure(tableName, ex);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for table {} in region {}"
+ " active",
tableName, region, e);
Thread.currentThread().interrupt();
throw (InterruptedIOException)
new InterruptedIOException("DynamoDB table '"
+ tableName + "' is not active yet in region " + region)
.initCause(e);
}
});
}
/**
* Create a table, wait for it to become active, then add the version
* marker.
* Creating an setting up the table isn't wrapped by any retry operations;
* the wait for a table to become available is RetryTranslated.
* @param capacity capacity to provision
* @throws IOException on any failure.
* @throws InterruptedIOException if the wait was interrupted
@ -1604,4 +1623,46 @@ public long getBatchWriteCapacityExceededCount() {
public Invoker getInvoker() {
return invoker;
}
/**
* Take an {@code IllegalArgumentException} raised by a DDB operation
* and if it contains an inner SDK exception, unwrap it.
* @param ex exception.
* @return the inner AWS exception or null.
*/
public static SdkBaseException extractInnerException(
IllegalArgumentException ex) {
if (ex.getCause() instanceof SdkBaseException) {
return (SdkBaseException) ex.getCause();
} else {
return null;
}
}
/**
* Handle a table wait failure by extracting any inner cause and
* converting it, or, if unconvertable by wrapping
* the IllegalArgumentException in an IOE.
*
* @param name name of the table
* @param e exception
* @return an IOE to raise.
*/
@VisibleForTesting
static IOException translateTableWaitFailure(
final String name, IllegalArgumentException e) {
final SdkBaseException ex = extractInnerException(e);
if (ex != null) {
if (ex instanceof WaiterTimedOutException) {
// a timeout waiting for state change: extract the
// message from the outer exception, but translate
// the inner one for the throttle policy.
return new AWSClientIOException(e.getMessage(), ex);
} else {
return translateException(e.getMessage(), name, ex);
}
} else {
return new IOException(e);
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.FileNotFoundException;
import java.io.IOException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.waiters.WaiterTimedOutException;
import org.junit.Test;
import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit test suite for misc dynamoDB metastore operations.
*/
public class TestDynamoDBMiscOperations extends HadoopTestBase {
private static final String TIMEOUT_ERROR_MESSAGE
= "Table table-name did not transition into ACTIVE state.";
@Test
public void testUnwrapTableWaitTimeout() throws Throwable {
final Exception waiterTimedOut =
new WaiterTimedOutException("waiter timed out");
final AWSClientIOException ex = intercept(AWSClientIOException.class,
TIMEOUT_ERROR_MESSAGE,
() -> {
throw translateTableWaitFailure("example",
new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE,
waiterTimedOut));
});
assertEquals(waiterTimedOut, ex.getCause());
}
@Test
public void testTranslateIllegalArgumentException() throws Throwable {
final IllegalArgumentException e =
new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE);
final IOException ex = intercept(IOException.class,
TIMEOUT_ERROR_MESSAGE,
() -> {
throw translateTableWaitFailure("example", e);
});
assertEquals(e, ex.getCause());
}
@Test
public void testTranslateWrappedDDBException() throws Throwable {
final Exception inner = new ResourceNotFoundException("ddb");
final IllegalArgumentException e =
new IllegalArgumentException("outer", inner);
final FileNotFoundException ex = intercept(FileNotFoundException.class,
"outer",
() -> {
throw translateTableWaitFailure("example", e);
});
assertEquals(inner, ex.getCause());
}
@Test
public void testTranslateWrappedOtherException() throws Throwable {
final Exception inner = new NullPointerException("npe");
final IllegalArgumentException e =
new IllegalArgumentException("outer", inner);
final IOException ex = intercept(IOException.class,
"outer",
() -> {
throw translateTableWaitFailure("example", e);
});
assertEquals(e, ex.getCause());
}
}