HADOOP-15837. DynamoDB table Update can fail S3A FS init.
Contributed by Steve Loughran.
(cherry picked from commit ee816f1fd7
)
This commit is contained in:
parent
f29a7fdb14
commit
70d39c74d9
|
@ -42,6 +42,7 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.SdkBaseException;
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||||
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
|
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
|
||||||
|
@ -66,6 +67,7 @@ import com.amazonaws.services.dynamodbv2.model.TableDescription;
|
||||||
import com.amazonaws.services.dynamodbv2.model.Tag;
|
import com.amazonaws.services.dynamodbv2.model.Tag;
|
||||||
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
|
import com.amazonaws.services.dynamodbv2.model.TagResourceRequest;
|
||||||
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
|
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
|
||||||
|
import com.amazonaws.waiters.WaiterTimedOutException;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -78,6 +80,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.AWSClientIOException;
|
||||||
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
|
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
|
||||||
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
|
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
@ -1128,6 +1131,9 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* overall, this method is synchronous, and the table is guaranteed to exist
|
* overall, this method is synchronous, and the table is guaranteed to exist
|
||||||
* after this method returns successfully.
|
* after this method returns successfully.
|
||||||
*
|
*
|
||||||
|
* The wait for a table becoming active is Retry+Translated; it can fail
|
||||||
|
* while a table is not yet ready.
|
||||||
|
*
|
||||||
* @throws IOException if table does not exist and auto-creation is disabled;
|
* @throws IOException if table does not exist and auto-creation is disabled;
|
||||||
* or table is being deleted, or any other I/O exception occurred.
|
* or table is being deleted, or any other I/O exception occurred.
|
||||||
*/
|
*/
|
||||||
|
@ -1143,7 +1149,6 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
final String status = description.getTableStatus();
|
final String status = description.getTableStatus();
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case "CREATING":
|
case "CREATING":
|
||||||
case "UPDATING":
|
|
||||||
LOG.debug("Table {} in region {} is being created/updated. This may"
|
LOG.debug("Table {} in region {} is being created/updated. This may"
|
||||||
+ " indicate that the table is being operated by another "
|
+ " indicate that the table is being operated by another "
|
||||||
+ "concurrent thread or process. Waiting for active...",
|
+ "concurrent thread or process. Waiting for active...",
|
||||||
|
@ -1154,6 +1159,10 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
throw new FileNotFoundException("DynamoDB table "
|
throw new FileNotFoundException("DynamoDB table "
|
||||||
+ "'" + tableName + "' is being "
|
+ "'" + tableName + "' is being "
|
||||||
+ "deleted in region " + region);
|
+ "deleted in region " + region);
|
||||||
|
case "UPDATING":
|
||||||
|
// table being updated; it can still be used.
|
||||||
|
LOG.debug("Table is being updated.");
|
||||||
|
break;
|
||||||
case "ACTIVE":
|
case "ACTIVE":
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -1284,12 +1293,19 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
* @throws InterruptedIOException if the wait was interrupted
|
* @throws InterruptedIOException if the wait was interrupted
|
||||||
* @throws IllegalArgumentException if an exception was raised in the waiter
|
* @throws IllegalArgumentException if an exception was raised in the waiter
|
||||||
*/
|
*/
|
||||||
@Retries.OnceRaw
|
@Retries.RetryTranslated
|
||||||
private void waitForTableActive(Table t) throws InterruptedIOException {
|
private void waitForTableActive(Table t) throws IOException {
|
||||||
|
invoker.retry("Waiting for active state of table " + tableName,
|
||||||
|
null,
|
||||||
|
true,
|
||||||
|
() -> {
|
||||||
try {
|
try {
|
||||||
t.waitForActive();
|
t.waitForActive();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
throw translateTableWaitFailure(tableName, ex);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Interrupted while waiting for table {} in region {} active",
|
LOG.warn("Interrupted while waiting for table {} in region {}"
|
||||||
|
+ " active",
|
||||||
tableName, region, e);
|
tableName, region, e);
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
throw (InterruptedIOException)
|
throw (InterruptedIOException)
|
||||||
|
@ -1297,11 +1313,14 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
+ tableName + "' is not active yet in region " + region)
|
+ tableName + "' is not active yet in region " + region)
|
||||||
.initCause(e);
|
.initCause(e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a table, wait for it to become active, then add the version
|
* Create a table, wait for it to become active, then add the version
|
||||||
* marker.
|
* marker.
|
||||||
|
* Creating an setting up the table isn't wrapped by any retry operations;
|
||||||
|
* the wait for a table to become available is RetryTranslated.
|
||||||
* @param capacity capacity to provision
|
* @param capacity capacity to provision
|
||||||
* @throws IOException on any failure.
|
* @throws IOException on any failure.
|
||||||
* @throws InterruptedIOException if the wait was interrupted
|
* @throws InterruptedIOException if the wait was interrupted
|
||||||
|
@ -1603,4 +1622,46 @@ public class DynamoDBMetadataStore implements MetadataStore {
|
||||||
public Invoker getInvoker() {
|
public Invoker getInvoker() {
|
||||||
return invoker;
|
return invoker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take an {@code IllegalArgumentException} raised by a DDB operation
|
||||||
|
* and if it contains an inner SDK exception, unwrap it.
|
||||||
|
* @param ex exception.
|
||||||
|
* @return the inner AWS exception or null.
|
||||||
|
*/
|
||||||
|
public static SdkBaseException extractInnerException(
|
||||||
|
IllegalArgumentException ex) {
|
||||||
|
if (ex.getCause() instanceof SdkBaseException) {
|
||||||
|
return (SdkBaseException) ex.getCause();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle a table wait failure by extracting any inner cause and
|
||||||
|
* converting it, or, if unconvertable by wrapping
|
||||||
|
* the IllegalArgumentException in an IOE.
|
||||||
|
*
|
||||||
|
* @param name name of the table
|
||||||
|
* @param e exception
|
||||||
|
* @return an IOE to raise.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static IOException translateTableWaitFailure(
|
||||||
|
final String name, IllegalArgumentException e) {
|
||||||
|
final SdkBaseException ex = extractInnerException(e);
|
||||||
|
if (ex != null) {
|
||||||
|
if (ex instanceof WaiterTimedOutException) {
|
||||||
|
// a timeout waiting for state change: extract the
|
||||||
|
// message from the outer exception, but translate
|
||||||
|
// the inner one for the throttle policy.
|
||||||
|
return new AWSClientIOException(e.getMessage(), ex);
|
||||||
|
} else {
|
||||||
|
return translateException(e.getMessage(), name, ex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.s3guard;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
|
||||||
|
import com.amazonaws.waiters.WaiterTimedOutException;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.s3a.AWSClientIOException;
|
||||||
|
import org.apache.hadoop.test.HadoopTestBase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test suite for misc dynamoDB metastore operations.
|
||||||
|
*/
|
||||||
|
public class TestDynamoDBMiscOperations extends HadoopTestBase {
|
||||||
|
|
||||||
|
private static final String TIMEOUT_ERROR_MESSAGE
|
||||||
|
= "Table table-name did not transition into ACTIVE state.";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnwrapTableWaitTimeout() throws Throwable {
|
||||||
|
final Exception waiterTimedOut =
|
||||||
|
new WaiterTimedOutException("waiter timed out");
|
||||||
|
final AWSClientIOException ex = intercept(AWSClientIOException.class,
|
||||||
|
TIMEOUT_ERROR_MESSAGE,
|
||||||
|
() -> {
|
||||||
|
throw translateTableWaitFailure("example",
|
||||||
|
new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE,
|
||||||
|
waiterTimedOut));
|
||||||
|
});
|
||||||
|
assertEquals(waiterTimedOut, ex.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTranslateIllegalArgumentException() throws Throwable {
|
||||||
|
final IllegalArgumentException e =
|
||||||
|
new IllegalArgumentException(TIMEOUT_ERROR_MESSAGE);
|
||||||
|
final IOException ex = intercept(IOException.class,
|
||||||
|
TIMEOUT_ERROR_MESSAGE,
|
||||||
|
() -> {
|
||||||
|
throw translateTableWaitFailure("example", e);
|
||||||
|
});
|
||||||
|
assertEquals(e, ex.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTranslateWrappedDDBException() throws Throwable {
|
||||||
|
final Exception inner = new ResourceNotFoundException("ddb");
|
||||||
|
final IllegalArgumentException e =
|
||||||
|
new IllegalArgumentException("outer", inner);
|
||||||
|
final FileNotFoundException ex = intercept(FileNotFoundException.class,
|
||||||
|
"outer",
|
||||||
|
() -> {
|
||||||
|
throw translateTableWaitFailure("example", e);
|
||||||
|
});
|
||||||
|
assertEquals(inner, ex.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTranslateWrappedOtherException() throws Throwable {
|
||||||
|
final Exception inner = new NullPointerException("npe");
|
||||||
|
final IllegalArgumentException e =
|
||||||
|
new IllegalArgumentException("outer", inner);
|
||||||
|
final IOException ex = intercept(IOException.class,
|
||||||
|
"outer",
|
||||||
|
() -> {
|
||||||
|
throw translateTableWaitFailure("example", e);
|
||||||
|
});
|
||||||
|
assertEquals(e, ex.getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue