HADOOP-15583. Stabilize S3A Assumed Role support.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2018-08-08 22:57:10 -07:00
parent d81cd3611a
commit da9a39eed1
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
30 changed files with 1466 additions and 534 deletions

View File

@ -1033,7 +1033,19 @@
<name>fs.s3a.assumed.role.sts.endpoint</name>
<value/>
<description>
AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
AWS Security Token Service Endpoint.
If unset, uses the default endpoint.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.sts.endpoint.region</name>
<value>us-west-1</value>
<description>
AWS Security Token Service Endpoint's region;
Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint
other than the default one and the v4 signature is used.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
@ -1058,7 +1070,9 @@
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
<description>Enables or disables SSL connections to S3.</description>
<description>Enables or disables SSL connections to AWS services.
Also sets the default port to use for the s3a proxy settings,
when not explicitly set in fs.s3a.proxy.port.</description>
</property>
<property>

View File

@ -18,24 +18,28 @@
package org.apache.hadoop.fs.s3a;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.IOUtils;
/**
* A list of providers.
@ -62,10 +66,18 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider,
public static final String NO_AWS_CREDENTIAL_PROVIDERS
= "No AWS Credential Providers";
static final String
CREDENTIALS_REQUESTED_WHEN_CLOSED
= "Credentials requested after provider list was closed";
private final List<AWSCredentialsProvider> providers = new ArrayList<>(1);
private boolean reuseLastProvider = true;
private AWSCredentialsProvider lastProvider;
private final AtomicInteger refCount = new AtomicInteger(1);
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Empty instance. This is not ready to be used.
*/
@ -94,6 +106,9 @@ public void add(AWSCredentialsProvider p) {
*/
@Override
public void refresh() {
if (isClosed()) {
return;
}
for (AWSCredentialsProvider provider : providers) {
provider.refresh();
}
@ -106,6 +121,11 @@ public void refresh() {
*/
@Override
public AWSCredentials getCredentials() {
if (isClosed()) {
LOG.warn(CREDENTIALS_REQUESTED_WHEN_CLOSED);
throw new NoAuthWithAWSException(
CREDENTIALS_REQUESTED_WHEN_CLOSED);
}
checkNotEmpty();
if (reuseLastProvider && lastProvider != null) {
return lastProvider.getCredentials();
@ -136,8 +156,7 @@ public AWSCredentials getCredentials() {
if (lastException != null) {
message += ": " + lastException;
}
throw new AmazonClientException(message, lastException);
throw new NoAuthWithAWSException(message, lastException);
}
/**
@ -156,7 +175,7 @@ List<AWSCredentialsProvider> getProviders() {
*/
public void checkNotEmpty() {
if (providers.isEmpty()) {
throw new AmazonClientException(NO_AWS_CREDENTIAL_PROVIDERS);
throw new NoAuthWithAWSException(NO_AWS_CREDENTIAL_PROVIDERS);
}
}
@ -178,8 +197,38 @@ public String listProviderNames() {
*/
@Override
public String toString() {
return "AWSCredentialProviderList: " +
StringUtils.join(providers, " ");
return "AWSCredentialProviderList[" +
"refcount= " + refCount.get() + ": [" +
StringUtils.join(providers, ", ") + ']';
}
/**
* Get a reference to this object with an updated reference count.
*
* @return a reference to this
*/
public synchronized AWSCredentialProviderList share() {
Preconditions.checkState(!closed.get(), "Provider list is closed");
refCount.incrementAndGet();
return this;
}
/**
* Get the current reference count.
* @return the current ref count
*/
@VisibleForTesting
public int getRefCount() {
return refCount.get();
}
/**
* Get the closed flag.
* @return true iff the list is closed.
*/
@VisibleForTesting
public boolean isClosed() {
return closed.get();
}
/**
@ -190,9 +239,29 @@ public String toString() {
*/
@Override
public void close() {
for(AWSCredentialsProvider p: providers) {
synchronized (this) {
if (closed.get()) {
// already closed: no-op
return;
}
int remainder = refCount.decrementAndGet();
if (remainder != 0) {
// still actively used, or somehow things are
// now negative
LOG.debug("Not closing {}", this);
return;
}
// at this point, the closing is going to happen
LOG.debug("Closing {}", this);
closed.set(true);
}
// do this outside the synchronized block.
for (AWSCredentialsProvider p : providers) {
if (p instanceof Closeable) {
IOUtils.closeStream((Closeable)p);
IOUtils.closeStream((Closeable) p);
} else if (p instanceof AutoCloseable) {
S3AUtils.closeAutocloseables(LOG, (AutoCloseable)p);
}
}
}

View File

@ -84,10 +84,27 @@ private Constants() {
public static final String ASSUMED_ROLE_SESSION_DURATION =
"fs.s3a.assumed.role.session.duration";
/** Simple Token Service Endpoint. If unset, uses the default endpoint. */
/** Security Token Service Endpoint. If unset, uses the default endpoint. */
public static final String ASSUMED_ROLE_STS_ENDPOINT =
"fs.s3a.assumed.role.sts.endpoint";
/**
* Region for the STS endpoint; only relevant if the endpoint
* is set.
*/
public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION =
"fs.s3a.assumed.role.sts.endpoint.region";
/**
* Default value for the STS endpoint region; needed for
* v4 signing.
*/
public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT =
"us-west-1";
/**
* Default duration of an assumed role.
*/
public static final String ASSUMED_ROLE_SESSION_DURATION_DEFAULT = "30m";
/** list of providers to authenticate for the assumed role. */

View File

@ -18,59 +18,45 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.net.URI;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
/**
* The default factory implementation, which calls the AWS SDK to configure
* and create an {@link AmazonS3Client} that communicates with the S3 service.
* The default {@link S3ClientFactory} implementation.
* This which calls the AWS SDK to configure and create an
* {@link AmazonS3Client} that communicates with the S3 service.
*/
public class DefaultS3ClientFactory extends Configured implements
S3ClientFactory {
public class DefaultS3ClientFactory extends Configured
implements S3ClientFactory {
protected static final Logger LOG = S3AFileSystem.LOG;
@Override
public AmazonS3 createS3Client(URI name) throws IOException {
public AmazonS3 createS3Client(URI name,
final String bucket,
final AWSCredentialsProvider credentials) throws IOException {
Configuration conf = getConf();
AWSCredentialsProvider credentials =
createAWSCredentialProviderSet(name, conf);
final ClientConfiguration awsConf = createAwsConf(getConf());
AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
return createAmazonS3Client(s3, conf, credentials, awsConf);
final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket);
return configureAmazonS3Client(
newAmazonS3Client(credentials, awsConf), conf);
}
/**
* Create a new {@link ClientConfiguration}.
* @param conf The Hadoop configuration
* @return new AWS client configuration
*/
public static ClientConfiguration createAwsConf(Configuration conf) {
final ClientConfiguration awsConf = new ClientConfiguration();
initConnectionSettings(conf, awsConf);
initProxySupport(conf, awsConf);
initUserAgent(conf, awsConf);
return awsConf;
}
/**
* Wrapper around constructor for {@link AmazonS3} client. Override this to
* provide an extended version of the client
* Wrapper around constructor for {@link AmazonS3} client.
* Override this to provide an extended version of the client
* @param credentials credentials to use
* @param awsConf AWS configuration
* @return new AmazonS3 client
@ -81,120 +67,17 @@ protected AmazonS3 newAmazonS3Client(
}
/**
* Initializes all AWS SDK settings related to connection management.
* Configure S3 client from the Hadoop configuration.
*
* This includes: endpoint, Path Access and possibly other
* options.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
private static void initConnectionSettings(Configuration conf,
ClientConfiguration awsConf) {
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
DEFAULT_SOCKET_SEND_BUFFER, 2048);
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
DEFAULT_SOCKET_RECV_BUFFER, 2048);
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
}
/**
* Initializes AWS SDK proxy support if configured.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
* @throws IllegalArgumentException if misconfigured
*/
private static void initProxySupport(Configuration conf,
ClientConfiguration awsConf) throws IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(),
String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
} else if (proxyPort >= 0) {
String msg =
"Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
/**
* Initializes the User-Agent header to send in HTTP requests to the S3
* back-end. We always include the Hadoop version number. The user also
* may set an optional custom prefix to put in front of the Hadoop version
* number. The AWS SDK interally appends its own information, which seems
* to include the AWS SDK version, OS and JVM version.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
private static void initUserAgent(Configuration conf,
ClientConfiguration awsConf) {
String userAgent = "Hadoop " + VersionInfo.getVersion();
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
if (!userAgentPrefix.isEmpty()) {
userAgent = userAgentPrefix + ", " + userAgent;
}
LOG.debug("Using User-Agent: {}", userAgent);
awsConf.setUserAgentPrefix(userAgent);
}
/**
* Creates an {@link AmazonS3Client} from the established configuration.
*
* @param conf Hadoop configuration
* @param credentials AWS credentials
* @param awsConf AWS SDK configuration
* @return S3 client
* @throws IllegalArgumentException if misconfigured
*/
private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
AWSCredentialsProvider credentials, ClientConfiguration awsConf)
private static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
Configuration conf)
throws IllegalArgumentException {
String endPoint = conf.getTrimmed(ENDPOINT, "");
if (!endPoint.isEmpty()) {
@ -206,21 +89,29 @@ private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
throw new IllegalArgumentException(msg, e);
}
}
enablePathStyleAccessIfRequired(s3, conf);
return s3;
return applyS3ClientOptions(s3, conf);
}
/**
* Enables path-style access to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
* http://bucketname.s3.amazonaws.com. Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
* http://s3-eu-west-1.amazonaws.com/bucketname.
* Perform any tuning of the {@code S3ClientOptions} settings based on
* the Hadoop configuration.
* This is different from the general AWS configuration creation as
* it is unique to S3 connections.
*
* The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access
* to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
* {@code http://bucketname.s3.amazonaws.com}
* Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
* {@code http://s3-eu-west-1.amazonaws.com/bucketname}.
* It is common to use this when connecting to private S3 servers, as it
* avoids the need to play with DNS entries.
* @param s3 S3 client
* @param conf Hadoop configuration
* @return the S3 client
*/
private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
private static AmazonS3 applyS3ClientOptions(AmazonS3 s3,
Configuration conf) {
final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
if (pathStyleAccess) {
@ -229,5 +120,6 @@ private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
.setPathStyleAccess(true)
.build());
}
return s3;
}
}

View File

@ -114,6 +114,16 @@ public S3ObjectSummary summary() {
/** Map of key to delay -> time it was created. */
private Map<String, Long> delayedPutKeys = new HashMap<>();
/**
* Instantiate.
* This subclasses a deprecated constructor of the parent
* {@code AmazonS3Client} class; we can't use the builder API because,
* that only creates the consistent client.
* @param credentials credentials to auth.
* @param clientConfiguration connection settings
* @param conf hadoop configuration.
*/
@SuppressWarnings("deprecation")
public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration clientConfiguration, Configuration conf) {
super(credentials, clientConfiguration);

View File

@ -21,16 +21,27 @@
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* S3 Client factory used for testing with eventual consistency fault injection.
* This client is for testing <i>only</i>; it is in the production
* {@code hadoop-aws} module to enable integration tests to use this
* just by editing the Hadoop configuration used to bring up the client.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
/**
* Create the inconsistent client.
* Logs a warning that this is being done.
* @param credentials credentials to use
* @param awsConf AWS configuration
* @return an inconsistent client.
*/
@Override
protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
ClientConfiguration awsConf) {

View File

@ -77,8 +77,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -124,9 +125,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The core S3A Filesystem implementation.
*
@ -205,6 +203,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private boolean useListV1;
private MagicCommitIntegration committerIntegration;
private AWSCredentialProviderList credentials;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -252,8 +252,10 @@ public void initialize(URI name, Configuration originalConf)
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
credentials = createAWSCredentialProviderSet(name, conf);
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(name);
.createS3Client(name, bucket, credentials);
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()),
onRetry);
@ -2470,12 +2472,11 @@ public void close() throws IOException {
transfers.shutdownNow(true);
transfers = null;
}
if (metadataStore != null) {
metadataStore.close();
metadataStore = null;
}
IOUtils.closeQuietly(instrumentation);
S3AUtils.closeAll(LOG, metadataStore, instrumentation);
metadataStore = null;
instrumentation = null;
closeAutocloseables(LOG, credentials);
credentials = null;
}
}
@ -2885,6 +2886,7 @@ public String toString() {
}
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", credentials=").append(credentials);
sb.append(", statistics {")
.append(statistics)
.append("}");
@ -3319,4 +3321,17 @@ public boolean hasCapability(String capability) {
return false;
}
}
/**
* Get a shared copy of the AWS credentials, with its reference
* counter updated.
* Caller is required to call {@code close()} on this after
* they have finished using it.
* @param purpose what is this for? This is initially for logging
* @return a reference to shared credentials.
*/
public AWSCredentialProviderList shareCredentials(final String purpose) {
LOG.debug("Sharing credentials for: {}", purpose);
return credentials.share();
}
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.ConnectTimeoutException;
@ -154,8 +155,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
policyMap.put(InterruptedException.class, fail);
// note this does not pick up subclasses (like socket timeout)
policyMap.put(InterruptedIOException.class, fail);
// interesting question: should this be retried ever?
// Access denial and auth exceptions are not retried
policyMap.put(AccessDeniedException.class, fail);
policyMap.put(NoAuthWithAWSException.class, fail);
policyMap.put(FileNotFoundException.class, fail);
policyMap.put(InvalidRequestException.class, fail);

View File

@ -21,6 +21,8 @@
import com.amazonaws.AbortedException;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.SdkBaseException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@ -44,15 +46,18 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -174,11 +179,17 @@ public static IOException translateException(@Nullable String operation,
// call considered an sign of connectivity failure
return (EOFException)new EOFException(message).initCause(exception);
}
if (exception instanceof NoAuthWithAWSException) {
// the exception raised by AWSCredentialProvider list if the
// credentials were not accepted.
return (AccessDeniedException)new AccessDeniedException(path, null,
exception.toString()).initCause(exception);
}
return new AWSClientIOException(message, exception);
} else {
if (exception instanceof AmazonDynamoDBException) {
// special handling for dynamo DB exceptions
return translateDynamoDBException(message,
return translateDynamoDBException(path, message,
(AmazonDynamoDBException)exception);
}
IOException ioe;
@ -373,20 +384,45 @@ public static boolean signifiesConnectionBroken(SdkBaseException ex) {
/**
* Translate a DynamoDB exception into an IOException.
*
* @param path path in the DDB
* @param message preformatted message for the exception
* @param ex exception
* @param ddbException exception
* @return an exception to throw.
*/
public static IOException translateDynamoDBException(String message,
AmazonDynamoDBException ex) {
if (isThrottleException(ex)) {
return new AWSServiceThrottledException(message, ex);
public static IOException translateDynamoDBException(final String path,
final String message,
final AmazonDynamoDBException ddbException) {
if (isThrottleException(ddbException)) {
return new AWSServiceThrottledException(message, ddbException);
}
if (ex instanceof ResourceNotFoundException) {
if (ddbException instanceof ResourceNotFoundException) {
return (FileNotFoundException) new FileNotFoundException(message)
.initCause(ex);
.initCause(ddbException);
}
return new AWSServiceIOException(message, ex);
final int statusCode = ddbException.getStatusCode();
final String errorCode = ddbException.getErrorCode();
IOException result = null;
// 400 gets used a lot by DDB
if (statusCode == 400) {
switch (errorCode) {
case "AccessDeniedException":
result = (IOException) new AccessDeniedException(
path,
null,
ddbException.toString())
.initCause(ddbException);
break;
default:
result = new AWSBadRequestException(message, ddbException);
}
}
if (result == null) {
result = new AWSServiceIOException(message, ddbException);
}
return result;
}
/**
@ -738,6 +774,29 @@ public static String lookupPassword(
String baseKey,
String overrideVal)
throws IOException {
return lookupPassword(bucket, conf, baseKey, overrideVal, "");
}
/**
* Get a password from a configuration, including JCEKS files, handling both
* the absolute key and bucket override.
* @param bucket bucket or "" if none known
* @param conf configuration
* @param baseKey base key to look up, e.g "fs.s3a.secret.key"
* @param overrideVal override value: if non empty this is used instead of
* querying the configuration.
* @param defVal value to return if there is no password
* @return a password or the value of defVal.
* @throws IOException on any IO problem
* @throws IllegalArgumentException bad arguments
*/
public static String lookupPassword(
String bucket,
Configuration conf,
String baseKey,
String overrideVal,
String defVal)
throws IOException {
String initialVal;
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
@ -757,7 +816,7 @@ public static String lookupPassword(
// no bucket, make the initial value the override value
initialVal = overrideVal;
}
return getPassword(conf, baseKey, initialVal);
return getPassword(conf, baseKey, initialVal, defVal);
}
/**
@ -1059,6 +1118,134 @@ public static void deleteWithWarning(FileSystem fs,
}
}
/**
* Create a new AWS {@code ClientConfiguration}.
* All clients to AWS services <i>MUST</i> use this for consistent setup
* of connectivity, UA, proxy settings.
* @param conf The Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @return new AWS client configuration
*/
public static ClientConfiguration createAwsConf(Configuration conf,
String bucket)
throws IOException {
final ClientConfiguration awsConf = new ClientConfiguration();
initConnectionSettings(conf, awsConf);
initProxySupport(conf, bucket, awsConf);
initUserAgent(conf, awsConf);
return awsConf;
}
/**
* Initializes all AWS SDK settings related to connection management.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration
*/
public static void initConnectionSettings(Configuration conf,
ClientConfiguration awsConf) {
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS);
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT, 0));
awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT, 0));
int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
DEFAULT_SOCKET_SEND_BUFFER, 2048);
int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
DEFAULT_SOCKET_RECV_BUFFER, 2048);
awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override = {}", signerOverride);
awsConf.setSignerOverride(signerOverride);
}
}
/**
* Initializes AWS SDK proxy support in the AWS client configuration
* if the S3A settings enable it.
*
* @param conf Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param awsConf AWS SDK configuration to update
* @throws IllegalArgumentException if misconfigured
* @throws IOException problem getting username/secret from password source.
*/
public static void initProxySupport(Configuration conf,
String bucket,
ClientConfiguration awsConf) throws IllegalArgumentException,
IOException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
awsConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
awsConf.setProxyPort(proxyPort);
} else {
if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
awsConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
awsConf.setProxyPort(80);
}
}
final String proxyUsername = lookupPassword(bucket, conf, PROXY_USERNAME,
null, null);
final String proxyPassword = lookupPassword(bucket, conf, PROXY_PASSWORD,
null, null);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
PROXY_PASSWORD + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
awsConf.setProxyUsername(proxyUsername);
awsConf.setProxyPassword(proxyPassword);
awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
if (LOG.isDebugEnabled()) {
LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
"domain {} as workstation {}", awsConf.getProxyHost(),
awsConf.getProxyPort(),
String.valueOf(awsConf.getProxyUsername()),
awsConf.getProxyPassword(), awsConf.getProxyDomain(),
awsConf.getProxyWorkstation());
}
} else if (proxyPort >= 0) {
String msg =
"Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
/**
* Initializes the User-Agent header to send in HTTP requests to AWS
* services. We always include the Hadoop version number. The user also
* may set an optional custom prefix to put in front of the Hadoop version
* number. The AWS SDK internally appends its own information, which seems
* to include the AWS SDK version, OS and JVM version.
*
* @param conf Hadoop configuration
* @param awsConf AWS SDK configuration to update
*/
private static void initUserAgent(Configuration conf,
ClientConfiguration awsConf) {
String userAgent = "Hadoop " + VersionInfo.getVersion();
String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
if (!userAgentPrefix.isEmpty()) {
userAgent = userAgentPrefix + ", " + userAgent;
}
LOG.debug("Using User-Agent: {}", userAgent);
awsConf.setUserAgentPrefix(userAgent);
}
/**
* An interface for use in lambda-expressions working with
@ -1289,18 +1476,40 @@ private static String passwordDiagnostics(String pass, String description) {
* @param closeables the objects to close
*/
public static void closeAll(Logger log,
java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
Closeable... closeables) {
if (log == null) {
log = LOG;
}
for (Closeable c : closeables) {
if (c != null) {
try {
if (log != null) {
log.debug("Closing {}", c);
}
log.debug("Closing {}", c);
c.close();
} catch (Exception e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing {}", c, e);
}
log.debug("Exception in closing {}", c, e);
}
}
}
}
/**
* Close the Closeable objects and <b>ignore</b> any Exception or
* null pointers.
* (This is the SLF4J equivalent of that in {@code IOUtils}).
* @param log the log to log at debug level. Can be null.
* @param closeables the objects to close
*/
public static void closeAutocloseables(Logger log,
AutoCloseable... closeables) {
if (log == null) {
log = LOG;
}
for (AutoCloseable c : closeables) {
if (c != null) {
try {
log.debug("Closing {}", c);
c.close();
} catch (Exception e) {
log.debug("Exception in closing {}", c, e);
}
}
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.URI;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.classification.InterfaceAudience;
@ -37,9 +38,13 @@ public interface S3ClientFactory {
* Creates a new {@link AmazonS3} client.
*
* @param name raw input S3A file system URI
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param credentialSet credentials to use
* @return S3 client
* @throws IOException IO problem
*/
AmazonS3 createS3Client(URI name) throws IOException;
AmazonS3 createS3Client(URI name,
final String bucket,
final AWSCredentialsProvider credentialSet) throws IOException;
}

View File

@ -24,9 +24,11 @@
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -37,6 +39,9 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.security.UserGroupInformation;
@ -77,17 +82,21 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
private final String arn;
private final AWSCredentialProviderList credentialsToSTS;
private final Invoker invoker;
/**
* Instantiate.
* This calls {@link #getCredentials()} to fail fast on the inner
* role credential retrieval.
* @param uri URI of endpoint.
* @param fsUri URI of the filesystem.
* @param conf configuration
* @throws IOException on IO problems and some parameter checking
* @throws IllegalArgumentException invalid parameters
* @throws AWSSecurityTokenServiceException problems getting credentials
*/
public AssumedRoleCredentialProvider(URI uri, Configuration conf)
public AssumedRoleCredentialProvider(URI fsUri, Configuration conf)
throws IOException {
arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
@ -99,13 +108,14 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf)
Class<?>[] awsClasses = loadAWSProviderClasses(conf,
ASSUMED_ROLE_CREDENTIALS_PROVIDER,
SimpleAWSCredentialsProvider.class);
AWSCredentialProviderList credentials = new AWSCredentialProviderList();
credentialsToSTS = new AWSCredentialProviderList();
for (Class<?> aClass : awsClasses) {
if (this.getClass().equals(aClass)) {
throw new IOException(E_FORBIDDEN_PROVIDER);
}
credentials.add(createAWSCredentialProvider(conf, aClass, uri));
credentialsToSTS.add(createAWSCredentialProvider(conf, aClass, fsUri));
}
LOG.debug("Credentials to obtain role credentials: {}", credentialsToSTS);
// then the STS binding
sessionName = conf.getTrimmed(ASSUMED_ROLE_SESSION_NAME,
@ -122,14 +132,27 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf)
LOG.debug("Scope down policy {}", policy);
builder.withScopeDownPolicy(policy);
}
String epr = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
if (StringUtils.isNotEmpty(epr)) {
LOG.debug("STS Endpoint: {}", epr);
builder.withServiceEndpoint(epr);
}
LOG.debug("Credentials to obtain role credentials: {}", credentials);
builder.withLongLivedCredentialsProvider(credentials);
String endpoint = conf.get(ASSUMED_ROLE_STS_ENDPOINT, "");
String region = conf.get(ASSUMED_ROLE_STS_ENDPOINT_REGION,
ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT);
AWSSecurityTokenServiceClientBuilder stsbuilder =
STSClientFactory.builder(
conf,
fsUri.getHost(),
credentialsToSTS,
endpoint,
region);
// the STS client is not tracked for a shutdown in close(), because it
// (currently) throws an UnsupportedOperationException in shutdown().
builder.withStsClient(stsbuilder.build());
//now build the provider
stsProvider = builder.build();
// to handle STS throttling by the AWS account, we
// need to retry
invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried);
// and force in a fail-fast check just to keep the stack traces less
// convoluted
getCredentials();
@ -143,7 +166,17 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf)
@Override
public AWSCredentials getCredentials() {
try {
return stsProvider.getCredentials();
return invoker.retryUntranslated("getCredentials",
true,
stsProvider::getCredentials);
} catch (IOException e) {
// this is in the signature of retryUntranslated;
// its hard to see how this could be raised, but for
// completeness, it is wrapped as an Amazon Client Exception
// and rethrown.
throw new AmazonClientException(
"getCredentials failed: " + e,
e);
} catch (AWSSecurityTokenServiceException e) {
LOG.error("Failed to get credentials for role {}",
arn, e);
@ -161,7 +194,7 @@ public void refresh() {
*/
@Override
public void close() {
stsProvider.close();
S3AUtils.closeAutocloseables(LOG, stsProvider, credentialsToSTS);
}
@Override
@ -205,4 +238,23 @@ static String sanitize(String session) {
return r.toString();
}
/**
* Callback from {@link Invoker} when an operation is retried.
* @param text text of the operation
* @param ex exception
* @param retries number of retries
* @param idempotent is the method idempotent
*/
public void operationRetried(
String text,
Exception ex,
int retries,
boolean idempotent) {
if (retries == 0) {
// log on the first retry attempt of the credential access.
// At worst, this means one log entry every intermittent renewal
// time.
LOG.info("Retried {}", text);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import com.amazonaws.AmazonClientException;
/**
* A specific subclass of {@code AmazonClientException} which can
* be used in the retry logic to fail fast when there is any
* authentication problem.
*/
public class NoAuthWithAWSException extends AmazonClientException {
public NoAuthWithAWSException(final String message, final Throwable t) {
super(message, t);
}
public NoAuthWithAWSException(final String message) {
super(message);
}
}

View File

@ -205,6 +205,14 @@ public static Policy policy(Statement... statements) {
return new Policy(statements);
}
/**
* From a set of statements, create a policy.
* @param statements statements
* @return the policy
*/
public static Policy policy(final List<RoleModel.Statement> statements) {
return new Policy(statements);
}
/**
* Effect options.

View File

@ -29,6 +29,55 @@ public final class RolePolicies {
private RolePolicies() {
}
/** All KMS operations: {@value}.*/
public static final String KMS_ALL_OPERATIONS = "kms:*";
/** KMS encryption. This is <i>Not</i> used by SSE-KMS: {@value}. */
public static final String KMS_ENCRYPT = "kms:Encrypt";
/**
* Decrypt data encrypted with SSE-KMS: {@value}.
*/
public static final String KMS_DECRYPT = "kms:Decrypt";
/**
* Arn for all KMS keys: {@value}.
*/
public static final String KMS_ALL_KEYS = "arn:aws:kms:*";
/**
* This is used by S3 to generate a per-object encryption key and
* the encrypted value of this, the latter being what it tags
* the object with for later decryption: {@value}.
*/
public static final String KMS_GENERATE_DATA_KEY = "kms:GenerateDataKey";
/**
* Actions needed to read and write SSE-KMS data.
*/
private static final String[] KMS_KEY_RW =
new String[]{KMS_DECRYPT, KMS_GENERATE_DATA_KEY};
/**
* Actions needed to read SSE-KMS data.
*/
private static final String[] KMS_KEY_READ =
new String[] {KMS_DECRYPT};
/**
* Statement to allow KMS R/W access access, so full use of
* SSE-KMS.
*/
public static final Statement STATEMENT_ALLOW_SSE_KMS_RW =
statement(true, KMS_ALL_KEYS, KMS_KEY_RW);
/**
* Statement to allow read access to KMS keys, so the ability
* to read SSE-KMS data,, but not decrypt it.
*/
public static final Statement STATEMENT_ALLOW_SSE_KMS_READ =
statement(true, KMS_ALL_KEYS, KMS_KEY_READ);
/**
* All S3 operations: {@value}.
*/
@ -52,7 +101,6 @@ private RolePolicies() {
public static final String S3_LIST_BUCKET_MULTPART_UPLOADS =
"s3:ListBucketMultipartUploads";
/**
* List multipart upload is needed for the S3A Commit protocols.
*/
@ -97,6 +145,8 @@ private RolePolicies() {
public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion";
public static final String S3_GET_BUCKET_LOCATION = "s3:GetBucketLocation";
public static final String S3_GET_OBJECT_VERSION_ACL
= "s3:GetObjectVersionAcl";
@ -128,7 +178,8 @@ private RolePolicies() {
public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
/**
* Actions needed to read data from S3 through S3A.
* Actions needed to read a file in S3 through S3A, excluding
* S3Guard and SSE-KMS.
*/
public static final String[] S3_PATH_READ_OPERATIONS =
new String[]{
@ -136,18 +187,20 @@ private RolePolicies() {
};
/**
* Actions needed to read data from S3 through S3A.
* Base actions needed to read data from S3 through S3A,
* excluding SSE-KMS data and S3Guard-ed buckets.
*/
public static final String[] S3_ROOT_READ_OPERATIONS =
new String[]{
S3_LIST_BUCKET,
S3_LIST_BUCKET_MULTPART_UPLOADS,
S3_GET_OBJECT,
S3_ALL_GET,
};
/**
* Actions needed to write data to an S3A Path.
* This includes the appropriate read operations.
* This includes the appropriate read operations, but
* not SSE-KMS or S3Guard support.
*/
public static final String[] S3_PATH_RW_OPERATIONS =
new String[]{
@ -163,6 +216,7 @@ private RolePolicies() {
* This is purely the extra operations needed for writing atop
* of the read operation set.
* Deny these and a path is still readable, but not writeable.
* Excludes: SSE-KMS and S3Guard permissions.
*/
public static final String[] S3_PATH_WRITE_OPERATIONS =
new String[]{
@ -173,6 +227,7 @@ private RolePolicies() {
/**
* Actions needed for R/W IO from the root of a bucket.
* Excludes: SSE-KMS and S3Guard permissions.
*/
public static final String[] S3_ROOT_RW_OPERATIONS =
new String[]{
@ -190,26 +245,57 @@ private RolePolicies() {
*/
public static final String DDB_ALL_OPERATIONS = "dynamodb:*";
public static final String DDB_ADMIN = "dynamodb:*";
/**
* Operations needed for DDB/S3Guard Admin.
* For now: make this {@link #DDB_ALL_OPERATIONS}.
*/
public static final String DDB_ADMIN = DDB_ALL_OPERATIONS;
/**
* Permission for DDB describeTable() operation: {@value}.
* This is used during initialization.
*/
public static final String DDB_DESCRIBE_TABLE = "dynamodb:DescribeTable";
public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem";
/**
* Permission to query the DDB table: {@value}.
*/
public static final String DDB_QUERY = "dynamodb:Query";
/**
* Permission for DDB operation to get a record: {@value}.
*/
public static final String DDB_GET_ITEM = "dynamodb:GetItem";
/**
* Permission for DDB write record operation: {@value}.
*/
public static final String DDB_PUT_ITEM = "dynamodb:PutItem";
/**
* Permission for DDB update single item operation: {@value}.
*/
public static final String DDB_UPDATE_ITEM = "dynamodb:UpdateItem";
/**
* Permission for DDB delete operation: {@value}.
*/
public static final String DDB_DELETE_ITEM = "dynamodb:DeleteItem";
/**
* Permission for DDB operation: {@value}.
*/
public static final String DDB_BATCH_GET_ITEM = "dynamodb:BatchGetItem";
/**
* Batch write permission for DDB: {@value}.
*/
public static final String DDB_BATCH_WRITE_ITEM = "dynamodb:BatchWriteItem";
/**
* All DynamoDB tables: {@value}.
*/
public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*";
public static final String WILDCARD = "*";
/**
* Allow all S3 Operations.
*/
public static final Statement STATEMENT_ALL_S3 = statement(true,
S3_ALL_BUCKETS,
S3_ALL_OPERATIONS);
public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:*";
/**
* Statement to allow all DDB access.
@ -218,11 +304,36 @@ private RolePolicies() {
ALL_DDB_TABLES, DDB_ALL_OPERATIONS);
/**
* Allow all S3 and S3Guard operations.
* Statement to allow all client operations needed for S3Guard,
* but none of the admin operations.
*/
public static final Statement STATEMENT_S3GUARD_CLIENT = statement(true,
ALL_DDB_TABLES,
DDB_BATCH_GET_ITEM,
DDB_BATCH_WRITE_ITEM,
DDB_DELETE_ITEM,
DDB_DESCRIBE_TABLE,
DDB_GET_ITEM,
DDB_PUT_ITEM,
DDB_QUERY,
DDB_UPDATE_ITEM
);
/**
* Allow all S3 Operations.
* This does not cover DDB or S3-KMS
*/
public static final Statement STATEMENT_ALL_S3 = statement(true,
S3_ALL_BUCKETS,
S3_ALL_OPERATIONS);
/**
* Policy for all S3 and S3Guard operations, and SSE-KMS.
*/
public static final Policy ALLOW_S3_AND_SGUARD = policy(
STATEMENT_ALL_S3,
STATEMENT_ALL_DDB
STATEMENT_ALL_DDB,
STATEMENT_ALLOW_SSE_KMS_RW
);
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.io.IOException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AUtils;
/**
* Factory for creating STS Clients.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class STSClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(STSClientFactory.class);
/**
* Create the builder ready for any final configuration options.
* Picks up connection settings from the Hadoop configuration, including
* proxy secrets.
* @param conf Configuration to act as source of options.
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param credentials AWS credential chain to use
* @param stsEndpoint optional endpoint "https://sns.us-west-1.amazonaws.com"
* @param stsRegion the region, e.g "us-west-1"
* @return the builder to call {@code build()}
* @throws IOException problem reading proxy secrets
*/
public static AWSSecurityTokenServiceClientBuilder builder(
final Configuration conf,
final String bucket,
final AWSCredentialsProvider credentials, final String stsEndpoint,
final String stsRegion) throws IOException {
Preconditions.checkArgument(credentials != null, "No credentials");
final AWSSecurityTokenServiceClientBuilder builder
= AWSSecurityTokenServiceClientBuilder.standard();
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
builder.withClientConfiguration(awsConf);
builder.withCredentials(credentials);
if (StringUtils.isNotEmpty(stsEndpoint)) {
LOG.debug("STS Endpoint ={}", stsEndpoint);
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(stsEndpoint, stsRegion));
}
return builder;
}
}

View File

@ -34,10 +34,9 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
import org.apache.hadoop.fs.s3a.S3AUtils;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
/**
* Interface to create a DynamoDB client.
@ -58,10 +57,14 @@ public interface DynamoDBClientFactory extends Configurable {
* it will indicate an error.
*
* @param defaultRegion the default region of the AmazonDynamoDB client
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param credentials credentials to use for authentication.
* @return a new DynamoDB client
* @throws IOException if any IO error happens
*/
AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
AmazonDynamoDB createDynamoDBClient(final String defaultRegion,
final String bucket,
final AWSCredentialsProvider credentials) throws IOException;
/**
* The default implementation for creating an AmazonDynamoDB.
@ -69,16 +72,15 @@ public interface DynamoDBClientFactory extends Configurable {
class DefaultDynamoDBClientFactory extends Configured
implements DynamoDBClientFactory {
@Override
public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
public AmazonDynamoDB createDynamoDBClient(String defaultRegion,
final String bucket,
final AWSCredentialsProvider credentials)
throws IOException {
Preconditions.checkNotNull(getConf(),
"Should have been configured before usage");
final Configuration conf = getConf();
final AWSCredentialsProvider credentials =
createAWSCredentialProviderSet(null, conf);
final ClientConfiguration awsConf =
DefaultS3ClientFactory.createAwsConf(conf);
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
final String region = getRegion(conf, defaultRegion);
LOG.debug("Creating DynamoDB client in region {}", region);

View File

@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -34,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
@ -67,6 +69,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
@ -75,13 +78,14 @@
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.fs.s3a.auth.RolePolicies;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
@ -207,6 +211,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
new ValueMap().withBoolean(":false", false);
private DynamoDB dynamoDB;
private AWSCredentialProviderList credentials;
private String region;
private Table table;
private String tableName;
@ -242,10 +247,16 @@ public class DynamoDBMetadataStore implements MetadataStore {
* A utility function to create DynamoDB instance.
* @param conf the file system configuration
* @param s3Region region of the associated S3 bucket (if any).
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param credentials credentials.
* @return DynamoDB instance.
* @throws IOException I/O error.
*/
private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
private static DynamoDB createDynamoDB(
final Configuration conf,
final String s3Region,
final String bucket,
final AWSCredentialsProvider credentials)
throws IOException {
Preconditions.checkNotNull(conf);
final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
@ -254,10 +265,18 @@ private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
DynamoDBClientFactory.class);
LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
.createDynamoDBClient(s3Region);
.createDynamoDBClient(s3Region, bucket, credentials);
return new DynamoDB(dynamoDBClient);
}
/**
* {@inheritDoc}.
* The credentials for authenticating with S3 are requested from the
* FS via {@link S3AFileSystem#shareCredentials(String)}; this will
* increment the reference counter of these credentials.
* @param fs {@code S3AFileSystem} associated with the MetadataStore
* @throws IOException on a failure
*/
@Override
@Retries.OnceRaw
public void initialize(FileSystem fs) throws IOException {
@ -274,11 +293,23 @@ public void initialize(FileSystem fs) throws IOException {
LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
region);
} else {
region = owner.getBucketLocation();
try {
region = owner.getBucketLocation();
} catch (AccessDeniedException e) {
// access denied here == can't call getBucket. Report meaningfully
URI uri = owner.getUri();
LOG.error("Failed to get bucket location from S3 bucket {}",
uri);
throw (IOException)new AccessDeniedException(
"S3 client role lacks permission "
+ RolePolicies.S3_GET_BUCKET_LOCATION + " for " + uri)
.initCause(e);
}
LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
}
username = owner.getUsername();
dynamoDB = createDynamoDB(conf, region);
credentials = owner.shareCredentials("s3guard");
dynamoDB = createDynamoDB(conf, region, bucket, credentials);
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
@ -311,6 +342,9 @@ public void initialize(FileSystem fs) throws IOException {
* must declare the table name and region in the
* {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
* {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
* It also creates a new credential provider list from the configuration,
* using the base fs.s3a.* options, as there is no bucket to infer per-bucket
* settings from.
*
* @see #initialize(FileSystem)
* @throws IOException if there is an error
@ -327,7 +361,8 @@ public void initialize(Configuration config) throws IOException {
region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
Preconditions.checkArgument(!StringUtils.isEmpty(region),
"No DynamoDB region configured");
dynamoDB = createDynamoDB(conf, region);
credentials = createAWSCredentialProviderSet(null, conf);
dynamoDB = createDynamoDB(conf, region, null, credentials);
username = UserGroupInformation.getCurrentUser().getShortUserName();
initDataAccessRetries(conf);
@ -778,12 +813,17 @@ public synchronized void close() {
if (instrumentation != null) {
instrumentation.storeClosed();
}
if (dynamoDB != null) {
LOG.debug("Shutting down {}", this);
dynamoDB.shutdown();
dynamoDB = null;
try {
if (dynamoDB != null) {
LOG.debug("Shutting down {}", this);
dynamoDB.shutdown();
dynamoDB = null;
}
} finally {
closeAutocloseables(LOG, credentials);
credentials = null;
}
}
}
@Override
@Retries.OnceTranslated

View File

@ -29,7 +29,7 @@ assumed roles for different buckets.
*IAM Assumed Roles are unlikely to be supported by third-party systems
supporting the S3 APIs.*
## Using IAM Assumed Roles
## <a name="using_assumed_roles"></a> Using IAM Assumed Roles
### Before You Begin
@ -40,6 +40,8 @@ are, how to configure their policies, etc.
* You need a pair of long-lived IAM User credentials, not the root account set.
* Have the AWS CLI installed, and test that it works there.
* Give the role access to S3, and, if using S3Guard, to DynamoDB.
* For working with data encrypted with SSE-KMS, the role must
have access to the appropriate KMS keys.
Trying to learn how IAM Assumed Roles work by debugging stack traces from
the S3A client is "suboptimal".
@ -51,7 +53,7 @@ To use assumed roles, the client must be configured to use the
in the configuration option `fs.s3a.aws.credentials.provider`.
This AWS Credential provider will read in the `fs.s3a.assumed.role` options needed to connect to the
Session Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html),
Security Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html),
first authenticating with the full credentials, then assuming the specific role
specified. It will then refresh this login at the configured rate of
`fs.s3a.assumed.role.session.duration`
@ -69,7 +71,7 @@ which uses `fs.s3a.access.key` and `fs.s3a.secret.key`.
Note: although you can list other AWS credential providers in to the
Assumed Role Credential Provider, it can only cause confusion.
### <a name="using"></a> Using Assumed Roles
### <a name="using"></a> Configuring Assumed Roles
To use assumed roles, the S3A client credentials provider must be set to
the `AssumedRoleCredentialProvider`, and `fs.s3a.assumed.role.arn` to
@ -78,7 +80,6 @@ the previously created ARN.
```xml
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider</value>
<value>org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider</value>
</property>
@ -159,7 +160,18 @@ Here are the full set of configuration options.
<name>fs.s3a.assumed.role.sts.endpoint</name>
<value/>
<description>
AWS Simple Token Service Endpoint. If unset, uses the default endpoint.
AWS Security Token Service Endpoint. If unset, uses the default endpoint.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
<property>
<name>fs.s3a.assumed.role.sts.endpoint.region</name>
<value>us-west-1</value>
<description>
AWS Security Token Service Endpoint's region;
Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint
other than the default one and the v4 signature is used.
Only used if AssumedRoleCredentialProvider is the AWS credential provider.
</description>
</property>
@ -194,39 +206,101 @@ These lists represent the minimum actions to which the client's principal
must have in order to work with a bucket.
### Read Access Permissions
### <a name="read-permissions"></a> Read Access Permissions
Permissions which must be granted when reading from a bucket:
| Action | S3A operations |
|--------|----------|
| `s3:ListBucket` | `listStatus()`, `getFileStatus()` and elsewhere |
| `s3:GetObject` | `getFileStatus()`, `open()` and elsewhere |
| `s3:ListBucketMultipartUploads` | Aborting/cleaning up S3A commit operations|
```
s3:Get*
s3:ListBucket
```
When using S3Guard, the client needs the appropriate
<a href="s3guard-permissions">DynamoDB access permissions</a>
The `s3:ListBucketMultipartUploads` is only needed when committing work
via the [S3A committers](committers.html).
However, it must be granted to the root path in order to safely clean up jobs.
It is simplest to permit this in all buckets, even if it is only actually
needed when writing data.
To use SSE-KMS encryption, the client needs the
<a href="sse-kms-permissions">SSE-KMS Permissions</a> to access the
KMS key(s).
### <a name="write-permissions"></a> Write Access Permissions
### Write Access Permissions
These permissions must all be granted for write access:
These permissions must *also* be granted for write access:
```
s3:Get*
s3:Delete*
s3:Put*
s3:ListBucket
s3:ListBucketMultipartUploads
s3:AbortMultipartUpload
```
### <a name="sse-kms-permissions"></a> SSE-KMS Permissions
| Action | S3A operations |
|--------|----------|
| `s3:PutObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
| `s3:DeleteObject` | `mkdir()`, `create()`, `rename()`, `delete()` |
| `s3:AbortMultipartUpload` | S3A committer `abortJob()` and `cleanup()` operations |
| `s3:ListMultipartUploadParts` | S3A committer `abortJob()` and `cleanup()` operations |
When to read data encrypted using SSE-KMS, the client must have
`kms:Decrypt` permission for the specific key a file was encrypted with.
```
kms:Decrypt
```
### Mixed Permissions in a single S3 Bucket
To write data using SSE-KMS, the client must have all the following permissions.
```
kms:Decrypt
kms:GenerateDataKey
```
This includes renaming: renamed files are encrypted with the encryption key
of the current S3A client; it must decrypt the source file first.
If the caller doesn't have these permissions, the operation will fail with an
`AccessDeniedException`: the S3 Store does not provide the specifics of
the cause of the failure.
### <a name="s3guard-permissions"></a> S3Guard Permissions
To use S3Guard, all clients must have a subset of the
[AWS DynamoDB Permissions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/api-permissions-reference.html).
To work with buckets protected with S3Guard, the client must have
all the following rights on the DynamoDB Table used to protect that bucket.
```
dynamodb:BatchGetItem
dynamodb:BatchWriteItem
dynamodb:DeleteItem
dynamodb:DescribeTable
dynamodb:GetItem
dynamodb:PutItem
dynamodb:Query
dynamodb:UpdateItem
```
This is true, *even if the client only has read access to the data*.
For the `hadoop s3guard` table management commands, _extra_ permissions are required:
```
dynamodb:CreateTable
dynamodb:DescribeLimits
dynamodb:DeleteTable
dynamodb:Scan
dynamodb:TagResource
dynamodb:UntagResource
dynamodb:UpdateTable
```
Without these permissions, tables cannot be created, destroyed or have their IO capacity
changed through the `s3guard set-capacity` call.
The `dynamodb:Scan` permission is needed for `s3guard prune`
The `dynamodb:CreateTable` permission is needed by a client it tries to
create the DynamoDB table on startup, that is
`fs.s3a.s3guard.ddb.table.create` is `true` and the table does not already exist.
### <a name="mixed-permissions"></a> Mixed Permissions in a single S3 Bucket
Mixing permissions down the "directory tree" is limited
only to the extent of supporting writeable directories under
@ -274,7 +348,7 @@ This example has the base bucket read only, and a directory underneath,
"Action" : [
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:GetObject"
"s3:Get*"
],
"Resource" : "arn:aws:s3:::example-bucket/*"
}, {
@ -320,7 +394,7 @@ the command line before trying to use the S3A client.
`hadoop fs -mkdirs -p s3a://bucket/path/p1/`
### <a name="no_role"></a>IOException: "Unset property fs.s3a.assumed.role.arn"
### <a name="no_role"></a> IOException: "Unset property fs.s3a.assumed.role.arn"
The Assumed Role Credential Provider is enabled, but `fs.s3a.assumed.role.arn` is unset.
@ -339,7 +413,7 @@ java.io.IOException: Unset property fs.s3a.assumed.role.arn
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474)
```
### <a name="not_authorized_for_assumed_role"></a>"Not authorized to perform sts:AssumeRole"
### <a name="not_authorized_for_assumed_role"></a> "Not authorized to perform sts:AssumeRole"
This can arise if the role ARN set in `fs.s3a.assumed.role.arn` is invalid
or one to which the caller has no access.
@ -399,7 +473,8 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
The value of `fs.s3a.assumed.role.session.duration` is out of range.
```
java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min - 1Hr
java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min
- 1Hr
at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$Builder.withRoleSessionDurationSeconds(STSAssumeRoleSessionCredentialsProvider.java:437)
at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:86)
```
@ -603,7 +678,7 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
### <a name="invalid_token"></a> `AccessDeniedException/InvalidClientTokenId`: "The security token included in the request is invalid"
The credentials used to authenticate with the AWS Simple Token Service are invalid.
The credentials used to authenticate with the AWS Security Token Service are invalid.
```
[ERROR] Failures:
@ -682,26 +757,7 @@ org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.f
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$expectFileSystemFailure$0(ITestAssumeRole.java:70)
at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$interceptC$1(ITestAssumeRole.java:84)
at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:491)
at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:377)
at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:446)
at org.apache.hadoop.fs.s3a.ITestAssumeRole.interceptC(ITestAssumeRole.java:82)
at org.apache.hadoop.fs.s3a.ITestAssumeRole.expectFileSystemFailure(ITestAssumeRole.java:68)
at org.apache.hadoop.fs.s3a.ITestAssumeRole.testAssumeRoleBadSession(ITestAssumeRole.java:216)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74)
Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
1 validation error detected: Value 'Session Names cannot Hava Spaces!' at 'roleSessionName'
failed to satisfy constraint:
@ -742,10 +798,11 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc
### <a name="access_denied"></a> `java.nio.file.AccessDeniedException` within a FileSystem API call
If an operation fails with an `AccessDeniedException`, then the role does not have
the permission for the S3 Operation invoked during the call
the permission for the S3 Operation invoked during the call.
```
java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir: rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest)
java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir:
rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest)
on s3a://bucket/readonlyDir:
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 2805F2ABF5246BB1;
@ -795,3 +852,33 @@ check the path for the operation.
Make sure that all the read and write permissions are allowed for any bucket/path
to which data is being written to, and read permissions for all
buckets read from.
If the bucket is using SSE-KMS to encrypt data:
1. The caller must have the `kms:Decrypt` permission to read the data.
1. The caller needs `kms:Decrypt` and `kms:GenerateDataKey`.
Without permissions, the request fails *and there is no explicit message indicating
that this is an encryption-key issue*.
### <a name="dynamodb_exception"></a> `AccessDeniedException` + `AmazonDynamoDBException`
```
java.nio.file.AccessDeniedException: bucket1:
com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException:
User: arn:aws:sts::980678866538:assumed-role/s3guard-test-role/test is not authorized to perform:
dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-west-1:980678866538:table/bucket1
(Service: AmazonDynamoDBv2; Status Code: 400;
```
The caller is trying to access an S3 bucket which uses S3Guard, but the caller
lacks the relevant DynamoDB access permissions.
The `dynamodb:DescribeTable` operation is the first one used in S3Guard to access,
the DynamoDB table, so it is often the first to fail. It can be a sign
that the role has no permissions at all to access the table named in the exception,
or just that this specific permission has been omitted.
If the role policy requested for the assumed role didn't ask for any DynamoDB
permissions, this is where all attempts to work with a S3Guarded bucket will
fail. Check the value of `fs.s3a.assumed.role.policy`

View File

@ -33,7 +33,7 @@ See also:
* [Working with IAM Assumed Roles](./assumed_roles.html)
* [Testing](./testing.html)
##<a name="overview"></a> Overview
## <a name="overview"></a> Overview
Apache Hadoop's `hadoop-aws` module provides support for AWS integration.
applications to easily use this support.
@ -88,7 +88,7 @@ maintain it.
This connector is no longer available: users must migrate to the newer `s3a:` client.
##<a name="getting_started"></a> Getting Started
## <a name="getting_started"></a> Getting Started
S3A depends upon two JARs, alongside `hadoop-common` and its dependencies.
@ -1698,6 +1698,6 @@ as configured by the value `fs.s3a.multipart.size`.
To disable checksum verification in `distcp`, use the `-skipcrccheck` option:
```bash
hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets
hadoop distcp -update -skipcrccheck -numListstatusThreads 40 /user/alice/datasets s3a://alice-backup/datasets
```

View File

@ -36,14 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@ -60,6 +52,9 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
import static org.junit.Assert.*;
/**
* S3A tests for configuration.
@ -134,12 +129,26 @@ public void testProxyConnection() throws Exception {
conf.setInt(Constants.PROXY_PORT, 1);
String proxy =
conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server at " + proxy);
} catch (AWSClientIOException e) {
// expected
}
expectFSCreateFailure(AWSClientIOException.class,
conf, "when using proxy " + proxy);
}
/**
* Expect a filesystem to not be created from a configuration
* @return the exception intercepted
* @throws Exception any other exception
*/
private <E extends Throwable> E expectFSCreateFailure(
Class<E> clazz,
Configuration conf,
String text)
throws Exception {
return intercept(clazz,
() -> {
fs = S3ATestUtils.createTestFileSystem(conf);
return "expected failure creating FS " + text + " got " + fs;
});
}
@Test
@ -148,15 +157,13 @@ public void testProxyPortWithoutHost() throws Exception {
conf.unset(Constants.PROXY_HOST);
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.PROXY_PORT, 1);
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a proxy configuration error");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_HOST) &&
!msg.contains(Constants.PROXY_PORT)) {
throw e;
}
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
String msg = e.toString();
if (!msg.contains(Constants.PROXY_HOST) &&
!msg.contains(Constants.PROXY_PORT)) {
throw e;
}
}
@ -167,19 +174,11 @@ public void testAutomaticProxyPortSelection() throws Exception {
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.set(Constants.SECURE_CONNECTIONS, "true");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AWSClientIOException e) {
// expected
}
expectFSCreateFailure(AWSClientIOException.class,
conf, "Expected a connection error for proxy server");
conf.set(Constants.SECURE_CONNECTIONS, "false");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (AWSClientIOException e) {
// expected
}
expectFSCreateFailure(AWSClientIOException.class,
conf, "Expected a connection error for proxy server");
}
@Test
@ -189,31 +188,31 @@ public void testUsernameInconsistentWithPassword() throws Exception {
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_USERNAME, "user");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
assertIsProxyUsernameError(e);
}
private void assertIsProxyUsernameError(final IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
}
@Test
public void testUsernameInconsistentWithPassword2() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
conf.setInt(Constants.PROXY_PORT, 1);
conf.set(Constants.PROXY_PASSWORD, "password");
try {
fs = S3ATestUtils.createTestFileSystem(conf);
fail("Expected a connection error for proxy server");
} catch (IllegalArgumentException e) {
String msg = e.toString();
if (!msg.contains(Constants.PROXY_USERNAME) &&
!msg.contains(Constants.PROXY_PASSWORD)) {
throw e;
}
}
IllegalArgumentException e = expectFSCreateFailure(
IllegalArgumentException.class,
conf, "Expected a connection error for proxy server");
assertIsProxyUsernameError(e);
}
@Test
@ -393,7 +392,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
// Catch/pass standard path style access behaviour when live bucket
// isn't in the same region as the s3 client default. See
// http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY);
assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode());
}
}
@ -428,8 +427,16 @@ public void testCustomUserAgent() throws Exception {
public void testCloseIdempotent() throws Throwable {
conf = new Configuration();
fs = S3ATestUtils.createTestFileSystem(conf);
AWSCredentialProviderList credentials =
fs.shareCredentials("testCloseIdempotent");
credentials.close();
fs.close();
assertTrue("Closing FS didn't close credentials " + credentials,
credentials.isClosed());
assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
fs.close();
// and the numbers should not change
assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount());
}
@Test

View File

@ -19,15 +19,14 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.net.URI;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
@ -55,6 +54,14 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
private static final long TEST_FILE_SIZE = 1024;
private AWSCredentialProviderList credentials;
@Override
public void teardown() throws Exception {
S3AUtils.closeAutocloseables(LOG, credentials);
super.teardown();
}
/**
* Test use of STS for requesting temporary credentials.
*
@ -63,7 +70,7 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
* S3A tests to request temporary credentials, then attempt to use those
* credentials instead.
*
* @throws IOException
* @throws IOException failure
*/
@Test
public void testSTS() throws IOException {
@ -71,21 +78,20 @@ public void testSTS() throws IOException {
if (!conf.getBoolean(TEST_STS_ENABLED, true)) {
skip("STS functional tests disabled");
}
S3AFileSystem testFS = getFileSystem();
credentials = testFS.shareCredentials("testSTS");
S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys(
URI.create("s3a://foobar"), conf);
if (!login.hasLogin()) {
skip("testSTS disabled because AWS credentials not configured");
}
AWSCredentialsProvider parentCredentials = new BasicAWSCredentialsProvider(
login.getUser(), login.getPassword());
String bucket = testFS.getBucket();
AWSSecurityTokenServiceClientBuilder builder = STSClientFactory.builder(
conf,
bucket,
credentials,
conf.getTrimmed(TEST_STS_ENDPOINT, ""), "");
AWSSecurityTokenService stsClient = builder.build();
String stsEndpoint = conf.getTrimmed(TEST_STS_ENDPOINT, "");
AWSSecurityTokenServiceClient stsClient;
stsClient = new AWSSecurityTokenServiceClient(parentCredentials);
if (!stsEndpoint.isEmpty()) {
LOG.debug("STS Endpoint ={}", stsEndpoint);
stsClient.setEndpoint(stsEndpoint);
if (!conf.getTrimmed(TEST_STS_ENDPOINT, "").isEmpty()) {
LOG.debug("STS Endpoint ={}", conf.getTrimmed(TEST_STS_ENDPOINT, ""));
stsClient.setEndpoint(conf.getTrimmed(TEST_STS_ENDPOINT, ""));
}
GetSessionTokenRequest sessionTokenRequest = new GetSessionTokenRequest();
sessionTokenRequest.setDurationSeconds(900);
@ -93,23 +99,28 @@ public void testSTS() throws IOException {
sessionTokenResult = stsClient.getSessionToken(sessionTokenRequest);
Credentials sessionCreds = sessionTokenResult.getCredentials();
String childAccessKey = sessionCreds.getAccessKeyId();
conf.set(ACCESS_KEY, childAccessKey);
String childSecretKey = sessionCreds.getSecretAccessKey();
conf.set(SECRET_KEY, childSecretKey);
String sessionToken = sessionCreds.getSessionToken();
conf.set(SESSION_TOKEN, sessionToken);
// clone configuration so changes here do not affect the base FS.
Configuration conf2 = new Configuration(conf);
S3AUtils.clearBucketOption(conf2, bucket, AWS_CREDENTIALS_PROVIDER);
S3AUtils.clearBucketOption(conf2, bucket, ACCESS_KEY);
S3AUtils.clearBucketOption(conf2, bucket, SECRET_KEY);
S3AUtils.clearBucketOption(conf2, bucket, SESSION_TOKEN);
conf.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS);
conf2.set(ACCESS_KEY, sessionCreds.getAccessKeyId());
conf2.set(SECRET_KEY, sessionCreds.getSecretAccessKey());
conf2.set(SESSION_TOKEN, sessionCreds.getSessionToken());
try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
conf2.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS);
// with valid credentials, we can set properties.
try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) {
createAndVerifyFile(fs, path("testSTS"), TEST_FILE_SIZE);
}
// now create an invalid set of credentials by changing the session
// token
conf.set(SESSION_TOKEN, "invalid-" + sessionToken);
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
conf2.set(SESSION_TOKEN, "invalid-" + sessionCreds.getSessionToken());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) {
createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE);
fail("Expected an access exception, but file access to "
+ fs.getUri() + " was allowed: " + fs);

View File

@ -20,6 +20,7 @@
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -28,6 +29,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Assume;
import org.junit.Test;
@ -37,6 +39,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
@ -71,7 +74,9 @@ protected AbstractFSContract createContract(Configuration conf) {
// Other configs would break test assumptions
conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
// this is a long value to guarantee that the inconsistency holds
// even over long-haul connections, and in the debugger too/
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, (long) (60 * 1000));
return new S3AContract(conf);
}
@ -524,37 +529,60 @@ public void testInconsistentS3ClientDeletes() throws Throwable {
ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/");
ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
assertListSizeEqual(
"InconsistentAmazonS3Client added back objects incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getObjectSummaries(),
postDeleteDelimited.getObjectSummaries());
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getObjectSummaries().size(),
postDeleteDelimited.getObjectSummaries().size()
preDeleteDelimited.getCommonPrefixes(),
postDeleteDelimited.getCommonPrefixes()
);
assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getCommonPrefixes().size(),
postDeleteDelimited.getCommonPrefixes().size()
);
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
assertListSizeEqual("InconsistentAmazonS3Client added back objects incorrectly " +
"in a recursive listing",
preDeleteUndelimited.getObjectSummaries().size(),
postDeleteUndelimited.getObjectSummaries().size()
preDeleteUndelimited.getObjectSummaries(),
postDeleteUndelimited.getObjectSummaries()
);
assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a recursive listing",
preDeleteUndelimited.getCommonPrefixes().size(),
postDeleteUndelimited.getCommonPrefixes().size()
preDeleteUndelimited.getCommonPrefixes(),
postDeleteUndelimited.getCommonPrefixes()
);
}
/**
* retrying v2 list.
* @param fs
* @param key
* @param delimiter
* @return
* Assert that the two list sizes match; failure message includes the lists.
* @param message text for the assertion
* @param expected expected list
* @param actual actual list
* @param <T> type of list
*/
private <T> void assertListSizeEqual(String message,
List<T> expected,
List<T> actual) {
String leftContents = expected.stream()
.map(n -> n.toString())
.collect(Collectors.joining("\n"));
String rightContents = actual.stream()
.map(n -> n.toString())
.collect(Collectors.joining("\n"));
String summary = "\nExpected:" + leftContents
+ "\n-----------\nActual:" + rightContents;
assertEquals(message + summary, expected.size(), actual.size());
}
/**
* Retrying v2 list directly through the s3 client.
* @param fs filesystem
* @param key key to list under
* @param delimiter any delimiter
* @return the listing
* @throws IOException on error
*/
@Retries.RetryRaw
private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
String key, String delimiter) throws IOException {
ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)

View File

@ -65,11 +65,12 @@ public void testListStatusWriteBack() throws Exception {
// delete the existing directory (in case of last test failure)
noS3Guard.delete(directory, true);
// Create a directory on S3 only
noS3Guard.mkdirs(new Path(directory, "OnS3"));
Path onS3 = new Path(directory, "OnS3");
noS3Guard.mkdirs(onS3);
// Create a directory on both S3 and metadata store
Path p = new Path(directory, "OnS3AndMS");
ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", p);
noWriteBack.mkdirs(p);
Path onS3AndMS = new Path(directory, "OnS3AndMS");
ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", onS3AndMS);
noWriteBack.mkdirs(onS3AndMS);
FileStatus[] fsResults;
DirListingMetadata mdResults;
@ -83,6 +84,8 @@ public void testListStatusWriteBack() throws Exception {
// Metadata store without write-back should still only contain /OnS3AndMS,
// because newly discovered /OnS3 is not written back to metadata store
mdResults = noWriteBack.getMetadataStore().listChildren(directory);
assertNotNull("No results from noWriteBack listChildren " + directory,
mdResults);
assertEquals("Metadata store without write back should still only know "
+ "about /OnS3AndMS, but it has: " + mdResults,
1, mdResults.numEntries());
@ -102,8 +105,7 @@ public void testListStatusWriteBack() throws Exception {
// If we don't clean this up, the next test run will fail because it will
// have recorded /OnS3 being deleted even after it's written to noS3Guard.
getFileSystem().getMetadataStore().forgetMetadata(
new Path(directory, "OnS3"));
getFileSystem().getMetadataStore().forgetMetadata(onS3);
}
/**
@ -118,26 +120,33 @@ private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard,
// Create a FileSystem that is S3-backed only
conf = createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
String host = fsURI.getHost();
if (disableS3Guard) {
conf.set(Constants.S3_METADATA_STORE_IMPL,
Constants.S3GUARD_METASTORE_NULL);
S3AUtils.setBucketOption(conf, host,
S3_METADATA_STORE_IMPL,
S3GUARD_METASTORE_NULL);
} else {
S3ATestUtils.maybeEnableS3Guard(conf);
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
S3AUtils.setBucketOption(conf, host,
METADATASTORE_AUTHORITATIVE,
Boolean.toString(authoritativeMeta));
S3AUtils.setBucketOption(conf, host,
S3_METADATA_STORE_IMPL,
conf.get(S3_METADATA_STORE_IMPL));
String metastore;
metastore = S3GUARD_METASTORE_NULL;
if (!disableS3Guard) {
// pick up the metadata store used by the main test
metastore = getFileSystem().getConf().get(S3_METADATA_STORE_IMPL);
assertNotEquals(S3GUARD_METASTORE_NULL, metastore);
}
FileSystem fs = FileSystem.get(fsURI, conf);
return asS3AFS(fs);
conf.set(Constants.S3_METADATA_STORE_IMPL, metastore);
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
S3AUtils.setBucketOption(conf, host,
METADATASTORE_AUTHORITATIVE,
Boolean.toString(authoritativeMeta));
S3AUtils.setBucketOption(conf, host,
S3_METADATA_STORE_IMPL, metastore);
S3AFileSystem fs = asS3AFS(FileSystem.newInstance(fsURI, conf));
// do a check to verify that everything got through
assertEquals("Metadata store should have been disabled: " + fs,
disableS3Guard, !fs.hasMetadataStore());
assertEquals("metastore option did not propagate",
metastore, fs.getConf().get(S3_METADATA_STORE_IMPL));
return fs;
}
private static S3AFileSystem asS3AFS(FileSystem fs) {

View File

@ -23,6 +23,7 @@
import java.net.URI;
import java.util.ArrayList;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.Region;
@ -34,8 +35,9 @@
public class MockS3ClientFactory implements S3ClientFactory {
@Override
public AmazonS3 createS3Client(URI name) {
String bucket = name.getHost();
public AmazonS3 createS3Client(URI name,
final String bucket,
final AWSCredentialsProvider credentialSet) {
AmazonS3 s3 = mock(AmazonS3.class);
when(s3.doesBucketExist(bucket)).thenReturn(true);
// this listing is used in startup if purging is enabled, so

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.Arrays;
import java.util.List;
@ -34,11 +35,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.*;
/**
@ -221,14 +226,13 @@ private void expectException(Class<? extends Throwable> exceptionClass,
}
private void expectProviderInstantiationFailure(String option,
String expectedErrorText) throws IOException {
String expectedErrorText) throws Exception {
Configuration conf = new Configuration();
conf.set(AWS_CREDENTIALS_PROVIDER, option);
Path testFile = new Path(
conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
expectException(IOException.class, expectedErrorText);
URI uri = testFile.toUri();
S3AUtils.createAWSCredentialProviderSet(uri, conf);
intercept(IOException.class, expectedErrorText,
() -> S3AUtils.createAWSCredentialProviderSet(testFile.toUri(), conf));
}
/**
@ -288,4 +292,68 @@ public void testAuthenticationContainsProbes() {
authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
}
@Test
public void testExceptionLogic() throws Throwable {
AWSCredentialProviderList providers
= new AWSCredentialProviderList();
// verify you can't get credentials from it
NoAuthWithAWSException noAuth = intercept(NoAuthWithAWSException.class,
AWSCredentialProviderList.NO_AWS_CREDENTIAL_PROVIDERS,
() -> providers.getCredentials());
// but that it closes safely
providers.close();
S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration());
assertEquals("Expected no retry on auth failure",
RetryPolicy.RetryAction.FAIL.action,
retryPolicy.shouldRetry(noAuth, 0, 0, true).action);
try {
throw S3AUtils.translateException("login", "", noAuth);
} catch (AccessDeniedException expected) {
// this is what we want; other exceptions will be passed up
assertEquals("Expected no retry on AccessDeniedException",
RetryPolicy.RetryAction.FAIL.action,
retryPolicy.shouldRetry(expected, 0, 0, true).action);
}
}
@Test
public void testRefCounting() throws Throwable {
AWSCredentialProviderList providers
= new AWSCredentialProviderList();
assertEquals("Ref count for " + providers,
1, providers.getRefCount());
AWSCredentialProviderList replicate = providers.share();
assertEquals(providers, replicate);
assertEquals("Ref count after replication for " + providers,
2, providers.getRefCount());
assertFalse("Was closed " + providers, providers.isClosed());
providers.close();
assertFalse("Was closed " + providers, providers.isClosed());
assertEquals("Ref count after close() for " + providers,
1, providers.getRefCount());
// this should now close it
providers.close();
assertTrue("Was not closed " + providers, providers.isClosed());
assertEquals("Ref count after close() for " + providers,
0, providers.getRefCount());
assertEquals("Ref count after second close() for " + providers,
0, providers.getRefCount());
intercept(IllegalStateException.class, "closed",
() -> providers.share());
// final call harmless
providers.close();
assertEquals("Ref count after close() for " + providers,
0, providers.getRefCount());
providers.refresh();
intercept(NoAuthWithAWSException.class,
AWSCredentialProviderList.CREDENTIALS_REQUESTED_WHEN_CLOSED,
() -> providers.getCredentials());
}
}

View File

@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.LambdaTestUtils.*;
/**
@ -85,6 +86,24 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
*/
private S3AFileSystem roleFS;
/**
* Duration range exception text on SDKs which check client-side.
*/
protected static final String E_DURATION_RANGE_1
= "Assume Role session duration should be in the range of 15min - 1Hr";
/**
* Duration range too high text on SDKs which check on the server.
*/
protected static final String E_DURATION_RANGE_2
= "Member must have value less than or equal to 43200";
/**
* Duration range too low text on SDKs which check on the server.
*/
protected static final String E_DURATION_RANGE_3
= "Member must have value greater than or equal to 900";
@Override
public void setup() throws Exception {
super.setup();
@ -112,13 +131,14 @@ private String getAssumedRoleARN() {
* @param clazz class of exception to expect
* @param text text in exception
* @param <E> type of exception as inferred from clazz
* @return the caught exception if it was of the expected type and contents
* @throws Exception if the exception was the wrong class
*/
private <E extends Throwable> void expectFileSystemCreateFailure(
private <E extends Throwable> E expectFileSystemCreateFailure(
Configuration conf,
Class<E> clazz,
String text) throws Exception {
interceptClosing(clazz,
return interceptClosing(clazz,
text,
() -> new Path(getFileSystem().getUri()).getFileSystem(conf));
}
@ -246,6 +266,60 @@ public void testAssumeRoleBadSession() throws Exception {
"Member must satisfy regular expression pattern");
}
/**
* A duration >1h is forbidden client-side in AWS SDK 1.11.271;
* with the ability to extend durations deployed in March 2018,
* duration checks will need to go server-side, and, presumably,
* later SDKs will remove the client side checks.
* This code exists to see when this happens.
*/
@Test
public void testAssumeRoleThreeHourSessionDuration() throws Exception {
describe("Try to authenticate with a long session duration");
Configuration conf = createAssumedRoleConfig();
// add a duration of three hours
conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 3 * 60 * 60);
try {
new Path(getFileSystem().getUri()).getFileSystem(conf).close();
LOG.info("Successfully created token of a duration >3h");
} catch (IOException ioe) {
assertExceptionContains(E_DURATION_RANGE_1, ioe);
}
}
/**
* A duration >1h is forbidden client-side in AWS SDK 1.11.271;
* with the ability to extend durations deployed in March 2018.
* with the later SDKs, the checks go server-side and
* later SDKs will remove the client side checks.
* This test asks for a duration which will still be rejected, and
* looks for either of the error messages raised.
*/
@Test
public void testAssumeRoleThirtySixHourSessionDuration() throws Exception {
describe("Try to authenticate with a long session duration");
Configuration conf = createAssumedRoleConfig();
conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 36 * 60 * 60);
IOException ioe = expectFileSystemCreateFailure(conf,
IOException.class, null);
assertIsRangeException(ioe);
}
/**
* Look for either the client-side or STS-side range exception
* @param e exception
* @throws Exception the exception, if its text doesn't match
*/
private void assertIsRangeException(final Exception e) throws Exception {
String message = e.toString();
if (!message.contains(E_DURATION_RANGE_1)
&& !message.contains(E_DURATION_RANGE_2)
&& !message.contains(E_DURATION_RANGE_3)) {
throw e;
}
}
/**
* Create the assumed role configuration.
@ -280,11 +354,11 @@ public void testAssumedIllegalDuration() throws Throwable {
describe("Expect the constructor to fail if the session is to short");
Configuration conf = new Configuration();
conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
interceptClosing(IllegalArgumentException.class, "",
Exception ex = interceptClosing(Exception.class, "",
() -> new AssumedRoleCredentialProvider(uri, conf));
assertIsRangeException(ex);
}
@Test
public void testAssumeRoleCreateFS() throws IOException {
describe("Create an FS client with the role and do some basic IO");
@ -296,24 +370,32 @@ public void testAssumeRoleCreateFS() throws IOException {
conf.get(ACCESS_KEY), roleARN);
try (FileSystem fs = path.getFileSystem(conf)) {
fs.getFileStatus(new Path("/"));
fs.getFileStatus(ROOT);
fs.mkdirs(path("testAssumeRoleFS"));
}
}
@Test
public void testAssumeRoleRestrictedPolicyFS() throws Exception {
describe("Restrict the policy for this session; verify that reads fail");
describe("Restrict the policy for this session; verify that reads fail.");
// there's some special handling of S3Guard here as operations
// which only go to DDB don't fail the way S3 would reject them.
Configuration conf = createAssumedRoleConfig();
bindRolePolicy(conf, RESTRICTED_POLICY);
Path path = new Path(getFileSystem().getUri());
boolean guarded = getFileSystem().hasMetadataStore();
try (FileSystem fs = path.getFileSystem(conf)) {
forbidden("getFileStatus",
() -> fs.getFileStatus(new Path("/")));
forbidden("getFileStatus",
() -> fs.listStatus(new Path("/")));
forbidden("getFileStatus",
if (!guarded) {
// when S3Guard is enabled, the restricted policy still
// permits S3Guard record lookup, so getFileStatus calls
// will work iff the record is in the database.
forbidden("getFileStatus",
() -> fs.getFileStatus(ROOT));
}
forbidden("",
() -> fs.listStatus(ROOT));
forbidden("",
() -> fs.mkdirs(path("testAssumeRoleFS")));
}
}
@ -333,7 +415,11 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable {
Configuration conf = createAssumedRoleConfig();
bindRolePolicy(conf,
policy(statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT)));
policy(
statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT),
ALLOW_S3_GET_BUCKET_LOCATION,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_RW));
Path path = path("testAssumeRoleStillIncludesRolePerms");
roleFS = (S3AFileSystem) path.getFileSystem(conf);
assertTouchForbidden(roleFS, path);
@ -342,6 +428,8 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable {
/**
* After blocking all write verbs used by S3A, try to write data (fail)
* and read data (succeed).
* For S3Guard: full DDB RW access is retained.
* SSE-KMS key access is set to decrypt only.
*/
@Test
public void testReadOnlyOperations() throws Throwable {
@ -352,7 +440,9 @@ public void testReadOnlyOperations() throws Throwable {
bindRolePolicy(conf,
policy(
statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS),
STATEMENT_ALL_S3, STATEMENT_ALL_DDB));
STATEMENT_ALL_S3,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_READ));
Path path = methodPath();
roleFS = (S3AFileSystem) path.getFileSystem(conf);
// list the root path, expect happy
@ -399,8 +489,9 @@ public void testRestrictedWriteSubdir() throws Throwable {
Configuration conf = createAssumedRoleConfig();
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
STATEMENT_ALLOW_SSE_KMS_RW,
new Statement(Effects.Allow)
.addActions(S3_ALL_OPERATIONS)
.addResources(directory(restrictedDir)));
@ -447,7 +538,7 @@ public void testRestrictedSingleDeleteRename() throws Throwable {
}
/**
* Execute a sequence of rename operations.
* Execute a sequence of rename operations with access locked down.
* @param conf FS configuration
*/
public void executeRestrictedRename(final Configuration conf)
@ -461,7 +552,8 @@ public void executeRestrictedRename(final Configuration conf)
fs.delete(basePath, true);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
@ -502,6 +594,25 @@ public void testRestrictedRenameReadOnlySingleDelete() throws Throwable {
executeRenameReadOnlyData(conf);
}
/**
* Without simulation of STS failures, and with STS overload likely to
* be very rare, there'll be no implicit test coverage of
* {@link AssumedRoleCredentialProvider#operationRetried(String, Exception, int, boolean)}.
* This test simply invokes the callback for both the first and second retry event.
*
* If the handler ever adds more than logging, this test ensures that things
* don't break.
*/
@Test
public void testAssumedRoleRetryHandler() throws Throwable {
try(AssumedRoleCredentialProvider provider
= new AssumedRoleCredentialProvider(getFileSystem().getUri(),
createAssumedRoleConfig())) {
provider.operationRetried("retry", new IOException("failure"), 0, true);
provider.operationRetried("retry", new IOException("failure"), 1, true);
}
}
/**
* Execute a sequence of rename operations where the source
* data is read only to the client calling rename().
@ -534,7 +645,7 @@ public void executeRenameReadOnlyData(final Configuration conf)
touch(fs, readOnlyFile);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
@ -614,7 +725,8 @@ public void testRestrictedCommitActions() throws Throwable {
fs.mkdirs(readOnlyDir);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new Statement(Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
@ -752,7 +864,8 @@ public void executePartialDelete(final Configuration conf)
fs.delete(destDir, true);
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
new Statement(Effects.Deny)
.addActions(S3_PATH_WRITE_OPERATIONS)

View File

@ -72,7 +72,8 @@ public void setup() throws Exception {
Configuration conf = newAssumedRoleConfig(getConfiguration(),
getAssumedRoleARN());
bindRolePolicyStatements(conf,
STATEMENT_ALL_DDB,
STATEMENT_S3GUARD_CLIENT,
STATEMENT_ALLOW_SSE_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
new RoleModel.Statement(RoleModel.Effects.Allow)
.addActions(S3_PATH_RW_OPERATIONS)
@ -81,7 +82,6 @@ public void setup() throws Exception {
roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
}
@Override
public void teardown() throws Exception {
S3AUtils.closeAll(LOG, roleFS);
@ -122,7 +122,6 @@ protected Path path(String filepath) throws IOException {
return new Path(restrictedDir, filepath);
}
private String getAssumedRoleARN() {
return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
}

View File

@ -58,14 +58,23 @@ public final class RoleTestUtils {
/** Deny GET requests to all buckets. */
public static final Statement DENY_GET_ALL =
public static final Statement DENY_S3_GET_OBJECT =
statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT);
/**
* This is AWS policy removes read access.
*/
public static final Policy RESTRICTED_POLICY = policy(DENY_GET_ALL);
public static final Statement ALLOW_S3_GET_BUCKET_LOCATION
= statement(true, S3_ALL_BUCKETS, S3_GET_BUCKET_LOCATION);
/**
* This is AWS policy removes read access from S3, leaves S3Guard access up.
* This will allow clients to use S3Guard list/HEAD operations, even
* the ability to write records, but not actually access the underlying
* data.
* The client does need {@link RolePolicies#S3_GET_BUCKET_LOCATION} to
* get the bucket location.
*/
public static final Policy RESTRICTED_POLICY = policy(
DENY_S3_GET_OBJECT, STATEMENT_ALL_DDB, ALLOW_S3_GET_BUCKET_LOCATION
);
/**
* Error message to get from the AWS SDK if you can't assume the role.
@ -145,7 +154,7 @@ public static Configuration newAssumedRoleConfig(
Configuration conf = new Configuration(srcConf);
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
conf.set(ASSUMED_ROLE_ARN, roleARN);
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
conf.set(ASSUMED_ROLE_SESSION_NAME, "test");
conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
disableFilesystemCaching(conf);
return conf;
@ -163,9 +172,8 @@ public static <T> AccessDeniedException forbidden(
String contained,
Callable<T> eval)
throws Exception {
AccessDeniedException ex = intercept(AccessDeniedException.class, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
return intercept(AccessDeniedException.class,
contained, eval);
}
}

View File

@ -32,6 +32,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.StopWatch;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
@ -51,6 +52,7 @@
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
@ -144,8 +146,11 @@ public void setup() throws Exception {
// Also create a "raw" fs without any MetadataStore configured
Configuration conf = new Configuration(getConfiguration());
conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL);
URI fsUri = getFileSystem().getUri();
conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL);
S3AUtils.setBucketOption(conf,fsUri.getHost(),
METADATASTORE_AUTHORITATIVE,
S3GUARD_METASTORE_NULL);
rawFs = (S3AFileSystem) FileSystem.newInstance(fsUri, conf);
}

View File

@ -40,8 +40,10 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
@ -80,81 +82,102 @@ private void deleteTable(DynamoDB db, String tableName) throws
@Test
public void testConcurrentTableCreations() throws Exception {
final Configuration conf = getConfiguration();
S3AFileSystem fs = getFileSystem();
final Configuration conf = fs.getConf();
Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard",
conf.get(Constants.S3_METADATA_STORE_IMPL).equals(
Constants.S3GUARD_METASTORE_DYNAMO));
AWSCredentialProviderList sharedCreds =
fs.shareCredentials("testConcurrentTableCreations");
// close that shared copy.
sharedCreds.close();
// this is the original reference count.
int originalRefCount = sharedCreds.getRefCount();
//now init the store; this should increment the ref count.
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
ms.initialize(getFileSystem());
DynamoDB db = ms.getDynamoDB();
ms.initialize(fs);
String tableName = "testConcurrentTableCreations" + new Random().nextInt();
conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
// the ref count should have gone up
assertEquals("Credential Ref count unchanged after initializing metastore "
+ sharedCreds,
originalRefCount + 1, sharedCreds.getRefCount());
try {
DynamoDB db = ms.getDynamoDB();
String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
if (StringUtils.isEmpty(region)) {
// no region set, so pick it up from the test bucket
conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation());
}
int concurrentOps = 16;
int iterations = 4;
String tableName = "testConcurrentTableCreations" + new Random().nextInt();
conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
failIfTableExists(db, tableName);
String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
if (StringUtils.isEmpty(region)) {
// no region set, so pick it up from the test bucket
conf.set(S3GUARD_DDB_REGION_KEY, fs.getBucketLocation());
}
int concurrentOps = 16;
int iterations = 4;
for (int i = 0; i < iterations; i++) {
ExecutorService executor = Executors.newFixedThreadPool(
concurrentOps, new ThreadFactory() {
private AtomicInteger count = new AtomicInteger(0);
failIfTableExists(db, tableName);
public Thread newThread(Runnable r) {
return new Thread(r,
"testConcurrentTableCreations" + count.getAndIncrement());
for (int i = 0; i < iterations; i++) {
ExecutorService executor = Executors.newFixedThreadPool(
concurrentOps, new ThreadFactory() {
private AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r,
"testConcurrentTableCreations" + count.getAndIncrement());
}
});
((ThreadPoolExecutor) executor).prestartAllCoreThreads();
Future<Exception>[] futures = new Future[concurrentOps];
for (int f = 0; f < concurrentOps; f++) {
final int index = f;
futures[f] = executor.submit(new Callable<Exception>() {
@Override
public Exception call() throws Exception {
ContractTestUtils.NanoTimer timer =
new ContractTestUtils.NanoTimer();
Exception result = null;
try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
store.initialize(conf);
} catch (Exception e) {
LOG.error(e.getClass() + ": " + e.getMessage());
result = e;
}
timer.end("Parallel DynamoDB client creation %d", index);
LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
index, timer.getStartTime(), timer.getEndTime());
return result;
}
});
((ThreadPoolExecutor) executor).prestartAllCoreThreads();
Future<Exception>[] futures = new Future[concurrentOps];
for (int f = 0; f < concurrentOps; f++) {
final int index = f;
futures[f] = executor.submit(new Callable<Exception>() {
@Override
public Exception call() throws Exception {
ContractTestUtils.NanoTimer timer =
new ContractTestUtils.NanoTimer();
Exception result = null;
try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
store.initialize(conf);
} catch (Exception e) {
LOG.error(e.getClass() + ": " + e.getMessage());
result = e;
}
timer.end("Parallel DynamoDB client creation %d", index);
LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
index, timer.getStartTime(), timer.getEndTime());
return result;
}
List<Exception> exceptions = new ArrayList<>(concurrentOps);
for (int f = 0; f < concurrentOps; f++) {
Exception outcome = futures[f].get();
if (outcome != null) {
exceptions.add(outcome);
}
});
}
List<Exception> exceptions = new ArrayList<>(concurrentOps);
for (int f = 0; f < concurrentOps; f++) {
Exception outcome = futures[f].get();
if (outcome != null) {
exceptions.add(outcome);
}
deleteTable(db, tableName);
int exceptionsThrown = exceptions.size();
if (exceptionsThrown > 0) {
// at least one exception was thrown. Fail the test & nest the first
// exception caught
throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
" threads threw exceptions while initializing on iteration " + i,
exceptions.get(0));
}
}
deleteTable(db, tableName);
int exceptionsThrown = exceptions.size();
if (exceptionsThrown > 0) {
// at least one exception was thrown. Fail the test & nest the first
// exception caught
throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
" threads threw exceptions while initializing on iteration " + i,
exceptions.get(0));
}
} finally {
ms.close();
}
assertEquals("Credential Ref count unchanged after closing metastore: "
+ sharedCreds,
originalRefCount, sharedCreds.getRefCount());
}
}